Skip to main content
When you have existing conversation history to import, batch processing lets you efficiently ingest large volumes of messages into EverOS. This guide covers data formats, the v1 SDK for batch imports, and monitoring.

Prerequisites

Install the EverOS SDK:
pip install everos
Set your API key and base URL as environment variables (or pass them to the client constructor):
export EVEROS_API_KEY="your-api-key"
export EVEROS_BASE_URL="https://api.evermind.ai"

Data Format

EverOS accepts two message formats depending on whether you are importing group conversations (multi-participant) or personal conversations (single user with an assistant).

Group Conversation Format

Each message must include sender_id and sender_name to identify the participant:
{
  "group_id": "conversation_001",
  "messages": [
    {
      "role": "user",
      "sender_id": "user_alice",
      "sender_name": "Alice",
      "timestamp": 1705312800000,
      "content": "Let's discuss the new architecture proposal.",
      "message_id": "msg_001"
    },
    {
      "role": "user",
      "sender_id": "user_bob",
      "sender_name": "Bob",
      "timestamp": 1705312890000,
      "content": "I've reviewed it. I think we should consider microservices.",
      "message_id": "msg_002"
    }
  ]
}

Personal Conversation Format

For one-on-one conversations between a user and an assistant:
{
  "user_id": "user_alice",
  "messages": [
    {
      "role": "user",
      "timestamp": 1705312800000,
      "content": "I love hiking on weekends."
    },
    {
      "role": "assistant",
      "timestamp": 1705312801000,
      "content": "That sounds wonderful! Do you have a favorite trail?"
    }
  ]
}

Field Reference

Group messages:
FieldRequiredDescription
roleYes"user" for all group participants
sender_idYesUnique identifier for the sender
sender_nameYesDisplay name of the sender
timestampYesUnix timestamp in milliseconds
contentYesMessage text content
message_idNoUnique message identifier
Personal messages:
FieldRequiredDescription
roleYes"user" or "assistant"
timestampYesUnix timestamp in milliseconds
contentYesMessage text content

Converting Your Data

Convert your existing chat data to the EverOS format before importing:
import json
import time
from datetime import datetime
from typing import List, Dict, Any

def convert_slack_export(slack_messages: List[Dict], channel_name: str) -> Dict[str, Any]:
    """Convert Slack export to EverOS group conversation format."""
    messages = []

    for msg in slack_messages:
        ts = float(msg.get("ts", 0))
        messages.append({
            "role": "user",
            "sender_id": msg.get("user", "unknown"),
            "sender_name": msg.get("user_profile", {}).get(
                "real_name", msg.get("user", "Unknown")
            ),
            "timestamp": int(ts * 1000),
            "content": msg.get("text", ""),
            "message_id": msg.get("ts", ""),
        })

    messages.sort(key=lambda m: m["timestamp"])

    return {
        "group_id": f"slack_{channel_name}",
        "messages": messages
    }


def convert_discord_export(
    discord_messages: List[Dict], channel_name: str
) -> Dict[str, Any]:
    """Convert Discord export to EverOS group conversation format."""
    messages = []

    for msg in discord_messages:
        ts_str = msg.get("timestamp", datetime.now().isoformat() + "Z")
        ts_ms = int(datetime.fromisoformat(ts_str.replace("Z", "+00:00")).timestamp() * 1000)

        messages.append({
            "role": "user",
            "sender_id": msg.get("author", {}).get("id", "unknown"),
            "sender_name": msg.get("author", {}).get("username", "Unknown"),
            "timestamp": ts_ms,
            "content": msg.get("content", ""),
            "message_id": msg.get("id", ""),
        })

    messages.sort(key=lambda m: m["timestamp"])

    return {
        "group_id": f"discord_{channel_name}",
        "messages": messages
    }


# Save to file
def save_conversation(data: Dict[str, Any], output_path: str):
    """Save converted conversation to JSON file."""
    with open(output_path, "w") as f:
        json.dump(data, f, indent=2)

# Example
slack_data = [...]  # Your Slack export
converted = convert_slack_export(slack_data, "engineering")
save_conversation(converted, "slack_import.json")

Batch Import Script

Here is a complete batch import script using the EverOS SDK with async mode for efficient processing:
import asyncio
import json
import logging
import time
from pathlib import Path
from typing import List, Dict, Any, Optional

from everos import AsyncEverOS, NotFoundError

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


async def wait_for_task(
    client: AsyncEverOS,
    task_id: str,
    poll_interval: float = 2.0,
    max_attempts: int = 30,
) -> str:
    """Poll a task until completion. Returns final status."""
    for attempt in range(max_attempts):
        try:
            resp = await client.v1.tasks.retrieve(task_id)
        except NotFoundError:
            # Task completed and was cleaned up (short TTL)
            logger.info(f"  [attempt {attempt + 1}] task {task_id} done (expired)")
            return "done"
        status = resp.data.status if resp.data else "unknown"
        logger.info(f"  [attempt {attempt + 1}] task {task_id} status={status}")
        if status in ("success", "failed", "completed", "done"):
            return status
        await asyncio.sleep(poll_interval)
    return "timeout"


