pi-mono/packages/coding-agent/test/suite/agent-session-queue.test.ts
2026-05-07 15:59:42 +02:00

422 lines
13 KiB
TypeScript

import type { AgentTool } from "@earendil-works/pi-agent-core";
import { fauxAssistantMessage, fauxToolCall } from "@earendil-works/pi-ai";
import type { ExtensionAPI } from "@earendil-works/pi-coding-agent";
import { Type } from "typebox";
import { afterEach, describe, expect, it } from "vitest";
import { createHarness, getAssistantTexts, getMessageText, getUserTexts, type Harness } from "./harness.js";
async function createWaitingHarness(
options: {
tools?: AgentTool[];
extensionFactories?: Harness["session"]["extensionRunner"] extends never
? never
: Array<(pi: ExtensionAPI) => void>;
} = {},
): Promise<{
harness: Harness;
releaseToolExecution: () => void;
promptPromise: Promise<void>;
waitForToolStart: Promise<void>;
}> {
let releaseToolExecution: (() => void) | undefined;
const toolRelease = new Promise<void>((resolve) => {
releaseToolExecution = resolve;
});
const waitTool: AgentTool = {
name: "wait",
label: "Wait",
description: "Wait for release",
parameters: Type.Object({}),
execute: async () => {
await toolRelease;
return {
content: [{ type: "text", text: "released" }],
details: {},
};
},
};
const harness = await createHarness({
tools: [waitTool, ...(options.tools ?? [])],
extensionFactories: options.extensionFactories,
});
const waitForToolStart = new Promise<void>((resolve) => {
const unsubscribe = harness.session.subscribe((event) => {
if (event.type === "tool_execution_start" && event.toolName === "wait") {
unsubscribe();
resolve();
}
});
});
return {
harness,
releaseToolExecution: () => releaseToolExecution?.(),
promptPromise: harness.session.prompt("start"),
waitForToolStart,
};
}
describe("AgentSession queue characterization", () => {
const harnesses: Harness[] = [];
afterEach(() => {
while (harnesses.length > 0) {
harnesses.pop()?.cleanup();
}
});
it("dispatches extension commands immediately when prompted while idle", async () => {
const commandRuns: string[] = [];
const harness = await createHarness({
extensionFactories: [
(pi) => {
pi.registerCommand("testcmd", {
description: "Test command",
handler: async (args) => {
commandRuns.push(args);
},
});
},
],
});
harnesses.push(harness);
await harness.session.prompt("/testcmd hello world");
expect(commandRuns).toEqual(["hello world"]);
expect(harness.getPendingResponseCount()).toBe(0);
expect(harness.session.messages).toEqual([]);
});
it("delivers extension-origin steering messages before the next LLM call", async () => {
let extensionApi: ExtensionAPI | undefined;
const waiting = await createWaitingHarness({
extensionFactories: [
(pi) => {
extensionApi = pi;
},
],
});
const { harness, waitForToolStart, promptPromise, releaseToolExecution } = waiting;
harnesses.push(harness);
harness.setResponses([
fauxAssistantMessage(fauxToolCall("wait", {}), { stopReason: "toolUse" }),
(context) => {
const sawSteer = context.messages.some(
(message) => message.role === "user" && getMessageText(message) === "steer now",
);
return fauxAssistantMessage(sawSteer ? "saw steer" : "missing steer");
},
]);
await waitForToolStart;
await new Promise((resolve) => setTimeout(resolve, 0));
extensionApi?.sendUserMessage("steer now", { deliverAs: "steer" });
releaseToolExecution();
await promptPromise;
expect(getUserTexts(harness)).toEqual(["start", "steer now"]);
expect(getAssistantTexts(harness)).toContain("saw steer");
});
it("delivers follow-up messages only after the current run finishes", async () => {
const waiting = await createWaitingHarness();
const { harness, waitForToolStart, promptPromise, releaseToolExecution } = waiting;
harnesses.push(harness);
const assistantSeenBeforeFollowUp: string[] = [];
harness.setResponses([
fauxAssistantMessage(fauxToolCall("wait", {}), { stopReason: "toolUse" }),
(context) => {
assistantSeenBeforeFollowUp.push(
...context.messages
.filter((message) => message.role === "assistant")
.map((message) =>
message.content
.filter((part): part is { type: "text"; text: string } => part.type === "text")
.map((part) => part.text)
.join("\n"),
),
);
return fauxAssistantMessage("follow-up response");
},
]);
await waitForToolStart;
await harness.session.followUp("after current run");
releaseToolExecution();
await promptPromise;
expect(getUserTexts(harness)).toEqual(["start", "after current run"]);
expect(assistantSeenBeforeFollowUp).toContain("");
expect(getAssistantTexts(harness)).toContain("follow-up response");
});
it("delivers multiple steering messages in order in one-at-a-time mode", async () => {
const waiting = await createWaitingHarness();
const { harness, waitForToolStart, promptPromise, releaseToolExecution } = waiting;
harnesses.push(harness);
harness.setResponses([
fauxAssistantMessage(fauxToolCall("wait", {}), { stopReason: "toolUse" }),
fauxAssistantMessage("handled steer 1"),
fauxAssistantMessage("handled steer 2"),
]);
await waitForToolStart;
await harness.session.steer("steer 1");
await harness.session.steer("steer 2");
releaseToolExecution();
await promptPromise;
expect(getUserTexts(harness)).toEqual(["start", "steer 1", "steer 2"]);
expect(getAssistantTexts(harness)).toEqual(["", "handled steer 1", "handled steer 2"]);
});
it("delivers multiple follow-up messages in order in one-at-a-time mode", async () => {
const waiting = await createWaitingHarness();
const { harness, waitForToolStart, promptPromise, releaseToolExecution } = waiting;
harnesses.push(harness);
harness.setResponses([
fauxAssistantMessage(fauxToolCall("wait", {}), { stopReason: "toolUse" }),
fauxAssistantMessage("original turn complete"),
fauxAssistantMessage("handled follow-up 1"),
fauxAssistantMessage("handled follow-up 2"),
]);
await waitForToolStart;
await harness.session.followUp("follow-up 1");
await harness.session.followUp("follow-up 2");
releaseToolExecution();
await promptPromise;
expect(getUserTexts(harness)).toEqual(["start", "follow-up 1", "follow-up 2"]);
expect(getAssistantTexts(harness)).toEqual([
"",
"original turn complete",
"handled follow-up 1",
"handled follow-up 2",
]);
});
it("delivers all steering messages in one batch in all mode", async () => {
const waiting = await createWaitingHarness();
const { harness, waitForToolStart, promptPromise, releaseToolExecution } = waiting;
harnesses.push(harness);
harness.session.setSteeringMode("all");
let batchedUserMessages: string[] = [];
harness.setResponses([
fauxAssistantMessage(fauxToolCall("wait", {}), { stopReason: "toolUse" }),
(context) => {
batchedUserMessages = context.messages
.filter((message) => message.role === "user")
.map((message) => getMessageText(message));
return fauxAssistantMessage("batched steer response");
},
]);
await waitForToolStart;
await harness.session.steer("steer 1");
await harness.session.steer("steer 2");
releaseToolExecution();
await promptPromise;
expect(batchedUserMessages).toEqual(["start", "steer 1", "steer 2"]);
expect(getAssistantTexts(harness)).toEqual(["", "batched steer response"]);
});
it("delivers all follow-up messages in one batch in all mode", async () => {
const waiting = await createWaitingHarness();
const { harness, waitForToolStart, promptPromise, releaseToolExecution } = waiting;
harnesses.push(harness);
harness.session.setFollowUpMode("all");
let batchedUserMessages: string[] = [];
harness.setResponses([
fauxAssistantMessage(fauxToolCall("wait", {}), { stopReason: "toolUse" }),
fauxAssistantMessage("original turn complete"),
(context) => {
batchedUserMessages = context.messages
.filter((message) => message.role === "user")
.map((message) => getMessageText(message));
return fauxAssistantMessage("batched follow-up response");
},
]);
await waitForToolStart;
await harness.session.followUp("follow-up 1");
await harness.session.followUp("follow-up 2");
releaseToolExecution();
await promptPromise;
expect(batchedUserMessages).toEqual(["start", "follow-up 1", "follow-up 2"]);
expect(getAssistantTexts(harness)).toEqual(["", "original turn complete", "batched follow-up response"]);
});
it("queues custom messages with deliverAs steer while streaming", async () => {
const waiting = await createWaitingHarness();
const { harness, waitForToolStart, promptPromise, releaseToolExecution } = waiting;
harnesses.push(harness);
let sawCustomMessage = false;
harness.setResponses([
fauxAssistantMessage(fauxToolCall("wait", {}), { stopReason: "toolUse" }),
(context) => {
sawCustomMessage = context.messages.some(
(message) =>
message.role === "user" &&
typeof message.content !== "string" &&
message.content.some((part) => part.type === "text" && part.text === "steer custom"),
);
return fauxAssistantMessage("done");
},
]);
await waitForToolStart;
await harness.session.sendCustomMessage(
{ customType: "queue-test", content: "steer custom", display: true, details: { value: 1 } },
{ deliverAs: "steer" },
);
releaseToolExecution();
await promptPromise;
expect(sawCustomMessage).toBe(true);
expect(
harness.session.messages.some((message) => message.role === "custom" && message.customType === "queue-test"),
).toBe(true);
});
it("queues custom messages with deliverAs followUp while streaming", async () => {
const waiting = await createWaitingHarness();
const { harness, waitForToolStart, promptPromise, releaseToolExecution } = waiting;
harnesses.push(harness);
let sawCustomMessage = false;
harness.setResponses([
fauxAssistantMessage(fauxToolCall("wait", {}), { stopReason: "toolUse" }),
fauxAssistantMessage("original turn complete"),
(context) => {
sawCustomMessage = context.messages.some(
(message) =>
message.role === "user" &&
typeof message.content !== "string" &&
message.content.some((part) => part.type === "text" && part.text === "follow-up custom"),
);
return fauxAssistantMessage("done");
},
]);
await waitForToolStart;
await harness.session.sendCustomMessage(
{ customType: "queue-test", content: "follow-up custom", display: true, details: { value: 1 } },
{ deliverAs: "followUp" },
);
releaseToolExecution();
await promptPromise;
expect(sawCustomMessage).toBe(true);
expect(
harness.session.messages.some((message) => message.role === "custom" && message.customType === "queue-test"),
).toBe(true);
});
it("injects nextTurn custom messages into the next prompt", async () => {
const harness = await createHarness();
harnesses.push(harness);
let sawCustomMessage = false;
await harness.session.sendCustomMessage(
{ customType: "next-turn", content: "carry this", display: true, details: {} },
{ deliverAs: "nextTurn" },
);
harness.setResponses([
(context) => {
sawCustomMessage = context.messages.some(
(message) =>
message.role === "user" &&
typeof message.content !== "string" &&
message.content.some((part) => part.type === "text" && part.text === "carry this"),
);
return fauxAssistantMessage("done");
},
]);
await harness.session.prompt("normal prompt");
expect(sawCustomMessage).toBe(true);
expect(harness.session.messages.map((message) => message.role)).toEqual(["user", "custom", "assistant"]);
});
it("updates pendingMessageCount and removes queued text before message_start is emitted", async () => {
const waiting = await createWaitingHarness();
const { harness, waitForToolStart, promptPromise, releaseToolExecution } = waiting;
harnesses.push(harness);
const countsAtQueuedMessageStart: number[] = [];
harness.setResponses([
fauxAssistantMessage(fauxToolCall("wait", {}), { stopReason: "toolUse" }),
fauxAssistantMessage("done"),
]);
harness.session.subscribe((event) => {
if (
event.type === "message_start" &&
event.message.role === "user" &&
getMessageText(event.message) === "queued"
) {
countsAtQueuedMessageStart.push(harness.session.pendingMessageCount);
}
});
await waitForToolStart;
await harness.session.steer("queued");
expect(harness.session.pendingMessageCount).toBe(1);
releaseToolExecution();
await promptPromise;
expect(countsAtQueuedMessageStart).toEqual([0]);
expect(harness.session.pendingMessageCount).toBe(0);
});
it("throws when queueing an extension command with steer", async () => {
const harness = await createHarness({
extensionFactories: [
(pi) => {
pi.registerCommand("testcmd", {
description: "Test command",
handler: async () => {},
});
},
],
});
harnesses.push(harness);
await expect(harness.session.steer("/testcmd queued")).rejects.toThrow(
'Extension command "/testcmd" cannot be queued. Use prompt() or execute the command when not streaming.',
);
});
it("throws when queueing an extension command with followUp", async () => {
const harness = await createHarness({
extensionFactories: [
(pi) => {
pi.registerCommand("testcmd", {
description: "Test command",
handler: async () => {},
});
},
],
});
harnesses.push(harness);
await expect(harness.session.followUp("/testcmd queued")).rejects.toThrow(
'Extension command "/testcmd" cannot be queued. Use prompt() or execute the command when not streaming.',
);
});
});