# Pi Observability Design Notes ## Goal Make `packages/ai` and `packages/agent`/harness observable without depending on OpenTelemetry, Sentry, or any APM vendor. Pi should emit stable, structured lifecycle events. External listeners can convert those events into OTel spans, Sentry spans, logs, metrics, or custom telemetry. ## Mental model A trace is one causal tree of work, e.g. one user turn. A span is one timed operation in that tree. It is normally represented by IDs, not object pointers: ```ts interface SpanRecord { traceId: string; spanId: string; parentSpanId?: string; name: string; startTime: number; endTime?: number; attributes: Record; status: "ok" | "error"; } ``` Example tree: ```text traceId=t1 spanId=s1 parent=- name=pi.agent.prompt traceId=t1 spanId=s2 parent=s1 name=pi.agent.turn traceId=t1 spanId=s3 parent=s2 name=pi.ai.provider.request traceId=t1 spanId=s4 parent=s2 name=pi.agent.tool_call traceId=t1 spanId=s5 parent=s4 name=pi.session.append_entry ``` ## Async context JavaScript has one event loop but multiple async chains can interleave. A single global `currentContext` breaks under concurrency. `AsyncLocalStorage` is the Node equivalent of `ThreadLocal` for async continuations. It lets concurrent operations keep distinct current contexts: ```ts await Promise.all([ runWithPiContext({ userId: "alice" }, () => harness.prompt("A")), runWithPiContext({ userId: "bob" }, () => harness.prompt("B")), ]); ``` Deep code can then read the correct current context for the active async chain. Pi must run in Node, Bun, browser, workers, and other JS runtimes, so ALS cannot be the core abstraction. It should be a runtime adapter. ## Core design Pi owns a small runtime-agnostic observability abstraction: ```ts export interface PiObservabilityContext { traceId?: string; currentSpanId?: string; userContext?: Record; } export interface PiObservabilityEvent { type: "start" | "end" | "error" | "event"; name: string; traceId: string; spanId?: string; parentSpanId?: string; timestamp: number; durationMs?: number; context?: Record; payload?: Record; error?: { name: string; message: string }; } export interface PiObservability { getContext(): PiObservabilityContext | undefined; runWithContext(context: PiObservabilityContext, fn: () => T): T; emit(event: PiObservabilityEvent): void; hasSubscribers(): boolean; } ``` Public API: ```ts export function configurePiObservability(observability: PiObservability): void; export function subscribePiObservability(listener: (event: PiObservabilityEvent) => void): () => void; export function runWithPiContext(userContext: Record, fn: () => T): T; export function traceOperation(name: string, payload: Record, fn: () => T): T; ``` `traceOperation()`: 1. reads the current context 2. creates `traceId` if missing 3. creates a new `spanId` 4. uses current span as `parentSpanId` 5. emits `start` 6. runs callback under child context 7. emits `end` or `error` 8. rethrows on error Pseudo-code: ```ts function traceOperation(name: string, payload: Record, fn: () => T): T { const parent = getContext(); const traceId = parent?.traceId ?? createId(); const spanId = createId(); const parentSpanId = parent?.currentSpanId; const child = { ...parent, traceId, currentSpanId: spanId }; emit({ type: "start", name, traceId, spanId, parentSpanId, timestamp: Date.now(), context: parent?.userContext, payload }); return runWithContext(child, () => { try { const result = fn(); // Promise-aware implementation emits end/error after settlement. emit({ type: "end", name, traceId, spanId, parentSpanId, timestamp: Date.now(), context: child.userContext, payload }); return result; } catch (error) { emit({ type: "error", name, traceId, spanId, parentSpanId, timestamp: Date.now(), context: child.userContext, payload, error: serializeError(error) }); throw error; } }); } ``` ## Runtime adapters Core packages should not import Node-only APIs. Possible implementations: - Node adapter: `AsyncLocalStorage` for context, optional `diagnostics_channel` publishing. - Browser/workers fallback: local subscriber set and limited/manual context propagation. - Bun/Deno adapters: use runtime-specific async context if available. For Node, diagnostics channels can be used as a passive event bus: ```ts import { channel } from "diagnostics_channel"; channel("pi.observability").publish(event); ``` Subscribers can create OTel/Sentry spans without monkey-patching pi. ## What pi emits Pi emits what happened. It does not create OTel/Sentry spans directly. Initial minimal event names: ```text pi.agent.prompt pi.agent.skill pi.agent.prompt_template pi.agent.compaction pi.agent.branch_navigation pi.agent.session.append_entry pi.ai.provider.request ``` Each operation emits: ```text start end error ``` Later additions: ```text pi.agent.turn pi.agent.tool_call pi.agent.queue_update pi.ai.provider.retry pi.ai.provider.first_token pi.ai.provider.usage pi.session.read pi.session.write ``` ## Minimal instrumentation points ### packages/agent Wrap: - `AgentHarness.prompt()` - `AgentHarness.skill()` - `AgentHarness.promptFromTemplate()` - `AgentHarness.compact()` - `AgentHarness.navigateTree()` - `Session.appendTypedEntry()` or storage append facade Example: ```ts return traceOperation( "pi.agent.prompt", { sessionId: turnState.sessionId, provider: turnState.model.provider, model: turnState.model.id, promptLength: text.length, imageCount: options?.images?.length ?? 0, }, () => this.executeTurn(turnState, text, options), ); ``` Session write: ```ts return traceOperation( "pi.agent.session.append_entry", { entryType: entry.type }, async () => { await this.unwrap(this.storage.appendEntry(entry)); return entry.id; }, ); ``` ### packages/ai Wrap common provider boundaries: - `streamSimple()` - `completeSimple()` Example: ```ts return traceOperation( "pi.ai.provider.request", { api: model.api, provider: model.provider, model: model.id, sessionId: options.sessionId, reasoning: options.reasoning, }, () => actualStreamSimple(model, context, options), ); ``` End/error payloads can include safe metadata: - stop reason - status code - retry count - input/output/total tokens - cost total - aborted/timeout flag ## Safety and redaction Default payloads must be safe. Safe by default: - provider - model - API identifier - session id - entry type - tool name - status code - stop reason - token counts - costs - durations Unsafe by default: - prompts - completions - tool args - tool results - shell output - file contents - provider request payloads - provider response bodies - API keys - headers Content capture can be opt-in later with explicit redaction hooks. ## Listener behavior Observability must never affect pi execution. Subscriber errors should be swallowed or isolated. Harness hooks are control-plane and may affect execution; observability subscribers are passive and must not. ## User context Users can associate arbitrary context with a turn: ```ts await runWithPiContext( { userId: "u123", orgId: "acme", region: "eu", }, () => harness.prompt("fix this"), ); ``` Every emitted event inside that async chain includes the context: ```ts { type: "start", name: "pi.ai.provider.request", traceId: "t1", spanId: "s3", parentSpanId: "s1", context: { userId: "u123", orgId: "acme", region: "eu", }, payload: { provider: "anthropic", model: "claude-sonnet-4", }, } ``` An OTel adapter can map this to span attributes. A Sentry adapter can map it to Sentry context/spans. A custom user can log JSON. ## Package story Minimal initial package: ```text packages/observability runtime-agnostic context + traceOperation + subscribe ``` Then: ```text packages/ai emits pi.ai.* events packages/agent emits pi.agent.* / pi.session.* events ``` Optional later: ```text packages/observability-node AsyncLocalStorage + diagnostics_channel bridge packages/otel subscribes to pi events and creates OpenTelemetry spans ``` ## Thesis Pi defines a stable, safe event contract. Adapters define where events go. This makes ai/harness observable without binding core packages to OTel, Sentry, Node-only APIs, or monkey-patching.