import { type AssistantMessage, type AssistantMessageEvent, EventStream, type Message, type Model, type UserMessage, } from "@earendil-works/pi-ai"; import { Type } from "typebox"; import { describe, expect, it } from "vitest"; import { agentLoop, agentLoopContinue } from "../src/agent-loop.js"; import type { AgentContext, AgentEvent, AgentLoopConfig, AgentMessage, AgentTool } from "../src/types.js"; // Mock stream for testing - mimics MockAssistantStream class MockAssistantStream extends EventStream { constructor() { super( (event) => event.type === "done" || event.type === "error", (event) => { if (event.type === "done") return event.message; if (event.type === "error") return event.error; throw new Error("Unexpected event type"); }, ); } } function createUsage() { return { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0, cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, }; } function createModel(): Model<"openai-responses"> { return { id: "mock", name: "mock", api: "openai-responses", provider: "openai", baseUrl: "https://example.invalid", reasoning: false, input: ["text"], cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, contextWindow: 8192, maxTokens: 2048, }; } function createAssistantMessage( content: AssistantMessage["content"], stopReason: AssistantMessage["stopReason"] = "stop", ): AssistantMessage { return { role: "assistant", content, api: "openai-responses", provider: "openai", model: "mock", usage: createUsage(), stopReason, timestamp: Date.now(), }; } function createUserMessage(text: string): UserMessage { return { role: "user", content: text, timestamp: Date.now(), }; } // Simple identity converter for tests - just passes through standard messages function identityConverter(messages: AgentMessage[]): Message[] { return messages.filter((m) => m.role === "user" || m.role === "assistant" || m.role === "toolResult") as Message[]; } describe("agentLoop with AgentMessage", () => { it("should emit events with AgentMessage types", async () => { const context: AgentContext = { systemPrompt: "You are helpful.", messages: [], tools: [], }; const userPrompt: AgentMessage = createUserMessage("Hello"); const config: AgentLoopConfig = { model: createModel(), convertToLlm: identityConverter, }; const streamFn = () => { const stream = new MockAssistantStream(); queueMicrotask(() => { const message = createAssistantMessage([{ type: "text", text: "Hi there!" }]); stream.push({ type: "done", reason: "stop", message }); }); return stream; }; const events: AgentEvent[] = []; const stream = agentLoop([userPrompt], context, config, undefined, streamFn); for await (const event of stream) { events.push(event); } const messages = await stream.result(); // Should have user message and assistant message expect(messages.length).toBe(2); expect(messages[0].role).toBe("user"); expect(messages[1].role).toBe("assistant"); // Verify event sequence const eventTypes = events.map((e) => e.type); expect(eventTypes).toContain("agent_start"); expect(eventTypes).toContain("turn_start"); expect(eventTypes).toContain("message_start"); expect(eventTypes).toContain("message_end"); expect(eventTypes).toContain("turn_end"); expect(eventTypes).toContain("agent_end"); }); it("should handle custom message types via convertToLlm", async () => { // Create a custom message type interface CustomNotification { role: "notification"; text: string; timestamp: number; } const notification: CustomNotification = { role: "notification", text: "This is a notification", timestamp: Date.now(), }; const context: AgentContext = { systemPrompt: "You are helpful.", messages: [notification as unknown as AgentMessage], // Custom message in context tools: [], }; const userPrompt: AgentMessage = createUserMessage("Hello"); let convertedMessages: Message[] = []; const config: AgentLoopConfig = { model: createModel(), convertToLlm: (messages) => { // Filter out notifications, convert rest convertedMessages = messages .filter((m) => (m as { role: string }).role !== "notification") .filter((m) => m.role === "user" || m.role === "assistant" || m.role === "toolResult") as Message[]; return convertedMessages; }, }; const streamFn = () => { const stream = new MockAssistantStream(); queueMicrotask(() => { const message = createAssistantMessage([{ type: "text", text: "Response" }]); stream.push({ type: "done", reason: "stop", message }); }); return stream; }; const events: AgentEvent[] = []; const stream = agentLoop([userPrompt], context, config, undefined, streamFn); for await (const event of stream) { events.push(event); } // The notification should have been filtered out in convertToLlm expect(convertedMessages.length).toBe(1); // Only user message expect(convertedMessages[0].role).toBe("user"); }); it("should apply transformContext before convertToLlm", async () => { const context: AgentContext = { systemPrompt: "You are helpful.", messages: [ createUserMessage("old message 1"), createAssistantMessage([{ type: "text", text: "old response 1" }]), createUserMessage("old message 2"), createAssistantMessage([{ type: "text", text: "old response 2" }]), ], tools: [], }; const userPrompt: AgentMessage = createUserMessage("new message"); let transformedMessages: AgentMessage[] = []; let convertedMessages: Message[] = []; const config: AgentLoopConfig = { model: createModel(), transformContext: async (messages) => { // Keep only last 2 messages (prune old ones) transformedMessages = messages.slice(-2); return transformedMessages; }, convertToLlm: (messages) => { convertedMessages = messages.filter( (m) => m.role === "user" || m.role === "assistant" || m.role === "toolResult", ) as Message[]; return convertedMessages; }, }; const streamFn = () => { const stream = new MockAssistantStream(); queueMicrotask(() => { const message = createAssistantMessage([{ type: "text", text: "Response" }]); stream.push({ type: "done", reason: "stop", message }); }); return stream; }; const stream = agentLoop([userPrompt], context, config, undefined, streamFn); for await (const _ of stream) { // consume } // transformContext should have been called first, keeping only last 2 expect(transformedMessages.length).toBe(2); // Then convertToLlm receives the pruned messages expect(convertedMessages.length).toBe(2); }); it("should handle tool calls and results", async () => { const toolSchema = Type.Object({ value: Type.String() }); const executed: string[] = []; const tool: AgentTool = { name: "echo", label: "Echo", description: "Echo tool", parameters: toolSchema, async execute(_toolCallId, params) { executed.push(params.value); return { content: [{ type: "text", text: `echoed: ${params.value}` }], details: { value: params.value }, }; }, }; const context: AgentContext = { systemPrompt: "", messages: [], tools: [tool], }; const userPrompt: AgentMessage = createUserMessage("echo something"); const config: AgentLoopConfig = { model: createModel(), convertToLlm: identityConverter, }; let callIndex = 0; const streamFn = () => { const stream = new MockAssistantStream(); queueMicrotask(() => { if (callIndex === 0) { // First call: return tool call const message = createAssistantMessage( [{ type: "toolCall", id: "tool-1", name: "echo", arguments: { value: "hello" } }], "toolUse", ); stream.push({ type: "done", reason: "toolUse", message }); } else { // Second call: return final response const message = createAssistantMessage([{ type: "text", text: "done" }]); stream.push({ type: "done", reason: "stop", message }); } callIndex++; }); return stream; }; const events: AgentEvent[] = []; const stream = agentLoop([userPrompt], context, config, undefined, streamFn); for await (const event of stream) { events.push(event); } // Tool should have been executed expect(executed).toEqual(["hello"]); // Should have tool execution events const toolStart = events.find((e) => e.type === "tool_execution_start"); const toolEnd = events.find((e) => e.type === "tool_execution_end"); expect(toolStart).toBeDefined(); expect(toolEnd).toBeDefined(); if (toolEnd?.type === "tool_execution_end") { expect(toolEnd.isError).toBe(false); } }); it("should execute mutated beforeToolCall args without revalidation", async () => { const toolSchema = Type.Object({ value: Type.String() }); const executed: Array = []; const tool: AgentTool = { name: "echo", label: "Echo", description: "Echo tool", parameters: toolSchema, async execute(_toolCallId, params) { executed.push(params.value as string | number); return { content: [{ type: "text", text: `echoed: ${String(params.value)}` }], details: { value: params.value as string | number }, }; }, }; const context: AgentContext = { systemPrompt: "", messages: [], tools: [tool], }; const userPrompt: AgentMessage = createUserMessage("echo something"); const config: AgentLoopConfig = { model: createModel(), convertToLlm: identityConverter, beforeToolCall: async ({ args }) => { const mutableArgs = args as { value: string | number }; mutableArgs.value = 123; return undefined; }, }; let callIndex = 0; const streamFn = () => { const stream = new MockAssistantStream(); queueMicrotask(() => { if (callIndex === 0) { const message = createAssistantMessage( [{ type: "toolCall", id: "tool-1", name: "echo", arguments: { value: "hello" } }], "toolUse", ); stream.push({ type: "done", reason: "toolUse", message }); } else { const message = createAssistantMessage([{ type: "text", text: "done" }]); stream.push({ type: "done", reason: "stop", message }); } callIndex++; }); return stream; }; const stream = agentLoop([userPrompt], context, config, undefined, streamFn); for await (const _event of stream) { // consume } expect(executed).toEqual([123]); }); it("should prepare tool arguments for validation", async () => { const replaceSchema = Type.Object({ oldText: Type.String(), newText: Type.String() }); const toolSchema = Type.Object({ edits: Type.Array(replaceSchema) }); const executed: Array> = []; const tool: AgentTool = { name: "edit", label: "Edit", description: "Edit tool", parameters: toolSchema, prepareArguments(args) { if (!args || typeof args !== "object") { return args as { edits: { oldText: string; newText: string }[] }; } const input = args as { edits?: Array<{ oldText: string; newText: string }>; oldText?: string; newText?: string; }; if (typeof input.oldText !== "string" || typeof input.newText !== "string") { return args as { edits: { oldText: string; newText: string }[] }; } return { edits: [...(input.edits ?? []), { oldText: input.oldText, newText: input.newText }], }; }, async execute(_toolCallId, params) { executed.push(params.edits); return { content: [{ type: "text", text: `edited ${params.edits.length}` }], details: { count: params.edits.length }, }; }, }; const context: AgentContext = { systemPrompt: "", messages: [], tools: [tool], }; const userPrompt: AgentMessage = createUserMessage("edit something"); const config: AgentLoopConfig = { model: createModel(), convertToLlm: identityConverter, }; let callIndex = 0; const streamFn = () => { const stream = new MockAssistantStream(); queueMicrotask(() => { if (callIndex === 0) { const message = createAssistantMessage( [ { type: "toolCall", id: "tool-1", name: "edit", arguments: { oldText: "before", newText: "after" }, }, ], "toolUse", ); stream.push({ type: "done", reason: "toolUse", message }); } else { const message = createAssistantMessage([{ type: "text", text: "done" }]); stream.push({ type: "done", reason: "stop", message }); } callIndex++; }); return stream; }; const stream = agentLoop([userPrompt], context, config, undefined, streamFn); for await (const _event of stream) { // consume } expect(executed).toEqual([[{ oldText: "before", newText: "after" }]]); }); it("should emit tool_execution_end in completion order but persist tool results in source order", async () => { const toolSchema = Type.Object({ value: Type.String() }); let firstResolved = false; let parallelObserved = false; let releaseFirst: (() => void) | undefined; const firstDone = new Promise((resolve) => { releaseFirst = resolve; }); const tool: AgentTool = { name: "echo", label: "Echo", description: "Echo tool", parameters: toolSchema, async execute(_toolCallId, params) { if (params.value === "first") { await firstDone; firstResolved = true; } if (params.value === "second" && !firstResolved) { parallelObserved = true; } return { content: [{ type: "text", text: `echoed: ${params.value}` }], details: { value: params.value }, }; }, }; const context: AgentContext = { systemPrompt: "", messages: [], tools: [tool], }; const userPrompt: AgentMessage = createUserMessage("echo both"); const config: AgentLoopConfig = { model: createModel(), convertToLlm: identityConverter, toolExecution: "parallel", }; let callIndex = 0; const stream = agentLoop([userPrompt], context, config, undefined, () => { const mockStream = new MockAssistantStream(); queueMicrotask(() => { if (callIndex === 0) { const message = createAssistantMessage( [ { type: "toolCall", id: "tool-1", name: "echo", arguments: { value: "first" } }, { type: "toolCall", id: "tool-2", name: "echo", arguments: { value: "second" } }, ], "toolUse", ); mockStream.push({ type: "done", reason: "toolUse", message }); setTimeout(() => releaseFirst?.(), 20); } else { const message = createAssistantMessage([{ type: "text", text: "done" }]); mockStream.push({ type: "done", reason: "stop", message }); } callIndex++; }); return mockStream; }); const events: AgentEvent[] = []; for await (const event of stream) { events.push(event); } const toolExecutionEndIds = events.flatMap((event) => { if (event.type !== "tool_execution_end") { return []; } return [event.toolCallId]; }); const toolResultIds = events.flatMap((event) => { if (event.type !== "message_end" || event.message.role !== "toolResult") { return []; } return [event.message.toolCallId]; }); const turnToolResultIds = events.flatMap((event) => { if (event.type !== "turn_end") { return []; } return event.toolResults.map((toolResult) => toolResult.toolCallId); }); expect(parallelObserved).toBe(true); expect(toolExecutionEndIds).toEqual(["tool-2", "tool-1"]); expect(toolResultIds).toEqual(["tool-1", "tool-2"]); expect(turnToolResultIds).toEqual(["tool-1", "tool-2"]); }); it("should inject queued messages after all tool calls complete", async () => { const toolSchema = Type.Object({ value: Type.String() }); const executed: string[] = []; const tool: AgentTool = { name: "echo", label: "Echo", description: "Echo tool", parameters: toolSchema, async execute(_toolCallId, params) { executed.push(params.value); return { content: [{ type: "text", text: `ok:${params.value}` }], details: { value: params.value }, }; }, }; const context: AgentContext = { systemPrompt: "", messages: [], tools: [tool], }; const userPrompt: AgentMessage = createUserMessage("start"); const queuedUserMessage: AgentMessage = createUserMessage("interrupt"); let queuedDelivered = false; let callIndex = 0; let sawInterruptInContext = false; const config: AgentLoopConfig = { model: createModel(), convertToLlm: identityConverter, toolExecution: "sequential", getSteeringMessages: async () => { // Return steering message after tool execution has started. if (executed.length >= 1 && !queuedDelivered) { queuedDelivered = true; return [queuedUserMessage]; } return []; }, }; const events: AgentEvent[] = []; const stream = agentLoop([userPrompt], context, config, undefined, (_model, ctx, _options) => { // Check if interrupt message is in context on second call if (callIndex === 1) { sawInterruptInContext = ctx.messages.some( (m) => m.role === "user" && typeof m.content === "string" && m.content === "interrupt", ); } const mockStream = new MockAssistantStream(); queueMicrotask(() => { if (callIndex === 0) { // First call: return two tool calls const message = createAssistantMessage( [ { type: "toolCall", id: "tool-1", name: "echo", arguments: { value: "first" } }, { type: "toolCall", id: "tool-2", name: "echo", arguments: { value: "second" } }, ], "toolUse", ); mockStream.push({ type: "done", reason: "toolUse", message }); } else { // Second call: return final response const message = createAssistantMessage([{ type: "text", text: "done" }]); mockStream.push({ type: "done", reason: "stop", message }); } callIndex++; }); return mockStream; }); for await (const event of stream) { events.push(event); } // Both tools should execute before steering is injected expect(executed).toEqual(["first", "second"]); const toolEnds = events.filter( (e): e is Extract => e.type === "tool_execution_end", ); expect(toolEnds.length).toBe(2); expect(toolEnds[0].isError).toBe(false); expect(toolEnds[1].isError).toBe(false); // Queued message should appear in events after both tool result messages const eventSequence = events.flatMap((event) => { if (event.type !== "message_start") return []; if (event.message.role === "toolResult") return [`tool:${event.message.toolCallId}`]; if (event.message.role === "user" && typeof event.message.content === "string") { return [event.message.content]; } return []; }); expect(eventSequence).toContain("interrupt"); expect(eventSequence.indexOf("tool:tool-1")).toBeLessThan(eventSequence.indexOf("interrupt")); expect(eventSequence.indexOf("tool:tool-2")).toBeLessThan(eventSequence.indexOf("interrupt")); // Interrupt message should be in context when second LLM call is made expect(sawInterruptInContext).toBe(true); }); it("should force sequential execution when a tool has executionMode=sequential even with default parallel config", async () => { const toolSchema = Type.Object({ value: Type.String() }); let firstResolved = false; let parallelObserved = false; let releaseFirst: (() => void) | undefined; const firstDone = new Promise((resolve) => { releaseFirst = resolve; }); const slowTool: AgentTool = { name: "slow", label: "Slow", description: "Slow tool", parameters: toolSchema, executionMode: "sequential", async execute(_toolCallId, params) { if (params.value === "first") { await firstDone; firstResolved = true; } if (params.value === "second" && !firstResolved) { parallelObserved = true; } return { content: [{ type: "text", text: `slow: ${params.value}` }], details: { value: params.value }, }; }, }; const context: AgentContext = { systemPrompt: "", messages: [], tools: [slowTool], }; const userPrompt: AgentMessage = createUserMessage("run both"); // config is parallel (default), but tool forces sequential const config: AgentLoopConfig = { model: createModel(), convertToLlm: identityConverter, }; let callIndex = 0; const stream = agentLoop([userPrompt], context, config, undefined, () => { const mockStream = new MockAssistantStream(); queueMicrotask(() => { if (callIndex === 0) { const message = createAssistantMessage( [ { type: "toolCall", id: "tool-1", name: "slow", arguments: { value: "first" } }, { type: "toolCall", id: "tool-2", name: "slow", arguments: { value: "second" } }, ], "toolUse", ); mockStream.push({ type: "done", reason: "toolUse", message }); setTimeout(() => releaseFirst?.(), 20); } else { const message = createAssistantMessage([{ type: "text", text: "done" }]); mockStream.push({ type: "done", reason: "stop", message }); } callIndex++; }); return mockStream; }); const events: AgentEvent[] = []; for await (const event of stream) { events.push(event); } // With sequential execution, second tool should NOT start before first finishes expect(parallelObserved).toBe(false); const toolResultIds = events.flatMap((event) => { if (event.type !== "message_end" || event.message.role !== "toolResult") { return []; } return [event.message.toolCallId]; }); expect(toolResultIds).toEqual(["tool-1", "tool-2"]); }); it("should force sequential execution when one of multiple tools has executionMode=sequential", async () => { const toolSchema = Type.Object({ value: Type.String() }); const executionOrder: string[] = []; let releaseSlow: (() => void) | undefined; const slowDone = new Promise((resolve) => { releaseSlow = resolve; }); const slowTool: AgentTool = { name: "slow", label: "Slow", description: "Slow tool", parameters: toolSchema, executionMode: "sequential", async execute(_toolCallId, params) { executionOrder.push(`slow:${params.value}`); if (params.value === "a") { await slowDone; } return { content: [{ type: "text", text: `slow: ${params.value}` }], details: { value: params.value }, }; }, }; const fastTool: AgentTool = { name: "fast", label: "Fast", description: "Fast tool", parameters: toolSchema, // no executionMode = defaults to parallel async execute(_toolCallId, params) { executionOrder.push(`fast:${params.value}`); return { content: [{ type: "text", text: `fast: ${params.value}` }], details: { value: params.value }, }; }, }; const context: AgentContext = { systemPrompt: "", messages: [], tools: [slowTool, fastTool], }; const userPrompt: AgentMessage = createUserMessage("run both"); const config: AgentLoopConfig = { model: createModel(), convertToLlm: identityConverter, // parallel by default, but slowTool forces sequential }; let callIndex = 0; const stream = agentLoop([userPrompt], context, config, undefined, () => { const mockStream = new MockAssistantStream(); queueMicrotask(() => { if (callIndex === 0) { const message = createAssistantMessage( [ { type: "toolCall", id: "tool-1", name: "slow", arguments: { value: "a" } }, { type: "toolCall", id: "tool-2", name: "fast", arguments: { value: "b" } }, ], "toolUse", ); mockStream.push({ type: "done", reason: "toolUse", message }); setTimeout(() => releaseSlow?.(), 20); } else { const message = createAssistantMessage([{ type: "text", text: "done" }]); mockStream.push({ type: "done", reason: "stop", message }); } callIndex++; }); return mockStream; }); const events: AgentEvent[] = []; for await (const event of stream) { events.push(event); } // Fast tool should NOT run before slow tool finishes expect(executionOrder[0]).toBe("slow:a"); expect(executionOrder).toContain("fast:b"); }); it("should allow parallel execution when all tools have executionMode=parallel", async () => { const toolSchema = Type.Object({ value: Type.String() }); let firstResolved = false; let parallelObserved = false; let releaseFirst: (() => void) | undefined; const firstDone = new Promise((resolve) => { releaseFirst = resolve; }); const tool: AgentTool = { name: "echo", label: "Echo", description: "Echo tool", parameters: toolSchema, executionMode: "parallel", async execute(_toolCallId, params) { if (params.value === "first") { await firstDone; firstResolved = true; } if (params.value === "second" && !firstResolved) { parallelObserved = true; } return { content: [{ type: "text", text: `echoed: ${params.value}` }], details: { value: params.value }, }; }, }; const context: AgentContext = { systemPrompt: "", messages: [], tools: [tool], }; const userPrompt: AgentMessage = createUserMessage("echo both"); const config: AgentLoopConfig = { model: createModel(), convertToLlm: identityConverter, }; let callIndex = 0; const stream = agentLoop([userPrompt], context, config, undefined, () => { const mockStream = new MockAssistantStream(); queueMicrotask(() => { if (callIndex === 0) { const message = createAssistantMessage( [ { type: "toolCall", id: "tool-1", name: "echo", arguments: { value: "first" } }, { type: "toolCall", id: "tool-2", name: "echo", arguments: { value: "second" } }, ], "toolUse", ); mockStream.push({ type: "done", reason: "toolUse", message }); setTimeout(() => releaseFirst?.(), 20); } else { const message = createAssistantMessage([{ type: "text", text: "done" }]); mockStream.push({ type: "done", reason: "stop", message }); } callIndex++; }); return mockStream; }); const events: AgentEvent[] = []; for await (const event of stream) { events.push(event); } // With executionMode=parallel, second tool should start before first finishes expect(parallelObserved).toBe(true); }); it("should use prepareNextTurn snapshot before continuing", async () => { const toolSchema = Type.Object({ value: Type.String() }); const tool: AgentTool = { name: "echo", label: "Echo", description: "Echo tool", parameters: toolSchema, async execute(_toolCallId, params) { return { content: [{ type: "text", text: `echoed: ${params.value}` }], details: { value: params.value }, }; }, }; const context: AgentContext = { systemPrompt: "first prompt", messages: [], tools: [tool], }; let convertedSecondTurnSystemPrompt = ""; let prepared = false; const config: AgentLoopConfig = { model: createModel(), convertToLlm: identityConverter, prepareNextTurn: async ({ context: currentContext }) => { if (prepared) return undefined; prepared = true; return { context: { systemPrompt: "second prompt", messages: currentContext.messages.slice(), tools: currentContext.tools, }, }; }, }; let llmCalls = 0; const stream = agentLoop([createUserMessage("echo something")], context, config, undefined, (_model, ctx) => { llmCalls++; if (llmCalls === 2) { convertedSecondTurnSystemPrompt = ctx.systemPrompt ?? ""; } const mockStream = new MockAssistantStream(); queueMicrotask(() => { if (llmCalls === 1) { mockStream.push({ type: "done", reason: "toolUse", message: createAssistantMessage( [{ type: "toolCall", id: "tool-1", name: "echo", arguments: { value: "hello" } }], "toolUse", ), }); } else { mockStream.push({ type: "done", reason: "stop", message: createAssistantMessage([{ type: "text", text: "done" }]), }); } }); return mockStream; }); for await (const _event of stream) { // consume } expect(llmCalls).toBe(2); expect(convertedSecondTurnSystemPrompt).toBe("second prompt"); }); it("should stop after the current turn when shouldStopAfterTurn returns true", async () => { const toolSchema = Type.Object({ value: Type.String() }); const executed: string[] = []; const tool: AgentTool = { name: "echo", label: "Echo", description: "Echo tool", parameters: toolSchema, async execute(_toolCallId, params) { executed.push(params.value); return { content: [{ type: "text", text: `echoed: ${params.value}` }], details: { value: params.value }, }; }, }; const context: AgentContext = { systemPrompt: "", messages: [], tools: [tool], }; let steeringPolls = 0; let followUpPolls = 0; let callbackToolResultIds: string[] = []; let callbackContextRoles: string[] = []; const config: AgentLoopConfig = { model: createModel(), convertToLlm: identityConverter, getSteeringMessages: async () => { steeringPolls++; return []; }, getFollowUpMessages: async () => { followUpPolls++; return [createUserMessage("follow up should stay queued")]; }, shouldStopAfterTurn: async ({ message, toolResults, context }) => { expect(message.role).toBe("assistant"); callbackToolResultIds = toolResults.map((toolResult) => toolResult.toolCallId); callbackContextRoles = context.messages.map((contextMessage) => contextMessage.role); return true; }, }; let llmCalls = 0; const stream = agentLoop([createUserMessage("echo something")], context, config, undefined, () => { llmCalls++; const mockStream = new MockAssistantStream(); queueMicrotask(() => { if (llmCalls === 1) { const message = createAssistantMessage( [{ type: "toolCall", id: "tool-1", name: "echo", arguments: { value: "hello" } }], "toolUse", ); mockStream.push({ type: "done", reason: "toolUse", message }); } else { mockStream.push({ type: "done", reason: "stop", message: createAssistantMessage([{ type: "text", text: "should not run" }]), }); } }); return mockStream; }); const events: AgentEvent[] = []; for await (const event of stream) { events.push(event); } const messages = await stream.result(); expect(llmCalls).toBe(1); expect(executed).toEqual(["hello"]); expect(steeringPolls).toBe(1); expect(followUpPolls).toBe(0); expect(callbackToolResultIds).toEqual(["tool-1"]); expect(callbackContextRoles).toEqual(["user", "assistant", "toolResult"]); expect(messages.map((message) => message.role)).toEqual(["user", "assistant", "toolResult"]); expect(events.map((event) => event.type)).toEqual([ "agent_start", "turn_start", "message_start", "message_end", "message_start", "message_end", "tool_execution_start", "tool_execution_end", "message_start", "message_end", "turn_end", "agent_end", ]); }); it("should stop after a tool batch when every tool result sets terminate=true", async () => { const toolSchema = Type.Object({ value: Type.String() }); const tool: AgentTool = { name: "echo", label: "Echo", description: "Echo tool", parameters: toolSchema, async execute(_toolCallId, params) { return { content: [{ type: "text", text: `echoed: ${params.value}` }], details: { value: params.value }, terminate: true, }; }, }; const context: AgentContext = { systemPrompt: "", messages: [], tools: [tool], }; const config: AgentLoopConfig = { model: createModel(), convertToLlm: identityConverter, }; let llmCalls = 0; const stream = agentLoop([createUserMessage("echo something")], context, config, undefined, () => { llmCalls++; const mockStream = new MockAssistantStream(); queueMicrotask(() => { const message = createAssistantMessage( [{ type: "toolCall", id: "tool-1", name: "echo", arguments: { value: "hello" } }], "toolUse", ); mockStream.push({ type: "done", reason: "toolUse", message }); }); return mockStream; }); const events: AgentEvent[] = []; for await (const event of stream) { events.push(event); } const messages = await stream.result(); expect(llmCalls).toBe(1); expect(messages.map((message) => message.role)).toEqual(["user", "assistant", "toolResult"]); expect(events.filter((event) => event.type === "turn_end")).toHaveLength(1); }); it("should continue after parallel tool calls when not all tool results terminate", async () => { const toolSchema = Type.Object({ value: Type.String() }); const tool: AgentTool = { name: "echo", label: "Echo", description: "Echo tool", parameters: toolSchema, async execute(_toolCallId, params) { return { content: [{ type: "text", text: `echoed: ${params.value}` }], details: { value: params.value }, terminate: params.value === "first", }; }, }; const context: AgentContext = { systemPrompt: "", messages: [], tools: [tool], }; const config: AgentLoopConfig = { model: createModel(), convertToLlm: identityConverter, toolExecution: "parallel", }; let callIndex = 0; const stream = agentLoop([createUserMessage("echo both")], context, config, undefined, () => { const mockStream = new MockAssistantStream(); queueMicrotask(() => { if (callIndex === 0) { const message = createAssistantMessage( [ { type: "toolCall", id: "tool-1", name: "echo", arguments: { value: "first" } }, { type: "toolCall", id: "tool-2", name: "echo", arguments: { value: "second" } }, ], "toolUse", ); mockStream.push({ type: "done", reason: "toolUse", message }); } else { const message = createAssistantMessage([{ type: "text", text: "done" }]); mockStream.push({ type: "done", reason: "stop", message }); } callIndex++; }); return mockStream; }); for await (const _event of stream) { // consume } const messages = await stream.result(); expect(callIndex).toBe(2); expect(messages.map((message) => message.role)).toEqual([ "user", "assistant", "toolResult", "toolResult", "assistant", ]); }); it("should allow afterToolCall to mark a tool batch as terminating", async () => { const toolSchema = Type.Object({ value: Type.String() }); const tool: AgentTool = { name: "echo", label: "Echo", description: "Echo tool", parameters: toolSchema, async execute(_toolCallId, params) { return { content: [{ type: "text", text: `echoed: ${params.value}` }], details: { value: params.value }, }; }, }; const context: AgentContext = { systemPrompt: "", messages: [], tools: [tool], }; const config: AgentLoopConfig = { model: createModel(), convertToLlm: identityConverter, afterToolCall: async () => ({ terminate: true }), }; let llmCalls = 0; const stream = agentLoop([createUserMessage("echo something")], context, config, undefined, () => { llmCalls++; const mockStream = new MockAssistantStream(); queueMicrotask(() => { const message = createAssistantMessage( [{ type: "toolCall", id: "tool-1", name: "echo", arguments: { value: "hello" } }], "toolUse", ); mockStream.push({ type: "done", reason: "toolUse", message }); }); return mockStream; }); for await (const _event of stream) { // consume } expect(llmCalls).toBe(1); }); }); describe("agentLoopContinue with AgentMessage", () => { it("should throw when context has no messages", () => { const context: AgentContext = { systemPrompt: "You are helpful.", messages: [], tools: [], }; const config: AgentLoopConfig = { model: createModel(), convertToLlm: identityConverter, }; expect(() => agentLoopContinue(context, config)).toThrow("Cannot continue: no messages in context"); }); it("should continue from existing context without emitting user message events", async () => { const userMessage: AgentMessage = createUserMessage("Hello"); const context: AgentContext = { systemPrompt: "You are helpful.", messages: [userMessage], tools: [], }; const config: AgentLoopConfig = { model: createModel(), convertToLlm: identityConverter, }; const streamFn = () => { const stream = new MockAssistantStream(); queueMicrotask(() => { const message = createAssistantMessage([{ type: "text", text: "Response" }]); stream.push({ type: "done", reason: "stop", message }); }); return stream; }; const events: AgentEvent[] = []; const stream = agentLoopContinue(context, config, undefined, streamFn); for await (const event of stream) { events.push(event); } const messages = await stream.result(); // Should only return the new assistant message (not the existing user message) expect(messages.length).toBe(1); expect(messages[0].role).toBe("assistant"); // Should NOT have user message events (that's the key difference from agentLoop) const messageEndEvents = events.filter((e) => e.type === "message_end"); expect(messageEndEvents.length).toBe(1); expect((messageEndEvents[0] as any).message.role).toBe("assistant"); }); it("should allow custom message types as last message (caller responsibility)", async () => { // Custom message that will be converted to user message by convertToLlm interface CustomMessage { role: "custom"; text: string; timestamp: number; } const customMessage: CustomMessage = { role: "custom", text: "Hook content", timestamp: Date.now(), }; const context: AgentContext = { systemPrompt: "You are helpful.", messages: [customMessage as unknown as AgentMessage], tools: [], }; const config: AgentLoopConfig = { model: createModel(), convertToLlm: (messages) => { // Convert custom to user message return messages .map((m) => { if ((m as any).role === "custom") { return { role: "user" as const, content: (m as any).text, timestamp: m.timestamp, }; } return m; }) .filter((m) => m.role === "user" || m.role === "assistant" || m.role === "toolResult") as Message[]; }, }; const streamFn = () => { const stream = new MockAssistantStream(); queueMicrotask(() => { const message = createAssistantMessage([{ type: "text", text: "Response to custom message" }]); stream.push({ type: "done", reason: "stop", message }); }); return stream; }; // Should not throw - the custom message will be converted to user message const stream = agentLoopContinue(context, config, undefined, streamFn); const events: AgentEvent[] = []; for await (const event of stream) { events.push(event); } const messages = await stream.result(); expect(messages.length).toBe(1); expect(messages[0].role).toBe("assistant"); }); });