8.4 KiB
Pi Observability Design Notes
Goal
Make packages/ai and packages/agent/harness observable without depending on OpenTelemetry, Sentry, or any APM vendor.
Pi should emit stable, structured lifecycle events. External listeners can convert those events into OTel spans, Sentry spans, logs, metrics, or custom telemetry.
Mental model
A trace is one causal tree of work, e.g. one user turn.
A span is one timed operation in that tree. It is normally represented by IDs, not object pointers:
interface SpanRecord {
traceId: string;
spanId: string;
parentSpanId?: string;
name: string;
startTime: number;
endTime?: number;
attributes: Record<string, unknown>;
status: "ok" | "error";
}
Example tree:
traceId=t1 spanId=s1 parent=- name=pi.agent.prompt
traceId=t1 spanId=s2 parent=s1 name=pi.agent.turn
traceId=t1 spanId=s3 parent=s2 name=pi.ai.provider.request
traceId=t1 spanId=s4 parent=s2 name=pi.agent.tool_call
traceId=t1 spanId=s5 parent=s4 name=pi.session.append_entry
Async context
JavaScript has one event loop but multiple async chains can interleave. A single global currentContext breaks under concurrency.
AsyncLocalStorage is the Node equivalent of ThreadLocal for async continuations. It lets concurrent operations keep distinct current contexts:
await Promise.all([
runWithPiContext({ userId: "alice" }, () => harness.prompt("A")),
runWithPiContext({ userId: "bob" }, () => harness.prompt("B")),
]);
Deep code can then read the correct current context for the active async chain.
Pi must run in Node, Bun, browser, workers, and other JS runtimes, so ALS cannot be the core abstraction. It should be a runtime adapter.
Core design
Pi owns a small runtime-agnostic observability abstraction:
export interface PiObservabilityContext {
traceId?: string;
currentSpanId?: string;
userContext?: Record<string, unknown>;
}
export interface PiObservabilityEvent {
type: "start" | "end" | "error" | "event";
name: string;
traceId: string;
spanId?: string;
parentSpanId?: string;
timestamp: number;
durationMs?: number;
context?: Record<string, unknown>;
payload?: Record<string, unknown>;
error?: { name: string; message: string };
}
export interface PiObservability {
getContext(): PiObservabilityContext | undefined;
runWithContext<T>(context: PiObservabilityContext, fn: () => T): T;
emit(event: PiObservabilityEvent): void;
hasSubscribers(): boolean;
}
Public API:
export function configurePiObservability(observability: PiObservability): void;
export function subscribePiObservability(listener: (event: PiObservabilityEvent) => void): () => void;
export function runWithPiContext<T>(userContext: Record<string, unknown>, fn: () => T): T;
export function traceOperation<T>(name: string, payload: Record<string, unknown>, fn: () => T): T;
traceOperation():
- reads the current context
- creates
traceIdif missing - creates a new
spanId - uses current span as
parentSpanId - emits
start - runs callback under child context
- emits
endorerror - rethrows on error
Pseudo-code:
function traceOperation<T>(name: string, payload: Record<string, unknown>, fn: () => T): T {
const parent = getContext();
const traceId = parent?.traceId ?? createId();
const spanId = createId();
const parentSpanId = parent?.currentSpanId;
const child = { ...parent, traceId, currentSpanId: spanId };
emit({ type: "start", name, traceId, spanId, parentSpanId, timestamp: Date.now(), context: parent?.userContext, payload });
return runWithContext(child, () => {
try {
const result = fn();
// Promise-aware implementation emits end/error after settlement.
emit({ type: "end", name, traceId, spanId, parentSpanId, timestamp: Date.now(), context: child.userContext, payload });
return result;
} catch (error) {
emit({ type: "error", name, traceId, spanId, parentSpanId, timestamp: Date.now(), context: child.userContext, payload, error: serializeError(error) });
throw error;
}
});
}
Runtime adapters
Core packages should not import Node-only APIs.
Possible implementations:
- Node adapter:
AsyncLocalStoragefor context, optionaldiagnostics_channelpublishing. - Browser/workers fallback: local subscriber set and limited/manual context propagation.
- Bun/Deno adapters: use runtime-specific async context if available.
For Node, diagnostics channels can be used as a passive event bus:
import { channel } from "diagnostics_channel";
channel("pi.observability").publish(event);
Subscribers can create OTel/Sentry spans without monkey-patching pi.
What pi emits
Pi emits what happened. It does not create OTel/Sentry spans directly.
Initial minimal event names:
pi.agent.prompt
pi.agent.skill
pi.agent.prompt_template
pi.agent.compaction
pi.agent.branch_navigation
pi.agent.session.append_entry
pi.ai.provider.request
Each operation emits:
start
end
error
Later additions:
pi.agent.turn
pi.agent.tool_call
pi.agent.queue_update
pi.ai.provider.retry
pi.ai.provider.first_token
pi.ai.provider.usage
pi.session.read
pi.session.write
Minimal instrumentation points
packages/agent
Wrap:
AgentHarness.prompt()AgentHarness.skill()AgentHarness.promptFromTemplate()AgentHarness.compact()AgentHarness.navigateTree()Session.appendTypedEntry()or storage append facade
Example:
return traceOperation(
"pi.agent.prompt",
{
sessionId: turnState.sessionId,
provider: turnState.model.provider,
model: turnState.model.id,
promptLength: text.length,
imageCount: options?.images?.length ?? 0,
},
() => this.executeTurn(turnState, text, options),
);
Session write:
return traceOperation(
"pi.agent.session.append_entry",
{ entryType: entry.type },
async () => {
await this.unwrap(this.storage.appendEntry(entry));
return entry.id;
},
);
packages/ai
Wrap common provider boundaries:
streamSimple()completeSimple()
Example:
return traceOperation(
"pi.ai.provider.request",
{
api: model.api,
provider: model.provider,
model: model.id,
sessionId: options.sessionId,
reasoning: options.reasoning,
},
() => actualStreamSimple(model, context, options),
);
End/error payloads can include safe metadata:
- stop reason
- status code
- retry count
- input/output/total tokens
- cost total
- aborted/timeout flag
Safety and redaction
Default payloads must be safe.
Safe by default:
- provider
- model
- API identifier
- session id
- entry type
- tool name
- status code
- stop reason
- token counts
- costs
- durations
Unsafe by default:
- prompts
- completions
- tool args
- tool results
- shell output
- file contents
- provider request payloads
- provider response bodies
- API keys
- headers
Content capture can be opt-in later with explicit redaction hooks.
Listener behavior
Observability must never affect pi execution.
Subscriber errors should be swallowed or isolated. Harness hooks are control-plane and may affect execution; observability subscribers are passive and must not.
User context
Users can associate arbitrary context with a turn:
await runWithPiContext(
{
userId: "u123",
orgId: "acme",
region: "eu",
},
() => harness.prompt("fix this"),
);
Every emitted event inside that async chain includes the context:
{
type: "start",
name: "pi.ai.provider.request",
traceId: "t1",
spanId: "s3",
parentSpanId: "s1",
context: {
userId: "u123",
orgId: "acme",
region: "eu",
},
payload: {
provider: "anthropic",
model: "claude-sonnet-4",
},
}
An OTel adapter can map this to span attributes. A Sentry adapter can map it to Sentry context/spans. A custom user can log JSON.
Package story
Minimal initial package:
packages/observability
runtime-agnostic context + traceOperation + subscribe
Then:
packages/ai
emits pi.ai.* events
packages/agent
emits pi.agent.* / pi.session.* events
Optional later:
packages/observability-node
AsyncLocalStorage + diagnostics_channel bridge
packages/otel
subscribes to pi events and creates OpenTelemetry spans
Thesis
Pi defines a stable, safe event contract. Adapters define where events go.
This makes ai/harness observable without binding core packages to OTel, Sentry, Node-only APIs, or monkey-patching.