pi-mono/packages/agent
2026-04-24 14:20:18 +02:00
..
src fix(ai): support xhigh for Codex GPT-5.5 2026-04-23 22:49:09 +02:00
test fix(typebox): migrate to v1 with extension compat (#3474) 2026-04-22 19:59:33 +02:00
CHANGELOG.md Add [Unreleased] section for next cycle 2026-04-24 14:20:18 +02:00
package.json Release v0.70.2 2026-04-24 14:19:20 +02:00
README.md fix(coding-agent,tui): drop typebox compiler shim and fix progress 2026-04-22 21:12:20 +02:00
tsconfig.build.json Agent package + coding agent WIP, refactored web-ui prompts 2025-10-17 11:47:01 +02:00
vitest.config.ts Simplify compaction: remove proactive abort, use Agent.continue() for retry 2025-12-09 21:43:49 +01:00

@mariozechner/pi-agent-core

Stateful agent with tool execution and event streaming. Built on @mariozechner/pi-ai.

Installation

npm install @mariozechner/pi-agent-core

Quick Start

import { Agent } from "@mariozechner/pi-agent-core";
import { getModel } from "@mariozechner/pi-ai";

const agent = new Agent({
  initialState: {
    systemPrompt: "You are a helpful assistant.",
    model: getModel("anthropic", "claude-sonnet-4-20250514"),
  },
});

agent.subscribe((event) => {
  if (event.type === "message_update" && event.assistantMessageEvent.type === "text_delta") {
    // Stream just the new text chunk
    process.stdout.write(event.assistantMessageEvent.delta);
  }
});

await agent.prompt("Hello!");

Core Concepts

AgentMessage vs LLM Message

The agent works with AgentMessage, a flexible type that can include:

  • Standard LLM messages (user, assistant, toolResult)
  • Custom app-specific message types via declaration merging

LLMs only understand user, assistant, and toolResult. The convertToLlm function bridges this gap by filtering and transforming messages before each LLM call.

Message Flow

AgentMessage[] → transformContext() → AgentMessage[] → convertToLlm() → Message[] → LLM
                    (optional)                           (required)
  1. transformContext: Prune old messages, inject external context
  2. convertToLlm: Filter out UI-only messages, convert custom types to LLM format

Event Flow

The agent emits events for UI updates. Understanding the event sequence helps build responsive interfaces.

prompt() Event Sequence

When you call prompt("Hello"):

prompt("Hello")
├─ agent_start
├─ turn_start
├─ message_start   { message: userMessage }      // Your prompt
├─ message_end     { message: userMessage }
├─ message_start   { message: assistantMessage } // LLM starts responding
├─ message_update  { message: partial... }       // Streaming chunks
├─ message_update  { message: partial... }
├─ message_end     { message: assistantMessage } // Complete response
├─ turn_end        { message, toolResults: [] }
└─ agent_end       { messages: [...] }

With Tool Calls

If the assistant calls tools, the loop continues:

prompt("Read config.json")
├─ agent_start
├─ turn_start
├─ message_start/end  { userMessage }
├─ message_start      { assistantMessage with toolCall }
├─ message_update...
├─ message_end        { assistantMessage }
├─ tool_execution_start  { toolCallId, toolName, args }
├─ tool_execution_update { partialResult }           // If tool streams
├─ tool_execution_end    { toolCallId, result }
├─ message_start/end  { toolResultMessage }
├─ turn_end           { message, toolResults: [toolResult] }
│
├─ turn_start                                        // Next turn
├─ message_start      { assistantMessage }           // LLM responds to tool result
├─ message_update...
├─ message_end
├─ turn_end
└─ agent_end

Tool execution mode is configurable:

  • parallel (default): preflight tool calls sequentially, execute allowed tools concurrently, emit tool_execution_end as soon as each tool is finalized, then emit toolResult messages and turn_end.toolResults in assistant source order
  • sequential: execute tool calls one by one, matching the historical behavior

In parallel mode, tool completion events follow tool completion order, but persisted toolResult messages still follow assistant source order.

The mode can be set globally via toolExecution in the agent config, or per-tool via executionMode on AgentTool. If any tool call in a batch targets a tool with executionMode: "sequential", the entire batch executes sequentially regardless of the global setting.

The beforeToolCall hook runs after tool_execution_start and validated argument parsing. It can block execution. The afterToolCall hook runs after tool execution finishes and before tool_execution_end and final tool result message events are emitted.

Tools can also return terminate: true to hint that the automatic follow-up LLM call should be skipped. The loop only stops early when every finalized tool result in that batch sets terminate: true. Mixed batches continue normally.

When you use the Agent class, assistant message_end processing is treated as a barrier before tool preflight begins. That means beforeToolCall sees agent state that already includes the assistant message that requested the tool call.

continue() Event Sequence

continue() resumes from existing context without adding a new message. Use it for retries after errors.

// After an error, retry from current state
await agent.continue();

The last message in context must be user or toolResult (not assistant).

Event Types

Event Description
agent_start Agent begins processing
agent_end Final event for the run. Awaited subscribers for this event still count toward settlement
turn_start New turn begins (one LLM call + tool executions)
turn_end Turn completes with assistant message and tool results
message_start Any message begins (user, assistant, toolResult)
message_update Assistant only. Includes assistantMessageEvent with delta
message_end Message completes
tool_execution_start Tool begins
tool_execution_update Tool streams progress
tool_execution_end Tool completes

Agent.subscribe() listeners are awaited in registration order. agent_end means no more loop events will be emitted, but await agent.waitForIdle() and await agent.prompt(...) only settle after awaited agent_end listeners finish.

Agent Options

const agent = new Agent({
  // Initial state
  initialState: {
    systemPrompt: string,
    model: Model<any>,
    thinkingLevel: "off" | "minimal" | "low" | "medium" | "high" | "xhigh",
    tools: AgentTool<any>[],
    messages: AgentMessage[],
  },

  // Convert AgentMessage[] to LLM Message[] (required for custom message types)
  convertToLlm: (messages) => messages.filter(...),

  // Transform context before convertToLlm (for pruning, compaction)
  transformContext: async (messages, signal) => pruneOldMessages(messages),

  // Steering mode: "one-at-a-time" (default) or "all"
  steeringMode: "one-at-a-time",

  // Follow-up mode: "one-at-a-time" (default) or "all"
  followUpMode: "one-at-a-time",

  // Custom stream function (for proxy backends)
  streamFn: streamProxy,

  // Session ID for provider caching
  sessionId: "session-123",

  // Dynamic API key resolution (for expiring OAuth tokens)
  getApiKey: async (provider) => refreshToken(),

  // Tool execution mode: "parallel" (default) or "sequential"
  toolExecution: "parallel",

  // Preflight each tool call after args are validated. Can block execution.
  beforeToolCall: async ({ toolCall, args, context }) => {
    if (toolCall.name === "bash") {
      return { block: true, reason: "bash is disabled" };
    }
  },

  // Postprocess each tool result before final tool events are emitted.
  afterToolCall: async ({ toolCall, result, isError, context }) => {
    if (toolCall.name === "notify_done" && !isError) {
      return { terminate: true };
    }
    if (!isError) {
      return { details: { ...result.details, audited: true } };
    }
  },

  // Custom thinking budgets for token-based providers
  thinkingBudgets: {
    minimal: 128,
    low: 512,
    medium: 1024,
    high: 2048,
  },
});

Agent State

interface AgentState {
  systemPrompt: string;
  model: Model<any>;
  thinkingLevel: ThinkingLevel;
  tools: AgentTool<any>[];
  messages: AgentMessage[];
  readonly isStreaming: boolean;
  readonly streamingMessage?: AgentMessage;
  readonly pendingToolCalls: ReadonlySet<string>;
  readonly errorMessage?: string;
}

Access state via agent.state.

Assigning agent.state.tools = [...] or agent.state.messages = [...] copies the top-level array before storing it. Mutating the returned array mutates the current agent state.

During streaming, agent.state.streamingMessage contains the current partial assistant message.

agent.state.isStreaming remains true until the run fully settles, including awaited agent_end subscribers.

Methods

Prompting

// Text prompt
await agent.prompt("Hello");

// With images
await agent.prompt("What's in this image?", [
  { type: "image", data: base64Data, mimeType: "image/jpeg" }
]);

// AgentMessage directly
await agent.prompt({ role: "user", content: "Hello", timestamp: Date.now() });

// Continue from current context (last message must be user or toolResult)
await agent.continue();

State Management

agent.state.systemPrompt = "New prompt";
agent.state.model = getModel("openai", "gpt-4o");
agent.state.thinkingLevel = "medium";
agent.state.tools = [myTool];
agent.toolExecution = "sequential";
agent.beforeToolCall = async ({ toolCall }) => undefined;
agent.afterToolCall = async ({ toolCall, result }) => undefined;
agent.state.messages = newMessages; // top-level array is copied
agent.state.messages.push(message);
agent.reset();

Session and Thinking Budgets

agent.sessionId = "session-123";

agent.thinkingBudgets = {
  minimal: 128,
  low: 512,
  medium: 1024,
  high: 2048,
};

Control

agent.abort();           // Cancel current operation
await agent.waitForIdle(); // Wait for completion

Events

const unsubscribe = agent.subscribe(async (event, signal) => {
  if (event.type === "agent_end") {
    // Final barrier work for the run
    await flushSessionState(signal);
  }
});
unsubscribe();

Steering and Follow-up

Steering messages let you interrupt the agent while tools are running. Follow-up messages let you queue work after the agent would otherwise stop.

agent.steeringMode = "one-at-a-time";
agent.followUpMode = "one-at-a-time";

// While agent is running tools
agent.steer({
  role: "user",
  content: "Stop! Do this instead.",
  timestamp: Date.now(),
});

// After the agent finishes its current work
agent.followUp({
  role: "user",
  content: "Also summarize the result.",
  timestamp: Date.now(),
});

const steeringMode = agent.steeringMode;
const followUpMode = agent.followUpMode;

agent.clearSteeringQueue();
agent.clearFollowUpQueue();
agent.clearAllQueues();

Use clearSteeringQueue, clearFollowUpQueue, or clearAllQueues to drop queued messages.

When steering messages are detected after a turn completes:

  1. All tool calls from the current assistant message have already finished
  2. Steering messages are injected
  3. The LLM responds on the next turn

Follow-up messages are checked only when there are no more tool calls and no steering messages. If any are queued, they are injected and another turn runs.

Custom Message Types

Extend AgentMessage via declaration merging:

declare module "@mariozechner/pi-agent-core" {
  interface CustomAgentMessages {
    notification: { role: "notification"; text: string; timestamp: number };
  }
}

// Now valid
const msg: AgentMessage = { role: "notification", text: "Info", timestamp: Date.now() };

Handle custom types in convertToLlm:

const agent = new Agent({
  convertToLlm: (messages) => messages.flatMap(m => {
    if (m.role === "notification") return []; // Filter out
    return [m];
  }),
});

Tools

Define tools using AgentTool:

import { Type } from "typebox";

const readFileTool: AgentTool = {
  name: "read_file",
  label: "Read File",  // For UI display
  description: "Read a file's contents",
  parameters: Type.Object({
    path: Type.String({ description: "File path" }),
  }),
  // Override execution mode for this tool (optional).
  // "sequential" forces the entire batch to run one at a time.
  // "parallel" allows concurrent execution with other tool calls.
  // If omitted, the global toolExecution config applies.
  executionMode: "sequential",
  execute: async (toolCallId, params, signal, onUpdate) => {
    const content = await fs.readFile(params.path, "utf-8");

    // Optional: stream progress
    onUpdate?.({ content: [{ type: "text", text: "Reading..." }], details: {} });

    // Optional: add `terminate: true` here to skip the automatic follow-up LLM call
    // when every finalized tool result in the batch does the same.
    return {
      content: [{ type: "text", text: content }],
      details: { path: params.path, size: content.length },
    };
  },
};

agent.state.tools = [readFileTool];

Error Handling

Throw an error when a tool fails. Do not return error messages as content.

execute: async (toolCallId, params, signal, onUpdate) => {
  if (!fs.existsSync(params.path)) {
    throw new Error(`File not found: ${params.path}`);
  }
  // Return content only on success
  return { content: [{ type: "text", text: "..." }] };
}

Thrown errors are caught by the agent and reported to the LLM as tool errors with isError: true.

Return terminate: true from execute() or afterToolCall to hint that the agent should stop after the current tool batch. This only takes effect when every finalized tool result in the batch is terminating. The hint is runtime-only; emitted toolResult transcript messages remain standard LLM tool results.

Proxy Usage

For browser apps that proxy through a backend:

import { Agent, streamProxy } from "@mariozechner/pi-agent-core";

const agent = new Agent({
  streamFn: (model, context, options) =>
    streamProxy(model, context, {
      ...options,
      authToken: "...",
      proxyUrl: "https://your-server.com",
    }),
});

Low-Level API

For direct control without the Agent class:

import { agentLoop, agentLoopContinue } from "@mariozechner/pi-agent-core";

const context: AgentContext = {
  systemPrompt: "You are helpful.",
  messages: [],
  tools: [],
};

const config: AgentLoopConfig = {
  model: getModel("openai", "gpt-4o"),
  convertToLlm: (msgs) => msgs.filter(m => ["user", "assistant", "toolResult"].includes(m.role)),
  toolExecution: "parallel",  // overridden by per-tool executionMode if set
  beforeToolCall: async ({ toolCall, args, context }) => undefined,
  afterToolCall: async ({ toolCall, result, isError, context }) => undefined,
};

const userMessage = { role: "user", content: "Hello", timestamp: Date.now() };

for await (const event of agentLoop([userMessage], context, config)) {
  console.log(event.type);
}

// Continue from existing context
for await (const event of agentLoopContinue(context, config)) {
  console.log(event.type);
}

These low-level streams are observational. They preserve event order, but they do not wait for your async event handling to settle before later producer phases continue. If you need message processing to act as a barrier before tool preflight, use the Agent class instead of raw agentLoop() or agentLoopContinue().

License

MIT