pi-mono/packages/agent/docs/observability.md
Mario Zechner f8d2c8aa6e
Some checks are pending
CI / build-check-test (push) Waiting to run
docs(agent): organize harness design todos
2026-05-16 22:07:20 +02:00

376 lines
8.4 KiB
Markdown

<!-- Synced from jot qe0ikdqs. Edit this file in-repo going forward. -->
# Pi Observability Design Notes
## Goal
Make `packages/ai` and `packages/agent`/harness observable without depending on OpenTelemetry, Sentry, or any APM vendor.
Pi should emit stable, structured lifecycle events. External listeners can convert those events into OTel spans, Sentry spans, logs, metrics, or custom telemetry.
## Mental model
A trace is one causal tree of work, e.g. one user turn.
A span is one timed operation in that tree. It is normally represented by IDs, not object pointers:
```ts
interface SpanRecord {
traceId: string;
spanId: string;
parentSpanId?: string;
name: string;
startTime: number;
endTime?: number;
attributes: Record<string, unknown>;
status: "ok" | "error";
}
```
Example tree:
```text
traceId=t1 spanId=s1 parent=- name=pi.agent.prompt
traceId=t1 spanId=s2 parent=s1 name=pi.agent.turn
traceId=t1 spanId=s3 parent=s2 name=pi.ai.provider.request
traceId=t1 spanId=s4 parent=s2 name=pi.agent.tool_call
traceId=t1 spanId=s5 parent=s4 name=pi.session.append_entry
```
## Async context
JavaScript has one event loop but multiple async chains can interleave. A single global `currentContext` breaks under concurrency.
`AsyncLocalStorage` is the Node equivalent of `ThreadLocal` for async continuations. It lets concurrent operations keep distinct current contexts:
```ts
await Promise.all([
runWithPiContext({ userId: "alice" }, () => harness.prompt("A")),
runWithPiContext({ userId: "bob" }, () => harness.prompt("B")),
]);
```
Deep code can then read the correct current context for the active async chain.
Pi must run in Node, Bun, browser, workers, and other JS runtimes, so ALS cannot be the core abstraction. It should be a runtime adapter.
## Core design
Pi owns a small runtime-agnostic observability abstraction:
```ts
export interface PiObservabilityContext {
traceId?: string;
currentSpanId?: string;
userContext?: Record<string, unknown>;
}
export interface PiObservabilityEvent {
type: "start" | "end" | "error" | "event";
name: string;
traceId: string;
spanId?: string;
parentSpanId?: string;
timestamp: number;
durationMs?: number;
context?: Record<string, unknown>;
payload?: Record<string, unknown>;
error?: { name: string; message: string };
}
export interface PiObservability {
getContext(): PiObservabilityContext | undefined;
runWithContext<T>(context: PiObservabilityContext, fn: () => T): T;
emit(event: PiObservabilityEvent): void;
hasSubscribers(): boolean;
}
```
Public API:
```ts
export function configurePiObservability(observability: PiObservability): void;
export function subscribePiObservability(listener: (event: PiObservabilityEvent) => void): () => void;
export function runWithPiContext<T>(userContext: Record<string, unknown>, fn: () => T): T;
export function traceOperation<T>(name: string, payload: Record<string, unknown>, fn: () => T): T;
```
`traceOperation()`:
1. reads the current context
2. creates `traceId` if missing
3. creates a new `spanId`
4. uses current span as `parentSpanId`
5. emits `start`
6. runs callback under child context
7. emits `end` or `error`
8. rethrows on error
Pseudo-code:
```ts
function traceOperation<T>(name: string, payload: Record<string, unknown>, fn: () => T): T {
const parent = getContext();
const traceId = parent?.traceId ?? createId();
const spanId = createId();
const parentSpanId = parent?.currentSpanId;
const child = { ...parent, traceId, currentSpanId: spanId };
emit({ type: "start", name, traceId, spanId, parentSpanId, timestamp: Date.now(), context: parent?.userContext, payload });
return runWithContext(child, () => {
try {
const result = fn();
// Promise-aware implementation emits end/error after settlement.
emit({ type: "end", name, traceId, spanId, parentSpanId, timestamp: Date.now(), context: child.userContext, payload });
return result;
} catch (error) {
emit({ type: "error", name, traceId, spanId, parentSpanId, timestamp: Date.now(), context: child.userContext, payload, error: serializeError(error) });
throw error;
}
});
}
```
## Runtime adapters
Core packages should not import Node-only APIs.
Possible implementations:
- Node adapter: `AsyncLocalStorage` for context, optional `diagnostics_channel` publishing.
- Browser/workers fallback: local subscriber set and limited/manual context propagation.
- Bun/Deno adapters: use runtime-specific async context if available.
For Node, diagnostics channels can be used as a passive event bus:
```ts
import { channel } from "diagnostics_channel";
channel("pi.observability").publish(event);
```
Subscribers can create OTel/Sentry spans without monkey-patching pi.
## What pi emits
Pi emits what happened. It does not create OTel/Sentry spans directly.
Initial minimal event names:
```text
pi.agent.prompt
pi.agent.skill
pi.agent.prompt_template
pi.agent.compaction
pi.agent.branch_navigation
pi.agent.session.append_entry
pi.ai.provider.request
```
Each operation emits:
```text
start
end
error
```
Later additions:
```text
pi.agent.turn
pi.agent.tool_call
pi.agent.queue_update
pi.ai.provider.retry
pi.ai.provider.first_token
pi.ai.provider.usage
pi.session.read
pi.session.write
```
## Minimal instrumentation points
### packages/agent
Wrap:
- `AgentHarness.prompt()`
- `AgentHarness.skill()`
- `AgentHarness.promptFromTemplate()`
- `AgentHarness.compact()`
- `AgentHarness.navigateTree()`
- `Session.appendTypedEntry()` or storage append facade
Example:
```ts
return traceOperation(
"pi.agent.prompt",
{
sessionId: turnState.sessionId,
provider: turnState.model.provider,
model: turnState.model.id,
promptLength: text.length,
imageCount: options?.images?.length ?? 0,
},
() => this.executeTurn(turnState, text, options),
);
```
Session write:
```ts
return traceOperation(
"pi.agent.session.append_entry",
{ entryType: entry.type },
async () => {
await this.unwrap(this.storage.appendEntry(entry));
return entry.id;
},
);
```
### packages/ai
Wrap common provider boundaries:
- `streamSimple()`
- `completeSimple()`
Example:
```ts
return traceOperation(
"pi.ai.provider.request",
{
api: model.api,
provider: model.provider,
model: model.id,
sessionId: options.sessionId,
reasoning: options.reasoning,
},
() => actualStreamSimple(model, context, options),
);
```
End/error payloads can include safe metadata:
- stop reason
- status code
- retry count
- input/output/total tokens
- cost total
- aborted/timeout flag
## Safety and redaction
Default payloads must be safe.
Safe by default:
- provider
- model
- API identifier
- session id
- entry type
- tool name
- status code
- stop reason
- token counts
- costs
- durations
Unsafe by default:
- prompts
- completions
- tool args
- tool results
- shell output
- file contents
- provider request payloads
- provider response bodies
- API keys
- headers
Content capture can be opt-in later with explicit redaction hooks.
## Listener behavior
Observability must never affect pi execution.
Subscriber errors should be swallowed or isolated. Harness hooks are control-plane and may affect execution; observability subscribers are passive and must not.
## User context
Users can associate arbitrary context with a turn:
```ts
await runWithPiContext(
{
userId: "u123",
orgId: "acme",
region: "eu",
},
() => harness.prompt("fix this"),
);
```
Every emitted event inside that async chain includes the context:
```ts
{
type: "start",
name: "pi.ai.provider.request",
traceId: "t1",
spanId: "s3",
parentSpanId: "s1",
context: {
userId: "u123",
orgId: "acme",
region: "eu",
},
payload: {
provider: "anthropic",
model: "claude-sonnet-4",
},
}
```
An OTel adapter can map this to span attributes. A Sentry adapter can map it to Sentry context/spans. A custom user can log JSON.
## Package story
Minimal initial package:
```text
packages/observability
runtime-agnostic context + traceOperation + subscribe
```
Then:
```text
packages/ai
emits pi.ai.* events
packages/agent
emits pi.agent.* / pi.session.* events
```
Optional later:
```text
packages/observability-node
AsyncLocalStorage + diagnostics_channel bridge
packages/otel
subscribes to pi events and creates OpenTelemetry spans
```
## Thesis
Pi defines a stable, safe event contract. Adapters define where events go.
This makes ai/harness observable without binding core packages to OTel, Sentry, Node-only APIs, or monkey-patching.