Skip to main content
When you have existing conversation history to import, batch processing lets you efficiently ingest large volumes of messages into EverMemOS. This guide covers data formats, concurrent processing, and monitoring.

Data Format: GroupChatFormat

EverMemOS uses a JSON format called GroupChatFormat for batch imports. Each file contains messages from a single conversation group.
{
  "group_id": "conversation_001",
  "group_name": "Team Engineering",
  "scene": "group_chat",
  "user_details": [
    {"user_id": "user_alice", "user_name": "Alice", "role": "tech_lead"},
    {"user_id": "user_bob", "user_name": "Bob", "role": "engineer"}
  ],
  "messages": [
    {
      "message_id": "msg_001",
      "create_time": "2024-01-15T10:00:00Z",
      "sender": "user_alice",
      "sender_name": "Alice",
      "content": "Let's discuss the new architecture proposal.",
      "refer_list": []
    },
    {
      "message_id": "msg_002",
      "create_time": "2024-01-15T10:01:30Z",
      "sender": "user_bob",
      "sender_name": "Bob",
      "content": "I've reviewed it. I think we should consider microservices.",
      "refer_list": [
        {
          "message_id": "msg_001",
          "content": "Let's discuss the new architecture proposal."
        }
      ]
    }
  ]
}

Field Reference

FieldRequiredDescription
group_idYesUnique conversation identifier
group_nameYesHuman-readable conversation name
sceneNoassistant or group_chat (default: group_chat)
user_detailsNoParticipant information with roles
messagesYesArray of messages in chronological order

Message Fields

FieldRequiredDescription
message_idYesUnique message identifier
create_timeYesISO 8601 timestamp
senderYesUser ID of sender
sender_nameNoDisplay name of sender
contentYesMessage text content
refer_listNoReferenced messages (replies)

Converting Your Data

Convert your existing chat data to GroupChatFormat:
import json
from datetime import datetime
from typing import List, Dict, Any

def convert_slack_export(slack_messages: List[Dict]) -> Dict[str, Any]:
    """Convert Slack export to GroupChatFormat."""
    messages = []

    for msg in slack_messages:
        converted = {
            "message_id": msg.get("ts", ""),  # Slack uses timestamp as ID
            "create_time": datetime.fromtimestamp(float(msg.get("ts", 0))).isoformat() + "Z",
            "sender": msg.get("user", "unknown"),
            "sender_name": msg.get("user_profile", {}).get("real_name", msg.get("user", "Unknown")),
            "content": msg.get("text", ""),
            "refer_list": []
        }

        # Handle thread replies
        if msg.get("thread_ts") and msg.get("thread_ts") != msg.get("ts"):
            converted["refer_list"] = [{
                "message_id": msg.get("thread_ts"),
                "content": ""  # Will be filled during processing
            }]

        messages.append(converted)

    return {
        "group_id": f"slack_{slack_messages[0].get('channel', 'unknown')}",
        "group_name": "Imported from Slack",
        "scene": "group_chat",
        "messages": sorted(messages, key=lambda m: m["create_time"])
    }


def convert_discord_export(discord_messages: List[Dict], channel_name: str) -> Dict[str, Any]:
    """Convert Discord export to GroupChatFormat."""
    messages = []

    for msg in discord_messages:
        converted = {
            "message_id": msg.get("id", ""),
            "create_time": msg.get("timestamp", datetime.now().isoformat() + "Z"),
            "sender": msg.get("author", {}).get("id", "unknown"),
            "sender_name": msg.get("author", {}).get("username", "Unknown"),
            "content": msg.get("content", ""),
            "refer_list": []
        }

        # Handle replies
        if msg.get("referenced_message"):
            ref = msg["referenced_message"]
            converted["refer_list"] = [{
                "message_id": ref.get("id", ""),
                "content": ref.get("content", "")[:100]  # Truncate long content
            }]

        messages.append(converted)

    return {
        "group_id": f"discord_{channel_name}",
        "group_name": channel_name,
        "scene": "group_chat",
        "messages": sorted(messages, key=lambda m: m["create_time"])
    }


# Save to file
def save_group_chat(data: Dict[str, Any], output_path: str):
    """Save GroupChatFormat to JSON file."""
    with open(output_path, "w") as f:
        json.dump(data, f, indent=2)

# Example
slack_data = [...]  # Your Slack export
converted = convert_slack_export(slack_data)
save_group_chat(converted, "slack_import.json")

Batch Import Script