class BatchImporter:
    """Batch import conversations into EverOS using the v1 SDK."""

    def __init__(
        self,
        concurrency: int = 10,
        chunk_size: int = 50,
    ):
        self.client = AsyncEverOS()
        self.concurrency = concurrency
        self.chunk_size = chunk_size
        self.semaphore = asyncio.Semaphore(concurrency)

        # Statistics
        self.stats = {
            "messages_sent": 0,
            "messages_failed": 0,
            "conversations_processed": 0,
        }

    async def import_file(self, file_path: str) -> Dict[str, Any]:
        """Import a single conversation JSON file."""
        with open(file_path, "r") as f:
            data = json.load(f)
        return await self.import_conversation(data)

    async def import_conversation(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Import a conversation (group or personal)."""
        is_group = "group_id" in data
        conversation_id = data.get("group_id") or data.get("user_id")
        messages = data.get("messages", [])

        logger.info(
            f"Processing {'group' if is_group else 'personal'} "
            f"conversation: {conversation_id} ({len(messages)} messages)"
        )

        # 1. Set up group metadata if needed
        if is_group and data.get("group_meta"):
            await self._setup_group(data["group_id"], data["group_meta"])

        # 2. Import messages in chunks
        total_sent = 0
        total_failed = 0

        for i in range(0, len(messages), self.chunk_size):
            chunk = messages[i : i + self.chunk_size]
            sent, failed = await self._send_chunk(data, chunk, is_group)
            total_sent += sent
            total_failed += failed

        # 3. Flush to trigger memory extraction
        await self._flush(data, is_group)

        self.stats["messages_sent"] += total_sent
        self.stats["messages_failed"] += total_failed
        self.stats["conversations_processed"] += 1

        logger.info(
            f"Completed {conversation_id}: {total_sent} sent, {total_failed} failed"
        )

        return {
            "conversation_id": conversation_id,
            "messages_sent": total_sent,
            "messages_failed": total_failed,
        }

    async def _setup_group(self, group_id: str, group_meta: Dict):
        """Create or update group metadata."""
        try:
            await self.client.v1.groups.create(
                group_id=group_id,
                name=group_meta.get("name", group_id),
                description=group_meta.get("description", ""),
            )
        except Exception as e:
            logger.warning(f"Failed to create group {group_id}: {e}")

    async def _send_chunk(
        self, data: Dict, chunk: List[Dict], is_group: bool
    ) -> tuple:
        """Send a chunk of messages using async mode."""
        async with self.semaphore:
            try:
                if is_group:
                    response = await self.client.v1.memories.group.add(
                        group_id=data["group_id"],
                        async_mode=True,
                        messages=chunk,
                    )
                else:
                    response = await self.client.v1.memories.add(
                        user_id=data["user_id"],
                        async_mode=True,
                        messages=chunk,
                    )

                # Wait for async task to complete
                task_id = response.data.task_id if response.data else None
                if task_id:
                    status = await wait_for_task(self.client, task_id)
                    if status in ("failed", "timeout"):
                        logger.error(f"Task {task_id} ended with status: {status}")
                        return 0, len(chunk)

                return len(chunk), 0

            except Exception as e:
                logger.error(f"Failed to send chunk: {e}")
                return 0, len(chunk)

    async def _flush(self, data: Dict, is_group: bool):
        """Flush to trigger memory extraction."""
        try:
            if is_group:
                await self.client.v1.memories.group.flush(
                    group_id=data["group_id"]
                )
            else:
                await self.client.v1.memories.flush(user_id=data["user_id"])
        except Exception as e:
            logger.warning(f"Flush failed: {e}")

    async def import_directory(self, directory: str) -> List[Dict]:
        """Import all JSON files in a directory."""
        path = Path(directory)
        files = sorted(path.glob("*.json"))

        logger.info(f"Found {len(files)} files to import")

        results = []
        for file_path in files:
            try:
                result = await self.import_file(str(file_path))
                results.append(result)
            except Exception as e:
                logger.error(f"Failed to import {file_path}: {e}")
                results.append({"file": str(file_path), "error": str(e)})

        return results

    def get_stats(self) -> Dict[str, int]:
        """Get import statistics."""
        return self.stats.copy()


# Usage
async def main():
    importer = BatchImporter(
        concurrency=20,
        chunk_size=50,
    )

    # Import a single file
    result = await importer.import_file("conversation_001.json")
    print(f"Result: {result}")

    # Import a directory
    results = await importer.import_directory("./exports/")
    print(f"Imported {len(results)} conversations")
    print(f"Stats: {importer.get_stats()}")


if __name__ == "__main__":
    asyncio.run(main())

Monitoring Progress

Add progress tracking for large imports:
from tqdm.asyncio import tqdm

class MonitoredBatchImporter(BatchImporter):
    """Batch importer with progress bars."""

    async def import_conversation(self, data: dict) -> dict:
        """Import with progress tracking."""
        is_group = "group_id" in data
        conversation_id = data.get("group_id") or data.get("user_id")
        messages = data.get("messages", [])

        if is_group and data.get("group_meta"):
            await self._setup_group(data["group_id"], data["group_meta"])

        total_sent = 0
        total_failed = 0
        chunks = [
            messages[i : i + self.chunk_size]
            for i in range(0, len(messages), self.chunk_size)
        ]

        with tqdm(total=len(messages), desc=f"Importing {conversation_id}") as pbar:
            for chunk in chunks:
                sent, failed = await self._send_chunk(data, chunk, is_group)
                total_sent += sent
                total_failed += failed
                pbar.update(len(chunk))

        await self._flush(data, is_group)

        self.stats["messages_sent"] += total_sent
        self.stats["messages_failed"] += total_failed
        self.stats["conversations_processed"] += 1

        return {
            "conversation_id": conversation_id,
            "messages_sent": total_sent,
            "messages_failed": total_failed,
        }

Handling Large Conversations

For very large conversations, process in smaller chunks to manage memory and avoid timeouts:
async def import_large_conversation(
    importer: BatchImporter,
    data: dict,
    chunk_size: int = 1000,
) -> dict:
    """Import a large conversation in stages to manage memory."""
    messages = data.get("messages", [])
    total_chunks = (len(messages) + chunk_size - 1) // chunk_size

    logger.info(f"Processing {len(messages)} messages in {total_chunks} stages")

    total_sent = 0
    total_failed = 0

    for i in range(0, len(messages), chunk_size):
        chunk = messages[i : i + chunk_size]
        stage_data = {**data, "messages": chunk}

        # Only set up group metadata on the first stage
        if i > 0:
            stage_data.pop("group_meta", None)

        result = await importer.import_conversation(stage_data)
        total_sent += result["messages_sent"]
        total_failed += result["messages_failed"]

        logger.info(f"Stage {i // chunk_size + 1}/{total_chunks} complete")

    return {
        "conversation_id": data.get("group_id") or data.get("user_id"),
        "messages_sent": total_sent,
        "messages_failed": total_failed,
    }

Error Recovery

Handle partial failures with checkpointing:
import json
from pathlib import Path


class RecoverableBatchImporter(BatchImporter):
    """Importer with checkpoint/resume support."""

    def __init__(self, checkpoint_file: str = "import_checkpoint.json", **kwargs):
        super().__init__(**kwargs)
        self.checkpoint_file = checkpoint_file
        self.checkpoint = self._load_checkpoint()

    def _load_checkpoint(self) -> dict:
        """Load checkpoint from file."""
        if Path(self.checkpoint_file).exists():
            with open(self.checkpoint_file, "r") as f:
                return json.load(f)
        return {"completed": [], "last_message_index": {}}

    def _save_checkpoint(self):
        """Save checkpoint to file."""
        with open(self.checkpoint_file, "w") as f:
            json.dump(self.checkpoint, f, indent=2)

    async def import_conversation(self, data: dict) -> dict:
        """Import with checkpointing."""
        conversation_id = data.get("group_id") or data.get("user_id")

        # Skip if already completed
        if conversation_id in self.checkpoint["completed"]:
            logger.info(f"Skipping {conversation_id} (already completed)")
            return {"conversation_id": conversation_id, "skipped": True}

        # Resume from last checkpoint
        last_index = self.checkpoint["last_message_index"].get(conversation_id, 0)
        messages = data.get("messages", [])

        if last_index > 0:
            logger.info(f"Resuming {conversation_id} from message {last_index}")
            data = {**data, "messages": messages[last_index:]}

        result = await super().import_conversation(data)

        # Update checkpoint
        new_index = last_index + result.get("messages_sent", 0)
        self.checkpoint["last_message_index"][conversation_id] = new_index

        if result.get("messages_failed", 0) == 0:
            self.checkpoint["completed"].append(conversation_id)

        self._save_checkpoint()
        return result

    def clear_checkpoint(self):
        """Clear checkpoint file."""
        self.checkpoint = {"completed": [], "last_message_index": {}}
        if Path(self.checkpoint_file).exists():
            Path(self.checkpoint_file).unlink()

Best Practices

Always sort messages chronologically before import. EverOS uses message timestamps for boundary detection.
messages.sort(key=lambda m: m["timestamp"])
The SDK sends messages in batches. Adjust chunk_size based on your use case:
# Small conversations (< 500 messages)
chunk_size = 50

# Large conversations (> 10,000 messages)
chunk_size = 100
Balance concurrency to avoid overwhelming the server. Higher concurrency speeds up imports but may trigger rate limits.
# Conservative
concurrency = 5

# Aggressive (for large batch imports)
concurrency = 50
For large imports, process files one at a time rather than loading all into memory.
for file_path in files:
    await importer.import_file(file_path)
    # File data is released after each import
Always call flush after importing a conversation to trigger memory extraction immediately. Without flushing, EverOS waits for the boundary detection timeout before processing.
# Group flush
await client.v1.memories.group.flush(group_id="group_001")

# Personal flush
await client.v1.memories.flush(user_id="user_010")

Next Steps

Python Integration

Production-ready client patterns

Team Collaboration

Use imported group chat memories