Documentation Index Fetch the complete documentation index at: https://docs.evermind.ai/llms.txt
Use this file to discover all available pages before exploring further.
This guide covers production-ready patterns for integrating EverOS into Python applications using the official everos SDK, including async/await, error handling, and best practices.
Installation
The SDK handles connection management, retries, timeouts, and connection pooling out of the box.
Basic Usage (Sync)
Get started with the synchronous client:
import time
from everos import EverOS
client = EverOS( api_key = "your-api-key" )
memories = client.v1.memories
# Store a conversation message
response = memories.add(
user_id = "user_alice" ,
messages = [
{
"role" : "user" ,
"timestamp" : int (time.time() * 1000 ),
"content" : "I prefer morning meetings before 10am" ,
}
],
)
print (response)
# Search memories
results = memories.search(
filters = { "user_id" : "user_alice" },
query = "meeting preferences" ,
method = "vector" ,
top_k = 5 ,
)
print (results)
# Retrieve memories by type
profile = memories.get(
filters = { "user_id" : "user_alice" },
memory_type = "profile" ,
)
print (profile)
Async Client
For production applications handling concurrent requests:
import asyncio
import time
from everos import AsyncEverOS
async def main ():
client = AsyncEverOS( api_key = "your-api-key" )
memories = client.v1.memories
# Store messages concurrently
now_ms = int (time.time() * 1000 )
tasks = [
memories.add(
user_id = "user_alice" ,
messages = [
{
"role" : "user" ,
"timestamp" : now_ms + i,
"content" : f "Message { i } " ,
}
],
)
for i in range ( 10 )
]
results = await asyncio.gather( * tasks)
print ( f "Stored { len (results) } messages" )
# Search
search_result = await memories.search(
filters = { "user_id" : "user_alice" },
query = "message" ,
method = "vector" ,
top_k = 10 ,
)
print (search_result)
asyncio.run(main())
Async Mode with Task Polling
For long-running operations, use async_mode=True to get a task ID and poll for completion:
import asyncio
import time
from everos import AsyncEverOS, NotFoundError
client = AsyncEverOS( api_key = "your-api-key" )
memories = client.v1.memories
tasks = client.v1.tasks
async def wait_for_task ( task_id : str , poll_interval : float = 2.0 , max_attempts : int = 30 ) -> str :
"""Poll a task until completion.
A 404 means the task has been processed and cleared (short TTL) - treat as success.
"""
for attempt in range (max_attempts):
try :
resp = await tasks.retrieve(task_id)
except NotFoundError:
print ( f " [attempt { attempt + 1 } ] task completed (expired)" )
return "done"
status = resp.data.status if resp.data else "unknown"
print ( f " [attempt { attempt + 1 } ] status= { status !r} " )
if status in ( "success" , "failed" , "completed" , "done" ):
return status
await asyncio.sleep(poll_interval)
return "timeout"
async def main ():
now_ms = int (time.time() * 1000 )
# async_mode=True returns HTTP 202 with a task_id
response = await memories.add(
user_id = "user_alice" ,
async_mode = True ,
messages = [
{
"role" : "user" ,
"timestamp" : now_ms,
"content" : "I love hiking on weekends, especially in the mountains." ,
},
{
"role" : "assistant" ,
"timestamp" : now_ms + 1000 ,
"content" : "That sounds wonderful! Do you have a favorite trail?" ,
},
{
"role" : "user" ,
"timestamp" : now_ms + 2000 ,
"content" : "Yes, the mountain trails near the lake." ,
},
],
)
print ( "add response:" , response)
# Poll until the task is complete
task_id = response.data.task_id if response.data else None
if task_id:
final_status = await wait_for_task(task_id)
print ( f "Task finished with status: { final_status !r} " )
asyncio.run(main())
Error Handling
The SDK provides typed exceptions for different error scenarios:
import time
import logging
from everos import EverOS, NotFoundError, BadRequestError, InternalServerError
logger = logging.getLogger( __name__ )
client = EverOS( api_key = "your-api-key" )
memories = client.v1.memories
def store_with_handling ( user_id : str , content : str ) -> None :
"""Store a message with structured error handling."""
try :
response = memories.add(
user_id = user_id,
messages = [
{
"role" : "user" ,
"timestamp" : int (time.time() * 1000 ),
"content" : content,
}
],
)
logger.info( f "Stored memory for { user_id } " )
return response
except BadRequestError as e:
# Invalid parameters - do not retry
logger.error( f "Bad request for { user_id } : { e } " )
raise
except NotFoundError as e:
# Resource not found
logger.warning( f "Resource not found: { e } " )
raise
except InternalServerError as e:
# Server-side error - the SDK already retries these automatically
logger.error( f "Server error: { e } " )
raise
except Exception as e:
logger.error( f "Unexpected error: { e } " )
raise
def search_with_handling ( user_id : str , query : str ) -> dict :
"""Search with structured error handling."""
try :
return memories.search(
filters = { "user_id" : user_id},
query = query,
method = "vector" ,
top_k = 5 ,
)
except BadRequestError as e:
logger.error( f "Invalid search params: { e } " )
raise
except Exception as e:
logger.error( f "Search failed: { e } " )
raise
Fire-and-Forget Message Storage
For non-critical message storage that should not block your main flow:
import asyncio
import time
import logging
from typing import Optional
from everos import AsyncEverOS
logger = logging.getLogger( __name__ )
class FireAndForgetStore :
"""Non-blocking message storage with background queue."""
def __init__ ( self , client : AsyncEverOS, max_queue_size : int = 1000 ):
self .client = client
self .memories = client.v1.memories
self .queue: asyncio.Queue = asyncio.Queue( maxsize = max_queue_size)
self ._worker_task: Optional[asyncio.Task] = None
async def start ( self ):
"""Start the background worker."""
self ._worker_task = asyncio.create_task( self ._worker())
async def stop ( self ):
"""Stop the worker and flush remaining messages."""
if self ._worker_task:
self ._worker_task.cancel()
try :
await self ._worker_task
except asyncio.CancelledError:
pass
# Flush remaining
while not self .queue.empty():
msg = await self .queue.get()
await self ._store_message(msg)
async def _worker ( self ):
"""Background worker that processes the queue."""
while True :
try :
msg = await self .queue.get()
await self ._store_message(msg)
self .queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.error( f "Failed to store message: { e } " )
async def _store_message ( self , msg : dict ):
"""Store a single message via the SDK."""
try :
await self .memories.add(
user_id = msg[ "user_id" ],
messages = [
{
"role" : msg.get( "role" , "user" ),
"timestamp" : msg.get( "timestamp" , int (time.time() * 1000 )),
"content" : msg[ "content" ],
}
],
)
except Exception as e:
logger.error( f "Store failed: { e } " )
def store ( self , user_id : str , content : str , ** kwargs ):
"""Queue a message for storage (non-blocking)."""
msg = { "user_id" : user_id, "content" : content, ** kwargs}
try :
self .queue.put_nowait(msg)
except asyncio.QueueFull:
logger.warning( "Message queue full, dropping message" )
# Usage
async def main ():
client = AsyncEverOS( api_key = "your-api-key" )
store = FireAndForgetStore(client)
await store.start()
# These don't block - messages are queued
for i in range ( 100 ):
store.store( "user_alice" , f "Message { i } " )
# Do other work while messages are stored in background
await asyncio.sleep( 1 )
await store.stop()
asyncio.run(main())
Logging and Monitoring
Add observability to your EverOS integration:
import time
import logging
from functools import wraps
from typing import Callable, Any
logger = logging.getLogger( "everos" )
def log_operation ( func : Callable) -> Callable:
"""Decorator to log SDK operations."""
@wraps (func)
def wrapper ( * args , ** kwargs ) -> Any:
start = time.perf_counter()
method_name = func. __name__
try :
result = func( * args, ** kwargs)
duration = (time.perf_counter() - start) * 1000
logger.info(
f "EverOS { method_name } completed" ,
extra = {
"method" : method_name,
"duration_ms" : round (duration, 2 ),
"success" : True ,
},
)
return result
except Exception as e:
duration = (time.perf_counter() - start) * 1000
logger.error(
f "EverOS { method_name } failed: { e } " ,
extra = {
"method" : method_name,
"duration_ms" : round (duration, 2 ),
"success" : False ,
"error" : str (e),
},
)
raise
return wrapper
# Usage: wrap SDK calls in your application layer
from everos import EverOS
client = EverOS( api_key = "your-api-key" )
memories = client.v1.memories
@log_operation
def store_memory ( user_id : str , content : str ):
return memories.add(
user_id = user_id,
messages = [
{
"role" : "user" ,
"timestamp" : int (time.time() * 1000 ),
"content" : content,
}
],
)
@log_operation
def search_memory ( user_id : str , query : str ):
return memories.search(
filters = { "user_id" : user_id},
query = query,
method = "vector" ,
top_k = 5 ,
)
Best Practices Summary
Create a single EverOS or AsyncEverOS instance and reuse it across your application
The SDK handles connection pooling and session management internally
Use AsyncEverOS for production applications handling concurrent requests
Use typed exceptions (BadRequestError, NotFoundError, InternalServerError) for structured handling
The SDK includes built-in retry logic for transient failures
Log failures with context for debugging
Use method="vector" for semantic similarity search
Use method="keyword" for exact term matching
Use method="agentic" for LLM-guided search (may need longer timeouts)
Filter by memory_types to narrow results (e.g., ["episodic_memory", "profile"])
Next Steps
Batch Processing Import conversation history at scale
Agentic Retrieval LLM-guided search with longer timeouts