Skip to main content
This guide covers production-ready patterns for integrating EverMemOS into Python applications, including async/await, error handling, connection pooling, and best practices.

Installation

EverMemOS uses a simple REST API. You can use any HTTP client:
# Sync (simple scripts)
pip install requests

# Async (production applications)
pip install aiohttp

# Alternative async client
pip install httpx

Basic Client Class

A reusable client class with proper configuration:
import requests
from typing import Optional, List, Dict, Any
from datetime import datetime

class EverMemOSClient:
    """Synchronous client for EverMemOS API."""

    def __init__(
        self,
        base_url: str = "https://api.evermind.ai",
        api_key: Optional[str] = None,
        timeout: int = 30
    ):
        self.base_url = base_url.rstrip("/")
        self.timeout = timeout
        self.headers = {"Content-Type": "application/json"}

        if api_key:
            self.headers["Authorization"] = f"Bearer {api_key}"

    def store_message(
        self,
        group_id: str,
        message_id: str,
        sender: str,
        content: str,
        group_name: str = "Conversation",
        sender_name: Optional[str] = None,
        create_time: Optional[str] = None,
        refer_list: Optional[List[Dict]] = None
    ) -> Dict[str, Any]:
        """Store a message in EverMemOS."""
        payload = {
            "group_id": group_id,
            "group_name": group_name,
            "message_id": message_id,
            "sender": sender,
            "sender_name": sender_name or sender,
            "content": content,
            "create_time": create_time or datetime.now().isoformat() + "Z",
            "refer_list": refer_list or []
        }

        response = requests.post(
            f"{self.base_url}/api/v0/memories",
            json=payload,
            headers=self.headers,
            timeout=self.timeout
        )
        response.raise_for_status()
        return response.json()

    def search(
        self,
        query: str,
        user_id: Optional[str] = None,
        group_ids: Optional[List[str]] = None,
        retrieve_method: str = "hybrid",
        top_k: int = 10,
        memory_types: Optional[List[str]] = None,
        current_time: Optional[str] = None
    ) -> Dict[str, Any]:
        """Search memories."""
        payload = {
            "query": query,
            "retrieve_method": retrieve_method,
            "top_k": top_k,
            "memory_types": memory_types or ["episodic_memory", "profile"]
        }

        if user_id:
            payload["user_id"] = user_id
        if group_ids:
            payload["group_ids"] = group_ids
        if current_time:
            payload["current_time"] = current_time

        response = requests.get(
            f"{self.base_url}/api/v0/memories/search",
            json=payload,
            headers=self.headers,
            timeout=self.timeout
        )
        response.raise_for_status()
        return response.json()

    def get_memories(
        self,
        user_id: str,
        memory_type: str = "episodic_memory",
        page: int = 1,
        page_size: int = 20
    ) -> Dict[str, Any]:
        """Retrieve memories by user and type."""
        payload = {
            "user_id": user_id,
            "memory_type": memory_type,
            "page": page,
            "page_size": page_size
        }

        response = requests.get(
            f"{self.base_url}/api/v0/memories",
            json=payload,
            headers=self.headers,
            timeout=self.timeout
        )
        response.raise_for_status()
        return response.json()

    def set_conversation_meta(
        self,
        group_id: str,
        scene: str,
        user_details: List[Dict],
        group_name: Optional[str] = None
    ) -> Dict[str, Any]:
        """Set conversation metadata."""
        payload = {
            "group_id": group_id,
            "scene": scene,
            "user_details": user_details
        }

        if group_name:
            payload["group_name"] = group_name

        response = requests.post(
            f"{self.base_url}/api/v0/memories/conversation-meta",
            json=payload,
            headers=self.headers,
            timeout=self.timeout
        )
        response.raise_for_status()
        return response.json()


# Usage
client = EverMemOSClient(
    base_url="https://api.evermind.ai",
    api_key="your_api_key"  # Optional for local
)

# Store a message
client.store_message(
    group_id="conv_001",
    message_id="msg_001",
    sender="user_alice",
    content="I prefer morning meetings"
)

# Search
results = client.search(
    query="meeting preferences",
    user_id="user_alice"
)

Async Client with aiohttp

For production applications handling concurrent requests:
import aiohttp
import asyncio
from typing import Optional, List, Dict, Any
from datetime import datetime

