mirror of
https://github.com/badlogic/pi-mono.git
synced 2026-05-24 05:44:08 +00:00
407 lines
14 KiB
TypeScript
407 lines
14 KiB
TypeScript
import {
|
|
type AssistantMessage,
|
|
createAssistantMessageEventStream,
|
|
fauxAssistantMessage,
|
|
type Model,
|
|
} from "@earendil-works/pi-ai";
|
|
import { afterEach, describe, expect, it, vi } from "vitest";
|
|
import { createHarness, type Harness } from "./harness.js";
|
|
|
|
type SessionWithCompactionInternals = {
|
|
_checkCompaction: (assistantMessage: AssistantMessage, skipAbortedCheck?: boolean) => Promise<boolean>;
|
|
_runAutoCompaction: (reason: "overflow" | "threshold", willRetry: boolean) => Promise<boolean>;
|
|
};
|
|
|
|
function createUsage(totalTokens: number) {
|
|
return {
|
|
input: totalTokens,
|
|
output: 0,
|
|
cacheRead: 0,
|
|
cacheWrite: 0,
|
|
totalTokens,
|
|
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
|
};
|
|
}
|
|
|
|
function createAssistant(
|
|
harness: Harness,
|
|
options: {
|
|
stopReason?: AssistantMessage["stopReason"];
|
|
errorMessage?: string;
|
|
totalTokens?: number;
|
|
timestamp?: number;
|
|
},
|
|
): AssistantMessage {
|
|
const model = harness.getModel();
|
|
return {
|
|
...fauxAssistantMessage("", {
|
|
stopReason: options.stopReason,
|
|
errorMessage: options.errorMessage,
|
|
timestamp: options.timestamp,
|
|
}),
|
|
api: model.api,
|
|
provider: model.provider,
|
|
model: model.id,
|
|
usage: createUsage(options.totalTokens ?? 0),
|
|
};
|
|
}
|
|
|
|
function useSummaryStreamFn(harness: Harness, summary: string): () => number {
|
|
let callCount = 0;
|
|
harness.session.agent.streamFn = (model) => {
|
|
callCount++;
|
|
const stream = createAssistantMessageEventStream();
|
|
queueMicrotask(() => {
|
|
const message: AssistantMessage = {
|
|
...fauxAssistantMessage(summary),
|
|
api: model.api,
|
|
provider: model.provider,
|
|
model: model.id,
|
|
usage: createUsage(10),
|
|
};
|
|
stream.push({ type: "done", reason: "stop", message });
|
|
});
|
|
return stream;
|
|
};
|
|
return () => callCount;
|
|
}
|
|
|
|
function seedCompactableSession(harness: Harness): void {
|
|
const now = Date.now();
|
|
harness.sessionManager.appendMessage({
|
|
role: "user",
|
|
content: [{ type: "text", text: "message to compact" }],
|
|
timestamp: now - 1000,
|
|
});
|
|
harness.sessionManager.appendMessage(
|
|
createAssistant(harness, {
|
|
stopReason: "stop",
|
|
totalTokens: 100,
|
|
timestamp: now - 500,
|
|
}),
|
|
);
|
|
harness.session.agent.state.messages = harness.sessionManager.buildSessionContext().messages;
|
|
}
|
|
|
|
describe("AgentSession compaction characterization", () => {
|
|
const harnesses: Harness[] = [];
|
|
|
|
afterEach(() => {
|
|
vi.useRealTimers();
|
|
vi.restoreAllMocks();
|
|
while (harnesses.length > 0) {
|
|
harnesses.pop()?.cleanup();
|
|
}
|
|
});
|
|
|
|
it("manually compacts using an extension-provided summary", async () => {
|
|
const harness = await createHarness({
|
|
extensionFactories: [
|
|
(pi) => {
|
|
pi.on("session_before_compact", async (event) => ({
|
|
compaction: {
|
|
summary: "summary from extension",
|
|
firstKeptEntryId: event.preparation.firstKeptEntryId,
|
|
tokensBefore: event.preparation.tokensBefore,
|
|
details: { source: "extension" },
|
|
},
|
|
}));
|
|
},
|
|
],
|
|
});
|
|
harnesses.push(harness);
|
|
|
|
await harness.session.prompt("one");
|
|
await harness.session.prompt("two");
|
|
|
|
const result = await harness.session.compact();
|
|
const compactionEntries = harness.sessionManager.getEntries().filter((entry) => entry.type === "compaction");
|
|
|
|
expect(result.summary).toBe("summary from extension");
|
|
expect(compactionEntries).toHaveLength(1);
|
|
expect(harness.session.messages[0]?.role).toBe("compactionSummary");
|
|
});
|
|
|
|
it("throws when compacting without a model", async () => {
|
|
const harness = await createHarness();
|
|
harnesses.push(harness);
|
|
harness.session.agent.state.model = undefined as unknown as Model<any>;
|
|
|
|
await expect(harness.session.compact()).rejects.toThrow("No model selected");
|
|
});
|
|
|
|
it("throws when compacting without configured auth", async () => {
|
|
const harness = await createHarness({ withConfiguredAuth: false });
|
|
harnesses.push(harness);
|
|
|
|
await expect(harness.session.compact()).rejects.toThrow(`No API key found for ${harness.getModel().provider}.`);
|
|
});
|
|
|
|
it("manually compacts with a custom streamFn when registry auth is absent", async () => {
|
|
const harness = await createHarness({ withConfiguredAuth: false });
|
|
harnesses.push(harness);
|
|
seedCompactableSession(harness);
|
|
const getStreamCallCount = useSummaryStreamFn(harness, "summary from custom stream");
|
|
|
|
const result = await harness.session.compact();
|
|
|
|
expect(result.summary).toBe("summary from custom stream");
|
|
expect(getStreamCallCount()).toBe(1);
|
|
});
|
|
|
|
it("auto-compacts with a custom streamFn when registry auth is absent", async () => {
|
|
const harness = await createHarness({ withConfiguredAuth: false });
|
|
harnesses.push(harness);
|
|
seedCompactableSession(harness);
|
|
const getStreamCallCount = useSummaryStreamFn(harness, "auto summary from custom stream");
|
|
const sessionInternals = harness.session as unknown as SessionWithCompactionInternals;
|
|
|
|
await sessionInternals._runAutoCompaction("threshold", false);
|
|
|
|
const compactionEntries = harness.sessionManager.getEntries().filter((entry) => entry.type === "compaction");
|
|
expect(compactionEntries).toHaveLength(1);
|
|
expect(getStreamCallCount()).toBe(1);
|
|
});
|
|
|
|
it("cancels in-progress manual compaction when abortCompaction is called", async () => {
|
|
const harness = await createHarness({
|
|
extensionFactories: [
|
|
(pi) => {
|
|
pi.on("session_before_compact", async (event) => {
|
|
return await new Promise<{ cancel: true }>((resolve) => {
|
|
event.signal.addEventListener("abort", () => resolve({ cancel: true }), { once: true });
|
|
});
|
|
});
|
|
},
|
|
],
|
|
});
|
|
harnesses.push(harness);
|
|
|
|
await harness.session.prompt("one");
|
|
await harness.session.prompt("two");
|
|
|
|
const compactPromise = harness.session.compact();
|
|
await new Promise((resolve) => setTimeout(resolve, 0));
|
|
harness.session.abortCompaction();
|
|
|
|
await expect(compactPromise).rejects.toThrow("Compaction cancelled");
|
|
});
|
|
|
|
it("resumes after threshold compaction when only agent-level queued messages exist", async () => {
|
|
vi.useFakeTimers();
|
|
const harness = await createHarness({
|
|
settings: { compaction: { keepRecentTokens: 1 } },
|
|
extensionFactories: [
|
|
(pi) => {
|
|
pi.on("session_before_compact", async (event) => ({
|
|
compaction: {
|
|
summary: "auto compacted",
|
|
firstKeptEntryId: event.preparation.firstKeptEntryId,
|
|
tokensBefore: event.preparation.tokensBefore,
|
|
details: {},
|
|
},
|
|
}));
|
|
},
|
|
],
|
|
});
|
|
harnesses.push(harness);
|
|
harness.setResponses([fauxAssistantMessage("one"), fauxAssistantMessage("two")]);
|
|
await harness.session.prompt("first");
|
|
await harness.session.prompt("second");
|
|
|
|
harness.session.agent.followUp({
|
|
role: "custom",
|
|
customType: "test",
|
|
content: [{ type: "text", text: "queued custom" }],
|
|
display: false,
|
|
timestamp: Date.now(),
|
|
});
|
|
|
|
const sessionInternals = harness.session as unknown as SessionWithCompactionInternals;
|
|
|
|
await expect(sessionInternals._runAutoCompaction("threshold", false)).resolves.toBe(true);
|
|
});
|
|
|
|
it("does not retry overflow recovery more than once", async () => {
|
|
const harness = await createHarness();
|
|
harnesses.push(harness);
|
|
const sessionInternals = harness.session as unknown as SessionWithCompactionInternals;
|
|
const overflowMessage = createAssistant(harness, {
|
|
stopReason: "error",
|
|
errorMessage: "prompt is too long",
|
|
timestamp: Date.now(),
|
|
});
|
|
const runAutoCompactionSpy = vi.spyOn(sessionInternals, "_runAutoCompaction").mockResolvedValue(false);
|
|
const compactionErrors: string[] = [];
|
|
harness.session.subscribe((event) => {
|
|
if (event.type === "compaction_end" && event.errorMessage) {
|
|
compactionErrors.push(event.errorMessage);
|
|
}
|
|
});
|
|
|
|
await sessionInternals._checkCompaction(overflowMessage);
|
|
await sessionInternals._checkCompaction({ ...overflowMessage, timestamp: Date.now() + 1 });
|
|
|
|
expect(runAutoCompactionSpy).toHaveBeenCalledTimes(1);
|
|
expect(compactionErrors).toContain(
|
|
"Context overflow recovery failed after one compact-and-retry attempt. Try reducing context or switching to a larger-context model.",
|
|
);
|
|
});
|
|
|
|
it("ignores stale pre-compaction assistant usage on pre-prompt checks", async () => {
|
|
const harness = await createHarness();
|
|
harnesses.push(harness);
|
|
const sessionInternals = harness.session as unknown as SessionWithCompactionInternals;
|
|
const staleTimestamp = Date.now() - 10_000;
|
|
const staleAssistant = createAssistant(harness, {
|
|
stopReason: "stop",
|
|
totalTokens: 610_000,
|
|
timestamp: staleTimestamp,
|
|
});
|
|
|
|
harness.sessionManager.appendMessage({
|
|
role: "user",
|
|
content: [{ type: "text", text: "before compaction" }],
|
|
timestamp: staleTimestamp - 1000,
|
|
});
|
|
harness.sessionManager.appendMessage(staleAssistant);
|
|
const firstKeptEntryId = harness.sessionManager.getEntries()[0]!.id;
|
|
harness.sessionManager.appendCompaction(
|
|
"summary",
|
|
firstKeptEntryId,
|
|
staleAssistant.usage.totalTokens,
|
|
undefined,
|
|
false,
|
|
);
|
|
harness.sessionManager.appendMessage({
|
|
role: "user",
|
|
content: [{ type: "text", text: "after compaction" }],
|
|
timestamp: Date.now(),
|
|
});
|
|
|
|
const runAutoCompactionSpy = vi.spyOn(sessionInternals, "_runAutoCompaction").mockResolvedValue(false);
|
|
|
|
await sessionInternals._checkCompaction(staleAssistant, false);
|
|
|
|
expect(runAutoCompactionSpy).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("triggers threshold compaction for error messages using the last successful usage", async () => {
|
|
const harness = await createHarness();
|
|
harnesses.push(harness);
|
|
const sessionInternals = harness.session as unknown as SessionWithCompactionInternals;
|
|
const successfulAssistant = createAssistant(harness, {
|
|
stopReason: "stop",
|
|
totalTokens: 190_000,
|
|
timestamp: Date.now(),
|
|
});
|
|
const errorAssistant = createAssistant(harness, {
|
|
stopReason: "error",
|
|
errorMessage: "529 overloaded",
|
|
timestamp: Date.now() + 1000,
|
|
});
|
|
harness.session.agent.state.messages = [
|
|
{ role: "user", content: [{ type: "text", text: "hello" }], timestamp: Date.now() - 1000 },
|
|
successfulAssistant,
|
|
{ role: "user", content: [{ type: "text", text: "retry" }], timestamp: Date.now() + 500 },
|
|
errorAssistant,
|
|
];
|
|
|
|
const runAutoCompactionSpy = vi.spyOn(sessionInternals, "_runAutoCompaction").mockResolvedValue(false);
|
|
|
|
await sessionInternals._checkCompaction(errorAssistant);
|
|
|
|
expect(runAutoCompactionSpy).toHaveBeenCalledWith("threshold", false);
|
|
});
|
|
|
|
it("does not trigger threshold compaction for error messages when no prior usage exists", async () => {
|
|
const harness = await createHarness();
|
|
harnesses.push(harness);
|
|
const sessionInternals = harness.session as unknown as SessionWithCompactionInternals;
|
|
const errorAssistant = createAssistant(harness, {
|
|
stopReason: "error",
|
|
errorMessage: "529 overloaded",
|
|
timestamp: Date.now(),
|
|
});
|
|
harness.session.agent.state.messages = [
|
|
{ role: "user", content: [{ type: "text", text: "hello" }], timestamp: Date.now() - 1000 },
|
|
errorAssistant,
|
|
];
|
|
|
|
const runAutoCompactionSpy = vi.spyOn(sessionInternals, "_runAutoCompaction").mockResolvedValue(false);
|
|
|
|
await sessionInternals._checkCompaction(errorAssistant);
|
|
|
|
expect(runAutoCompactionSpy).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("does not trigger threshold compaction when only kept pre-compaction usage exists", async () => {
|
|
const harness = await createHarness();
|
|
harnesses.push(harness);
|
|
const sessionInternals = harness.session as unknown as SessionWithCompactionInternals;
|
|
const preCompactionTimestamp = Date.now() - 10_000;
|
|
const keptAssistant = createAssistant(harness, {
|
|
stopReason: "stop",
|
|
totalTokens: 190_000,
|
|
timestamp: preCompactionTimestamp,
|
|
});
|
|
|
|
harness.sessionManager.appendMessage({
|
|
role: "user",
|
|
content: [{ type: "text", text: "before compaction" }],
|
|
timestamp: preCompactionTimestamp - 1000,
|
|
});
|
|
harness.sessionManager.appendMessage(keptAssistant);
|
|
const firstKeptEntryId = harness.sessionManager.getEntries()[0]!.id;
|
|
harness.sessionManager.appendCompaction(
|
|
"summary",
|
|
firstKeptEntryId,
|
|
keptAssistant.usage.totalTokens,
|
|
undefined,
|
|
false,
|
|
);
|
|
|
|
const errorAssistant = createAssistant(harness, {
|
|
stopReason: "error",
|
|
errorMessage: "529 overloaded",
|
|
timestamp: Date.now(),
|
|
});
|
|
harness.session.agent.state.messages = [
|
|
{ role: "user", content: [{ type: "text", text: "kept user" }], timestamp: preCompactionTimestamp - 1000 },
|
|
keptAssistant,
|
|
{ role: "user", content: [{ type: "text", text: "new prompt" }], timestamp: Date.now() - 500 },
|
|
errorAssistant,
|
|
];
|
|
|
|
const runAutoCompactionSpy = vi.spyOn(sessionInternals, "_runAutoCompaction").mockResolvedValue(false);
|
|
|
|
await sessionInternals._checkCompaction(errorAssistant);
|
|
|
|
expect(runAutoCompactionSpy).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("does not trigger threshold compaction below the threshold or when disabled", async () => {
|
|
const belowThresholdHarness = await createHarness({
|
|
settings: { compaction: { enabled: true, reserveTokens: 1000 } },
|
|
models: [{ id: "faux-1", contextWindow: 200_000 }],
|
|
});
|
|
harnesses.push(belowThresholdHarness);
|
|
const disabledHarness = await createHarness({ settings: { compaction: { enabled: false } } });
|
|
harnesses.push(disabledHarness);
|
|
|
|
const belowThresholdInternals = belowThresholdHarness.session as unknown as SessionWithCompactionInternals;
|
|
const disabledInternals = disabledHarness.session as unknown as SessionWithCompactionInternals;
|
|
const belowThresholdSpy = vi.spyOn(belowThresholdInternals, "_runAutoCompaction").mockResolvedValue(false);
|
|
const disabledSpy = vi.spyOn(disabledInternals, "_runAutoCompaction").mockResolvedValue(false);
|
|
|
|
await belowThresholdInternals._checkCompaction(
|
|
createAssistant(belowThresholdHarness, { stopReason: "stop", totalTokens: 1_000, timestamp: Date.now() }),
|
|
);
|
|
await disabledInternals._checkCompaction(
|
|
createAssistant(disabledHarness, { stopReason: "stop", totalTokens: 1_000_000, timestamp: Date.now() }),
|
|
);
|
|
|
|
expect(belowThresholdSpy).not.toHaveBeenCalled();
|
|
expect(disabledSpy).not.toHaveBeenCalled();
|
|
});
|
|
});
|