mirror of
https://github.com/badlogic/pi-mono.git
synced 2026-05-26 07:44:27 +00:00
376 lines
8.4 KiB
Markdown
376 lines
8.4 KiB
Markdown
<!-- Synced from jot qe0ikdqs. Edit this file in-repo going forward. -->
|
|
|
|
# Pi Observability Design Notes
|
|
|
|
## Goal
|
|
|
|
Make `packages/ai` and `packages/agent`/harness observable without depending on OpenTelemetry, Sentry, or any APM vendor.
|
|
|
|
Pi should emit stable, structured lifecycle events. External listeners can convert those events into OTel spans, Sentry spans, logs, metrics, or custom telemetry.
|
|
|
|
## Mental model
|
|
|
|
A trace is one causal tree of work, e.g. one user turn.
|
|
|
|
A span is one timed operation in that tree. It is normally represented by IDs, not object pointers:
|
|
|
|
```ts
|
|
interface SpanRecord {
|
|
traceId: string;
|
|
spanId: string;
|
|
parentSpanId?: string;
|
|
name: string;
|
|
startTime: number;
|
|
endTime?: number;
|
|
attributes: Record<string, unknown>;
|
|
status: "ok" | "error";
|
|
}
|
|
```
|
|
|
|
Example tree:
|
|
|
|
```text
|
|
traceId=t1 spanId=s1 parent=- name=pi.agent.prompt
|
|
traceId=t1 spanId=s2 parent=s1 name=pi.agent.turn
|
|
traceId=t1 spanId=s3 parent=s2 name=pi.ai.provider.request
|
|
traceId=t1 spanId=s4 parent=s2 name=pi.agent.tool_call
|
|
traceId=t1 spanId=s5 parent=s4 name=pi.session.append_entry
|
|
```
|
|
|
|
## Async context
|
|
|
|
JavaScript has one event loop but multiple async chains can interleave. A single global `currentContext` breaks under concurrency.
|
|
|
|
`AsyncLocalStorage` is the Node equivalent of `ThreadLocal` for async continuations. It lets concurrent operations keep distinct current contexts:
|
|
|
|
```ts
|
|
await Promise.all([
|
|
runWithPiContext({ userId: "alice" }, () => harness.prompt("A")),
|
|
runWithPiContext({ userId: "bob" }, () => harness.prompt("B")),
|
|
]);
|
|
```
|
|
|
|
Deep code can then read the correct current context for the active async chain.
|
|
|
|
Pi must run in Node, Bun, browser, workers, and other JS runtimes, so ALS cannot be the core abstraction. It should be a runtime adapter.
|
|
|
|
## Core design
|
|
|
|
Pi owns a small runtime-agnostic observability abstraction:
|
|
|
|
```ts
|
|
export interface PiObservabilityContext {
|
|
traceId?: string;
|
|
currentSpanId?: string;
|
|
userContext?: Record<string, unknown>;
|
|
}
|
|
|
|
export interface PiObservabilityEvent {
|
|
type: "start" | "end" | "error" | "event";
|
|
name: string;
|
|
traceId: string;
|
|
spanId?: string;
|
|
parentSpanId?: string;
|
|
timestamp: number;
|
|
durationMs?: number;
|
|
context?: Record<string, unknown>;
|
|
payload?: Record<string, unknown>;
|
|
error?: { name: string; message: string };
|
|
}
|
|
|
|
export interface PiObservability {
|
|
getContext(): PiObservabilityContext | undefined;
|
|
runWithContext<T>(context: PiObservabilityContext, fn: () => T): T;
|
|
emit(event: PiObservabilityEvent): void;
|
|
hasSubscribers(): boolean;
|
|
}
|
|
```
|
|
|
|
Public API:
|
|
|
|
```ts
|
|
export function configurePiObservability(observability: PiObservability): void;
|
|
export function subscribePiObservability(listener: (event: PiObservabilityEvent) => void): () => void;
|
|
export function runWithPiContext<T>(userContext: Record<string, unknown>, fn: () => T): T;
|
|
export function traceOperation<T>(name: string, payload: Record<string, unknown>, fn: () => T): T;
|
|
```
|
|
|
|
`traceOperation()`:
|
|
|
|
1. reads the current context
|
|
2. creates `traceId` if missing
|
|
3. creates a new `spanId`
|
|
4. uses current span as `parentSpanId`
|
|
5. emits `start`
|
|
6. runs callback under child context
|
|
7. emits `end` or `error`
|
|
8. rethrows on error
|
|
|
|
Pseudo-code:
|
|
|
|
```ts
|
|
function traceOperation<T>(name: string, payload: Record<string, unknown>, fn: () => T): T {
|
|
const parent = getContext();
|
|
const traceId = parent?.traceId ?? createId();
|
|
const spanId = createId();
|
|
const parentSpanId = parent?.currentSpanId;
|
|
|
|
const child = { ...parent, traceId, currentSpanId: spanId };
|
|
|
|
emit({ type: "start", name, traceId, spanId, parentSpanId, timestamp: Date.now(), context: parent?.userContext, payload });
|
|
|
|
return runWithContext(child, () => {
|
|
try {
|
|
const result = fn();
|
|
// Promise-aware implementation emits end/error after settlement.
|
|
emit({ type: "end", name, traceId, spanId, parentSpanId, timestamp: Date.now(), context: child.userContext, payload });
|
|
return result;
|
|
} catch (error) {
|
|
emit({ type: "error", name, traceId, spanId, parentSpanId, timestamp: Date.now(), context: child.userContext, payload, error: serializeError(error) });
|
|
throw error;
|
|
}
|
|
});
|
|
}
|
|
```
|
|
|
|
## Runtime adapters
|
|
|
|
Core packages should not import Node-only APIs.
|
|
|
|
Possible implementations:
|
|
|
|
- Node adapter: `AsyncLocalStorage` for context, optional `diagnostics_channel` publishing.
|
|
- Browser/workers fallback: local subscriber set and limited/manual context propagation.
|
|
- Bun/Deno adapters: use runtime-specific async context if available.
|
|
|
|
For Node, diagnostics channels can be used as a passive event bus:
|
|
|
|
```ts
|
|
import { channel } from "diagnostics_channel";
|
|
channel("pi.observability").publish(event);
|
|
```
|
|
|
|
Subscribers can create OTel/Sentry spans without monkey-patching pi.
|
|
|
|
## What pi emits
|
|
|
|
Pi emits what happened. It does not create OTel/Sentry spans directly.
|
|
|
|
Initial minimal event names:
|
|
|
|
```text
|
|
pi.agent.prompt
|
|
pi.agent.skill
|
|
pi.agent.prompt_template
|
|
pi.agent.compaction
|
|
pi.agent.branch_navigation
|
|
pi.agent.session.append_entry
|
|
pi.ai.provider.request
|
|
```
|
|
|
|
Each operation emits:
|
|
|
|
```text
|
|
start
|
|
end
|
|
error
|
|
```
|
|
|
|
Later additions:
|
|
|
|
```text
|
|
pi.agent.turn
|
|
pi.agent.tool_call
|
|
pi.agent.queue_update
|
|
pi.ai.provider.retry
|
|
pi.ai.provider.first_token
|
|
pi.ai.provider.usage
|
|
pi.session.read
|
|
pi.session.write
|
|
```
|
|
|
|
## Minimal instrumentation points
|
|
|
|
### packages/agent
|
|
|
|
Wrap:
|
|
|
|
- `AgentHarness.prompt()`
|
|
- `AgentHarness.skill()`
|
|
- `AgentHarness.promptFromTemplate()`
|
|
- `AgentHarness.compact()`
|
|
- `AgentHarness.navigateTree()`
|
|
- `Session.appendTypedEntry()` or storage append facade
|
|
|
|
Example:
|
|
|
|
```ts
|
|
return traceOperation(
|
|
"pi.agent.prompt",
|
|
{
|
|
sessionId: turnState.sessionId,
|
|
provider: turnState.model.provider,
|
|
model: turnState.model.id,
|
|
promptLength: text.length,
|
|
imageCount: options?.images?.length ?? 0,
|
|
},
|
|
() => this.executeTurn(turnState, text, options),
|
|
);
|
|
```
|
|
|
|
Session write:
|
|
|
|
```ts
|
|
return traceOperation(
|
|
"pi.agent.session.append_entry",
|
|
{ entryType: entry.type },
|
|
async () => {
|
|
await this.unwrap(this.storage.appendEntry(entry));
|
|
return entry.id;
|
|
},
|
|
);
|
|
```
|
|
|
|
### packages/ai
|
|
|
|
Wrap common provider boundaries:
|
|
|
|
- `streamSimple()`
|
|
- `completeSimple()`
|
|
|
|
Example:
|
|
|
|
```ts
|
|
return traceOperation(
|
|
"pi.ai.provider.request",
|
|
{
|
|
api: model.api,
|
|
provider: model.provider,
|
|
model: model.id,
|
|
sessionId: options.sessionId,
|
|
reasoning: options.reasoning,
|
|
},
|
|
() => actualStreamSimple(model, context, options),
|
|
);
|
|
```
|
|
|
|
End/error payloads can include safe metadata:
|
|
|
|
- stop reason
|
|
- status code
|
|
- retry count
|
|
- input/output/total tokens
|
|
- cost total
|
|
- aborted/timeout flag
|
|
|
|
## Safety and redaction
|
|
|
|
Default payloads must be safe.
|
|
|
|
Safe by default:
|
|
|
|
- provider
|
|
- model
|
|
- API identifier
|
|
- session id
|
|
- entry type
|
|
- tool name
|
|
- status code
|
|
- stop reason
|
|
- token counts
|
|
- costs
|
|
- durations
|
|
|
|
Unsafe by default:
|
|
|
|
- prompts
|
|
- completions
|
|
- tool args
|
|
- tool results
|
|
- shell output
|
|
- file contents
|
|
- provider request payloads
|
|
- provider response bodies
|
|
- API keys
|
|
- headers
|
|
|
|
Content capture can be opt-in later with explicit redaction hooks.
|
|
|
|
## Listener behavior
|
|
|
|
Observability must never affect pi execution.
|
|
|
|
Subscriber errors should be swallowed or isolated. Harness hooks are control-plane and may affect execution; observability subscribers are passive and must not.
|
|
|
|
## User context
|
|
|
|
Users can associate arbitrary context with a turn:
|
|
|
|
```ts
|
|
await runWithPiContext(
|
|
{
|
|
userId: "u123",
|
|
orgId: "acme",
|
|
region: "eu",
|
|
},
|
|
() => harness.prompt("fix this"),
|
|
);
|
|
```
|
|
|
|
Every emitted event inside that async chain includes the context:
|
|
|
|
```ts
|
|
{
|
|
type: "start",
|
|
name: "pi.ai.provider.request",
|
|
traceId: "t1",
|
|
spanId: "s3",
|
|
parentSpanId: "s1",
|
|
context: {
|
|
userId: "u123",
|
|
orgId: "acme",
|
|
region: "eu",
|
|
},
|
|
payload: {
|
|
provider: "anthropic",
|
|
model: "claude-sonnet-4",
|
|
},
|
|
}
|
|
```
|
|
|
|
An OTel adapter can map this to span attributes. A Sentry adapter can map it to Sentry context/spans. A custom user can log JSON.
|
|
|
|
## Package story
|
|
|
|
Minimal initial package:
|
|
|
|
```text
|
|
packages/observability
|
|
runtime-agnostic context + traceOperation + subscribe
|
|
```
|
|
|
|
Then:
|
|
|
|
```text
|
|
packages/ai
|
|
emits pi.ai.* events
|
|
|
|
packages/agent
|
|
emits pi.agent.* / pi.session.* events
|
|
```
|
|
|
|
Optional later:
|
|
|
|
```text
|
|
packages/observability-node
|
|
AsyncLocalStorage + diagnostics_channel bridge
|
|
|
|
packages/otel
|
|
subscribes to pi events and creates OpenTelemetry spans
|
|
```
|
|
|
|
## Thesis
|
|
|
|
Pi defines a stable, safe event contract. Adapters define where events go.
|
|
|
|
This makes ai/harness observable without binding core packages to OTel, Sentry, Node-only APIs, or monkey-patching.
|