class AsyncEverMemOSClient:
    """Async client for EverMemOS API with connection pooling."""

    def __init__(
        self,
        base_url: str = "https://api.evermind.ai",
        api_key: Optional[str] = None,
        timeout: int = 30,
        max_connections: int = 100
    ):
        self.base_url = base_url.rstrip("/")
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.headers = {"Content-Type": "application/json"}

        if api_key:
            self.headers["Authorization"] = f"Bearer {api_key}"

        # Connection pool configuration
        self.connector = aiohttp.TCPConnector(
            limit=max_connections,
            limit_per_host=max_connections,
            keepalive_timeout=30
        )
        self._session: Optional[aiohttp.ClientSession] = None

    async def _get_session(self) -> aiohttp.ClientSession:
        """Get or create the session."""
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession(
                connector=self.connector,
                timeout=self.timeout,
                headers=self.headers
            )
        return self._session

    async def close(self):
        """Close the client session."""
        if self._session and not self._session.closed:
            await self._session.close()

    async def store_message(
        self,
        group_id: str,
        message_id: str,
        sender: str,
        content: str,
        **kwargs
    ) -> Dict[str, Any]:
        """Store a message asynchronously."""
        session = await self._get_session()

        payload = {
            "group_id": group_id,
            "group_name": kwargs.get("group_name", "Conversation"),
            "message_id": message_id,
            "sender": sender,
            "sender_name": kwargs.get("sender_name", sender),
            "content": content,
            "create_time": kwargs.get("create_time", datetime.now().isoformat() + "Z"),
            "refer_list": kwargs.get("refer_list", [])
        }

        async with session.post(f"{self.base_url}/api/v0/memories", json=payload) as resp:
            resp.raise_for_status()
            return await resp.json()

    async def search(
        self,
        query: str,
        user_id: Optional[str] = None,
        group_ids: Optional[List[str]] = None,
        retrieve_method: str = "hybrid",
        top_k: int = 10,
        memory_types: Optional[List[str]] = None,
        current_time: Optional[str] = None
    ) -> Dict[str, Any]:
        """Search memories asynchronously."""
        session = await self._get_session()

        payload = {
            "query": query,
            "retrieve_method": retrieve_method,
            "top_k": top_k,
            "memory_types": memory_types or ["episodic_memory", "profile"]
        }

        if user_id:
            payload["user_id"] = user_id
        if group_ids:
            payload["group_ids"] = group_ids
        if current_time:
            payload["current_time"] = current_time

        async with session.get(f"{self.base_url}/api/v0/memories/search", json=payload) as resp:
            resp.raise_for_status()
            return await resp.json()

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()


# Usage with context manager
async def main():
    async with AsyncEverMemOSClient(base_url="https://api.evermind.ai") as client:
        # Store messages concurrently
        tasks = [
            client.store_message("conv_001", f"msg_{i}", "user_alice", f"Message {i}")
            for i in range(10)
        ]
        results = await asyncio.gather(*tasks)
        print(f"Stored {len(results)} messages")

        # Search
        search_result = await client.search("message", user_id="user_alice")
        print(f"Found {len(search_result.get('result', {}).get('memories', []))} memories")

asyncio.run(main())

Error Handling and Retries

Robust error handling with exponential backoff:
import asyncio
import aiohttp
from typing import Optional, TypeVar, Callable
import logging

logger = logging.getLogger(__name__)

T = TypeVar('T')

class EverMemOSError(Exception):
    """Base exception for EverMemOS errors."""
    pass

class RateLimitError(EverMemOSError):
    """Rate limit exceeded."""
    pass

class ServerError(EverMemOSError):
    """Server-side error."""
    pass

async def with_retry(
    func: Callable[[], T],
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 30.0,
    retryable_exceptions: tuple = (aiohttp.ClientError, ServerError, asyncio.TimeoutError)
) -> T:
    """Execute function with exponential backoff retry."""
    last_exception = None

    for attempt in range(max_retries + 1):
        try:
            return await func()
        except RateLimitError as e:
            # Rate limits need longer backoff
            delay = min(base_delay * (4 ** attempt), max_delay)
            logger.warning(f"Rate limited, retrying in {delay}s (attempt {attempt + 1})")
            await asyncio.sleep(delay)
            last_exception = e
        except retryable_exceptions as e:
            if attempt == max_retries:
                raise
            delay = min(base_delay * (2 ** attempt), max_delay)
            logger.warning(f"Request failed, retrying in {delay}s: {e}")
            await asyncio.sleep(delay)
            last_exception = e

    raise last_exception or EverMemOSError("Max retries exceeded")


