Data Format: GroupChatFormat
EverMemOS uses a JSON format calledGroupChatFormat for batch imports. Each file contains messages from a single conversation group.
Copy
{
"group_id": "conversation_001",
"group_name": "Team Engineering",
"scene": "group_chat",
"user_details": [
{"user_id": "user_alice", "user_name": "Alice", "role": "tech_lead"},
{"user_id": "user_bob", "user_name": "Bob", "role": "engineer"}
],
"messages": [
{
"message_id": "msg_001",
"create_time": "2024-01-15T10:00:00Z",
"sender": "user_alice",
"sender_name": "Alice",
"content": "Let's discuss the new architecture proposal.",
"refer_list": []
},
{
"message_id": "msg_002",
"create_time": "2024-01-15T10:01:30Z",
"sender": "user_bob",
"sender_name": "Bob",
"content": "I've reviewed it. I think we should consider microservices.",
"refer_list": [
{
"message_id": "msg_001",
"content": "Let's discuss the new architecture proposal."
}
]
}
]
}
Field Reference
| Field | Required | Description |
|---|---|---|
group_id | Yes | Unique conversation identifier |
group_name | Yes | Human-readable conversation name |
scene | No | assistant or group_chat (default: group_chat) |
user_details | No | Participant information with roles |
messages | Yes | Array of messages in chronological order |
Message Fields
| Field | Required | Description |
|---|---|---|
message_id | Yes | Unique message identifier |
create_time | Yes | ISO 8601 timestamp |
sender | Yes | User ID of sender |
sender_name | No | Display name of sender |
content | Yes | Message text content |
refer_list | No | Referenced messages (replies) |
Converting Your Data
Convert your existing chat data to GroupChatFormat:Copy
import json
from datetime import datetime
from typing import List, Dict, Any
def convert_slack_export(slack_messages: List[Dict]) -> Dict[str, Any]:
"""Convert Slack export to GroupChatFormat."""
messages = []
for msg in slack_messages:
converted = {
"message_id": msg.get("ts", ""), # Slack uses timestamp as ID
"create_time": datetime.fromtimestamp(float(msg.get("ts", 0))).isoformat() + "Z",
"sender": msg.get("user", "unknown"),
"sender_name": msg.get("user_profile", {}).get("real_name", msg.get("user", "Unknown")),
"content": msg.get("text", ""),
"refer_list": []
}
# Handle thread replies
if msg.get("thread_ts") and msg.get("thread_ts") != msg.get("ts"):
converted["refer_list"] = [{
"message_id": msg.get("thread_ts"),
"content": "" # Will be filled during processing
}]
messages.append(converted)
return {
"group_id": f"slack_{slack_messages[0].get('channel', 'unknown')}",
"group_name": "Imported from Slack",
"scene": "group_chat",
"messages": sorted(messages, key=lambda m: m["create_time"])
}
def convert_discord_export(discord_messages: List[Dict], channel_name: str) -> Dict[str, Any]:
"""Convert Discord export to GroupChatFormat."""
messages = []
for msg in discord_messages:
converted = {
"message_id": msg.get("id", ""),
"create_time": msg.get("timestamp", datetime.now().isoformat() + "Z"),
"sender": msg.get("author", {}).get("id", "unknown"),
"sender_name": msg.get("author", {}).get("username", "Unknown"),
"content": msg.get("content", ""),
"refer_list": []
}
# Handle replies
if msg.get("referenced_message"):
ref = msg["referenced_message"]
converted["refer_list"] = [{
"message_id": ref.get("id", ""),
"content": ref.get("content", "")[:100] # Truncate long content
}]
messages.append(converted)
return {
"group_id": f"discord_{channel_name}",
"group_name": channel_name,
"scene": "group_chat",
"messages": sorted(messages, key=lambda m: m["create_time"])
}
# Save to file
def save_group_chat(data: Dict[str, Any], output_path: str):
"""Save GroupChatFormat to JSON file."""
with open(output_path, "w") as f:
json.dump(data, f, indent=2)
# Example
slack_data = [...] # Your Slack export
converted = convert_slack_export(slack_data)
save_group_chat(converted, "slack_import.json")
Batch Import Script
Here’s a complete batch import script:Copy
import asyncio
import aiohttp
import json
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class BatchImporter:
"""Batch import conversations into EverMemOS."""
def __init__(
self,
base_url: str = "https://api.evermind.ai",
api_key: Optional[str] = None,
concurrency: int = 10,
rate_limit_delay: float = 0.1 # seconds between requests
):
self.base_url = base_url.rstrip("/")
self.headers = {"Content-Type": "application/json"}
if api_key:
self.headers["Authorization"] = f"Bearer {api_key}"
self.concurrency = concurrency
self.rate_limit_delay = rate_limit_delay
self.semaphore = asyncio.Semaphore(concurrency)
# Statistics
self.stats = {
"messages_sent": 0,
"messages_failed": 0,
"conversations_processed": 0
}
async def import_file(self, file_path: str) -> Dict[str, Any]:
"""Import a single GroupChatFormat JSON file."""
with open(file_path, "r") as f:
data = json.load(f)
return await self.import_conversation(data)
async def import_conversation(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Import a conversation from GroupChatFormat data."""
group_id = data["group_id"]
logger.info(f"Processing conversation: {group_id}")
async with aiohttp.ClientSession(headers=self.headers) as session:
# 1. Set conversation metadata
if data.get("user_details"):
await self._set_metadata(session, data)
# 2. Import messages concurrently
messages = data.get("messages", [])
tasks = [
self._send_message(session, group_id, data.get("group_name", "Import"), msg)
for msg in messages
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Count successes and failures
successes = sum(1 for r in results if not isinstance(r, Exception))
failures = sum(1 for r in results if isinstance(r, Exception))
self.stats["messages_sent"] += successes
self.stats["messages_failed"] += failures
self.stats["conversations_processed"] += 1
logger.info(f"Completed {group_id}: {successes} sent, {failures} failed")
return {
"group_id": group_id,
"messages_sent": successes,
"messages_failed": failures
}
async def _set_metadata(self, session: aiohttp.ClientSession, data: Dict):
"""Set conversation metadata."""
meta = {
"group_id": data["group_id"],
"group_name": data.get("group_name", "Import"),
"scene": data.get("scene", "group_chat"),
"user_details": data.get("user_details", [])
}
try:
async with session.post(
f"{self.base_url}/api/v0/memories/conversation-meta",
json=meta
) as resp:
resp.raise_for_status()
except Exception as e:
logger.warning(f"Failed to set metadata for {data['group_id']}: {e}")
async def _send_message(
self,
session: aiohttp.ClientSession,
group_id: str,
group_name: str,
message: Dict
):
"""Send a single message with rate limiting."""
async with self.semaphore:
payload = {
"group_id": group_id,
"group_name": group_name,
"message_id": message["message_id"],
"create_time": message["create_time"],
"sender": message["sender"],
"sender_name": message.get("sender_name", message["sender"]),
"content": message["content"],
"refer_list": message.get("refer_list", [])
}
try:
async with session.post(
f"{self.base_url}/api/v0/memories",
json=payload
) as resp:
resp.raise_for_status()
await asyncio.sleep(self.rate_limit_delay)
return await resp.json()
except Exception as e:
logger.error(f"Failed to send message {message['message_id']}: {e}")
raise
async def import_directory(self, directory: str) -> List[Dict]:
"""Import all JSON files in a directory."""
path = Path(directory)
files = list(path.glob("*.json"))
logger.info(f"Found {len(files)} files to import")
results = []
for file_path in files:
try:
result = await self.import_file(str(file_path))
results.append(result)
except Exception as e:
logger.error(f"Failed to import {file_path}: {e}")
results.append({"file": str(file_path), "error": str(e)})
return results
def get_stats(self) -> Dict[str, int]:
"""Get import statistics."""
return self.stats.copy()
# Usage
async def main():
importer = BatchImporter(
base_url="https://api.evermind.ai",
concurrency=20,
rate_limit_delay=0.05 # 50ms between requests
)
# Import a single file
result = await importer.import_file("conversation_001.json")
print(f"Result: {result}")
# Import a directory
results = await importer.import_directory("./exports/")
print(f"Imported {len(results)} conversations")
print(f"Stats: {importer.get_stats()}")
if __name__ == "__main__":
asyncio.run(main())
Monitoring Progress
Add progress tracking for large imports:Copy
from tqdm.asyncio import tqdm
import asyncio
class MonitoredBatchImporter(BatchImporter):
"""Batch importer with progress bars."""
async def import_conversation(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Import with progress bar."""
group_id = data["group_id"]
messages = data.get("messages", [])
async with aiohttp.ClientSession(headers=self.headers) as session:
if data.get("user_details"):
await self._set_metadata(session, data)
# Progress bar for messages
results = []
async for result in tqdm(
self._send_messages_generator(session, group_id, data.get("group_name", "Import"), messages),
total=len(messages),
desc=f"Importing {group_id}"
):
results.append(result)
successes = sum(1 for r in results if not isinstance(r, Exception))
failures = len(results) - successes
self.stats["messages_sent"] += successes
self.stats["messages_failed"] += failures
self.stats["conversations_processed"] += 1
return {"group_id": group_id, "messages_sent": successes, "messages_failed": failures}
async def _send_messages_generator(self, session, group_id, group_name, messages):
"""Generator that yields results as messages are sent."""
for msg in messages:
try:
result = await self._send_message(session, group_id, group_name, msg)
yield result
except Exception as e:
yield e
Handling Large Conversations
For very large conversations, process in chunks:Copy
async def import_large_conversation(
importer: BatchImporter,
data: Dict[str, Any],
chunk_size: int = 1000
) -> Dict[str, Any]:
"""Import large conversation in chunks to manage memory."""
messages = data.get("messages", [])
total_chunks = (len(messages) + chunk_size - 1) // chunk_size
logger.info(f"Processing {len(messages)} messages in {total_chunks} chunks")
total_sent = 0
total_failed = 0
for i in range(0, len(messages), chunk_size):
chunk = messages[i:i + chunk_size]
chunk_data = {**data, "messages": chunk}
# Don't reset metadata for subsequent chunks
if i > 0:
chunk_data.pop("user_details", None)
result = await importer.import_conversation(chunk_data)
total_sent += result["messages_sent"]
total_failed += result["messages_failed"]
logger.info(f"Chunk {i // chunk_size + 1}/{total_chunks} complete")
return {
"group_id": data["group_id"],
"messages_sent": total_sent,
"messages_failed": total_failed
}
Error Recovery
Handle partial failures with checkpointing:Copy
import json
from pathlib import Path
class RecoverableBatchImporter(BatchImporter):
"""Importer with checkpoint/resume support."""
def __init__(self, checkpoint_file: str = "import_checkpoint.json", **kwargs):
super().__init__(**kwargs)
self.checkpoint_file = checkpoint_file
self.checkpoint = self._load_checkpoint()
def _load_checkpoint(self) -> Dict:
"""Load checkpoint from file."""
if Path(self.checkpoint_file).exists():
with open(self.checkpoint_file, "r") as f:
return json.load(f)
return {"completed_groups": [], "last_message_ids": {}}
def _save_checkpoint(self):
"""Save checkpoint to file."""
with open(self.checkpoint_file, "w") as f:
json.dump(self.checkpoint, f, indent=2)
async def import_conversation(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Import with checkpointing."""
group_id = data["group_id"]
# Skip if already completed
if group_id in self.checkpoint["completed_groups"]:
logger.info(f"Skipping {group_id} (already completed)")
return {"group_id": group_id, "skipped": True}
# Filter already-sent messages
last_msg_id = self.checkpoint["last_message_ids"].get(group_id)
messages = data.get("messages", [])
if last_msg_id:
# Find position after last sent message
for i, msg in enumerate(messages):
if msg["message_id"] == last_msg_id:
messages = messages[i + 1:]
logger.info(f"Resuming {group_id} from message {i + 1}")
break
data = {**data, "messages": messages}
result = await super().import_conversation(data)
# Update checkpoint
if messages:
self.checkpoint["last_message_ids"][group_id] = messages[-1]["message_id"]
if result.get("messages_failed", 0) == 0:
self.checkpoint["completed_groups"].append(group_id)
self._save_checkpoint()
return result
def clear_checkpoint(self):
"""Clear checkpoint file."""
self.checkpoint = {"completed_groups": [], "last_message_ids": {}}
if Path(self.checkpoint_file).exists():
Path(self.checkpoint_file).unlink()
Best Practices
Ordering
Ordering
Always sort messages chronologically before import. EverMemOS uses message timestamps for boundary detection.
Copy
messages = sorted(messages, key=lambda m: m["create_time"])
Rate Limiting
Rate Limiting
Use appropriate delays between requests to avoid overwhelming the server.
Copy
# Local development: 10-50ms
rate_limit_delay = 0.01
# Cloud production: 50-100ms
rate_limit_delay = 0.05
Concurrency
Concurrency
Balance concurrency with rate limits. Higher concurrency speeds up imports but may trigger rate limits.
Copy
# Conservative
concurrency = 5
# Aggressive (with short delays)
concurrency = 50
Memory Management
Memory Management
For large imports, process files one at a time rather than loading all into memory.
Copy
for file_path in files:
await importer.import_file(file_path)
# File data is released after each import