Installation
EverMemOS uses a simple REST API. You can use any HTTP client:Copy
# Sync (simple scripts)
pip install requests
# Async (production applications)
pip install aiohttp
# Alternative async client
pip install httpx
Basic Client Class
A reusable client class with proper configuration:Copy
import requests
from typing import Optional, List, Dict, Any
from datetime import datetime
class EverMemOSClient:
"""Synchronous client for EverMemOS API."""
def __init__(
self,
base_url: str = "https://api.evermind.ai",
api_key: Optional[str] = None,
timeout: int = 30
):
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self.headers = {"Content-Type": "application/json"}
if api_key:
self.headers["Authorization"] = f"Bearer {api_key}"
def store_message(
self,
group_id: str,
message_id: str,
sender: str,
content: str,
group_name: str = "Conversation",
sender_name: Optional[str] = None,
create_time: Optional[str] = None,
refer_list: Optional[List[Dict]] = None
) -> Dict[str, Any]:
"""Store a message in EverMemOS."""
payload = {
"group_id": group_id,
"group_name": group_name,
"message_id": message_id,
"sender": sender,
"sender_name": sender_name or sender,
"content": content,
"create_time": create_time or datetime.now().isoformat() + "Z",
"refer_list": refer_list or []
}
response = requests.post(
f"{self.base_url}/api/v0/memories",
json=payload,
headers=self.headers,
timeout=self.timeout
)
response.raise_for_status()
return response.json()
def search(
self,
query: str,
user_id: Optional[str] = None,
group_ids: Optional[List[str]] = None,
retrieve_method: str = "hybrid",
top_k: int = 10,
memory_types: Optional[List[str]] = None,
current_time: Optional[str] = None
) -> Dict[str, Any]:
"""Search memories."""
payload = {
"query": query,
"retrieve_method": retrieve_method,
"top_k": top_k,
"memory_types": memory_types or ["episodic_memory", "profile"]
}
if user_id:
payload["user_id"] = user_id
if group_ids:
payload["group_ids"] = group_ids
if current_time:
payload["current_time"] = current_time
response = requests.get(
f"{self.base_url}/api/v0/memories/search",
json=payload,
headers=self.headers,
timeout=self.timeout
)
response.raise_for_status()
return response.json()
def get_memories(
self,
user_id: str,
memory_type: str = "episodic_memory",
page: int = 1,
page_size: int = 20
) -> Dict[str, Any]:
"""Retrieve memories by user and type."""
payload = {
"user_id": user_id,
"memory_type": memory_type,
"page": page,
"page_size": page_size
}
response = requests.get(
f"{self.base_url}/api/v0/memories",
json=payload,
headers=self.headers,
timeout=self.timeout
)
response.raise_for_status()
return response.json()
def set_conversation_meta(
self,
group_id: str,
scene: str,
user_details: List[Dict],
group_name: Optional[str] = None
) -> Dict[str, Any]:
"""Set conversation metadata."""
payload = {
"group_id": group_id,
"scene": scene,
"user_details": user_details
}
if group_name:
payload["group_name"] = group_name
response = requests.post(
f"{self.base_url}/api/v0/memories/conversation-meta",
json=payload,
headers=self.headers,
timeout=self.timeout
)
response.raise_for_status()
return response.json()
# Usage
client = EverMemOSClient(
base_url="https://api.evermind.ai",
api_key="your_api_key" # Optional for local
)
# Store a message
client.store_message(
group_id="conv_001",
message_id="msg_001",
sender="user_alice",
content="I prefer morning meetings"
)
# Search
results = client.search(
query="meeting preferences",
user_id="user_alice"
)
Async Client with aiohttp
For production applications handling concurrent requests:Copy
import aiohttp
import asyncio
from typing import Optional, List, Dict, Any
from datetime import datetime
class AsyncEverMemOSClient:
"""Async client for EverMemOS API with connection pooling."""
def __init__(
self,
base_url: str = "https://api.evermind.ai",
api_key: Optional[str] = None,
timeout: int = 30,
max_connections: int = 100
):
self.base_url = base_url.rstrip("/")
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.headers = {"Content-Type": "application/json"}
if api_key:
self.headers["Authorization"] = f"Bearer {api_key}"
# Connection pool configuration
self.connector = aiohttp.TCPConnector(
limit=max_connections,
limit_per_host=max_connections,
keepalive_timeout=30
)
self._session: Optional[aiohttp.ClientSession] = None
async def _get_session(self) -> aiohttp.ClientSession:
"""Get or create the session."""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
connector=self.connector,
timeout=self.timeout,
headers=self.headers
)
return self._session
async def close(self):
"""Close the client session."""
if self._session and not self._session.closed:
await self._session.close()
async def store_message(
self,
group_id: str,
message_id: str,
sender: str,
content: str,
**kwargs
) -> Dict[str, Any]:
"""Store a message asynchronously."""
session = await self._get_session()
payload = {
"group_id": group_id,
"group_name": kwargs.get("group_name", "Conversation"),
"message_id": message_id,
"sender": sender,
"sender_name": kwargs.get("sender_name", sender),
"content": content,
"create_time": kwargs.get("create_time", datetime.now().isoformat() + "Z"),
"refer_list": kwargs.get("refer_list", [])
}
async with session.post(f"{self.base_url}/api/v0/memories", json=payload) as resp:
resp.raise_for_status()
return await resp.json()
async def search(
self,
query: str,
user_id: Optional[str] = None,
group_ids: Optional[List[str]] = None,
retrieve_method: str = "hybrid",
top_k: int = 10,
memory_types: Optional[List[str]] = None,
current_time: Optional[str] = None
) -> Dict[str, Any]:
"""Search memories asynchronously."""
session = await self._get_session()
payload = {
"query": query,
"retrieve_method": retrieve_method,
"top_k": top_k,
"memory_types": memory_types or ["episodic_memory", "profile"]
}
if user_id:
payload["user_id"] = user_id
if group_ids:
payload["group_ids"] = group_ids
if current_time:
payload["current_time"] = current_time
async with session.get(f"{self.base_url}/api/v0/memories/search", json=payload) as resp:
resp.raise_for_status()
return await resp.json()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
# Usage with context manager
async def main():
async with AsyncEverMemOSClient(base_url="https://api.evermind.ai") as client:
# Store messages concurrently
tasks = [
client.store_message("conv_001", f"msg_{i}", "user_alice", f"Message {i}")
for i in range(10)
]
results = await asyncio.gather(*tasks)
print(f"Stored {len(results)} messages")
# Search
search_result = await client.search("message", user_id="user_alice")
print(f"Found {len(search_result.get('result', {}).get('memories', []))} memories")
asyncio.run(main())
Error Handling and Retries
Robust error handling with exponential backoff:Copy
import asyncio
import aiohttp
from typing import Optional, TypeVar, Callable
import logging
logger = logging.getLogger(__name__)
T = TypeVar('T')
class EverMemOSError(Exception):
"""Base exception for EverMemOS errors."""
pass
class RateLimitError(EverMemOSError):
"""Rate limit exceeded."""
pass
class ServerError(EverMemOSError):
"""Server-side error."""
pass
async def with_retry(
func: Callable[[], T],
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
retryable_exceptions: tuple = (aiohttp.ClientError, ServerError, asyncio.TimeoutError)
) -> T:
"""Execute function with exponential backoff retry."""
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func()
except RateLimitError as e:
# Rate limits need longer backoff
delay = min(base_delay * (4 ** attempt), max_delay)
logger.warning(f"Rate limited, retrying in {delay}s (attempt {attempt + 1})")
await asyncio.sleep(delay)
last_exception = e
except retryable_exceptions as e:
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
logger.warning(f"Request failed, retrying in {delay}s: {e}")
await asyncio.sleep(delay)
last_exception = e
raise last_exception or EverMemOSError("Max retries exceeded")
class RobustEverMemOSClient(AsyncEverMemOSClient):
"""Async client with built-in retry logic."""
async def _request(self, method: str, url: str, **kwargs) -> Dict:
"""Make request with error handling."""
session = await self._get_session()
async def do_request():
async with session.request(method, url, **kwargs) as resp:
if resp.status == 429:
raise RateLimitError("Rate limit exceeded")
if resp.status >= 500:
raise ServerError(f"Server error: {resp.status}")
resp.raise_for_status()
return await resp.json()
return await with_retry(do_request)
async def store_message(self, group_id: str, message_id: str, sender: str, content: str, **kwargs):
"""Store with automatic retry."""
payload = {
"group_id": group_id,
"message_id": message_id,
"sender": sender,
"content": content,
"group_name": kwargs.get("group_name", "Conversation"),
"sender_name": kwargs.get("sender_name", sender),
"create_time": kwargs.get("create_time", datetime.now().isoformat() + "Z"),
"refer_list": kwargs.get("refer_list", [])
}
return await self._request("POST", f"{self.base_url}/api/v0/memories", json=payload)
async def search(self, query: str, **kwargs):
"""Search with automatic retry."""
payload = {
"query": query,
"retrieve_method": kwargs.get("retrieve_method", "hybrid"),
"top_k": kwargs.get("top_k", 10),
"memory_types": kwargs.get("memory_types", ["episodic_memory", "profile"])
}
for key in ["user_id", "group_ids", "current_time"]:
if kwargs.get(key):
payload[key] = kwargs[key]
return await self._request("GET", f"{self.base_url}/api/v0/memories/search", json=payload)
Fire-and-Forget Message Storage
For non-critical message storage that shouldn’t block your main flow:Copy
import asyncio
from typing import Dict, Any
import logging
logger = logging.getLogger(__name__)
class FireAndForgetStore:
"""Non-blocking message storage with background queue."""
def __init__(self, client: AsyncEverMemOSClient, max_queue_size: int = 1000):
self.client = client
self.queue: asyncio.Queue = asyncio.Queue(maxsize=max_queue_size)
self._worker_task: Optional[asyncio.Task] = None
async def start(self):
"""Start the background worker."""
self._worker_task = asyncio.create_task(self._worker())
async def stop(self):
"""Stop the worker and flush remaining messages."""
if self._worker_task:
self._worker_task.cancel()
try:
await self._worker_task
except asyncio.CancelledError:
pass
# Flush remaining
while not self.queue.empty():
msg = await self.queue.get()
await self._store_message(msg)
async def _worker(self):
"""Background worker that processes the queue."""
while True:
try:
msg = await self.queue.get()
await self._store_message(msg)
self.queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Failed to store message: {e}")
async def _store_message(self, msg: Dict[str, Any]):
"""Store a single message."""
try:
await self.client.store_message(**msg)
except Exception as e:
logger.error(f"Store failed: {e}")
def store(self, group_id: str, message_id: str, sender: str, content: str, **kwargs):
"""Queue a message for storage (non-blocking)."""
msg = {
"group_id": group_id,
"message_id": message_id,
"sender": sender,
"content": content,
**kwargs
}
try:
self.queue.put_nowait(msg)
except asyncio.QueueFull:
logger.warning("Message queue full, dropping message")
# Usage
async def main():
client = AsyncEverMemOSClient()
store = FireAndForgetStore(client)
await store.start()
# These don't block - messages are queued
for i in range(100):
store.store("conv_001", f"msg_{i}", "user", f"Message {i}")
# Do other work while messages are stored in background
await asyncio.sleep(1)
await store.stop()
await client.close()
Logging and Monitoring
Add observability to your EverMemOS integration:Copy
import time
import logging
from functools import wraps
from typing import Callable, Any
logger = logging.getLogger("evermemos")
def log_request(func: Callable) -> Callable:
"""Decorator to log API requests."""
@wraps(func)
async def wrapper(*args, **kwargs) -> Any:
start = time.perf_counter()
method_name = func.__name__
try:
result = await func(*args, **kwargs)
duration = (time.perf_counter() - start) * 1000
logger.info(
f"EverMemOS {method_name} completed",
extra={
"method": method_name,
"duration_ms": duration,
"success": True
}
)
return result
except Exception as e:
duration = (time.perf_counter() - start) * 1000
logger.error(
f"EverMemOS {method_name} failed: {e}",
extra={
"method": method_name,
"duration_ms": duration,
"success": False,
"error": str(e)
}
)
raise
return wrapper
class ObservableClient(AsyncEverMemOSClient):
"""Client with built-in logging."""
@log_request
async def store_message(self, *args, **kwargs):
return await super().store_message(*args, **kwargs)
@log_request
async def search(self, *args, **kwargs):
return await super().search(*args, **kwargs)
Best Practices Summary
Connection Management
Connection Management
- Use connection pooling for high-throughput applications
- Reuse client instances instead of creating new ones per request
- Close sessions properly when shutting down
Error Handling
Error Handling
- Implement exponential backoff for retries
- Distinguish between retryable and non-retryable errors
- Log failures for debugging
Performance
Performance
- Use async clients for concurrent operations
- Use fire-and-forget for non-critical storage
- Batch operations when possible
Timeouts
Timeouts
- Set appropriate timeouts (30s for search, 10s for store)
- Agentic retrieval may need longer timeouts (60s+)
- Handle timeout errors gracefully