mirror of
https://github.com/supermemoryai/supermemory.git
synced 2026-05-17 21:11:04 +00:00
| .. | ||
| src/supermemory_pipecat | ||
| tests | ||
| Agents.md | ||
| pyproject.toml | ||
| README.md | ||
Supermemory Pipecat SDK
Memory-enhanced conversational AI pipelines with Supermemory and Pipecat.
Installation
pip install supermemory-pipecat
Quick Start
import os
from pipecat.pipeline.pipeline import Pipeline
from pipecat.services.openai import OpenAILLMService, OpenAIUserContextAggregator
from supermemory_pipecat import SupermemoryPipecatService
# Create memory service
memory = SupermemoryPipecatService(
api_key=os.getenv("SUPERMEMORY_API_KEY"),
user_id="user-123", # Required: used as container_tag
session_id="conversation-456", # Optional: groups memories by session
)
# Create pipeline with memory
pipeline = Pipeline([
transport.input(),
stt,
user_context,
memory, # Automatically retrieves and injects relevant memories
llm,
transport.output(),
])
Configuration
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
user_id |
str | Yes | User identifier - used as container_tag for memory scoping |
session_id |
str | No | Session/conversation ID for grouping memories |
api_key |
str | No | Supermemory API key (or set SUPERMEMORY_API_KEY env var) |
params |
InputParams | No | Advanced configuration |
base_url |
str | No | Custom API endpoint |
Advanced Configuration
from supermemory_pipecat import SupermemoryPipecatService
memory = SupermemoryPipecatService(
user_id="user-123",
session_id="conv-456",
params=SupermemoryPipecatService.InputParams(
search_limit=10, # Max memories to retrieve
search_threshold=0.1, # Similarity threshold
mode="full", # "profile", "query", or "full"
system_prompt="Based on previous conversations, I recall:\n\n",
),
)
Memory Modes
| Mode | Static Profile | Dynamic Profile | Search Results |
|---|---|---|---|
"profile" |
Yes | Yes | No |
"query" |
No | No | Yes |
"full" |
Yes | Yes | Yes |
How It Works
- Intercepts context frames - Listens for
LLMContextFramein the pipeline - Tracks conversation - Maintains clean conversation history (no injected memories)
- Retrieves memories - Queries
/v4/profileAPI with user's message - Injects memories - Formats and adds to LLM context as system message
- Stores messages - Sends last user message to Supermemory (background, non-blocking)
What Gets Stored
Only the last user message is sent to Supermemory:
User: What's the weather like today?
Stored as:
{
"content": "User: What's the weather like today?",
"container_tags": ["user-123"],
"custom_id": "conversation-456",
"metadata": { "platform": "pipecat" }
}
Full Example
import asyncio
import os
from fastapi import FastAPI, WebSocket
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService
from pipecat.transports.websocket.fastapi import (
FastAPIWebsocketTransport,
FastAPIWebsocketParams,
)
from supermemory_pipecat import SupermemoryPipecatService
app = FastAPI()
@app.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
transport = FastAPIWebsocketTransport(
websocket=websocket,
params=FastAPIWebsocketParams(audio_in_enabled=True, audio_out_enabled=True),
)
# Gemini Live for speech-to-speech
llm = GeminiLiveLLMService(
api_key=os.getenv("GEMINI_API_KEY"),
model="models/gemini-2.5-flash-native-audio-preview-12-2025",
)
context = OpenAILLMContext([{"role": "system", "content": "You are a helpful assistant."}])
context_aggregator = llm.create_context_aggregator(context)
# Supermemory memory service
memory = SupermemoryPipecatService(
user_id="alice",
session_id="session-123",
)
pipeline = Pipeline([
transport.input(),
context_aggregator.user(),
memory,
llm,
transport.output(),
context_aggregator.assistant(),
])
runner = PipelineRunner()
task = PipelineTask(pipeline)
await runner.run(task)
License
MIT