class RobustEverMemOSClient(AsyncEverMemOSClient):
    """Async client with built-in retry logic."""

    async def _request(self, method: str, url: str, **kwargs) -> Dict:
        """Make request with error handling."""
        session = await self._get_session()

        async def do_request():
            async with session.request(method, url, **kwargs) as resp:
                if resp.status == 429:
                    raise RateLimitError("Rate limit exceeded")
                if resp.status >= 500:
                    raise ServerError(f"Server error: {resp.status}")
                resp.raise_for_status()
                return await resp.json()

        return await with_retry(do_request)

    async def store_message(self, group_id: str, message_id: str, sender: str, content: str, **kwargs):
        """Store with automatic retry."""
        payload = {
            "group_id": group_id,
            "message_id": message_id,
            "sender": sender,
            "content": content,
            "group_name": kwargs.get("group_name", "Conversation"),
            "sender_name": kwargs.get("sender_name", sender),
            "create_time": kwargs.get("create_time", datetime.now().isoformat() + "Z"),
            "refer_list": kwargs.get("refer_list", [])
        }
        return await self._request("POST", f"{self.base_url}/api/v0/memories", json=payload)

    async def search(self, query: str, **kwargs):
        """Search with automatic retry."""
        payload = {
            "query": query,
            "retrieve_method": kwargs.get("retrieve_method", "hybrid"),
            "top_k": kwargs.get("top_k", 10),
            "memory_types": kwargs.get("memory_types", ["episodic_memory", "profile"])
        }
        for key in ["user_id", "group_ids", "current_time"]:
            if kwargs.get(key):
                payload[key] = kwargs[key]

        return await self._request("GET", f"{self.base_url}/api/v0/memories/search", json=payload)

Fire-and-Forget Message Storage

For non-critical message storage that shouldn’t block your main flow:
import asyncio
from typing import Dict, Any
import logging

logger = logging.getLogger(__name__)

class FireAndForgetStore:
    """Non-blocking message storage with background queue."""

    def __init__(self, client: AsyncEverMemOSClient, max_queue_size: int = 1000):
        self.client = client
        self.queue: asyncio.Queue = asyncio.Queue(maxsize=max_queue_size)
        self._worker_task: Optional[asyncio.Task] = None

    async def start(self):
        """Start the background worker."""
        self._worker_task = asyncio.create_task(self._worker())

    async def stop(self):
        """Stop the worker and flush remaining messages."""
        if self._worker_task:
            self._worker_task.cancel()
            try:
                await self._worker_task
            except asyncio.CancelledError:
                pass

        # Flush remaining
        while not self.queue.empty():
            msg = await self.queue.get()
            await self._store_message(msg)

    async def _worker(self):
        """Background worker that processes the queue."""
        while True:
            try:
                msg = await self.queue.get()
                await self._store_message(msg)
                self.queue.task_done()
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"Failed to store message: {e}")

    async def _store_message(self, msg: Dict[str, Any]):
        """Store a single message."""
        try:
            await self.client.store_message(**msg)
        except Exception as e:
            logger.error(f"Store failed: {e}")

    def store(self, group_id: str, message_id: str, sender: str, content: str, **kwargs):
        """Queue a message for storage (non-blocking)."""
        msg = {
            "group_id": group_id,
            "message_id": message_id,
            "sender": sender,
            "content": content,
            **kwargs
        }
        try:
            self.queue.put_nowait(msg)
        except asyncio.QueueFull:
            logger.warning("Message queue full, dropping message")


# Usage
async def main():
    client = AsyncEverMemOSClient()
    store = FireAndForgetStore(client)

    await store.start()

    # These don't block - messages are queued
    for i in range(100):
        store.store("conv_001", f"msg_{i}", "user", f"Message {i}")

    # Do other work while messages are stored in background
    await asyncio.sleep(1)

    await store.stop()
    await client.close()

Logging and Monitoring

Add observability to your EverMemOS integration:
import time
import logging
from functools import wraps
from typing import Callable, Any

logger = logging.getLogger("evermemos")

def log_request(func: Callable) -> Callable:
    """Decorator to log API requests."""
    @wraps(func)
    async def wrapper(*args, **kwargs) -> Any:
        start = time.perf_counter()
        method_name = func.__name__

        try:
            result = await func(*args, **kwargs)
            duration = (time.perf_counter() - start) * 1000

            logger.info(
                f"EverMemOS {method_name} completed",
                extra={
                    "method": method_name,
                    "duration_ms": duration,
                    "success": True
                }
            )
            return result

        except Exception as e:
            duration = (time.perf_counter() - start) * 1000
            logger.error(
                f"EverMemOS {method_name} failed: {e}",
                extra={
                    "method": method_name,
                    "duration_ms": duration,
                    "success": False,
                    "error": str(e)
                }
            )
            raise

    return wrapper


class ObservableClient(AsyncEverMemOSClient):
    """Client with built-in logging."""

    @log_request
    async def store_message(self, *args, **kwargs):
        return await super().store_message(*args, **kwargs)

    @log_request
    async def search(self, *args, **kwargs):
        return await super().search(*args, **kwargs)

Best Practices Summary

  • Use connection pooling for high-throughput applications
  • Reuse client instances instead of creating new ones per request
  • Close sessions properly when shutting down
  • Implement exponential backoff for retries
  • Distinguish between retryable and non-retryable errors
  • Log failures for debugging
  • Use async clients for concurrent operations
  • Use fire-and-forget for non-critical storage
  • Batch operations when possible
  • Set appropriate timeouts (30s for search, 10s for store)
  • Agentic retrieval may need longer timeouts (60s+)
  • Handle timeout errors gracefully

Next Steps