Here’s a complete batch import script:
import asyncio
import aiohttp
import json
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class BatchImporter:
    """Batch import conversations into EverMemOS."""

    def __init__(
        self,
        base_url: str = "https://api.evermind.ai",
        api_key: Optional[str] = None,
        concurrency: int = 10,
        rate_limit_delay: float = 0.1  # seconds between requests
    ):
        self.base_url = base_url.rstrip("/")
        self.headers = {"Content-Type": "application/json"}
        if api_key:
            self.headers["Authorization"] = f"Bearer {api_key}"

        self.concurrency = concurrency
        self.rate_limit_delay = rate_limit_delay
        self.semaphore = asyncio.Semaphore(concurrency)

        # Statistics
        self.stats = {
            "messages_sent": 0,
            "messages_failed": 0,
            "conversations_processed": 0
        }

    async def import_file(self, file_path: str) -> Dict[str, Any]:
        """Import a single GroupChatFormat JSON file."""
        with open(file_path, "r") as f:
            data = json.load(f)

        return await self.import_conversation(data)

    async def import_conversation(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Import a conversation from GroupChatFormat data."""
        group_id = data["group_id"]
        logger.info(f"Processing conversation: {group_id}")

        async with aiohttp.ClientSession(headers=self.headers) as session:
            # 1. Set conversation metadata
            if data.get("user_details"):
                await self._set_metadata(session, data)

            # 2. Import messages concurrently
            messages = data.get("messages", [])
            tasks = [
                self._send_message(session, group_id, data.get("group_name", "Import"), msg)
                for msg in messages
            ]

            results = await asyncio.gather(*tasks, return_exceptions=True)

            # Count successes and failures
            successes = sum(1 for r in results if not isinstance(r, Exception))
            failures = sum(1 for r in results if isinstance(r, Exception))

            self.stats["messages_sent"] += successes
            self.stats["messages_failed"] += failures
            self.stats["conversations_processed"] += 1

            logger.info(f"Completed {group_id}: {successes} sent, {failures} failed")

            return {
                "group_id": group_id,
                "messages_sent": successes,
                "messages_failed": failures
            }

    async def _set_metadata(self, session: aiohttp.ClientSession, data: Dict):
        """Set conversation metadata."""
        meta = {
            "group_id": data["group_id"],
            "group_name": data.get("group_name", "Import"),
            "scene": data.get("scene", "group_chat"),
            "user_details": data.get("user_details", [])
        }

        try:
            async with session.post(
                f"{self.base_url}/api/v0/memories/conversation-meta",
                json=meta
            ) as resp:
                resp.raise_for_status()
        except Exception as e:
            logger.warning(f"Failed to set metadata for {data['group_id']}: {e}")

    async def _send_message(
        self,
        session: aiohttp.ClientSession,
        group_id: str,
        group_name: str,
        message: Dict
    ):
        """Send a single message with rate limiting."""
        async with self.semaphore:
            payload = {
                "group_id": group_id,
                "group_name": group_name,
                "message_id": message["message_id"],
                "create_time": message["create_time"],
                "sender": message["sender"],
                "sender_name": message.get("sender_name", message["sender"]),
                "content": message["content"],
                "refer_list": message.get("refer_list", [])
            }

            try:
                async with session.post(
                    f"{self.base_url}/api/v0/memories",
                    json=payload
                ) as resp:
                    resp.raise_for_status()
                    await asyncio.sleep(self.rate_limit_delay)
                    return await resp.json()
            except Exception as e:
                logger.error(f"Failed to send message {message['message_id']}: {e}")
                raise

    async def import_directory(self, directory: str) -> List[Dict]:
        """Import all JSON files in a directory."""
        path = Path(directory)
        files = list(path.glob("*.json"))

        logger.info(f"Found {len(files)} files to import")

        results = []
        for file_path in files:
            try:
                result = await self.import_file(str(file_path))
                results.append(result)
            except Exception as e:
                logger.error(f"Failed to import {file_path}: {e}")
                results.append({"file": str(file_path), "error": str(e)})

        return results

    def get_stats(self) -> Dict[str, int]:
        """Get import statistics."""
        return self.stats.copy()


# Usage
async def main():
    importer = BatchImporter(
        base_url="https://api.evermind.ai",
        concurrency=20,
        rate_limit_delay=0.05  # 50ms between requests
    )

    # Import a single file
    result = await importer.import_file("conversation_001.json")
    print(f"Result: {result}")

    # Import a directory
    results = await importer.import_directory("./exports/")
    print(f"Imported {len(results)} conversations")
    print(f"Stats: {importer.get_stats()}")


if __name__ == "__main__":
    asyncio.run(main())

Monitoring Progress

Add progress tracking for large imports:
from tqdm.asyncio import tqdm
import asyncio

class MonitoredBatchImporter(BatchImporter):
    """Batch importer with progress bars."""

    async def import_conversation(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Import with progress bar."""
        group_id = data["group_id"]
        messages = data.get("messages", [])

        async with aiohttp.ClientSession(headers=self.headers) as session:
            if data.get("user_details"):
                await self._set_metadata(session, data)

            # Progress bar for messages
            results = []
            async for result in tqdm(
                self._send_messages_generator(session, group_id, data.get("group_name", "Import"), messages),
                total=len(messages),
                desc=f"Importing {group_id}"
            ):
                results.append(result)

            successes = sum(1 for r in results if not isinstance(r, Exception))
            failures = len(results) - successes

            self.stats["messages_sent"] += successes
            self.stats["messages_failed"] += failures
            self.stats["conversations_processed"] += 1

            return {"group_id": group_id, "messages_sent": successes, "messages_failed": failures}

    async def _send_messages_generator(self, session, group_id, group_name, messages):
        """Generator that yields results as messages are sent."""
        for msg in messages:
            try:
                result = await self._send_message(session, group_id, group_name, msg)
                yield result
            except Exception as e:
                yield e

Handling Large Conversations

For very large conversations, process in chunks:
async def import_large_conversation(
    importer: BatchImporter,
    data: Dict[str, Any],
    chunk_size: int = 1000
) -> Dict[str, Any]:
    """Import large conversation in chunks to manage memory."""
    messages = data.get("messages", [])
    total_chunks = (len(messages) + chunk_size - 1) // chunk_size

    logger.info(f"Processing {len(messages)} messages in {total_chunks} chunks")

    total_sent = 0
    total_failed = 0

    for i in range(0, len(messages), chunk_size):
        chunk = messages[i:i + chunk_size]
        chunk_data = {**data, "messages": chunk}

        # Don't reset metadata for subsequent chunks
        if i > 0:
            chunk_data.pop("user_details", None)

        result = await importer.import_conversation(chunk_data)
        total_sent += result["messages_sent"]
        total_failed += result["messages_failed"]

        logger.info(f"Chunk {i // chunk_size + 1}/{total_chunks} complete")

    return {
        "group_id": data["group_id"],
        "messages_sent": total_sent,
        "messages_failed": total_failed
    }

Error Recovery

Handle partial failures with checkpointing:
import json
from pathlib import Path

class RecoverableBatchImporter(BatchImporter):
    """Importer with checkpoint/resume support."""

    def __init__(self, checkpoint_file: str = "import_checkpoint.json", **kwargs):
        super().__init__(**kwargs)
        self.checkpoint_file = checkpoint_file
        self.checkpoint = self._load_checkpoint()

    def _load_checkpoint(self) -> Dict:
        """Load checkpoint from file."""
        if Path(self.checkpoint_file).exists():
            with open(self.checkpoint_file, "r") as f:
                return json.load(f)
        return {"completed_groups": [], "last_message_ids": {}}

    def _save_checkpoint(self):
        """Save checkpoint to file."""
        with open(self.checkpoint_file, "w") as f:
            json.dump(self.checkpoint, f, indent=2)

    async def import_conversation(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Import with checkpointing."""
        group_id = data["group_id"]

        # Skip if already completed
        if group_id in self.checkpoint["completed_groups"]:
            logger.info(f"Skipping {group_id} (already completed)")
            return {"group_id": group_id, "skipped": True}

        # Filter already-sent messages
        last_msg_id = self.checkpoint["last_message_ids"].get(group_id)
        messages = data.get("messages", [])

        if last_msg_id:
            # Find position after last sent message
            for i, msg in enumerate(messages):
                if msg["message_id"] == last_msg_id:
                    messages = messages[i + 1:]
                    logger.info(f"Resuming {group_id} from message {i + 1}")
                    break

            data = {**data, "messages": messages}

        result = await super().import_conversation(data)

        # Update checkpoint
        if messages:
            self.checkpoint["last_message_ids"][group_id] = messages[-1]["message_id"]

        if result.get("messages_failed", 0) == 0:
            self.checkpoint["completed_groups"].append(group_id)

        self._save_checkpoint()
        return result

    def clear_checkpoint(self):
        """Clear checkpoint file."""
        self.checkpoint = {"completed_groups": [], "last_message_ids": {}}
        if Path(self.checkpoint_file).exists():
            Path(self.checkpoint_file).unlink()

Best Practices

Always sort messages chronologically before import. EverMemOS uses message timestamps for boundary detection.
messages = sorted(messages, key=lambda m: m["create_time"])
Use appropriate delays between requests to avoid overwhelming the server.
# Local development: 10-50ms
rate_limit_delay = 0.01

# Cloud production: 50-100ms
rate_limit_delay = 0.05
Balance concurrency with rate limits. Higher concurrency speeds up imports but may trigger rate limits.
# Conservative
concurrency = 5

# Aggressive (with short delays)
concurrency = 50
For large imports, process files one at a time rather than loading all into memory.
for file_path in files:
    await importer.import_file(file_path)
    # File data is released after each import

Next Steps