Skip to main content
This guide covers production-ready patterns for integrating EverOS into Python applications using the official everos SDK, including async/await, error handling, and best practices.

Installation

pip install everos
The SDK handles connection management, retries, timeouts, and connection pooling out of the box.

Basic Usage (Sync)

Get started with the synchronous client:
import time
from everos import EverOS

client = EverOS(api_key="your-api-key")
memories = client.v1.memories

# Store a conversation message
response = memories.add(
    user_id="user_alice",
    messages=[
        {
            "role": "user",
            "timestamp": int(time.time() * 1000),
            "content": "I prefer morning meetings before 10am",
        }
    ],
)
print(response)

# Search memories
results = memories.search(
    filters={"user_id": "user_alice"},
    query="meeting preferences",
    method="vector",
    top_k=5,
)
print(results)

# Retrieve memories by type
profile = memories.get(
    filters={"user_id": "user_alice"},
    memory_type="profile",
)
print(profile)

Async Client

For production applications handling concurrent requests:
import asyncio
import time
from everos import AsyncEverOS

async def main():
    client = AsyncEverOS(api_key="your-api-key")
    memories = client.v1.memories

    # Store messages concurrently
    now_ms = int(time.time() * 1000)
    tasks = [
        memories.add(
            user_id="user_alice",
            messages=[
                {
                    "role": "user",
                    "timestamp": now_ms + i,
                    "content": f"Message {i}",
                }
            ],
        )
        for i in range(10)
    ]
    results = await asyncio.gather(*tasks)
    print(f"Stored {len(results)} messages")

    # Search
    search_result = await memories.search(
        filters={"user_id": "user_alice"},
        query="message",
        method="vector",
        top_k=10,
    )
    print(search_result)

asyncio.run(main())

Async Mode with Task Polling

For long-running operations, use async_mode=True to get a task ID and poll for completion:
import asyncio
import time
from everos import AsyncEverOS, NotFoundError

client = AsyncEverOS(api_key="your-api-key")
memories = client.v1.memories
tasks = client.v1.tasks


async def wait_for_task(task_id: str, poll_interval: float = 2.0, max_attempts: int = 30) -> str:
    """Poll a task until completion.

    A 404 means the task has been processed and cleared (short TTL) - treat as success.
    """
    for attempt in range(max_attempts):
        try:
            resp = await tasks.retrieve(task_id)
        except NotFoundError:
            print(f"  [attempt {attempt + 1}] task completed (expired)")
            return "done"
        status = resp.data.status if resp.data else "unknown"
        print(f"  [attempt {attempt + 1}] status={status!r}")
        if status in ("success", "failed", "completed", "done"):
            return status
        await asyncio.sleep(poll_interval)
    return "timeout"


async def main():
    now_ms = int(time.time() * 1000)

    # async_mode=True returns HTTP 202 with a task_id
    response = await memories.add(
        user_id="user_alice",
        async_mode=True,
        messages=[
            {
                "role": "user",
                "timestamp": now_ms,
                "content": "I love hiking on weekends, especially in the mountains.",
            },
            {
                "role": "assistant",
                "timestamp": now_ms + 1000,
                "content": "That sounds wonderful! Do you have a favorite trail?",
            },
            {
                "role": "user",
                "timestamp": now_ms + 2000,
                "content": "Yes, the mountain trails near the lake.",
            },
        ],
    )
    print("add response:", response)

    # Poll until the task is complete
    task_id = response.data.task_id if response.data else None
    if task_id:
        final_status = await wait_for_task(task_id)
        print(f"Task finished with status: {final_status!r}")

asyncio.run(main())

Error Handling

The SDK provides typed exceptions for different error scenarios:
import time
import logging
from everos import EverOS, NotFoundError, BadRequestError, InternalServerError

logger = logging.getLogger(__name__)

client = EverOS(api_key="your-api-key")
memories = client.v1.memories


def store_with_handling(user_id: str, content: str) -> None:
    """Store a message with structured error handling."""
    try:
        response = memories.add(
            user_id=user_id,
            messages=[
                {
                    "role": "user",
                    "timestamp": int(time.time() * 1000),
                    "content": content,
                }
            ],
        )
        logger.info(f"Stored memory for {user_id}")
        return response

    except BadRequestError as e:
        # Invalid parameters - do not retry
        logger.error(f"Bad request for {user_id}: {e}")
        raise

    except NotFoundError as e:
        # Resource not found
        logger.warning(f"Resource not found: {e}")
        raise

    except InternalServerError as e:
        # Server-side error - the SDK already retries these automatically
        logger.error(f"Server error: {e}")
        raise

    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        raise


