fix(core): auto-compact subagent context to prevent overflow (#3735)
Some checks are pending
Qwen Code CI / Lint (push) Waiting to run
Qwen Code CI / Test (push) Blocked by required conditions
Qwen Code CI / Test-1 (push) Blocked by required conditions
Qwen Code CI / Test-2 (push) Blocked by required conditions
Qwen Code CI / Test-3 (push) Blocked by required conditions
Qwen Code CI / Test-4 (push) Blocked by required conditions
Qwen Code CI / Test-5 (push) Blocked by required conditions
Qwen Code CI / Test-6 (push) Blocked by required conditions
Qwen Code CI / Test-7 (push) Blocked by required conditions
Qwen Code CI / Test-8 (push) Blocked by required conditions
Qwen Code CI / Post Coverage Comment (push) Blocked by required conditions
Qwen Code CI / CodeQL (push) Waiting to run
E2E Tests / E2E Test (Linux) - sandbox:docker (push) Waiting to run
E2E Tests / E2E Test (Linux) - sandbox:none (push) Waiting to run
E2E Tests / E2E Test - macOS (push) Waiting to run
SDK Python / SDK Python (3.10) (push) Waiting to run
SDK Python / SDK Python (3.11) (push) Waiting to run
SDK Python / SDK Python (3.12) (push) Waiting to run

* fix(core): auto-compact subagent context to prevent overflow

Subagent chats accumulated history without ever compacting, so a long
multi-turn run could hit "max context length exceeded" before the
compaction logic the main session uses had a chance to fire.

Move compaction down into the chat layer so both main agent and subagent
auto-compress at the configured threshold, and surface the result via a
new chat stream event that bridges into the existing ChatCompressed UI
path. The main-session wrapper still owns full /compress reset.

Closes #3664

* fix(core): address subagent compaction review feedback

Code fixes:
- Seed `lastPromptTokenCount` on subagent chats so the first-send
  threshold gate sees the inherited history's true size.
- Add `COMPRESSION_FAILED_TOKEN_COUNT_ERROR` to the fail-latch chain
  so token-counting failures stop retrying compression every send.
- Restore `FileReadCache.clear()` after compaction in both the manual
  /compress wrapper and the auto-compaction path inside GeminiChat,
  preventing post-summary `file_unchanged` placeholders from pointing
  at content the model can no longer retrieve.
- Refresh stale comment on the `compressed → ChatCompressed` bridge
  in turn.ts now that this path is the primary route, not a fallback.

Tests:
- turn.test.ts asserts the compressed → ChatCompressed bridge.
- geminiChat.test.ts asserts COMPRESSED yields as the first stream
  event after auto-compaction succeeds.
- chatCompressionService.test.ts bumps originalTokenCount above the
  cheap-gate so the NOOP test exercises findCompressSplitPoint.
- client.test.ts asserts forceFullIdeContext flips when a
  ChatCompressed event flows through sendMessageStream's loop.

* chore(core): drop redundant StreamEvent cast and document auto-compaction trade-off

- Remove the `as StreamEvent` cast at the COMPRESSED yield site — the
  literal already matches the union member.
- Add a 4-line comment at the auto-compaction setHistory point that
  points readers to GeminiClient for the env-refresh trade-off rationale,
  so readers don't have to chase the layering decision back across files.
This commit is contained in:
tanzhenxin 2026-05-06 14:09:07 +08:00 committed by GitHub
parent 808d0978eb
commit 8a1ed565a6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 1434 additions and 1028 deletions

View file

@ -317,11 +317,17 @@ export class AgentCore {
}
try {
return new GeminiChat(
const chat = new GeminiChat(
this.runtimeContext,
generationConfig,
startHistory,
);
// Seed the per-chat token count so the auto-compaction threshold
// gate sees the inherited history's true size on the first send.
// Without this, fork subagents start at 0 and the gate NOOPs even
// when `startHistory` is already huge — first API call can 400.
chat.setLastPromptTokenCount(this.lastPromptTokenCount);
return chat;
} catch (error) {
await reportError(
error,
@ -540,6 +546,18 @@ export class AgentCore {
continue;
}
// GeminiChat already mutated its own history; surface to the debug
// log so subagent compactions show up alongside the main session's.
if (streamEvent.type === 'compressed') {
this.runtimeContext
.getDebugLogger()
.debug(
`[AGENT-COMPACT] subagent=${this.subagentId} round=${turnCounter} ` +
`tokens ${streamEvent.info.originalTokenCount} -> ${streamEvent.info.newTokenCount}`,
);
continue;
}
// Handle chunk events
if (streamEvent.type === 'chunk') {
const resp = streamEvent.value;

View file

@ -280,6 +280,7 @@ describe('subagent.ts', () => {
() =>
({
sendMessageStream: mockSendMessageStream,
setLastPromptTokenCount: vi.fn(),
}) as unknown as GeminiChat,
);
@ -958,6 +959,7 @@ describe('subagent.ts', () => {
() =>
({
sendMessageStream: mockSendMessageStream,
setLastPromptTokenCount: vi.fn(),
}) as unknown as GeminiChat,
);
@ -997,6 +999,7 @@ describe('subagent.ts', () => {
() =>
({
sendMessageStream: mockSendMessageStream,
setLastPromptTokenCount: vi.fn(),
}) as unknown as GeminiChat,
);
@ -1061,6 +1064,7 @@ describe('subagent.ts', () => {
() =>
({
sendMessageStream: mockSendMessageStream,
setLastPromptTokenCount: vi.fn(),
}) as unknown as GeminiChat,
);

View file

@ -29,12 +29,7 @@ import type { Config } from '../config/config.js';
import { ApprovalMode } from '../config/config.js';
import type { ModelsConfig } from '../models/modelsConfig.js';
import { retryWithBackoff } from '../utils/retry.js';
import {
CompressionStatus,
GeminiEventType,
Turn,
type ChatCompressionInfo,
} from './turn.js';
import { CompressionStatus, GeminiEventType, Turn } from './turn.js';
vi.mock('../utils/retry.js', () => ({
retryWithBackoff: vi.fn(async (fn) => await fn()),
@ -161,6 +156,8 @@ vi.mock('../utils/generateContentResponseUtilities', () => ({
const mockUiTelemetryService = vi.hoisted(() => ({
setLastPromptTokenCount: vi.fn(),
getLastPromptTokenCount: vi.fn(),
reset: vi.fn(),
addEvent: vi.fn(),
}));
vi.mock('../telemetry/index.js', async (importOriginal) => {
@ -264,15 +261,16 @@ describe('findCompressSplitPoint', () => {
expect(findCompressSplitPoint(history, 0.8)).toBe(4);
});
it('should return earlier splitpoint if no valid ones are after threshhold', () => {
it('compresses everything before the trailing in-flight functionCall', () => {
const history: Content[] = [
{ role: 'user', parts: [{ text: 'This is the first message.' }] },
{ role: 'model', parts: [{ text: 'This is the second message.' }] },
{ role: 'user', parts: [{ text: 'This is the third message.' }] },
{ role: 'model', parts: [{ functionCall: {} }] },
];
// Can't return 4 because the previous item has a function call.
expect(findCompressSplitPoint(history, 0.99)).toBe(2);
// Trailing m+fc is in-flight; the in-flight fallback compresses
// everything except the trailing fc (no preceding pair to retain).
expect(findCompressSplitPoint(history, 0.99)).toBe(3);
});
it('should handle a history with only one item', () => {
@ -446,12 +444,48 @@ describe('Gemini Client (client.ts)', () => {
client = new GeminiClient(mockConfig);
await client.initialize();
vi.mocked(mockConfig.getGeminiClient).mockReturnValue(client);
// GeminiClient.sendMessageStream calls this.tryCompressChat (which now
// delegates to chat.tryCompress) before each turn. Most tests use a
// hand-rolled chat mock that doesn't implement tryCompress; default the
// wrapper to a NOOP so those tests don't crash. Tests that exercise
// compression directly (the delegation tests below, the
// emits-compression-event test) override this spy.
vi.spyOn(client, 'tryCompressChat').mockResolvedValue({
originalTokenCount: 0,
newTokenCount: 0,
compressionStatus: CompressionStatus.NOOP,
});
});
afterEach(() => {
vi.restoreAllMocks();
});
describe('initialize', () => {
it('seeds resumed chat with replayed prompt token count', async () => {
vi.mocked(mockConfig.getResumedSessionData).mockReturnValue({
conversation: {
sessionId: 'resumed-session-id',
projectHash: 'project-hash',
startTime: new Date(0).toISOString(),
lastUpdated: new Date(0).toISOString(),
messages: [],
},
filePath: '/test/session.jsonl',
lastCompletedUuid: null,
});
vi.mocked(uiTelemetryService.getLastPromptTokenCount).mockReturnValue(
123_456,
);
const resumedClient = new GeminiClient(mockConfig);
await resumedClient.initialize();
expect(resumedClient.getChat().getLastPromptTokenCount()).toBe(123_456);
});
});
describe('addHistory', () => {
it('should call chat.addHistory with the provided content', async () => {
const mockChat = {
@ -642,6 +676,11 @@ describe('Gemini Client (client.ts)', () => {
mockChat = {
addHistory: vi.fn(),
getHistory: vi.fn().mockReturnValue([]),
tryCompress: vi.fn().mockResolvedValue({
originalTokenCount: 0,
newTokenCount: 0,
compressionStatus: CompressionStatus.NOOP,
}),
};
client['chat'] = mockChat as GeminiChat;
});
@ -768,66 +807,73 @@ describe('Gemini Client (client.ts)', () => {
});
});
describe('tryCompressChat', () => {
const mockGetHistory = vi.fn();
// tryCompressChat is now a thin wrapper around GeminiChat.tryCompress.
// The compression logic itself is exercised in chatCompressionService.test.ts
// (token math, threshold checks, hook firing) and geminiChat.test.ts (history
// mutation, recording, hasFailedCompressionAttempt). The tests below cover
// only what the wrapper itself adds: argument forwarding and the IDE-context
// flag flip.
describe('tryCompressChat (delegation)', () => {
beforeEach(() => {
client['chat'] = {
getHistory: mockGetHistory,
addHistory: vi.fn(),
setHistory: vi.fn(),
} as unknown as GeminiChat;
// The top-level beforeEach stubs tryCompressChat to NOOP for unrelated
// tests; restore the real implementation here so we can observe it.
vi.mocked(client.tryCompressChat).mockRestore();
});
function setup({
chatHistory = [
{ role: 'user', parts: [{ text: 'Long conversation' }] },
{ role: 'model', parts: [{ text: 'Long response' }] },
] as Content[],
originalTokenCount = 1000,
summaryText = 'This is a summary.',
// Token counts returned in usageMetadata to simulate what the API would return
// Default values ensure successful compression:
// newTokenCount = originalTokenCount - (compressionInputTokenCount - 1000) + compressionOutputTokenCount
// = 1000 - (1600 - 1000) + 50 = 1000 - 600 + 50 = 450 (< 1000, success)
compressionInputTokenCount = 1600,
compressionOutputTokenCount = 50,
} = {}) {
const mockOriginalChat: Partial<GeminiChat> = {
getHistory: vi.fn((_curated?: boolean) => chatHistory),
setHistory: vi.fn(),
};
client['chat'] = mockOriginalChat as GeminiChat;
it('forwards prompt id, model, force, and signal to chat.tryCompress', async () => {
const tryCompress = vi.fn().mockResolvedValue({
originalTokenCount: 0,
newTokenCount: 0,
compressionStatus: CompressionStatus.NOOP,
});
client['chat'] = {
tryCompress,
getHistory: vi.fn().mockReturnValue([]),
} as unknown as GeminiChat;
vi.mocked(mockConfig.getModel).mockReturnValue('the-model');
const signal = new AbortController().signal;
vi.mocked(uiTelemetryService.getLastPromptTokenCount).mockReturnValue(
originalTokenCount,
);
await client.tryCompressChat('p1', true, signal);
mockGenerateContentFn.mockResolvedValue({
candidates: [
{
content: {
role: 'model',
parts: [{ text: summaryText }],
},
},
],
usageMetadata: {
promptTokenCount: compressionInputTokenCount,
candidatesTokenCount: compressionOutputTokenCount,
totalTokenCount:
compressionInputTokenCount + compressionOutputTokenCount,
},
} as unknown as GenerateContentResponse);
expect(tryCompress).toHaveBeenCalledWith('p1', 'the-model', true, signal);
});
// Calculate what the new history will be
const splitPoint = findCompressSplitPoint(chatHistory, 0.7); // 1 - 0.3
const historyToKeep = chatHistory.slice(splitPoint);
it('flips forceFullIdeContext on a successful compression', async () => {
client['chat'] = {
tryCompress: vi.fn().mockResolvedValue({
originalTokenCount: 1000,
newTokenCount: 200,
compressionStatus: CompressionStatus.COMPRESSED,
}),
getHistory: vi.fn().mockReturnValue([]),
} as unknown as GeminiChat;
client['forceFullIdeContext'] = false;
// This is the history that the new chat will have.
// It includes the default startChat history + the extra history from tryCompressChat
const newCompressedHistory: Content[] = [
// Mocked envParts + canned response from startChat
await client.tryCompressChat('p2');
expect(client['forceFullIdeContext']).toBe(true);
});
it('re-prepends startup context and seeds the new chat after compression', async () => {
const compressedHistory: Content[] = [
{ role: 'user', parts: [{ text: 'summary' }] },
{ role: 'model', parts: [{ text: 'ok' }] },
];
const originalChat = client.getChat();
vi.spyOn(originalChat, 'tryCompress').mockImplementation(async () => {
originalChat.setHistory(compressedHistory);
return {
originalTokenCount: 1000,
newTokenCount: 200,
compressionStatus: CompressionStatus.COMPRESSED,
};
});
client['forceFullIdeContext'] = false;
await client.tryCompressChat('p4');
expect(client.getChat()).not.toBe(originalChat);
expect(client.getHistory()).toEqual([
{
role: 'user',
parts: [{ text: 'Mocked env context' }],
@ -836,616 +882,71 @@ describe('Gemini Client (client.ts)', () => {
role: 'model',
parts: [{ text: 'Got it. Thanks for the context!' }],
},
// extraHistory from tryCompressChat
{
role: 'user',
parts: [{ text: summaryText }],
},
{
role: 'model',
parts: [{ text: 'Got it. Thanks for the additional context!' }],
},
...historyToKeep,
];
...compressedHistory,
]);
expect(client.getChat().getLastPromptTokenCount()).toBe(200);
expect(client['forceFullIdeContext']).toBe(true);
});
const mockNewChat: Partial<GeminiChat> = {
getHistory: vi.fn().mockReturnValue(newCompressedHistory),
setHistory: vi.fn(),
};
client['startChat'] = vi.fn().mockImplementation(async () => {
client['chat'] = mockNewChat as GeminiChat;
return mockNewChat as GeminiChat;
});
// New token count formula: originalTokenCount - (compressionInputTokenCount - 1000) + compressionOutputTokenCount
const estimatedNewTokenCount = Math.max(
0,
originalTokenCount -
(compressionInputTokenCount - 1000) +
compressionOutputTokenCount,
);
return {
client,
mockOriginalChat,
mockNewChat,
estimatedNewTokenCount,
};
}
describe('when compression inflates the token count', () => {
it('allows compression to be forced/manual after a failure', async () => {
// Call 1 (Fails): Setup with token counts that will inflate
// newTokenCount = originalTokenCount - (compressionInputTokenCount - 1000) + compressionOutputTokenCount
// = 100 - (1010 - 1000) + 200 = 100 - 10 + 200 = 290 > 100 (inflation)
const longSummary = 'long summary '.repeat(100);
const { client, estimatedNewTokenCount: inflatedTokenCount } = setup({
originalTokenCount: 100,
summaryText: longSummary,
compressionInputTokenCount: 1010,
compressionOutputTokenCount: 200,
});
expect(inflatedTokenCount).toBeGreaterThan(100); // Ensure setup is correct
await client.tryCompressChat('prompt-id-4', false); // Fails
// Call 2 (Forced): Re-setup with token counts that will compress
// newTokenCount = 100 - (1100 - 1000) + 50 = 100 - 100 + 50 = 50 <= 100 (compression)
const shortSummary = 'short';
const { estimatedNewTokenCount: compressedTokenCount } = setup({
originalTokenCount: 100,
summaryText: shortSummary,
compressionInputTokenCount: 1100,
compressionOutputTokenCount: 50,
});
expect(compressedTokenCount).toBeLessThanOrEqual(100); // Ensure setup is correct
const result = await client.tryCompressChat('prompt-id-4', true); // Forced
expect(result.compressionStatus).toBe(CompressionStatus.COMPRESSED);
expect(result.originalTokenCount).toBe(100);
// newTokenCount might be clamped to originalTokenCount due to tolerance logic
expect(result.newTokenCount).toBeLessThanOrEqual(100);
});
it('yields the result even if the compression inflated the tokens', async () => {
// newTokenCount = 100 - (1010 - 1000) + 200 = 100 - 10 + 200 = 290 > 100 (inflation)
const longSummary = 'long summary '.repeat(100);
const { client, estimatedNewTokenCount } = setup({
originalTokenCount: 100,
summaryText: longSummary,
compressionInputTokenCount: 1010,
compressionOutputTokenCount: 200,
});
expect(estimatedNewTokenCount).toBeGreaterThan(100); // Ensure setup is correct
// Mock contextWindowSize to ensure compression is triggered
vi.spyOn(client['config'], 'getContentGeneratorConfig').mockReturnValue(
{
model: 'test-model',
apiKey: 'test-key',
vertexai: false,
authType: AuthType.USE_GEMINI,
contextWindowSize: 100, // Set to same as originalTokenCount to ensure threshold is exceeded
},
);
const result = await client.tryCompressChat('prompt-id-4', false);
expect(result.compressionStatus).toBe(
CompressionStatus.COMPRESSION_FAILED_INFLATED_TOKEN_COUNT,
);
expect(result.originalTokenCount).toBe(100);
// The newTokenCount should be higher than original since compression failed due to inflation
expect(result.newTokenCount).toBeGreaterThan(100);
// IMPORTANT: The change in client.ts means setLastPromptTokenCount is NOT called on failure
expect(
uiTelemetryService.setLastPromptTokenCount,
).not.toHaveBeenCalled();
});
it('does not manipulate the source chat', async () => {
// newTokenCount = 100 - (1010 - 1000) + 200 = 100 - 10 + 200 = 290 > 100 (inflation)
const longSummary = 'long summary '.repeat(100);
const { client, mockOriginalChat, estimatedNewTokenCount } = setup({
originalTokenCount: 100,
summaryText: longSummary,
compressionInputTokenCount: 1010,
compressionOutputTokenCount: 200,
});
expect(estimatedNewTokenCount).toBeGreaterThan(100); // Ensure setup is correct
await client.tryCompressChat('prompt-id-4', false);
// On failure, the chat should NOT be replaced
expect(client['chat']).toBe(mockOriginalChat);
});
it('will not attempt to compress context after a failure', async () => {
// newTokenCount = 100 - (1010 - 1000) + 200 = 100 - 10 + 200 = 290 > 100 (inflation)
const longSummary = 'long summary '.repeat(100);
const { client, estimatedNewTokenCount } = setup({
originalTokenCount: 100,
summaryText: longSummary,
compressionInputTokenCount: 1010,
compressionOutputTokenCount: 200,
});
expect(estimatedNewTokenCount).toBeGreaterThan(100); // Ensure setup is correct
// Mock contextWindowSize to ensure compression is triggered
vi.spyOn(client['config'], 'getContentGeneratorConfig').mockReturnValue(
{
model: 'test-model',
apiKey: 'test-key',
vertexai: false,
authType: AuthType.USE_GEMINI,
contextWindowSize: 100, // Set to same as originalTokenCount to ensure threshold is exceeded
},
);
await client.tryCompressChat('prompt-id-4', false); // This fails and sets hasFailedCompressionAttempt = true
// This call should now be a NOOP
const result = await client.tryCompressChat('prompt-id-5', false);
// generateContent (for summary) should only have been called once
expect(mockGenerateContentFn).toHaveBeenCalledTimes(1);
expect(result).toEqual({
compressionStatus: CompressionStatus.NOOP,
newTokenCount: 0,
it('does not flip forceFullIdeContext when compression NOOPs', async () => {
client['chat'] = {
tryCompress: vi.fn().mockResolvedValue({
originalTokenCount: 0,
});
});
});
it('should not trigger summarization if token count is below threshold', async () => {
const MOCKED_TOKEN_LIMIT = 1000;
vi.spyOn(client['config'], 'getContentGeneratorConfig').mockReturnValue({
model: 'test-model',
apiKey: 'test-key',
vertexai: false,
authType: AuthType.USE_GEMINI,
contextWindowSize: MOCKED_TOKEN_LIMIT,
});
mockGetHistory.mockReturnValue([
{ role: 'user', parts: [{ text: '...history...' }] },
]);
const originalTokenCount = MOCKED_TOKEN_LIMIT * 0.699;
vi.mocked(uiTelemetryService.getLastPromptTokenCount).mockReturnValue(
originalTokenCount,
);
const initialChat = client.getChat();
const result = await client.tryCompressChat('prompt-id-2', false);
const newChat = client.getChat();
expect(result).toEqual({
compressionStatus: CompressionStatus.NOOP,
newTokenCount: originalTokenCount,
originalTokenCount,
});
expect(newChat).toBe(initialChat);
});
it('logs a telemetry event when compressing', async () => {
const { logChatCompression } = await import('../telemetry/loggers.js');
vi.mocked(logChatCompression).mockClear();
const MOCKED_TOKEN_LIMIT = 1000;
const MOCKED_CONTEXT_PERCENTAGE_THRESHOLD = 0.5;
vi.spyOn(client['config'], 'getContentGeneratorConfig').mockReturnValue({
model: 'test-model',
apiKey: 'test-key',
vertexai: false,
authType: AuthType.USE_GEMINI,
contextWindowSize: MOCKED_TOKEN_LIMIT,
});
vi.spyOn(client['config'], 'getChatCompression').mockReturnValue({
contextPercentageThreshold: MOCKED_CONTEXT_PERCENTAGE_THRESHOLD,
});
// Need multiple history items so there's something to compress
const history = [
{ role: 'user', parts: [{ text: '...history 1...' }] },
{ role: 'model', parts: [{ text: '...history 2...' }] },
{ role: 'user', parts: [{ text: '...history 3...' }] },
{ role: 'model', parts: [{ text: '...history 4...' }] },
];
mockGetHistory.mockReturnValue(history);
// Token count needs to be ABOVE the threshold to trigger compression
const originalTokenCount =
MOCKED_TOKEN_LIMIT * MOCKED_CONTEXT_PERCENTAGE_THRESHOLD + 1;
vi.mocked(uiTelemetryService.getLastPromptTokenCount).mockReturnValue(
originalTokenCount,
);
// Mock the summary response from the chat
// newTokenCount = 501 - (1400 - 1000) + 50 = 501 - 400 + 50 = 151 <= 501 (success)
const summaryText = 'This is a summary.';
mockGenerateContentFn.mockResolvedValue({
candidates: [
{
content: {
role: 'model',
parts: [{ text: summaryText }],
},
},
],
usageMetadata: {
promptTokenCount: 1400,
candidatesTokenCount: 50,
totalTokenCount: 1450,
},
} as unknown as GenerateContentResponse);
// Mock startChat to complete the compression flow
const splitPoint = findCompressSplitPoint(history, 0.7);
const historyToKeep = history.slice(splitPoint);
const newCompressedHistory: Content[] = [
{ role: 'user', parts: [{ text: 'Mocked env context' }] },
{ role: 'model', parts: [{ text: 'Got it. Thanks for the context!' }] },
{ role: 'user', parts: [{ text: summaryText }] },
{
role: 'model',
parts: [{ text: 'Got it. Thanks for the additional context!' }],
},
...historyToKeep,
];
const mockNewChat: Partial<GeminiChat> = {
getHistory: vi.fn().mockReturnValue(newCompressedHistory),
};
client['startChat'] = vi
.fn()
.mockResolvedValue(mockNewChat as GeminiChat);
await client.tryCompressChat('prompt-id-3', false);
expect(logChatCompression).toHaveBeenCalledWith(
expect.anything(),
expect.objectContaining({
tokens_before: originalTokenCount,
newTokenCount: 0,
compressionStatus: CompressionStatus.NOOP,
}),
);
expect(uiTelemetryService.setLastPromptTokenCount).toHaveBeenCalled();
getHistory: vi.fn().mockReturnValue([]),
} as unknown as GeminiChat;
client['forceFullIdeContext'] = false;
await client.tryCompressChat('p3');
expect(client['forceFullIdeContext']).toBe(false);
});
it('should trigger summarization if token count is above threshold with contextPercentageThreshold setting', async () => {
const MOCKED_TOKEN_LIMIT = 1000;
const MOCKED_CONTEXT_PERCENTAGE_THRESHOLD = 0.5;
vi.spyOn(client['config'], 'getContentGeneratorConfig').mockReturnValue({
model: 'test-model',
apiKey: 'test-key',
vertexai: false,
authType: AuthType.USE_GEMINI,
contextWindowSize: MOCKED_TOKEN_LIMIT,
it('flips forceFullIdeContext when ChatCompressed flows through sendMessageStream', async () => {
// Auto-compaction lives inside chat.sendMessageStream and surfaces via
// the compressed → ChatCompressed bridge in turn.ts. The flip on this
// path is owned by the for-await loop in client.sendMessageStream, not
// by tryCompressChat — so this test feeds the event in directly.
vi.spyOn(client, 'tryCompressChat').mockResolvedValue({
originalTokenCount: 0,
newTokenCount: 0,
compressionStatus: CompressionStatus.NOOP,
});
vi.spyOn(client['config'], 'getChatCompression').mockReturnValue({
contextPercentageThreshold: MOCKED_CONTEXT_PERCENTAGE_THRESHOLD,
});
// Need multiple history items so there's something to compress
const history = [
{ role: 'user', parts: [{ text: '...history 1...' }] },
{ role: 'model', parts: [{ text: '...history 2...' }] },
{ role: 'user', parts: [{ text: '...history 3...' }] },
{ role: 'model', parts: [{ text: '...history 4...' }] },
];
mockGetHistory.mockReturnValue(history);
// Token count needs to be ABOVE the threshold to trigger compression
const originalTokenCount =
MOCKED_TOKEN_LIMIT * MOCKED_CONTEXT_PERCENTAGE_THRESHOLD + 1;
vi.mocked(uiTelemetryService.getLastPromptTokenCount).mockReturnValue(
originalTokenCount,
);
// Mock summary and new chat
const summaryText = 'This is a summary.';
const splitPoint = findCompressSplitPoint(history, 0.7);
const historyToKeep = history.slice(splitPoint);
const newCompressedHistory: Content[] = [
{ role: 'user', parts: [{ text: 'Mocked env context' }] },
{ role: 'model', parts: [{ text: 'Got it. Thanks for the context!' }] },
{ role: 'user', parts: [{ text: summaryText }] },
{
role: 'model',
parts: [{ text: 'Got it. Thanks for the additional context!' }],
},
...historyToKeep,
];
const mockNewChat: Partial<GeminiChat> = {
getHistory: vi.fn().mockReturnValue(newCompressedHistory),
};
client['startChat'] = vi.fn().mockImplementation(async () => {
client['chat'] = mockNewChat as GeminiChat;
return mockNewChat as GeminiChat;
});
// Mock the summary response from the chat
// newTokenCount = 501 - (1400 - 1000) + 50 = 501 - 400 + 50 = 151 <= 501 (success)
mockGenerateContentFn.mockResolvedValue({
candidates: [
{
content: {
role: 'model',
parts: [{ text: summaryText }],
mockTurnRunFn.mockReturnValue(
(async function* () {
yield {
type: GeminiEventType.ChatCompressed,
value: {
originalTokenCount: 1000,
newTokenCount: 200,
compressionStatus: CompressionStatus.COMPRESSED,
},
},
],
usageMetadata: {
promptTokenCount: 1400,
candidatesTokenCount: 50,
totalTokenCount: 1450,
},
} as unknown as GenerateContentResponse);
const initialChat = client.getChat();
const result = await client.tryCompressChat('prompt-id-3', false);
const newChat = client.getChat();
expect(mockGenerateContentFn).toHaveBeenCalled();
// Assert that summarization happened
expect(result.compressionStatus).toBe(CompressionStatus.COMPRESSED);
expect(result.originalTokenCount).toBe(originalTokenCount);
// newTokenCount might be clamped to originalTokenCount due to tolerance logic
expect(result.newTokenCount).toBeLessThanOrEqual(originalTokenCount);
// Assert that the chat was reset
expect(newChat).not.toBe(initialChat);
});
it('should not compress across a function call response', async () => {
const MOCKED_TOKEN_LIMIT = 1000;
vi.spyOn(client['config'], 'getContentGeneratorConfig').mockReturnValue({
model: 'test-model',
apiKey: 'test-key',
vertexai: false,
authType: AuthType.USE_GEMINI,
contextWindowSize: MOCKED_TOKEN_LIMIT,
});
const history: Content[] = [
{ role: 'user', parts: [{ text: '...history 1...' }] },
{ role: 'model', parts: [{ text: '...history 2...' }] },
{ role: 'user', parts: [{ text: '...history 3...' }] },
{ role: 'model', parts: [{ text: '...history 4...' }] },
{ role: 'user', parts: [{ text: '...history 5...' }] },
{ role: 'model', parts: [{ text: '...history 6...' }] },
{ role: 'user', parts: [{ text: '...history 7...' }] },
{ role: 'model', parts: [{ text: '...history 8...' }] },
// Normally we would break here, but we have a function response.
{
role: 'user',
parts: [{ functionResponse: { name: '...history 8...' } }],
},
{ role: 'model', parts: [{ text: '...history 10...' }] },
// Instead we will break here.
{ role: 'user', parts: [{ text: '...history 10...' }] },
];
mockGetHistory.mockReturnValue(history);
const originalTokenCount = 1000 * 0.7;
vi.mocked(uiTelemetryService.getLastPromptTokenCount).mockReturnValue(
originalTokenCount,
};
})(),
);
client['chat'] = {
addHistory: vi.fn(),
getHistory: vi.fn().mockReturnValue([]),
} as unknown as GeminiChat;
client['forceFullIdeContext'] = false;
// Mock summary and new chat
const summaryText = 'This is a summary.';
const splitPoint = findCompressSplitPoint(history, 0.7); // This should be 10
expect(splitPoint).toBe(10); // Verify split point logic
const historyToKeep = history.slice(splitPoint); // Should keep last user message
expect(historyToKeep).toEqual([
{ role: 'user', parts: [{ text: '...history 10...' }] },
]);
const newCompressedHistory: Content[] = [
{ role: 'user', parts: [{ text: 'Mocked env context' }] },
{ role: 'model', parts: [{ text: 'Got it. Thanks for the context!' }] },
{ role: 'user', parts: [{ text: summaryText }] },
{
role: 'model',
parts: [{ text: 'Got it. Thanks for the additional context!' }],
},
...historyToKeep,
];
const mockNewChat: Partial<GeminiChat> = {
getHistory: vi.fn().mockReturnValue(newCompressedHistory),
};
client['startChat'] = vi.fn().mockImplementation(async () => {
client['chat'] = mockNewChat as GeminiChat;
return mockNewChat as GeminiChat;
});
// Mock the summary response from the chat
// newTokenCount = 700 - (1500 - 1000) + 50 = 700 - 500 + 50 = 250 <= 700 (success)
mockGenerateContentFn.mockResolvedValue({
candidates: [
{
content: {
role: 'model',
parts: [{ text: summaryText }],
},
},
],
usageMetadata: {
promptTokenCount: 1500,
candidatesTokenCount: 50,
totalTokenCount: 1550,
},
} as unknown as GenerateContentResponse);
const initialChat = client.getChat();
const result = await client.tryCompressChat('prompt-id-3', false);
const newChat = client.getChat();
expect(mockGenerateContentFn).toHaveBeenCalled();
// Assert that summarization happened
expect(result.compressionStatus).toBe(CompressionStatus.COMPRESSED);
expect(result.originalTokenCount).toBe(originalTokenCount);
// newTokenCount might be clamped to originalTokenCount due to tolerance logic
expect(result.newTokenCount).toBeLessThanOrEqual(originalTokenCount);
// Assert that the chat was reset
expect(newChat).not.toBe(initialChat);
// 1. standard start context message (env)
// 2. standard canned model response
// 3. compressed summary message (user)
// 4. standard canned model response
// 5. The last user message (historyToKeep)
expect(newChat.getHistory().length).toEqual(5);
});
it('should always trigger summarization when force is true, regardless of token count', async () => {
// Need multiple history items so there's something to compress
const history = [
{ role: 'user', parts: [{ text: '...history 1...' }] },
{ role: 'model', parts: [{ text: '...history 2...' }] },
{ role: 'user', parts: [{ text: '...history 3...' }] },
{ role: 'model', parts: [{ text: '...history 4...' }] },
];
mockGetHistory.mockReturnValue(history);
const originalTokenCount = 100; // Well below threshold, but > estimated new count
vi.mocked(uiTelemetryService.getLastPromptTokenCount).mockReturnValue(
originalTokenCount,
const stream = client.sendMessageStream(
[{ text: 'hi' }],
new AbortController().signal,
'prompt-auto-flip',
{ type: SendMessageType.UserQuery },
);
for await (const _ of stream) {
/* drain */
}
// Mock summary and new chat
const summaryText = 'This is a summary.';
const splitPoint = findCompressSplitPoint(history, 0.7);
const historyToKeep = history.slice(splitPoint);
const newCompressedHistory: Content[] = [
{ role: 'user', parts: [{ text: 'Mocked env context' }] },
{ role: 'model', parts: [{ text: 'Got it. Thanks for the context!' }] },
{ role: 'user', parts: [{ text: summaryText }] },
{
role: 'model',
parts: [{ text: 'Got it. Thanks for the additional context!' }],
},
...historyToKeep,
];
const mockNewChat: Partial<GeminiChat> = {
getHistory: vi.fn().mockReturnValue(newCompressedHistory),
};
client['startChat'] = vi.fn().mockImplementation(async () => {
client['chat'] = mockNewChat as GeminiChat;
return mockNewChat as GeminiChat;
});
// Mock the summary response from the chat
// newTokenCount = 100 - (1060 - 1000) + 20 = 100 - 60 + 20 = 60 <= 100 (success)
mockGenerateContentFn.mockResolvedValue({
candidates: [
{
content: {
role: 'model',
parts: [{ text: summaryText }],
},
},
],
usageMetadata: {
promptTokenCount: 1060,
candidatesTokenCount: 20,
totalTokenCount: 1080,
},
} as unknown as GenerateContentResponse);
const initialChat = client.getChat();
const result = await client.tryCompressChat('prompt-id-1', true); // force = true
const newChat = client.getChat();
expect(mockGenerateContentFn).toHaveBeenCalled();
expect(result.compressionStatus).toBe(CompressionStatus.COMPRESSED);
expect(result.originalTokenCount).toBe(originalTokenCount);
// newTokenCount might be clamped to originalTokenCount due to tolerance logic
expect(result.newTokenCount).toBeLessThanOrEqual(originalTokenCount);
// Assert that the chat was reset
expect(newChat).not.toBe(initialChat);
expect(client['forceFullIdeContext']).toBe(true);
});
});
describe('sendMessageStream', () => {
it('emits a compression event when the context was automatically compressed', async () => {
// Arrange
mockTurnRunFn.mockReturnValue(
(async function* () {
yield { type: 'content', value: 'Hello' };
})(),
);
const compressionInfo: ChatCompressionInfo = {
compressionStatus: CompressionStatus.COMPRESSED,
originalTokenCount: 1000,
newTokenCount: 500,
};
vi.spyOn(client, 'tryCompressChat').mockResolvedValueOnce(
compressionInfo,
);
// Act
const stream = client.sendMessageStream(
[{ text: 'Hi' }],
new AbortController().signal,
'prompt-id-1',
);
const events = await fromAsync(stream);
// Assert
expect(events).toContainEqual({
type: GeminiEventType.ChatCompressed,
value: compressionInfo,
});
});
it.each([
{
compressionStatus:
CompressionStatus.COMPRESSION_FAILED_INFLATED_TOKEN_COUNT,
},
{ compressionStatus: CompressionStatus.NOOP },
])(
'does not emit a compression event when the status is $compressionStatus',
async ({ compressionStatus }) => {
// Arrange
const mockStream = (async function* () {
yield { type: 'content', value: 'Hello' };
})();
mockTurnRunFn.mockReturnValue(mockStream);
const compressionInfo: ChatCompressionInfo = {
compressionStatus,
originalTokenCount: 1000,
newTokenCount: 500,
};
vi.spyOn(client, 'tryCompressChat').mockResolvedValueOnce(
compressionInfo,
);
// Act
const stream = client.sendMessageStream(
[{ text: 'Hi' }],
new AbortController().signal,
'prompt-id-1',
);
const events = await fromAsync(stream);
// Assert
expect(events).not.toContainEqual({
type: GeminiEventType.ChatCompressed,
value: expect.anything(),
});
},
);
it('should include editor context when ideMode is enabled', async () => {
// Arrange
vi.mocked(ideContextStore.get).mockReturnValue({

View file

@ -42,7 +42,6 @@ import {
// Services
import {
ChatCompressionService,
COMPRESSION_PRESERVE_THRESHOLD,
COMPRESSION_TOKEN_THRESHOLD,
} from '../services/chatCompressionService.js';
@ -186,12 +185,6 @@ export class GeminiClient {
*/
private perModelGeneratorCache = new Map<string, Promise<ContentGenerator>>();
/**
* At any point in this conversation, was compression triggered without
* being forced and did it fail?
*/
private hasFailedCompressionAttempt = false;
/**
* Promises for pending background memory tasks (dream / extract).
* Each promise resolves with a count of memory files touched (0 = nothing written).
@ -224,6 +217,9 @@ export class GeminiClient {
resumedSessionData.conversation,
);
await this.startChat(resumedHistory);
this.getChat().setLastPromptTokenCount(
uiTelemetryService.getLastPromptTokenCount(),
);
} else {
await this.startChat();
}
@ -369,7 +365,6 @@ export class GeminiClient {
async startChat(extraHistory?: Content[]): Promise<GeminiChat> {
this.forceFullIdeContext = true;
this.hasFailedCompressionAttempt = false;
// Clear stale cache params on session reset to prevent cross-session leakage
clearCacheSafeParams();
@ -847,14 +842,10 @@ export class GeminiClient {
return new Turn(this.getChat(), prompt_id);
}
const compressed = await this.tryCompressChat(prompt_id, false, signal);
if (compressed.compressionStatus === CompressionStatus.COMPRESSED) {
yield { type: GeminiEventType.ChatCompressed, value: compressed };
}
// Check session token limit after compression.
// `lastPromptTokenCount` is treated as authoritative for the (possibly compressed) history;
// Auto-compaction happens inside GeminiChat.sendMessageStream and surfaces
// via the `compressed → ChatCompressed` bridge in turn.ts. Manual /compress
// still calls tryCompressChat directly for the full reset (env refresh +
// forceFullIdeContext flip).
const sessionTokenLimit = this.config.getSessionTokenLimit();
if (sessionTokenLimit > 0) {
const lastPromptTokenCount = uiTelemetryService.getLastPromptTokenCount();
@ -1000,6 +991,13 @@ export class GeminiClient {
await arenaAgentClient.updateStatus();
}
// Re-send a full IDE context blob on the next regular message — auto
// compaction inside chat.sendMessageStream may have summarized away
// the previous IDE-context turn.
if (event.type === GeminiEventType.ChatCompressed) {
this.forceFullIdeContext = true;
}
yield event;
if (event.type === GeminiEventType.Error) {
if (arenaAgentClient) {
@ -1391,58 +1389,36 @@ export class GeminiClient {
return generatorPromise;
}
/**
* Wrapper around {@link GeminiChat.tryCompress} that restores main-session
* startup context after successful compaction and flips the IDE full-context
* flag for the next regular message.
*/
async tryCompressChat(
prompt_id: string,
force: boolean = false,
signal?: AbortSignal,
): Promise<ChatCompressionInfo> {
const compressionService = new ChatCompressionService();
const { newHistory, info } = await compressionService.compress(
this.getChat(),
const info = await this.getChat().tryCompress(
prompt_id,
force,
this.config.getModel(),
this.config,
this.hasFailedCompressionAttempt,
force,
signal,
);
// Handle compression result
if (info.compressionStatus === CompressionStatus.COMPRESSED) {
// Success: update chat with new compressed history
if (newHistory) {
const chatRecordingService = this.config.getChatRecordingService();
chatRecordingService?.recordChatCompression({
info,
compressedHistory: newHistory,
});
await this.startChat(newHistory);
// Compaction rewrites the prompt history: prior full-Read tool
// results may have been summarised away, but the FileReadCache
// still believes those reads are "in this conversation". A
// follow-up Read could then return the file_unchanged
// placeholder pointing at content the model can no longer
// retrieve from its own context. Clear the cache so post-
// compaction Reads re-emit the bytes.
debugLogger.debug('[FILE_READ_CACHE] clear after tryCompressChat');
this.config.getFileReadCache().clear();
uiTelemetryService.setLastPromptTokenCount(info.newTokenCount);
this.forceFullIdeContext = true;
}
} else if (
info.compressionStatus ===
CompressionStatus.COMPRESSION_FAILED_INFLATED_TOKEN_COUNT ||
info.compressionStatus ===
CompressionStatus.COMPRESSION_FAILED_EMPTY_SUMMARY
) {
// Track failed attempts (only mark as failed if not forced)
if (!force) {
this.hasFailedCompressionAttempt = true;
}
const compressedHistory = this.getChat().getHistory();
await this.startChat(compressedHistory);
// startChat() creates a new GeminiChat without touching FileReadCache,
// so prior read_file results that were summarised away would still
// resolve to the file_unchanged placeholder. Clear so post-compaction
// Reads re-emit bytes the model can no longer see in history.
debugLogger.debug('[FILE_READ_CACHE] clear after tryCompressChat');
this.config.getFileReadCache().clear();
this.getChat().setLastPromptTokenCount(info.newTokenCount);
// Re-send a full IDE context blob on the next regular message —
// compression dropped the previous context turn from history.
this.forceFullIdeContext = true;
}
return info;
}
}

View file

@ -22,6 +22,8 @@ import { StreamContentError } from './openaiContentGenerator/pipeline.js';
import type { Config } from '../config/config.js';
import { setSimulate429 } from '../utils/testUtils.js';
import { uiTelemetryService } from '../telemetry/uiTelemetry.js';
import { CompressionStatus, type ChatCompressionInfo } from './turn.js';
import { ChatCompressionService } from '../services/chatCompressionService.js';
// Mock fs module to prevent actual file system operations during tests
const mockFileSystem = new Map<string, string>();
@ -119,6 +121,13 @@ describe('GeminiChat', async () => {
getTool: vi.fn(),
}),
getContentGenerator: vi.fn().mockReturnValue(mockContentGenerator),
getChatCompression: vi.fn().mockReturnValue(undefined),
getHookSystem: vi.fn().mockReturnValue(undefined),
getDebugLogger: vi
.fn()
.mockReturnValue({ debug: vi.fn(), warn: vi.fn(), info: vi.fn() }),
getApprovalMode: vi.fn().mockReturnValue('default'),
getFileReadCache: vi.fn().mockReturnValue({ clear: vi.fn() }),
} as unknown as Config;
// Disable 429 simulation for tests
@ -1019,6 +1028,223 @@ describe('GeminiChat', async () => {
});
});
describe('auto-compression integration', () => {
function makeStreamResponse(text = 'ok') {
return (async function* () {
yield {
candidates: [
{
content: { parts: [{ text }], role: 'model' },
finishReason: 'STOP',
index: 0,
safetyRatings: [],
},
],
text: () => text,
} as unknown as GenerateContentResponse;
})();
}
it('releases the send-lock when auto-compression throws (no deadlock)', async () => {
const compressSpy = vi
.spyOn(ChatCompressionService.prototype, 'compress')
.mockRejectedValueOnce(new Error('compression API down'));
// First send: compression rejects, error propagates to caller. The
// streamDoneResolver must run so this.sendPromise resolves; otherwise
// every subsequent send blocks forever.
await expect(
chat.sendMessageStream(
'test-model',
{ message: 'first' },
'prompt-id-deadlock-1',
),
).rejects.toThrow('compression API down');
// Second send: compress returns NOOP, request goes through. If the
// lock leaked, this await would never resolve.
compressSpy.mockResolvedValueOnce({
newHistory: null,
info: {
originalTokenCount: 0,
newTokenCount: 0,
compressionStatus: CompressionStatus.NOOP,
},
});
vi.mocked(mockContentGenerator.generateContentStream).mockResolvedValue(
makeStreamResponse('second response'),
);
const stream = await chat.sendMessageStream(
'test-model',
{ message: 'second' },
'prompt-id-deadlock-2',
);
for await (const _ of stream) {
/* consume */
}
expect(compressSpy).toHaveBeenCalledTimes(2);
});
it('seeds inherited token count via setLastPromptTokenCount', async () => {
const subagentChat = new GeminiChat(mockConfig, config, [
{ role: 'user', parts: [{ text: 'inherited' }] },
{ role: 'model', parts: [{ text: 'inherited reply' }] },
]);
subagentChat.setLastPromptTokenCount(123_456);
expect(subagentChat.getLastPromptTokenCount()).toBe(123_456);
// The compression service receives the seeded count, so the threshold
// check sees the inherited size — not the constructor default of 0.
const compressSpy = vi
.spyOn(ChatCompressionService.prototype, 'compress')
.mockResolvedValue({
newHistory: null,
info: {
originalTokenCount: 123_456,
newTokenCount: 123_456,
compressionStatus: CompressionStatus.NOOP,
},
});
vi.mocked(mockContentGenerator.generateContentStream).mockResolvedValue(
makeStreamResponse(),
);
const stream = await subagentChat.sendMessageStream(
'test-model',
{ message: 'go' },
'prompt-id-seed',
);
for await (const _ of stream) {
/* consume */
}
expect(compressSpy).toHaveBeenCalledTimes(1);
expect(compressSpy.mock.calls[0][1].originalTokenCount).toBe(123_456);
});
it('yields a COMPRESSED stream event as the first event after auto-compression succeeds', async () => {
const compressedHistory: Content[] = [
{ role: 'user', parts: [{ text: 'summary' }] },
{ role: 'model', parts: [{ text: 'ok' }] },
];
vi.spyOn(
ChatCompressionService.prototype,
'compress',
).mockResolvedValueOnce({
newHistory: compressedHistory,
info: {
originalTokenCount: 1000,
newTokenCount: 200,
compressionStatus: CompressionStatus.COMPRESSED,
},
});
vi.mocked(mockContentGenerator.generateContentStream).mockResolvedValue(
makeStreamResponse('answer'),
);
const stream = await chat.sendMessageStream(
'test-model',
{ message: 'go' },
'prompt-id-yield-compressed',
);
const events: Array<{ type: StreamEventType }> = [];
for await (const event of stream) {
events.push(event as { type: StreamEventType });
}
expect(events.length).toBeGreaterThan(0);
expect(events[0].type).toBe(StreamEventType.COMPRESSED);
expect(
(events[0] as { type: StreamEventType; info: ChatCompressionInfo }).info
.compressionStatus,
).toBe(CompressionStatus.COMPRESSED);
expect(
(events[0] as { type: StreamEventType; info: ChatCompressionInfo }).info
.newTokenCount,
).toBe(200);
});
it('clears hasFailedCompressionAttempt after a forced successful compression', async () => {
const compressSpy = vi.spyOn(
ChatCompressionService.prototype,
'compress',
);
// Step 1: auto-compression fails — latch is set on the chat.
compressSpy.mockResolvedValueOnce({
newHistory: null,
info: {
originalTokenCount: 100_000,
newTokenCount: 100_000,
compressionStatus:
CompressionStatus.COMPRESSION_FAILED_INFLATED_TOKEN_COUNT,
},
});
vi.mocked(mockContentGenerator.generateContentStream).mockResolvedValue(
makeStreamResponse(),
);
const stream1 = await chat.sendMessageStream(
'test-model',
{ message: 'first' },
'prompt-latch-1',
);
for await (const _ of stream1) {
/* consume */
}
// Latch passed to service was false on this attempt; service marks it
// failed and tryCompress flips the chat's flag to true.
expect(compressSpy.mock.calls[0][1].hasFailedCompressionAttempt).toBe(
false,
);
// Step 2: a forced /compress succeeds. After this, the latch must
// be cleared so future auto-compressions are not suppressed.
compressSpy.mockResolvedValueOnce({
newHistory: [
{ role: 'user', parts: [{ text: 'summary' }] },
{ role: 'model', parts: [{ text: 'ack' }] },
],
info: {
originalTokenCount: 100_000,
newTokenCount: 30_000,
compressionStatus: CompressionStatus.COMPRESSED,
},
});
await chat.tryCompress('prompt-latch-force', 'test-model', true);
// tryCompress was called with force=true, so the service got latch=true
// (the gate is `hasFailedCompressionAttempt && !force`, force overrides).
expect(compressSpy.mock.calls[1][1].hasFailedCompressionAttempt).toBe(
true,
);
// Step 3: next auto-compression sees the cleared latch.
compressSpy.mockResolvedValueOnce({
newHistory: null,
info: {
originalTokenCount: 30_000,
newTokenCount: 30_000,
compressionStatus: CompressionStatus.NOOP,
},
});
vi.mocked(mockContentGenerator.generateContentStream).mockResolvedValue(
makeStreamResponse(),
);
const stream2 = await chat.sendMessageStream(
'test-model',
{ message: 'second' },
'prompt-latch-2',
);
for await (const _ of stream2) {
/* consume */
}
expect(compressSpy.mock.calls[2][1].hasFailedCompressionAttempt).toBe(
false,
);
});
});
describe('addHistory', () => {
it('should add a new content item to the history', () => {
const newContent: Content = {
@ -2425,4 +2651,132 @@ describe('GeminiChat', async () => {
expect(mergedText).toBe('BCD');
});
});
// Compression logic is tested in chatCompressionService.test.ts; this
// suite covers per-chat state on GeminiChat: hasFailedCompressionAttempt
// stickiness, token-count mutation, history replacement, and conditional
// telemetry mirroring.
describe('tryCompress (per-chat state)', () => {
const userMsg = (text: string) => ({
role: 'user' as const,
parts: [{ text }],
});
const modelMsg = (text: string) => ({
role: 'model' as const,
parts: [{ text }],
});
/**
* Mock a successful compression: the service returns COMPRESSED with a
* fresh history. We don't go through the real
* `config.getContentGenerator().generateContent` path here the service
* is mocked at the boundary.
*/
function mockCompressionService(
result: 'compressed' | 'failed-inflated' | 'noop',
) {
const compressSpy = vi.spyOn(
ChatCompressionService.prototype,
'compress',
);
if (result === 'compressed') {
compressSpy.mockResolvedValue({
newHistory: [userMsg('summary'), modelMsg('ok'), userMsg('latest')],
info: {
originalTokenCount: 1000,
newTokenCount: 200,
compressionStatus: CompressionStatus.COMPRESSED,
},
});
} else if (result === 'failed-inflated') {
compressSpy.mockResolvedValue({
newHistory: null,
info: {
originalTokenCount: 1000,
newTokenCount: 1100,
compressionStatus:
CompressionStatus.COMPRESSION_FAILED_INFLATED_TOKEN_COUNT,
},
});
} else {
compressSpy.mockResolvedValue({
newHistory: null,
info: {
originalTokenCount: 0,
newTokenCount: 0,
compressionStatus: CompressionStatus.NOOP,
},
});
}
return compressSpy;
}
it('replaces history and updates per-chat lastPromptTokenCount on COMPRESSED', async () => {
mockCompressionService('compressed');
chat.setHistory([userMsg('a'), modelMsg('b'), userMsg('c')]);
const info = await chat.tryCompress('p1', 'm1');
expect(info.compressionStatus).toBe(CompressionStatus.COMPRESSED);
expect(chat.getHistory()).toHaveLength(3);
expect(chat.getHistory()[0]).toEqual(userMsg('summary'));
expect(chat.getLastPromptTokenCount()).toBe(200);
});
it('mirrors lastPromptTokenCount to the global telemetry only when wired', async () => {
mockCompressionService('compressed');
// chat under test was constructed with telemetryService=uiTelemetryService.
await chat.tryCompress('p2', 'm1');
expect(uiTelemetryService.setLastPromptTokenCount).toHaveBeenCalledWith(
200,
);
// A subagent-style chat with no telemetryService must NOT touch the
// global singleton (per the constructor docstring; per-chat counter
// still updates).
const subagentChat = new GeminiChat(mockConfig, config, []);
vi.mocked(uiTelemetryService.setLastPromptTokenCount).mockClear();
mockCompressionService('compressed');
const info = await subagentChat.tryCompress('p3', 'm1');
expect(info.compressionStatus).toBe(CompressionStatus.COMPRESSED);
expect(subagentChat.getLastPromptTokenCount()).toBe(200);
expect(uiTelemetryService.setLastPromptTokenCount).not.toHaveBeenCalled();
});
it('marks hasFailedCompressionAttempt and suppresses subsequent unforced auto-compactions', async () => {
const compressSpy = mockCompressionService('failed-inflated');
const first = await chat.tryCompress('p1', 'm1');
expect(first.compressionStatus).toBe(
CompressionStatus.COMPRESSION_FAILED_INFLATED_TOKEN_COUNT,
);
expect(compressSpy).toHaveBeenCalledTimes(1);
// The next unforced call should reach the service with
// hasFailedCompressionAttempt=true; the service's threshold check then
// returns NOOP. The important thing here is that GeminiChat actually
// forwards the sticky flag.
compressSpy.mockClear();
compressSpy.mockResolvedValue({
newHistory: null,
info: {
originalTokenCount: 0,
newTokenCount: 0,
compressionStatus: CompressionStatus.NOOP,
},
});
await chat.tryCompress('p2', 'm1');
expect(compressSpy).toHaveBeenCalledTimes(1);
expect(compressSpy.mock.calls[0][1].hasFailedCompressionAttempt).toBe(
true,
);
});
it('forwards force=true to the compression service', async () => {
const compressSpy = mockCompressionService('compressed');
await chat.tryCompress('p1', 'm1', true);
expect(compressSpy.mock.calls[0][1].force).toBe(true);
});
});
});

View file

@ -31,11 +31,13 @@ import {
logContentRetryFailure,
} from '../telemetry/loggers.js';
import { type ChatRecordingService } from '../services/chatRecordingService.js';
import { ChatCompressionService } from '../services/chatCompressionService.js';
import {
ContentRetryEvent,
ContentRetryFailureEvent,
} from '../telemetry/types.js';
import type { UiTelemetryService } from '../telemetry/uiTelemetry.js';
import { type ChatCompressionInfo, CompressionStatus } from './turn.js';
const debugLogger = createDebugLogger('QWEN_CODE_CHAT');
@ -45,6 +47,11 @@ export enum StreamEventType {
/** A signal that a retry is about to happen. The UI should discard any partial
* content from the attempt that just failed. */
RETRY = 'retry',
/** Emitted once at the start of the stream when an automatic compression
* pass succeeded. Carries the compression result so callers (the main
* agent UI, subagent loop) can surface it without each call site running
* its own compaction step. */
COMPRESSED = 'compressed',
}
export type StreamEvent =
@ -56,7 +63,8 @@ export type StreamEvent =
* fresh restart (escalation). The UI should keep the accumulated text
* buffer so the continuation appends to it. */
isContinuation?: boolean;
};
}
| { type: StreamEventType.COMPRESSED; info: ChatCompressionInfo };
/**
* Options for retrying due to invalid content from the model.
@ -299,6 +307,22 @@ export class GeminiChat {
// model.
private sendPromise: Promise<void> = Promise.resolve();
/**
* Per-chat last-prompt-token-count, populated from `usageMetadata` on each
* model response. Used by the compaction threshold check so that subagents
* (which intentionally don't write to the global telemetry singleton) can
* still make compaction decisions based on their *own* context size.
*/
private lastPromptTokenCount = 0;
/**
* Per-chat sticky flag. After an unforced compression attempt fails (empty
* summary or inflated token count), automatic compaction is suppressed
* for the remainder of this chat to avoid burning compression API calls
* in a loop. Manual `/compress` still works (it passes `force=true`).
*/
private hasFailedCompressionAttempt = false;
/**
* Creates a new GeminiChat instance.
*
@ -321,6 +345,96 @@ export class GeminiChat {
validateHistory(history);
}
/**
* Most recent prompt-token count reported by the model for *this* chat,
* mirroring the value in {@link UiTelemetryService} for the main session.
* Subagent chats have no telemetry service wired but still need a per-chat
* count for compaction decisions, so this is always populated regardless
* of whether the global telemetry is updated.
*/
getLastPromptTokenCount(): number {
return this.lastPromptTokenCount;
}
/**
* Seed the last-prompt-token-count for chats created with inherited
* history (forks, subagents, speculation). Without this, the auto-compress
* threshold check sees `0` and refuses to compress so the first API call
* can 400 from oversized history. Callers pass the parent chat's
* `getLastPromptTokenCount()` here.
*/
setLastPromptTokenCount(count: number): void {
this.lastPromptTokenCount = count;
}
/**
* Attempt to compress this chat's history.
*
* Returns the compression info regardless of outcome. On a successful
* compaction (`COMPRESSED`), this method has already mutated the chat's
* history, recorded the event to `chatRecordingService` (if wired), and
* updated both the per-chat token count and (when wired) the global
* telemetry singleton.
*/
async tryCompress(
promptId: string,
model: string,
force = false,
signal?: AbortSignal,
): Promise<ChatCompressionInfo> {
const service = new ChatCompressionService();
const { newHistory, info } = await service.compress(this, {
promptId,
force,
model,
config: this.config,
hasFailedCompressionAttempt: this.hasFailedCompressionAttempt,
originalTokenCount: this.lastPromptTokenCount,
signal,
});
if (info.compressionStatus === CompressionStatus.COMPRESSED && newHistory) {
this.chatRecordingService?.recordChatCompression({
info,
compressedHistory: newHistory,
});
// Auto-compaction replaces history in place — no env-context refresh
// here. Manual /compress goes through GeminiClient.tryCompressChat,
// which calls startChat() to re-prepend a fresh env snapshot. See
// GeminiClient.sendMessageStream for the rationale behind the split.
this.setHistory(newHistory);
// Compaction summarises away prior full-Read tool results, but the
// FileReadCache still treats those reads as "in this conversation".
// A follow-up Read could then return the file_unchanged placeholder
// pointing at content the model can no longer retrieve from history.
debugLogger.debug('[FILE_READ_CACHE] clear after auto tryCompress');
this.config.getFileReadCache().clear();
this.lastPromptTokenCount = info.newTokenCount;
// Mirror to the global singleton only when wired (main session).
// Subagents pass `telemetryService=undefined` to keep their context
// usage out of the main agent's UI counters.
this.telemetryService?.setLastPromptTokenCount(info.newTokenCount);
// Re-enable auto-compaction so a forced /compress recovers a chat
// that an earlier auto-attempt latched off.
this.hasFailedCompressionAttempt = false;
} else if (
info.compressionStatus ===
CompressionStatus.COMPRESSION_FAILED_INFLATED_TOKEN_COUNT ||
info.compressionStatus ===
CompressionStatus.COMPRESSION_FAILED_EMPTY_SUMMARY ||
info.compressionStatus ===
CompressionStatus.COMPRESSION_FAILED_TOKEN_COUNT_ERROR
) {
// Track failed attempts (only mark as failed if not forced) so we
// stop spending compression-API calls on a chat that can't shrink.
if (!force) {
this.hasFailedCompressionAttempt = true;
}
}
return info;
}
setSystemInstruction(sysInstr: string) {
this.generationConfig.systemInstruction = sysInstr;
}
@ -360,6 +474,23 @@ export class GeminiChat {
});
this.sendPromise = streamDonePromise;
// The send-lock above is held but the generator's `finally` (which
// resolves it) has not run yet — if `tryCompress` throws, we must
// release the lock here or subsequent sends will block forever at
// `await this.sendPromise`.
let compressionInfo: ChatCompressionInfo;
try {
compressionInfo = await this.tryCompress(
prompt_id,
model,
false,
params.config?.abortSignal,
);
} catch (error) {
streamDoneResolver!();
throw error;
}
const userContent = createUserContent(params.message);
// Add user content to history ONCE before any attempts.
@ -370,6 +501,20 @@ export class GeminiChat {
const self = this;
return (async function* () {
try {
// Surface a successful auto-compression to the caller as the first
// event in the stream. Failed/skipped compaction attempts are silent.
// Must be inside the try so that a consumer abandoning the stream
// immediately after this event still triggers the finally below;
// otherwise `streamDoneResolver` never fires and the next send hangs.
if (
compressionInfo.compressionStatus === CompressionStatus.COMPRESSED
) {
yield {
type: StreamEventType.COMPRESSED,
info: compressionInfo,
};
}
let lastError: unknown = new Error('Request failed after all retries.');
let rateLimitRetryCount = 0;
let invalidStreamRetryCount = 0;
@ -890,8 +1035,14 @@ export class GeminiChat {
// Some providers omit total_tokens or return 0 in streaming usage chunks.
const lastPromptTokenCount =
usageMetadata.totalTokenCount || usageMetadata.promptTokenCount;
if (lastPromptTokenCount && this.telemetryService) {
this.telemetryService.setLastPromptTokenCount(lastPromptTokenCount);
if (lastPromptTokenCount) {
// Always update the per-chat counter so this chat (including
// subagents) can make its own compaction decisions.
this.lastPromptTokenCount = lastPromptTokenCount;
// Mirror to the global telemetry only when wired — subagents
// pass `telemetryService=undefined` to keep their context usage
// out of the main session's UI counters.
this.telemetryService?.setLastPromptTokenCount(lastPromptTokenCount);
}
if (usageMetadata.cachedContentTokenCount && this.telemetryService) {
this.telemetryService.setLastCachedContentTokenCount(

View file

@ -9,7 +9,7 @@ import type {
ServerGeminiToolCallRequestEvent,
ServerGeminiErrorEvent,
} from './turn.js';
import { Turn, GeminiEventType } from './turn.js';
import { CompressionStatus, Turn, GeminiEventType } from './turn.js';
import type { GenerateContentResponse, Part, Content } from '@google/genai';
import { reportError } from '../utils/errorReporting.js';
import type { GeminiChat } from './geminiChat.js';
@ -845,6 +845,38 @@ describe('Turn', () => {
{ type: GeminiEventType.Content, value: 'Success' },
]);
});
it('bridges a compressed stream event to a ChatCompressed event', async () => {
const compressionInfo = {
originalTokenCount: 1000,
newTokenCount: 200,
compressionStatus: CompressionStatus.COMPRESSED,
};
const mockResponseStream = (async function* () {
yield { type: StreamEventType.COMPRESSED, info: compressionInfo };
yield {
type: StreamEventType.CHUNK,
value: {
candidates: [{ content: { parts: [{ text: 'after' }] } }],
},
};
})();
mockSendMessageStream.mockResolvedValue(mockResponseStream);
const events = [];
for await (const event of turn.run(
'test-model',
[],
new AbortController().signal,
)) {
events.push(event);
}
expect(events).toEqual([
{ type: GeminiEventType.ChatCompressed, value: compressionInfo },
{ type: GeminiEventType.Content, value: 'after' },
]);
});
});
describe('getDebugResponses', () => {

View file

@ -306,6 +306,19 @@ export class Turn {
continue; // Skip to the next event in the stream
}
// Surface auto-compaction that fired inside chat.sendMessageStream
// as the top-level ChatCompressed event so existing UI handlers stay
// connected. This bridge is the primary path for auto-compaction
// events; manual /compress emits its own ChatCompressed in
// GeminiClient.tryCompressChat.
if (streamEvent.type === 'compressed') {
yield {
type: GeminiEventType.ChatCompressed,
value: streamEvent.info,
};
continue;
}
// Assuming other events are chunks with a `value` property
const resp = streamEvent.value as GenerateContentResponse;
if (!resp) continue; // Skip if there's no response body

File diff suppressed because it is too large Load diff

View file

@ -8,7 +8,6 @@ import type { Content } from '@google/genai';
import type { Config } from '../config/config.js';
import type { GeminiChat } from '../core/geminiChat.js';
import { type ChatCompressionInfo, CompressionStatus } from '../core/turn.js';
import { uiTelemetryService } from '../telemetry/uiTelemetry.js';
import { DEFAULT_TOKEN_LIMIT } from '../core/tokenLimits.js';
import { getCompressionPrompt } from '../core/prompts.js';
import { getResponseText } from '../utils/partUtils.js';
@ -40,15 +39,79 @@ export const COMPRESSION_PRESERVE_THRESHOLD = 0.3;
*/
export const MIN_COMPRESSION_FRACTION = 0.05;
/**
* When the trailing entry is an in-flight `model+functionCall` and the regular
* scan finds no clean split past the target fraction, the splitter falls back
* to compressing everything except the last few entries. This constant sets
* how many most-recent complete `(model+functionCall, user+functionResponse)`
* tool rounds are retained as working context (the trailing in-flight call is
* always retained on top of these).
*/
export const TOOL_ROUND_RETAIN_COUNT = 2;
const hasFunctionCall = (content: Content | undefined): boolean =>
!!content?.parts?.some((part) => !!part.functionCall);
const hasFunctionResponse = (content: Content | undefined): boolean =>
!!content?.parts?.some((part) => !!part.functionResponse);
/**
* Walk backward from the trailing in-flight `model+functionCall` and return
* the index after which the most-recent `retainCount` complete tool-round
* pairs sit (plus the trailing fc itself). Used by the splitter's in-flight
* fallback path. Stops counting at the first non-pair encountered, so the
* retain count is best-effort: if there are fewer complete pairs than
* requested, all of them are retained.
*/
function splitPointRetainingTrailingPairs(
contents: Content[],
retainCount: number,
): number {
let pairsFound = 0;
let i = contents.length - 2;
while (i >= 1 && pairsFound < retainCount) {
if (hasFunctionCall(contents[i - 1]) && hasFunctionResponse(contents[i])) {
pairsFound += 1;
i -= 2;
} else {
break;
}
}
return contents.length - (2 * pairsFound + 1);
}
/**
* Returns the index of the oldest item to keep when compressing. May return
* contents.length which indicates that everything should be compressed.
*
* The algorithm has two phases:
*
* 1. **Scan:** walk left-to-right looking for the first non-functionResponse
* user message that lands past `fraction` of total chars. That's the
* "clean" split the kept slice starts with a fresh user prompt.
*
* 2. **Fallbacks** (no clean split found): the gate that gets us here has
* already decided we need to compress, so all three fallbacks bias toward
* *more* compression rather than less:
*
* - last entry is `model` without functionCall compress everything.
* - last entry is `user` with functionResponse compress everything (the
* trailing tool round is complete; no orphans).
* - last entry is `model` with functionCall (in-flight) compress
* everything except the trailing call plus the last `retainCount`
* complete tool rounds. The kept slice may start with `model+fc`;
* callers must inject a synthetic continuation user message between
* `summary_ack_model` and the kept slice to preserve role alternation.
*
* The pre-fallback returns of `lastSplitPoint` (compress less) only happen
* for malformed histories that don't end in user/model.
*
* Exported for testing purposes.
*/
export function findCompressSplitPoint(
contents: Content[],
fraction: number,
retainCount = TOOL_ROUND_RETAIN_COUNT,
): number {
if (fraction <= 0 || fraction >= 1) {
throw new Error('Fraction must be between 0 and 1');
@ -58,14 +121,11 @@ export function findCompressSplitPoint(
const totalCharCount = charCounts.reduce((a, b) => a + b, 0);
const targetCharCount = totalCharCount * fraction;
let lastSplitPoint = 0; // 0 is always valid (compress nothing)
let lastSplitPoint = 0;
let cumulativeCharCount = 0;
for (let i = 0; i < contents.length; i++) {
const content = contents[i];
if (
content.role === 'user' &&
!content.parts?.some((part) => !!part.functionResponse)
) {
if (content.role === 'user' && !hasFunctionResponse(content)) {
if (cumulativeCharCount >= targetCharCount) {
return i;
}
@ -74,48 +134,57 @@ export function findCompressSplitPoint(
cumulativeCharCount += charCounts[i];
}
// We found no split points after targetCharCount.
// Check if it's safe to compress everything.
const lastContent = contents[contents.length - 1];
if (
lastContent?.role === 'model' &&
!lastContent?.parts?.some((part) => part.functionCall)
) {
if (lastContent?.role === 'model') {
if (!hasFunctionCall(lastContent)) return contents.length;
return splitPointRetainingTrailingPairs(contents, retainCount);
}
if (lastContent?.role === 'user' && hasFunctionResponse(lastContent)) {
return contents.length;
}
// Also safe to compress everything if the last message completes a tool call
// sequence (all function calls have matching responses).
if (
lastContent?.role === 'user' &&
lastContent?.parts?.some((part) => !!part.functionResponse)
) {
return contents.length;
}
return lastSplitPoint;
}
export interface CompressOptions {
promptId: string;
force: boolean;
model: string;
config: Config;
/**
* Whether a previous unforced compression attempt failed for this chat.
* Suppresses auto-compaction; manual `/compress` (force=true) overrides.
*/
hasFailedCompressionAttempt: boolean;
/**
* Most recent prompt token count for this chat. Compared against
* `threshold * contextWindowSize` for the auto-compaction gate. Callers
* source this from the per-chat counter (main session, subagents alike)
* the service does not read or write any global telemetry.
*/
originalTokenCount: number;
signal?: AbortSignal;
}
export class ChatCompressionService {
async compress(
chat: GeminiChat,
promptId: string,
force: boolean,
model: string,
config: Config,
hasFailedCompressionAttempt: boolean,
signal?: AbortSignal,
opts: CompressOptions,
): Promise<{ newHistory: Content[] | null; info: ChatCompressionInfo }> {
const curatedHistory = chat.getHistory(true);
const {
promptId,
force,
model,
config,
hasFailedCompressionAttempt,
originalTokenCount,
signal,
} = opts;
const threshold =
config.getChatCompression()?.contextPercentageThreshold ??
COMPRESSION_TOKEN_THRESHOLD;
// Regardless of `force`, don't do anything if the history is empty.
if (
curatedHistory.length === 0 ||
threshold <= 0 ||
(hasFailedCompressionAttempt && !force)
) {
// Cheap gates first — these don't need the curated history.
if (threshold <= 0 || (hasFailedCompressionAttempt && !force)) {
return {
newHistory: null,
info: {
@ -126,9 +195,9 @@ export class ChatCompressionService {
};
}
const originalTokenCount = uiTelemetryService.getLastPromptTokenCount();
// Don't compress if not forced and we are under the limit.
// Don't compress if not forced and we are under the limit. This is the
// steady-state path on every send; we want to exit before paying for the
// full `getHistory(true)` clone below.
if (!force) {
const contextLimit =
config.getContentGeneratorConfig()?.contextWindowSize ??
@ -145,6 +214,18 @@ export class ChatCompressionService {
}
}
const curatedHistory = chat.getHistory(true);
if (curatedHistory.length === 0) {
return {
newHistory: null,
info: {
originalTokenCount: 0,
newTokenCount: 0,
compressionStatus: CompressionStatus.NOOP,
},
};
}
// Fire PreCompact hook before compression begins
const hookSystem = config.getHookSystem();
if (hookSystem) {
@ -181,6 +262,10 @@ export class ChatCompressionService {
const historyToCompress = historyForSplit.slice(0, splitPoint);
const historyToKeep = historyForSplit.slice(splitPoint);
// The in-flight fallback path may produce a kept slice starting with
// model+functionCall; the post-summary history needs a synthetic user
// between the summary's model_ack and the kept entries.
const keepNeedsContinuationBridge = historyToKeep[0]?.role === 'model';
if (historyToCompress.length === 0) {
return {
@ -196,10 +281,6 @@ export class ChatCompressionService {
// Guard: if historyToCompress is too small relative to the total history,
// skip compression. This prevents futile API calls where the model receives
// almost no context and generates a useless "summary" that inflates tokens.
//
// Note: findCompressSplitPoint already computes charCounts internally but
// returns only the split index. We intentionally recompute here to keep
// the function signature simple; this is a minor, acceptable duplication.
const compressCharCount = historyToCompress.reduce(
(sum, c) => sum + JSON.stringify(c).length,
0,
@ -274,6 +355,22 @@ export class ChatCompressionService {
role: 'model',
parts: [{ text: 'Got it. Thanks for the additional context!' }],
},
// When the kept slice starts with model+functionCall (because
// tool-round absorption pulled the only fresh user message into
// compress), inject a synthetic continuation prompt so the joined
// history alternates correctly.
...(keepNeedsContinuationBridge
? [
{
role: 'user' as const,
parts: [
{
text: 'Continue with the prior task using the context above.',
},
],
},
]
: []),
...historyToKeep,
];
@ -339,8 +436,6 @@ export class ChatCompressionService {
},
};
} else {
uiTelemetryService.setLastPromptTokenCount(newTokenCount);
// Fire SessionStart event after successful compression
try {
const permissionMode = String(