Prerequisites
Install the EverOS SDK:pip install everos
export EVEROS_API_KEY="your-api-key"
export EVEROS_BASE_URL="https://api.evermind.ai"
Data Format
EverOS accepts two message formats depending on whether you are importing group conversations (multi-participant) or personal conversations (single user with an assistant).Group Conversation Format
Each message must includesender_id and sender_name to identify the participant:
{
"group_id": "conversation_001",
"messages": [
{
"role": "user",
"sender_id": "user_alice",
"sender_name": "Alice",
"timestamp": 1705312800000,
"content": "Let's discuss the new architecture proposal.",
"message_id": "msg_001"
},
{
"role": "user",
"sender_id": "user_bob",
"sender_name": "Bob",
"timestamp": 1705312890000,
"content": "I've reviewed it. I think we should consider microservices.",
"message_id": "msg_002"
}
]
}
Personal Conversation Format
For one-on-one conversations between a user and an assistant:{
"user_id": "user_alice",
"messages": [
{
"role": "user",
"timestamp": 1705312800000,
"content": "I love hiking on weekends."
},
{
"role": "assistant",
"timestamp": 1705312801000,
"content": "That sounds wonderful! Do you have a favorite trail?"
}
]
}
Field Reference
Group messages:| Field | Required | Description |
|---|---|---|
role | Yes | "user" for all group participants |
sender_id | Yes | Unique identifier for the sender |
sender_name | Yes | Display name of the sender |
timestamp | Yes | Unix timestamp in milliseconds |
content | Yes | Message text content |
message_id | No | Unique message identifier |
| Field | Required | Description |
|---|---|---|
role | Yes | "user" or "assistant" |
timestamp | Yes | Unix timestamp in milliseconds |
content | Yes | Message text content |
Converting Your Data
Convert your existing chat data to the EverOS format before importing:import json
import time
from datetime import datetime
from typing import List, Dict, Any
def convert_slack_export(slack_messages: List[Dict], channel_name: str) -> Dict[str, Any]:
"""Convert Slack export to EverOS group conversation format."""
messages = []
for msg in slack_messages:
ts = float(msg.get("ts", 0))
messages.append({
"role": "user",
"sender_id": msg.get("user", "unknown"),
"sender_name": msg.get("user_profile", {}).get(
"real_name", msg.get("user", "Unknown")
),
"timestamp": int(ts * 1000),
"content": msg.get("text", ""),
"message_id": msg.get("ts", ""),
})
messages.sort(key=lambda m: m["timestamp"])
return {
"group_id": f"slack_{channel_name}",
"messages": messages
}
def convert_discord_export(
discord_messages: List[Dict], channel_name: str
) -> Dict[str, Any]:
"""Convert Discord export to EverOS group conversation format."""
messages = []
for msg in discord_messages:
ts_str = msg.get("timestamp", datetime.now().isoformat() + "Z")
ts_ms = int(datetime.fromisoformat(ts_str.replace("Z", "+00:00")).timestamp() * 1000)
messages.append({
"role": "user",
"sender_id": msg.get("author", {}).get("id", "unknown"),
"sender_name": msg.get("author", {}).get("username", "Unknown"),
"timestamp": ts_ms,
"content": msg.get("content", ""),
"message_id": msg.get("id", ""),
})
messages.sort(key=lambda m: m["timestamp"])
return {
"group_id": f"discord_{channel_name}",
"messages": messages
}
# Save to file
def save_conversation(data: Dict[str, Any], output_path: str):
"""Save converted conversation to JSON file."""
with open(output_path, "w") as f:
json.dump(data, f, indent=2)
# Example
slack_data = [...] # Your Slack export
converted = convert_slack_export(slack_data, "engineering")
save_conversation(converted, "slack_import.json")
Batch Import Script
Here is a complete batch import script using the EverOS SDK with async mode for efficient processing:import asyncio
import json
import logging
import time
from pathlib import Path
from typing import List, Dict, Any, Optional
from everos import AsyncEverOS, NotFoundError
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def wait_for_task(
client: AsyncEverOS,
task_id: str,
poll_interval: float = 2.0,
max_attempts: int = 30,
) -> str:
"""Poll a task until completion. Returns final status."""
for attempt in range(max_attempts):
try:
resp = await client.v1.tasks.retrieve(task_id)
except NotFoundError:
# Task completed and was cleaned up (short TTL)
logger.info(f" [attempt {attempt + 1}] task {task_id} done (expired)")
return "done"
status = resp.data.status if resp.data else "unknown"
logger.info(f" [attempt {attempt + 1}] task {task_id} status={status}")
if status in ("success", "failed", "completed", "done"):
return status
await asyncio.sleep(poll_interval)
return "timeout"
class BatchImporter:
"""Batch import conversations into EverOS using the v1 SDK."""
def __init__(
self,
concurrency: int = 10,
chunk_size: int = 50,
):
self.client = AsyncEverOS()
self.concurrency = concurrency
self.chunk_size = chunk_size
self.semaphore = asyncio.Semaphore(concurrency)
# Statistics
self.stats = {
"messages_sent": 0,
"messages_failed": 0,
"conversations_processed": 0,
}
async def import_file(self, file_path: str) -> Dict[str, Any]:
"""Import a single conversation JSON file."""
with open(file_path, "r") as f:
data = json.load(f)
return await self.import_conversation(data)
async def import_conversation(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Import a conversation (group or personal)."""
is_group = "group_id" in data
conversation_id = data.get("group_id") or data.get("user_id")
messages = data.get("messages", [])
logger.info(
f"Processing {'group' if is_group else 'personal'} "
f"conversation: {conversation_id} ({len(messages)} messages)"
)
# 1. Set up group metadata if needed
if is_group and data.get("group_meta"):
await self._setup_group(data["group_id"], data["group_meta"])
# 2. Import messages in chunks
total_sent = 0
total_failed = 0
for i in range(0, len(messages), self.chunk_size):
chunk = messages[i : i + self.chunk_size]
sent, failed = await self._send_chunk(data, chunk, is_group)
total_sent += sent
total_failed += failed
# 3. Flush to trigger memory extraction
await self._flush(data, is_group)
self.stats["messages_sent"] += total_sent
self.stats["messages_failed"] += total_failed
self.stats["conversations_processed"] += 1
logger.info(
f"Completed {conversation_id}: {total_sent} sent, {total_failed} failed"
)
return {
"conversation_id": conversation_id,
"messages_sent": total_sent,
"messages_failed": total_failed,
}
async def _setup_group(self, group_id: str, group_meta: Dict):
"""Create or update group metadata."""
try:
await self.client.v1.groups.create(
group_id=group_id,
name=group_meta.get("name", group_id),
description=group_meta.get("description", ""),
)
except Exception as e:
logger.warning(f"Failed to create group {group_id}: {e}")
async def _send_chunk(
self, data: Dict, chunk: List[Dict], is_group: bool
) -> tuple:
"""Send a chunk of messages using async mode."""
async with self.semaphore:
try:
if is_group:
response = await self.client.v1.memories.group.add(
group_id=data["group_id"],
async_mode=True,
messages=chunk,
)
else:
response = await self.client.v1.memories.add(
user_id=data["user_id"],
async_mode=True,
messages=chunk,
)
# Wait for async task to complete
task_id = response.data.task_id if response.data else None
if task_id:
status = await wait_for_task(self.client, task_id)
if status in ("failed", "timeout"):
logger.error(f"Task {task_id} ended with status: {status}")
return 0, len(chunk)
return len(chunk), 0
except Exception as e:
logger.error(f"Failed to send chunk: {e}")
return 0, len(chunk)
async def _flush(self, data: Dict, is_group: bool):
"""Flush to trigger memory extraction."""
try:
if is_group:
await self.client.v1.memories.group.flush(
group_id=data["group_id"]
)
else:
await self.client.v1.memories.flush(user_id=data["user_id"])
except Exception as e:
logger.warning(f"Flush failed: {e}")
async def import_directory(self, directory: str) -> List[Dict]:
"""Import all JSON files in a directory."""
path = Path(directory)
files = sorted(path.glob("*.json"))
logger.info(f"Found {len(files)} files to import")
results = []
for file_path in files:
try:
result = await self.import_file(str(file_path))
results.append(result)
except Exception as e:
logger.error(f"Failed to import {file_path}: {e}")
results.append({"file": str(file_path), "error": str(e)})
return results
def get_stats(self) -> Dict[str, int]:
"""Get import statistics."""
return self.stats.copy()
# Usage
async def main():
importer = BatchImporter(
concurrency=20,
chunk_size=50,
)
# Import a single file
result = await importer.import_file("conversation_001.json")
print(f"Result: {result}")
# Import a directory
results = await importer.import_directory("./exports/")
print(f"Imported {len(results)} conversations")
print(f"Stats: {importer.get_stats()}")
if __name__ == "__main__":
asyncio.run(main())
Monitoring Progress
Add progress tracking for large imports:from tqdm.asyncio import tqdm
class MonitoredBatchImporter(BatchImporter):
"""Batch importer with progress bars."""
async def import_conversation(self, data: dict) -> dict:
"""Import with progress tracking."""
is_group = "group_id" in data
conversation_id = data.get("group_id") or data.get("user_id")
messages = data.get("messages", [])
if is_group and data.get("group_meta"):
await self._setup_group(data["group_id"], data["group_meta"])
total_sent = 0
total_failed = 0
chunks = [
messages[i : i + self.chunk_size]
for i in range(0, len(messages), self.chunk_size)
]
with tqdm(total=len(messages), desc=f"Importing {conversation_id}") as pbar:
for chunk in chunks:
sent, failed = await self._send_chunk(data, chunk, is_group)
total_sent += sent
total_failed += failed
pbar.update(len(chunk))
await self._flush(data, is_group)
self.stats["messages_sent"] += total_sent
self.stats["messages_failed"] += total_failed
self.stats["conversations_processed"] += 1
return {
"conversation_id": conversation_id,
"messages_sent": total_sent,
"messages_failed": total_failed,
}
Handling Large Conversations
For very large conversations, process in smaller chunks to manage memory and avoid timeouts:async def import_large_conversation(
importer: BatchImporter,
data: dict,
chunk_size: int = 1000,
) -> dict:
"""Import a large conversation in stages to manage memory."""
messages = data.get("messages", [])
total_chunks = (len(messages) + chunk_size - 1) // chunk_size
logger.info(f"Processing {len(messages)} messages in {total_chunks} stages")
total_sent = 0
total_failed = 0
for i in range(0, len(messages), chunk_size):
chunk = messages[i : i + chunk_size]
stage_data = {**data, "messages": chunk}
# Only set up group metadata on the first stage
if i > 0:
stage_data.pop("group_meta", None)
result = await importer.import_conversation(stage_data)
total_sent += result["messages_sent"]
total_failed += result["messages_failed"]
logger.info(f"Stage {i // chunk_size + 1}/{total_chunks} complete")
return {
"conversation_id": data.get("group_id") or data.get("user_id"),
"messages_sent": total_sent,
"messages_failed": total_failed,
}
Error Recovery
Handle partial failures with checkpointing:import json
from pathlib import Path
class RecoverableBatchImporter(BatchImporter):
"""Importer with checkpoint/resume support."""
def __init__(self, checkpoint_file: str = "import_checkpoint.json", **kwargs):
super().__init__(**kwargs)
self.checkpoint_file = checkpoint_file
self.checkpoint = self._load_checkpoint()
def _load_checkpoint(self) -> dict:
"""Load checkpoint from file."""
if Path(self.checkpoint_file).exists():
with open(self.checkpoint_file, "r") as f:
return json.load(f)
return {"completed": [], "last_message_index": {}}
def _save_checkpoint(self):
"""Save checkpoint to file."""
with open(self.checkpoint_file, "w") as f:
json.dump(self.checkpoint, f, indent=2)
async def import_conversation(self, data: dict) -> dict:
"""Import with checkpointing."""
conversation_id = data.get("group_id") or data.get("user_id")
# Skip if already completed
if conversation_id in self.checkpoint["completed"]:
logger.info(f"Skipping {conversation_id} (already completed)")
return {"conversation_id": conversation_id, "skipped": True}
# Resume from last checkpoint
last_index = self.checkpoint["last_message_index"].get(conversation_id, 0)
messages = data.get("messages", [])
if last_index > 0:
logger.info(f"Resuming {conversation_id} from message {last_index}")
data = {**data, "messages": messages[last_index:]}
result = await super().import_conversation(data)
# Update checkpoint
new_index = last_index + result.get("messages_sent", 0)
self.checkpoint["last_message_index"][conversation_id] = new_index
if result.get("messages_failed", 0) == 0:
self.checkpoint["completed"].append(conversation_id)
self._save_checkpoint()
return result
def clear_checkpoint(self):
"""Clear checkpoint file."""
self.checkpoint = {"completed": [], "last_message_index": {}}
if Path(self.checkpoint_file).exists():
Path(self.checkpoint_file).unlink()
Best Practices
Ordering
Ordering
Always sort messages chronologically before import. EverOS uses message timestamps for boundary detection.
messages.sort(key=lambda m: m["timestamp"])
Chunk Size
Chunk Size
The SDK sends messages in batches. Adjust
chunk_size based on your use case:# Small conversations (< 500 messages)
chunk_size = 50
# Large conversations (> 10,000 messages)
chunk_size = 100
Concurrency
Concurrency
Balance concurrency to avoid overwhelming the server. Higher concurrency speeds up imports but may trigger rate limits.
# Conservative
concurrency = 5
# Aggressive (for large batch imports)
concurrency = 50
Memory Management
Memory Management
For large imports, process files one at a time rather than loading all into memory.
for file_path in files:
await importer.import_file(file_path)
# File data is released after each import
Flush After Import
Flush After Import
Always call flush after importing a conversation to trigger memory extraction immediately. Without flushing, EverOS waits for the boundary detection timeout before processing.
# Group flush
await client.v1.memories.group.flush(group_id="group_001")
# Personal flush
await client.v1.memories.flush(user_id="user_010")
Next Steps
Python Integration
Production-ready client patterns
Team Collaboration
Use imported group chat memories