def search_with_handling(user_id: str, query: str) -> dict:
    """Search with structured error handling."""
    try:
        return memories.search(
            filters={"user_id": user_id},
            query=query,
            method="vector",
            top_k=5,
        )

    except BadRequestError as e:
        logger.error(f"Invalid search params: {e}")
        raise

    except Exception as e:
        logger.error(f"Search failed: {e}")
        raise

Fire-and-Forget Message Storage

For non-critical message storage that should not block your main flow:
import asyncio
import time
import logging
from typing import Optional
from everos import AsyncEverOS

logger = logging.getLogger(__name__)


class FireAndForgetStore:
    """Non-blocking message storage with background queue."""

    def __init__(self, client: AsyncEverOS, max_queue_size: int = 1000):
        self.client = client
        self.memories = client.v1.memories
        self.queue: asyncio.Queue = asyncio.Queue(maxsize=max_queue_size)
        self._worker_task: Optional[asyncio.Task] = None

    async def start(self):
        """Start the background worker."""
        self._worker_task = asyncio.create_task(self._worker())

    async def stop(self):
        """Stop the worker and flush remaining messages."""
        if self._worker_task:
            self._worker_task.cancel()
            try:
                await self._worker_task
            except asyncio.CancelledError:
                pass

        # Flush remaining
        while not self.queue.empty():
            msg = await self.queue.get()
            await self._store_message(msg)

    async def _worker(self):
        """Background worker that processes the queue."""
        while True:
            try:
                msg = await self.queue.get()
                await self._store_message(msg)
                self.queue.task_done()
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"Failed to store message: {e}")

    async def _store_message(self, msg: dict):
        """Store a single message via the SDK."""
        try:
            await self.memories.add(
                user_id=msg["user_id"],
                messages=[
                    {
                        "role": msg.get("role", "user"),
                        "timestamp": msg.get("timestamp", int(time.time() * 1000)),
                        "content": msg["content"],
                    }
                ],
            )
        except Exception as e:
            logger.error(f"Store failed: {e}")

    def store(self, user_id: str, content: str, **kwargs):
        """Queue a message for storage (non-blocking)."""
        msg = {"user_id": user_id, "content": content, **kwargs}
        try:
            self.queue.put_nowait(msg)
        except asyncio.QueueFull:
            logger.warning("Message queue full, dropping message")


# Usage
async def main():
    client = AsyncEverOS(api_key="your-api-key")
    store = FireAndForgetStore(client)

    await store.start()

    # These don't block - messages are queued
    for i in range(100):
        store.store("user_alice", f"Message {i}")

    # Do other work while messages are stored in background
    await asyncio.sleep(1)

    await store.stop()

asyncio.run(main())

Logging and Monitoring

Add observability to your EverOS integration:
import time
import logging
from functools import wraps
from typing import Callable, Any

logger = logging.getLogger("everos")


def log_operation(func: Callable) -> Callable:
    """Decorator to log SDK operations."""
    @wraps(func)
    def wrapper(*args, **kwargs) -> Any:
        start = time.perf_counter()
        method_name = func.__name__

        try:
            result = func(*args, **kwargs)
            duration = (time.perf_counter() - start) * 1000

            logger.info(
                f"EverOS {method_name} completed",
                extra={
                    "method": method_name,
                    "duration_ms": round(duration, 2),
                    "success": True,
                },
            )
            return result

        except Exception as e:
            duration = (time.perf_counter() - start) * 1000
            logger.error(
                f"EverOS {method_name} failed: {e}",
                extra={
                    "method": method_name,
                    "duration_ms": round(duration, 2),
                    "success": False,
                    "error": str(e),
                },
            )
            raise

    return wrapper


# Usage: wrap SDK calls in your application layer
from everos import EverOS

client = EverOS(api_key="your-api-key")
memories = client.v1.memories


@log_operation
def store_memory(user_id: str, content: str):
    return memories.add(
        user_id=user_id,
        messages=[
            {
                "role": "user",
                "timestamp": int(time.time() * 1000),
                "content": content,
            }
        ],
    )


@log_operation
def search_memory(user_id: str, query: str):
    return memories.search(
        filters={"user_id": user_id},
        query=query,
        method="vector",
        top_k=5,
    )

Best Practices Summary

  • Create a single EverOS or AsyncEverOS instance and reuse it across your application
  • The SDK handles connection pooling and session management internally
  • Use AsyncEverOS for production applications handling concurrent requests
  • Use typed exceptions (BadRequestError, NotFoundError, InternalServerError) for structured handling
  • The SDK includes built-in retry logic for transient failures
  • Log failures with context for debugging
  • Use AsyncEverOS with asyncio.gather for concurrent operations
  • Use fire-and-forget patterns for non-critical storage
  • Use async_mode=True for bulk imports to avoid blocking on memory extraction

Next Steps

Batch Processing

Import conversation history at scale

Agentic Retrieval

LLM-guided search with longer timeouts