From 095a39a8d58c25de40d6e2a6cda331b6f2cbdfe5 Mon Sep 17 00:00:00 2001 From: JerryLee <223425819+Jerry2003826@users.noreply.github.com> Date: Tue, 5 May 2026 20:14:13 +1000 Subject: [PATCH] fix(acp): run auto compression before model sends (#3698) --- .../acp-integration/session/Session.test.ts | 1039 ++++++++++++++++- .../src/acp-integration/session/Session.ts | 348 +++++- 2 files changed, 1346 insertions(+), 41 deletions(-) diff --git a/packages/cli/src/acp-integration/session/Session.test.ts b/packages/cli/src/acp-integration/session/Session.test.ts index d324ee697..99f34d2eb 100644 --- a/packages/cli/src/acp-integration/session/Session.test.ts +++ b/packages/cli/src/acp-integration/session/Session.test.ts @@ -46,6 +46,20 @@ function createStreamWithChunks( })(); } +function expectCompressBeforeSend( + compressMock: ReturnType, + sendMock: ReturnType, + callIndex: number, +) { + expect(compressMock.mock.invocationCallOrder.length).toBeGreaterThan( + callIndex, + ); + expect(sendMock.mock.invocationCallOrder.length).toBeGreaterThan(callIndex); + expect(compressMock.mock.invocationCallOrder[callIndex]).toBeLessThan( + sendMock.mock.invocationCallOrder[callIndex], + ); +} + describe('Session', () => { let mockChat: GeminiChat; let mockConfig: Config; @@ -56,6 +70,10 @@ describe('Session', () => { let currentAuthType: AuthType; let switchModelSpy: ReturnType; let getAvailableCommandsSpy: ReturnType; + let mockGeminiClient: { + getChat: ReturnType; + tryCompressChat: ReturnType; + }; let mockToolRegistry: { getTool: ReturnType; ensureTool: ReturnType; @@ -75,6 +93,14 @@ describe('Session', () => { addHistory: vi.fn(), getHistory: vi.fn().mockReturnValue([]), } as unknown as GeminiChat; + mockGeminiClient = { + getChat: vi.fn().mockReturnValue(mockChat), + tryCompressChat: vi.fn().mockResolvedValue({ + originalTokenCount: 0, + newTokenCount: 0, + compressionStatus: core.CompressionStatus.NOOP, + }), + }; mockToolRegistry = { getTool: vi.fn(), @@ -102,6 +128,7 @@ describe('Session', () => { recordUserMessage: vi.fn(), recordUiTelemetryEvent: vi.fn(), recordToolResult: vi.fn(), + recordSlashCommand: vi.fn(), }), getToolRegistry: vi.fn().mockReturnValue(mockToolRegistry), // #buildInitialSystemReminders iterates listSubagents() on every @@ -117,9 +144,8 @@ describe('Session', () => { getDebugMode: vi.fn().mockReturnValue(false), getAuthType: vi.fn().mockImplementation(() => currentAuthType), isCronEnabled: vi.fn().mockReturnValue(false), - getGeminiClient: vi - .fn() - .mockReturnValue({ getChat: vi.fn().mockReturnValue(mockChat) }), + getSessionTokenLimit: vi.fn().mockReturnValue(0), + getGeminiClient: vi.fn().mockReturnValue(mockGeminiClient), } as unknown as Config; mockClient = { @@ -155,6 +181,7 @@ describe('Session', () => { mockConfig = undefined as unknown as Config; mockClient = undefined as unknown as AgentSideConnection; mockSettings = undefined as unknown as LoadedSettings; + mockGeminiClient = undefined as unknown as typeof mockGeminiClient; mockToolRegistry = undefined as unknown as typeof mockToolRegistry; vi.restoreAllMocks(); vi.clearAllTimers(); @@ -328,6 +355,1012 @@ describe('Session', () => { }); describe('prompt', () => { + describe('auto-compress', () => { + it('runs automatic compression before sending an ACP prompt', async () => { + mockChat.sendMessageStream = vi + .fn() + .mockResolvedValue(createEmptyStream()); + + await session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'hello' }], + }); + + expect(mockGeminiClient.tryCompressChat).toHaveBeenCalledWith( + 'test-session-id########1', + false, + expect.any(AbortSignal), + ); + + const sendMessageStream = mockChat.sendMessageStream as ReturnType< + typeof vi.fn + >; + expectCompressBeforeSend( + mockGeminiClient.tryCompressChat, + sendMessageStream, + 0, + ); + }); + + it('uses the current chat after automatic compression replaces it', async () => { + const compressedChat = { + sendMessageStream: vi.fn().mockResolvedValue(createEmptyStream()), + addHistory: vi.fn(), + getHistory: vi.fn().mockReturnValue([]), + } as unknown as GeminiChat; + + mockChat.sendMessageStream = vi + .fn() + .mockResolvedValue(createEmptyStream()); + mockGeminiClient.tryCompressChat.mockImplementation(async () => { + mockGeminiClient.getChat.mockReturnValue(compressedChat); + return { + originalTokenCount: 1000, + newTokenCount: 200, + compressionStatus: core.CompressionStatus.COMPRESSED, + }; + }); + + await session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'hello' }], + }); + + expect(mockChat.sendMessageStream).not.toHaveBeenCalled(); + expect(compressedChat.sendMessageStream).toHaveBeenCalledWith( + 'qwen3-code-plus', + { + message: expect.any(Array), + config: { abortSignal: expect.any(AbortSignal) }, + }, + 'test-session-id########1', + ); + }); + + it('emits an ACP-visible update when automatic compression succeeds', async () => { + mockGeminiClient.tryCompressChat.mockResolvedValueOnce({ + originalTokenCount: 1200, + newTokenCount: 450, + compressionStatus: core.CompressionStatus.COMPRESSED, + }); + mockChat.sendMessageStream = vi + .fn() + .mockResolvedValue(createEmptyStream()); + + await session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'hello' }], + }); + + expect(mockClient.sessionUpdate).toHaveBeenCalledWith({ + sessionId: 'test-session-id', + update: { + sessionUpdate: 'agent_message_chunk', + content: { + type: 'text', + text: + 'IMPORTANT: This conversation approached the input token limit for qwen3-code-plus. ' + + 'A compressed context will be sent for future messages (compressed from: 1200 to 450 tokens).', + }, + }, + }); + }); + + it('continues sending when automatic compression fails', async () => { + mockGeminiClient.tryCompressChat.mockRejectedValueOnce( + new Error('compression rate limited'), + ); + mockChat.sendMessageStream = vi + .fn() + .mockResolvedValue(createEmptyStream()); + + await session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'hello' }], + }); + + expect(mockGeminiClient.tryCompressChat).toHaveBeenCalledWith( + 'test-session-id########1', + false, + expect.any(AbortSignal), + ); + expect(mockChat.sendMessageStream).toHaveBeenCalledWith( + 'qwen3-code-plus', + { + message: expect.any(Array), + config: { abortSignal: expect.any(AbortSignal) }, + }, + 'test-session-id########1', + ); + }); + + it('does not use global UI telemetry when compression fails before local token counts exist', async () => { + mockConfig.getSessionTokenLimit = vi.fn().mockReturnValue(100); + vi.spyOn( + core.uiTelemetryService, + 'getLastPromptTokenCount', + ).mockReturnValue(101); + mockGeminiClient.tryCompressChat.mockRejectedValueOnce( + new Error('compression rate limited'), + ); + mockChat.sendMessageStream = vi + .fn() + .mockResolvedValue(createEmptyStream()); + + await expect( + session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'hello' }], + }), + ).resolves.toEqual({ stopReason: 'end_turn' }); + + expect(mockChat.sendMessageStream).toHaveBeenCalledTimes(1); + expect(mockClient.sessionUpdate).not.toHaveBeenCalledWith( + expect.objectContaining({ + update: expect.objectContaining({ + sessionUpdate: 'agent_message_chunk', + content: expect.objectContaining({ + text: expect.stringContaining('Session token limit exceeded'), + }), + }), + }), + ); + }); + + it('returns cancelled when automatic compression is aborted', async () => { + mockConfig.getSessionTokenLimit = vi.fn().mockReturnValue(100); + mockGeminiClient.tryCompressChat.mockImplementation( + async (_promptId: string, _force: boolean, signal: AbortSignal) => + new Promise((_, reject) => { + signal.addEventListener('abort', () => { + const abortError = new Error('aborted'); + abortError.name = 'AbortError'; + reject(abortError); + }); + }), + ); + mockChat.sendMessageStream = vi + .fn() + .mockResolvedValue(createEmptyStream()); + + const promptPromise = session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'hello' }], + }); + await vi.waitFor(() => { + expect(mockGeminiClient.tryCompressChat).toHaveBeenCalled(); + }); + + await session.cancelPendingPrompt(); + + await expect(promptPromise).resolves.toEqual({ + stopReason: 'cancelled', + }); + expect(mockChat.sendMessageStream).not.toHaveBeenCalled(); + expect(mockChat.addHistory).toHaveBeenCalledWith({ + role: 'user', + parts: expect.any(Array), + }); + expect(mockClient.sessionUpdate).not.toHaveBeenCalledWith({ + sessionId: 'test-session-id', + update: { + sessionUpdate: 'agent_message_chunk', + content: { + type: 'text', + text: + 'Session token limit exceeded: 101 tokens > 100 limit. ' + + 'Please start a new session or increase the sessionTokenLimit in your settings.json.', + }, + }, + }); + }); + + it('uses compression token info instead of global UI telemetry for the session limit', async () => { + mockConfig.getSessionTokenLimit = vi.fn().mockReturnValue(100); + vi.spyOn( + core.uiTelemetryService, + 'getLastPromptTokenCount', + ).mockReturnValue(999); + mockGeminiClient.tryCompressChat.mockResolvedValueOnce({ + originalTokenCount: 50, + newTokenCount: 50, + compressionStatus: core.CompressionStatus.NOOP, + }); + mockChat.sendMessageStream = vi + .fn() + .mockResolvedValue(createEmptyStream()); + + await session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'hello' }], + }); + + expect(mockChat.sendMessageStream).toHaveBeenCalledTimes(1); + }); + + it('falls back to the previous prompt token count when compression returns zero token info', async () => { + mockConfig.getSessionTokenLimit = vi.fn().mockReturnValue(100); + mockGeminiClient.tryCompressChat.mockResolvedValue({ + originalTokenCount: 0, + newTokenCount: 0, + compressionStatus: core.CompressionStatus.NOOP, + }); + mockChat.sendMessageStream = vi + .fn() + .mockResolvedValueOnce( + createStreamWithChunks([ + { + type: core.StreamEventType.CHUNK, + value: { + usageMetadata: { + totalTokenCount: 101, + promptTokenCount: 101, + }, + }, + }, + ]), + ) + .mockResolvedValueOnce(createEmptyStream()); + + await expect( + session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'first' }], + }), + ).resolves.toEqual({ stopReason: 'end_turn' }); + await expect( + session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'second' }], + }), + ).resolves.toEqual({ stopReason: 'max_tokens' }); + + expect(mockChat.sendMessageStream).toHaveBeenCalledTimes(1); + }); + + it('falls back to the previous prompt token count when compressed token info is zero', async () => { + mockConfig.getSessionTokenLimit = vi.fn().mockReturnValue(100); + mockGeminiClient.tryCompressChat + .mockResolvedValueOnce({ + originalTokenCount: 50, + newTokenCount: 50, + compressionStatus: core.CompressionStatus.NOOP, + }) + .mockResolvedValueOnce({ + originalTokenCount: 1200, + newTokenCount: 0, + compressionStatus: core.CompressionStatus.COMPRESSED, + }); + mockChat.sendMessageStream = vi + .fn() + .mockResolvedValueOnce( + createStreamWithChunks([ + { + type: core.StreamEventType.CHUNK, + value: { + usageMetadata: { + totalTokenCount: 101, + promptTokenCount: 101, + }, + }, + }, + ]), + ) + .mockResolvedValueOnce(createEmptyStream()); + + await expect( + session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'first' }], + }), + ).resolves.toEqual({ stopReason: 'end_turn' }); + await expect( + session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'second' }], + }), + ).resolves.toEqual({ stopReason: 'max_tokens' }); + + expect(mockChat.sendMessageStream).toHaveBeenCalledTimes(1); + }); + + it('records prompt token count instead of total token count for later session-limit checks', async () => { + mockConfig.getSessionTokenLimit = vi.fn().mockReturnValue(100); + mockGeminiClient.tryCompressChat + .mockResolvedValueOnce({ + originalTokenCount: 0, + newTokenCount: 0, + compressionStatus: core.CompressionStatus.NOOP, + }) + .mockRejectedValueOnce(new Error('compression unavailable')); + mockChat.sendMessageStream = vi + .fn() + .mockResolvedValueOnce( + createStreamWithChunks([ + { + type: core.StreamEventType.CHUNK, + value: { + usageMetadata: { + totalTokenCount: 500, + promptTokenCount: 50, + }, + }, + }, + ]), + ) + .mockResolvedValueOnce(createEmptyStream()); + + await expect( + session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'long response' }], + }), + ).resolves.toEqual({ stopReason: 'end_turn' }); + await expect( + session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'next prompt' }], + }), + ).resolves.toEqual({ stopReason: 'end_turn' }); + + expect(mockChat.sendMessageStream).toHaveBeenCalledTimes(2); + }); + + it('resets the session-local token count when the active chat instance changes', async () => { + const clearedChat = { + sendMessageStream: vi.fn().mockResolvedValue(createEmptyStream()), + addHistory: vi.fn(), + getHistory: vi.fn().mockReturnValue([]), + } as unknown as GeminiChat; + mockConfig.getSessionTokenLimit = vi.fn().mockReturnValue(100); + mockGeminiClient.tryCompressChat + .mockResolvedValueOnce({ + originalTokenCount: 50, + newTokenCount: 50, + compressionStatus: core.CompressionStatus.NOOP, + }) + .mockRejectedValueOnce(new Error('compression unavailable')); + mockChat.sendMessageStream = vi.fn().mockResolvedValueOnce( + createStreamWithChunks([ + { + type: core.StreamEventType.CHUNK, + value: { + usageMetadata: { + totalTokenCount: 500, + promptTokenCount: 101, + }, + }, + }, + ]), + ); + + await expect( + session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'before clear' }], + }), + ).resolves.toEqual({ stopReason: 'end_turn' }); + + mockGeminiClient.getChat.mockReturnValue(clearedChat); + + await expect( + session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'after clear' }], + }), + ).resolves.toEqual({ stopReason: 'end_turn' }); + + expect(clearedChat.sendMessageStream).toHaveBeenCalledTimes(1); + }); + + it('continues sending when the compression notification fails', async () => { + mockGeminiClient.tryCompressChat.mockResolvedValueOnce({ + originalTokenCount: 1200, + newTokenCount: 450, + compressionStatus: core.CompressionStatus.COMPRESSED, + }); + mockClient.sessionUpdate = vi + .fn() + .mockRejectedValueOnce(new Error('client disconnected')); + mockChat.sendMessageStream = vi + .fn() + .mockResolvedValue(createEmptyStream()); + + await session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'hello' }], + }); + + expect(mockChat.sendMessageStream).toHaveBeenCalledTimes(1); + }); + + it('stops before sending when the compressed prompt exceeds the session token limit', async () => { + mockConfig.getSessionTokenLimit = vi.fn().mockReturnValue(100); + mockGeminiClient.tryCompressChat.mockResolvedValueOnce({ + originalTokenCount: 1200, + newTokenCount: 101, + compressionStatus: core.CompressionStatus.COMPRESSED, + }); + mockChat.sendMessageStream = vi + .fn() + .mockResolvedValue(createEmptyStream()); + + await expect( + session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'hello' }], + }), + ).resolves.toEqual({ stopReason: 'max_tokens' }); + + expect(mockGeminiClient.tryCompressChat).toHaveBeenCalled(); + expect(mockChat.sendMessageStream).not.toHaveBeenCalled(); + expect(mockChat.addHistory).not.toHaveBeenCalled(); + expect(mockClient.sessionUpdate).not.toHaveBeenCalledWith({ + sessionId: 'test-session-id', + update: { + sessionUpdate: 'agent_message_chunk', + content: { + type: 'text', + text: + 'IMPORTANT: This conversation approached the input token limit for qwen3-code-plus. ' + + 'A compressed context will be sent for future messages (compressed from: 1200 to 101 tokens).', + }, + }, + }); + expect(mockClient.sessionUpdate).toHaveBeenCalledWith({ + sessionId: 'test-session-id', + update: { + sessionUpdate: 'agent_message_chunk', + content: { + type: 'text', + text: + 'Session token limit exceeded: 101 tokens > 100 limit. ' + + 'Please start a new session or increase the sessionTokenLimit in your settings.json.', + }, + }, + }); + }); + + it('stops without throwing when the token-limit diagnostic fails', async () => { + mockConfig.getSessionTokenLimit = vi.fn().mockReturnValue(100); + mockGeminiClient.tryCompressChat.mockResolvedValueOnce({ + originalTokenCount: 101, + newTokenCount: 101, + compressionStatus: core.CompressionStatus.NOOP, + }); + mockClient.sessionUpdate = vi + .fn() + .mockRejectedValueOnce(new Error('client disconnected')); + mockChat.sendMessageStream = vi + .fn() + .mockResolvedValue(createEmptyStream()); + + await expect( + session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'hello' }], + }), + ).resolves.toEqual({ stopReason: 'max_tokens' }); + + expect(mockChat.sendMessageStream).not.toHaveBeenCalled(); + expect(mockChat.addHistory).not.toHaveBeenCalled(); + }); + + it('also runs automatic compression before tool response follow-up sends', async () => { + const executeSpy = vi.fn().mockResolvedValue({ + llmContent: 'file contents', + returnDisplay: 'file contents', + }); + const tool = { + name: 'read_file', + kind: core.Kind.Read, + build: vi.fn().mockReturnValue({ + params: { path: '/tmp/test.txt' }, + getDefaultPermission: vi.fn().mockResolvedValue('allow'), + getDescription: vi.fn().mockReturnValue('Read file'), + toolLocations: vi.fn().mockReturnValue([]), + execute: executeSpy, + }), + }; + + mockToolRegistry.getTool.mockReturnValue(tool); + mockConfig.getApprovalMode = vi.fn().mockReturnValue(ApprovalMode.YOLO); + mockChat.sendMessageStream = vi + .fn() + .mockResolvedValueOnce( + createStreamWithChunks([ + { + type: core.StreamEventType.CHUNK, + value: { + functionCalls: [ + { + id: 'call-1', + name: 'read_file', + args: { path: '/tmp/test.txt' }, + }, + ], + }, + }, + ]), + ) + .mockResolvedValueOnce(createEmptyStream()); + + await session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'read file' }], + }); + + expect(mockChat.sendMessageStream).toHaveBeenCalledTimes(2); + expect(mockGeminiClient.tryCompressChat).toHaveBeenCalledTimes(2); + expect(mockGeminiClient.tryCompressChat).toHaveBeenNthCalledWith( + 2, + 'test-session-id########1', + false, + expect.any(AbortSignal), + ); + + const sendMessageStream = mockChat.sendMessageStream as ReturnType< + typeof vi.fn + >; + expectCompressBeforeSend( + mockGeminiClient.tryCompressChat, + sendMessageStream, + 1, + ); + }); + + it('stops tool response follow-up before sending when the session token limit is exceeded', async () => { + const executeSpy = vi.fn().mockResolvedValue({ + llmContent: 'file contents', + returnDisplay: 'file contents', + }); + const tool = { + name: 'read_file', + kind: core.Kind.Read, + build: vi.fn().mockReturnValue({ + params: { path: '/tmp/test.txt' }, + getDefaultPermission: vi.fn().mockResolvedValue('allow'), + getDescription: vi.fn().mockReturnValue('Read file'), + toolLocations: vi.fn().mockReturnValue([]), + execute: executeSpy, + }), + }; + + mockToolRegistry.getTool.mockReturnValue(tool); + mockConfig.getApprovalMode = vi.fn().mockReturnValue(ApprovalMode.YOLO); + mockConfig.getSessionTokenLimit = vi.fn().mockReturnValue(100); + mockGeminiClient.tryCompressChat + .mockResolvedValueOnce({ + originalTokenCount: 50, + newTokenCount: 50, + compressionStatus: core.CompressionStatus.NOOP, + }) + .mockResolvedValueOnce({ + originalTokenCount: 101, + newTokenCount: 101, + compressionStatus: core.CompressionStatus.NOOP, + }); + mockChat.sendMessageStream = vi + .fn() + .mockResolvedValueOnce( + createStreamWithChunks([ + { + type: core.StreamEventType.CHUNK, + value: { + functionCalls: [ + { + id: 'call-1', + name: 'read_file', + args: { path: '/tmp/test.txt' }, + }, + ], + }, + }, + ]), + ) + .mockResolvedValueOnce(createEmptyStream()); + + await expect( + session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'read file' }], + }), + ).resolves.toEqual({ stopReason: 'max_tokens' }); + + expect(executeSpy).toHaveBeenCalledTimes(1); + expect(mockGeminiClient.tryCompressChat).toHaveBeenCalledTimes(2); + expect(mockGeminiClient.tryCompressChat).toHaveBeenNthCalledWith( + 2, + 'test-session-id########1', + false, + expect.any(AbortSignal), + ); + expect(mockChat.sendMessageStream).toHaveBeenCalledTimes(1); + expect(mockChat.addHistory).toHaveBeenCalledWith({ + role: 'user', + parts: [ + expect.objectContaining({ + functionResponse: expect.objectContaining({ + id: 'call-1', + name: 'read_file', + }), + }), + ], + }); + expect(mockClient.sessionUpdate).toHaveBeenCalledWith({ + sessionId: 'test-session-id', + update: { + sessionUpdate: 'agent_message_chunk', + content: { + type: 'text', + text: + 'Session token limit exceeded: 101 tokens > 100 limit. ' + + 'Please start a new session or increase the sessionTokenLimit in your settings.json.', + }, + }, + }); + }); + + it('runs automatic compression before Stop-hook continuation sends', async () => { + const messageBus = { + request: vi + .fn() + .mockResolvedValueOnce({ + success: true, + output: { + decision: 'block', + reason: 'Continue after Stop hook', + }, + }) + .mockResolvedValueOnce({ + success: true, + output: {}, + }), + }; + mockConfig.getMessageBus = vi.fn().mockReturnValue(messageBus); + mockConfig.getDisableAllHooks = vi.fn().mockReturnValue(false); + mockConfig.hasHooksForEvent = vi + .fn() + .mockImplementation((eventName: string) => eventName === 'Stop'); + mockChat.getHistory = vi + .fn() + .mockReturnValue([ + { role: 'model', parts: [{ text: 'response text' }] }, + ]); + mockChat.sendMessageStream = vi + .fn() + .mockResolvedValueOnce(createEmptyStream()) + .mockResolvedValueOnce(createEmptyStream()); + + await session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'hello' }], + }); + + expect(mockChat.sendMessageStream).toHaveBeenCalledTimes(2); + expect(mockGeminiClient.tryCompressChat).toHaveBeenNthCalledWith( + 2, + 'test-session-id########1_stop_hook_1', + false, + expect.any(AbortSignal), + ); + + const sendMessageStream = mockChat.sendMessageStream as ReturnType< + typeof vi.fn + >; + expectCompressBeforeSend( + mockGeminiClient.tryCompressChat, + sendMessageStream, + 1, + ); + }); + + it('skips automatic compression after the first Stop-hook continuation', async () => { + const messageBus = { + request: vi + .fn() + .mockResolvedValueOnce({ + success: true, + output: { + decision: 'block', + reason: 'Continue after first Stop hook', + }, + }) + .mockResolvedValueOnce({ + success: true, + output: { + decision: 'block', + reason: 'Continue after second Stop hook', + }, + }) + .mockResolvedValueOnce({ + success: true, + output: {}, + }), + }; + mockConfig.getMessageBus = vi.fn().mockReturnValue(messageBus); + mockConfig.getDisableAllHooks = vi.fn().mockReturnValue(false); + mockConfig.hasHooksForEvent = vi + .fn() + .mockImplementation((eventName: string) => eventName === 'Stop'); + mockChat.getHistory = vi + .fn() + .mockReturnValue([ + { role: 'model', parts: [{ text: 'response text' }] }, + ]); + mockChat.sendMessageStream = vi + .fn() + .mockResolvedValueOnce(createEmptyStream()) + .mockResolvedValueOnce(createEmptyStream()) + .mockResolvedValueOnce(createEmptyStream()); + + await session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'hello' }], + }); + + expect(mockChat.sendMessageStream).toHaveBeenCalledTimes(3); + expect(mockGeminiClient.tryCompressChat).toHaveBeenCalledTimes(2); + expect(mockGeminiClient.tryCompressChat).toHaveBeenNthCalledWith( + 2, + 'test-session-id########1_stop_hook_1', + false, + expect.any(AbortSignal), + ); + expect(mockGeminiClient.tryCompressChat).not.toHaveBeenCalledWith( + 'test-session-id########1_stop_hook_2', + false, + expect.any(AbortSignal), + ); + + const sendMessageStream = mockChat.sendMessageStream as ReturnType< + typeof vi.fn + >; + expect(sendMessageStream.mock.calls[2]?.[2]).toBe( + 'test-session-id########1_stop_hook_2', + ); + }); + + it('stops Stop-hook continuation before sending when the session token limit is exceeded', async () => { + const messageBus = { + request: vi + .fn() + .mockResolvedValueOnce({ + success: true, + output: { + decision: 'block', + reason: 'Continue after Stop hook', + }, + }) + .mockResolvedValueOnce({ + success: true, + output: {}, + }), + }; + mockConfig.getMessageBus = vi.fn().mockReturnValue(messageBus); + mockConfig.getDisableAllHooks = vi.fn().mockReturnValue(false); + mockConfig.hasHooksForEvent = vi + .fn() + .mockImplementation((eventName: string) => eventName === 'Stop'); + mockConfig.getSessionTokenLimit = vi.fn().mockReturnValue(100); + mockGeminiClient.tryCompressChat + .mockResolvedValueOnce({ + originalTokenCount: 50, + newTokenCount: 50, + compressionStatus: core.CompressionStatus.NOOP, + }) + .mockResolvedValueOnce({ + originalTokenCount: 101, + newTokenCount: 101, + compressionStatus: core.CompressionStatus.NOOP, + }); + mockChat.getHistory = vi + .fn() + .mockReturnValue([ + { role: 'model', parts: [{ text: 'response text' }] }, + ]); + mockChat.sendMessageStream = vi + .fn() + .mockResolvedValue(createEmptyStream()); + + await expect( + session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'hello' }], + }), + ).resolves.toEqual({ stopReason: 'max_tokens' }); + + expect(mockGeminiClient.tryCompressChat).toHaveBeenCalledTimes(2); + expect(mockGeminiClient.tryCompressChat).toHaveBeenNthCalledWith( + 2, + 'test-session-id########1_stop_hook_1', + false, + expect.any(AbortSignal), + ); + expect(mockChat.sendMessageStream).toHaveBeenCalledTimes(1); + expect(mockClient.sessionUpdate).toHaveBeenCalledWith({ + sessionId: 'test-session-id', + update: { + sessionUpdate: 'agent_message_chunk', + content: { + type: 'text', + text: + 'Session token limit exceeded: 101 tokens > 100 limit. ' + + 'Please start a new session or increase the sessionTokenLimit in your settings.json.', + }, + }, + }); + }); + + it('runs automatic compression before cron-fired ACP prompt sends', async () => { + const scheduler = { + size: 1, + start: vi.fn((callback: (job: { prompt: string }) => void) => { + callback({ prompt: 'scheduled prompt' }); + }), + stop: vi.fn(), + getExitSummary: vi.fn().mockReturnValue(undefined), + }; + mockConfig.isCronEnabled = vi.fn().mockReturnValue(true); + mockConfig.getCronScheduler = vi.fn().mockReturnValue(scheduler); + mockChat.sendMessageStream = vi + .fn() + .mockResolvedValueOnce(createEmptyStream()) + .mockResolvedValueOnce(createEmptyStream()); + + await session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'hello' }], + }); + + await vi.waitFor(() => { + expect(mockChat.sendMessageStream).toHaveBeenCalledTimes(2); + }); + + expect(scheduler.start).toHaveBeenCalledTimes(1); + expect(mockGeminiClient.tryCompressChat).toHaveBeenNthCalledWith( + 1, + 'test-session-id########1', + false, + expect.any(AbortSignal), + ); + expect(mockGeminiClient.tryCompressChat).toHaveBeenNthCalledWith( + 2, + expect.stringMatching(/^test-session-id########cron\d+$/), + false, + expect.any(AbortSignal), + ); + + const sendMessageStream = mockChat.sendMessageStream as ReturnType< + typeof vi.fn + >; + expectCompressBeforeSend( + mockGeminiClient.tryCompressChat, + sendMessageStream, + 1, + ); + }); + + it('stops cron-fired ACP prompt before sending when the session token limit is exceeded', async () => { + let cronCallback: ((job: { prompt: string }) => void) | undefined; + const scheduler = { + size: 1, + start: vi.fn((callback: (job: { prompt: string }) => void) => { + cronCallback = callback; + callback({ prompt: 'scheduled prompt' }); + }), + stop: vi.fn(), + getExitSummary: vi.fn().mockReturnValue(undefined), + }; + mockConfig.isCronEnabled = vi.fn().mockReturnValue(true); + mockConfig.getCronScheduler = vi.fn().mockReturnValue(scheduler); + mockConfig.getSessionTokenLimit = vi.fn().mockReturnValue(100); + mockGeminiClient.tryCompressChat + .mockResolvedValueOnce({ + originalTokenCount: 50, + newTokenCount: 50, + compressionStatus: core.CompressionStatus.NOOP, + }) + .mockResolvedValueOnce({ + originalTokenCount: 101, + newTokenCount: 101, + compressionStatus: core.CompressionStatus.NOOP, + }); + mockChat.sendMessageStream = vi + .fn() + .mockResolvedValue(createEmptyStream()); + + await session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'hello' }], + }); + + await vi.waitFor(() => { + expect(mockGeminiClient.tryCompressChat).toHaveBeenCalledTimes(2); + }); + + expect(scheduler.start).toHaveBeenCalledTimes(1); + expect(mockGeminiClient.tryCompressChat).toHaveBeenNthCalledWith( + 2, + expect.stringMatching(/^test-session-id########cron\d+$/), + false, + expect.any(AbortSignal), + ); + expect(mockChat.sendMessageStream).toHaveBeenCalledTimes(1); + expect(mockClient.sessionUpdate).toHaveBeenCalledWith({ + sessionId: 'test-session-id', + update: { + sessionUpdate: 'agent_message_chunk', + content: { + type: 'text', + text: + 'Session token limit exceeded: 101 tokens > 100 limit. ' + + 'Please start a new session or increase the sessionTokenLimit in your settings.json.', + }, + }, + }); + expect(scheduler.stop).toHaveBeenCalledTimes(1); + await vi.waitFor(() => { + expect(mockClient.sessionUpdate).toHaveBeenCalledWith({ + sessionId: 'test-session-id', + update: { + sessionUpdate: 'agent_message_chunk', + content: { + type: 'text', + text: 'Cron jobs disabled for the rest of this session due to token limit. Restart the session to re-enable.', + }, + }, + }); + }); + + const sessionUpdateMock = mockClient.sessionUpdate as ReturnType< + typeof vi.fn + >; + const tokenLimitDiagnosticCount = () => + sessionUpdateMock.mock.calls.filter((call) => { + const notification = call[0] as { + update?: { + sessionUpdate?: string; + content?: { type?: string; text?: string }; + }; + }; + return ( + notification.update?.sessionUpdate === 'agent_message_chunk' && + notification.update.content?.type === 'text' && + notification.update.content.text?.includes( + 'Session token limit exceeded', + ) + ); + }).length; + const diagnosticCountBefore = tokenLimitDiagnosticCount(); + + cronCallback?.({ prompt: 'scheduled prompt again' }); + await Promise.resolve(); + + expect(mockGeminiClient.tryCompressChat).toHaveBeenCalledTimes(2); + expect(tokenLimitDiagnosticCount()).toBe(diagnosticCountBefore); + }); + + it('does not auto-compress slash commands handled without a model send', async () => { + vi.mocked( + nonInteractiveCliCommands.handleSlashCommand, + ).mockResolvedValueOnce({ + type: 'message', + messageType: 'info', + content: 'Already compressed.', + }); + mockChat.sendMessageStream = vi.fn(); + + await session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: '/compress' }], + }); + + expect(mockGeminiClient.tryCompressChat).not.toHaveBeenCalled(); + expect(mockChat.sendMessageStream).not.toHaveBeenCalled(); + }); + }); + it('passes resolved paths to read_many_files tool', async () => { const tempDir = await fs.mkdtemp( path.join(os.tmpdir(), 'qwen-acp-session-'), diff --git a/packages/cli/src/acp-integration/session/Session.ts b/packages/cli/src/acp-integration/session/Session.ts index 299754cd2..9b1c77e04 100644 --- a/packages/cli/src/acp-integration/session/Session.ts +++ b/packages/cli/src/acp-integration/session/Session.ts @@ -21,10 +21,13 @@ import type { HookExecutionRequest, HookExecutionResponse, MessageBus, + StreamEvent, + ChatCompressionInfo, } from '@qwen-code/qwen-code-core'; import { AuthType, ApprovalMode, + CompressionStatus, convertToFunctionResponse, createDebugLogger, DiscoveredMCPTool, @@ -108,6 +111,10 @@ import { const debugLogger = createDebugLogger('SESSION'); +type AutoCompressionSendResult = + | { responseStream: AsyncGenerator; stopReason?: never } + | { responseStream: null; stopReason: PromptResponse['stopReason'] }; + /** * Session represents an active conversation session with the AI model. * It uses modular components for consistent event emission: @@ -134,6 +141,9 @@ export class Session implements SessionContext { private cronProcessing = false; private cronAbortController: AbortController | null = null; private cronCompletion: Promise | null = null; + private cronDisabledByTokenLimit = false; + private lastPromptTokenCount = 0; + private lastPromptTokenCountChat: GeminiChat | null = null; // Modular components private readonly historyReplayer: HistoryReplayer; @@ -295,9 +305,6 @@ export class Session implements SessionContext { // Increment turn counter for each user prompt this.turn += 1; - // Always fetch the current chat from GeminiClient so that /clear's - // resetChat() (which replaces the chat instance) is reflected here. - const chat = this.config.getGeminiClient()!.getChat(); const promptId = this.config.getSessionId() + '########' + this.turn; // Extract text from all text blocks to construct the full prompt text for logging @@ -413,7 +420,7 @@ export class Session implements SessionContext { while (nextMessage !== null) { if (pendingSend.signal.aborted) { - chat.addHistory(nextMessage); + this.#getCurrentChat().addHistory(nextMessage); return { stopReason: 'cancelled' }; } @@ -422,16 +429,19 @@ export class Session implements SessionContext { const streamStartTime = Date.now(); try { - const responseStream = await chat.sendMessageStream( - this.config.getModel(), - { - message: nextMessage?.parts ?? [], - config: { - abortSignal: pendingSend.signal, - }, - }, + const sendResult = await this.#sendMessageStreamWithAutoCompression( promptId, + nextMessage?.parts ?? [], + pendingSend.signal, ); + if (!sendResult.responseStream) { + this.#preserveUnsentMessageHistory( + nextMessage, + sendResult.stopReason === 'cancelled', + ); + return { stopReason: sendResult.stopReason }; + } + const responseStream = sendResult.responseStream; nextMessage = null; for await (const resp of responseStream) { @@ -510,6 +520,7 @@ export class Session implements SessionContext { } if (usageMetadata) { + this.#recordPromptTokenCount(usageMetadata); // Kick off rewrite in background (non-blocking, runs parallel to tools) if (this.messageRewriter) { this.messageRewriter.flushTurn(pendingSend.signal); @@ -541,7 +552,6 @@ export class Session implements SessionContext { // Fire Stop hook loop (aligned with core path in client.ts) // This is triggered after model response completes with no pending tool calls return this.#handleStopHookLoop( - chat, pendingSend, promptId, hooksEnabled, @@ -557,20 +567,18 @@ export class Session implements SessionContext { * If a Stop hook requests continuation, it sends a follow-up message and loops back. * Maximum iterations (100) prevent infinite loops. * - * @param chat - The GeminiChat instance * @param pendingSend - The abort controller for the current prompt * @param promptId - The prompt ID for tracking * @param hooksEnabled - Whether hooks are enabled * @param messageBus - The MessageBus for hook communication (may be undefined) - * @returns The stop reason ('end_turn' or 'cancelled') + * @returns The ACP stop reason for the prompt. */ async #handleStopHookLoop( - chat: GeminiChat, pendingSend: AbortController, promptId: string, hooksEnabled: boolean, messageBus: MessageBus | undefined, - ): Promise<{ stopReason: 'end_turn' | 'cancelled' }> { + ): Promise<{ stopReason: PromptResponse['stopReason'] }> { const MAX_STOP_HOOK_ITERATIONS = 100; let stopHookIterationCount = 0; let stopHookReasons: string[] = []; @@ -586,7 +594,7 @@ export class Session implements SessionContext { } // Get response text from the chat history - const history = chat.getHistory(); + const history = this.#getCurrentChat().getHistory(); const lastModelMessage = history .filter((msg: Content) => msg.role === 'model') .pop(); @@ -666,16 +674,21 @@ export class Session implements SessionContext { const streamStartTime = Date.now(); try { - const continueResponseStream = await chat.sendMessageStream( - this.config.getModel(), - { - message: nextMessage?.parts ?? [], - config: { - abortSignal: pendingSend.signal, - }, - }, - promptId + '_stop_hook_' + stopHookIterationCount, - ); + const continueSendResult = + await this.#sendMessageStreamWithAutoCompression( + promptId + '_stop_hook_' + stopHookIterationCount, + nextMessage?.parts ?? [], + pendingSend.signal, + { skipCompression: stopHookIterationCount > 1 }, + ); + if (!continueSendResult.responseStream) { + this.#preserveUnsentMessageHistory( + nextMessage, + continueSendResult.stopReason === 'cancelled', + ); + return { stopReason: continueSendResult.stopReason }; + } + const continueResponseStream = continueSendResult.responseStream; nextMessage = null; for await (const resp of continueResponseStream) { @@ -749,6 +762,7 @@ export class Session implements SessionContext { } if (usageMetadata) { + this.#recordPromptTokenCount(usageMetadata); const durationMs = Date.now() - streamStartTime; await this.messageEmitter.emitUsageMetadata( usageMetadata, @@ -795,6 +809,245 @@ export class Session implements SessionContext { await this.client.sessionUpdate(params); } + #getCurrentChat(): GeminiChat { + return this.config.getGeminiClient()!.getChat(); + } + + /** + * Mirrors the core send path for ACP model sends. + * + * Attempts automatic chat compression first, checks the session token limit, + * emits an ACP-visible notice when compression succeeds, and returns the ACP + * stop reason when the provider send should be skipped because the request + * was cancelled or the session token limit was exceeded. + */ + async #sendMessageStreamWithAutoCompression( + promptId: string, + message: Part[], + abortSignal: AbortSignal, + options: { skipCompression?: boolean } = {}, + ): Promise { + const geminiClient = this.config.getGeminiClient()!; + let compressionDiagnostic: string | null = null; + let compressionInfo: ChatCompressionInfo | null = null; + if (!options.skipCompression) { + try { + const compressed = await geminiClient.tryCompressChat( + promptId, + false, + abortSignal, + ); + compressionInfo = compressed; + this.#recordCompressionTokenCount(compressed); + if (compressed.compressionStatus === CompressionStatus.COMPRESSED) { + compressionDiagnostic = + `IMPORTANT: This conversation approached the input token limit for ${this.config.getModel()}. ` + + `A compressed context will be sent for future messages (compressed from: ` + + `${compressed.originalTokenCount ?? 'unknown'} to ` + + `${compressed.newTokenCount ?? 'unknown'} tokens).`; + } + } catch (compressionError) { + if (abortSignal.aborted || this.#isAbortError(compressionError)) { + debugLogger.debug(`Auto-compression aborted for prompt ${promptId}`); + return { responseStream: null, stopReason: 'cancelled' }; + } + debugLogger.warn( + `Auto-compression failed for prompt ${promptId}; proceeding without compression: ` + + this.#formatError(compressionError), + ); + } + } + + if (abortSignal.aborted) { + debugLogger.debug(`Auto-compression aborted for prompt ${promptId}`); + return { responseStream: null, stopReason: 'cancelled' }; + } + + if (!compressionInfo) { + this.#syncPromptTokenCountWithCurrentChat(); + } + + const sessionTokenLimit = this.config.getSessionTokenLimit(); + if (sessionTokenLimit > 0) { + const lastPromptTokenCount = + this.#getPostCompressionTokenCount(compressionInfo); + if (lastPromptTokenCount > sessionTokenLimit) { + debugLogger.warn( + `Session token limit exceeded for prompt ${promptId}: ` + + `${lastPromptTokenCount} > ${sessionTokenLimit}. Send dropped.`, + ); + await this.#emitAgentDiagnosticMessageSafely( + `Session token limit exceeded: ${lastPromptTokenCount} tokens > ${sessionTokenLimit} limit. ` + + 'Please start a new session or increase the sessionTokenLimit in your settings.json.', + `Failed to emit token limit diagnostic for prompt ${promptId}`, + ); + return { responseStream: null, stopReason: 'max_tokens' }; + } + } + + if (compressionDiagnostic) { + await this.#emitAgentDiagnosticMessageSafely( + compressionDiagnostic, + `Failed to emit compression notification for prompt ${promptId}`, + ); + } + + if (abortSignal.aborted) { + debugLogger.debug( + `Send aborted after compression diagnostic for prompt ${promptId}`, + ); + return { responseStream: null, stopReason: 'cancelled' }; + } + + const responseStream = await this.#getCurrentChat().sendMessageStream( + this.config.getModel(), + { + message, + config: { + abortSignal, + }, + }, + promptId, + ); + return { responseStream }; + } + + #preserveUnsentMessageHistory( + message: Content | null, + preserveFullMessage: boolean, + ): void { + if (!message) return; + + if (preserveFullMessage) { + this.#getCurrentChat().addHistory(message); + return; + } + + const functionResponseParts = + message.parts?.filter( + (part: Part) => 'functionResponse' in part && part.functionResponse, + ) ?? []; + const droppedParts = + (message.parts?.length ?? 0) - functionResponseParts.length; + if (droppedParts > 0) { + debugLogger.debug( + `Dropping ${droppedParts} non-functionResponse part(s) from unsent ACP message after send was skipped.`, + ); + } + if (functionResponseParts.length > 0) { + this.#getCurrentChat().addHistory({ + ...message, + parts: functionResponseParts, + }); + } + } + + #recordCompressionTokenCount(info: ChatCompressionInfo): void { + this.#syncPromptTokenCountWithCurrentChat(); + const tokenCount = this.#extractCompressionTokenCount(info); + if (tokenCount !== null && tokenCount > 0) { + this.lastPromptTokenCount = tokenCount; + } + } + + #recordPromptTokenCount( + usageMetadata: GenerateContentResponseUsageMetadata, + ): void { + this.#syncPromptTokenCountWithCurrentChat(); + const tokenCount = + usageMetadata.promptTokenCount ?? usageMetadata.totalTokenCount; + if (tokenCount !== undefined && tokenCount > 0) { + this.lastPromptTokenCount = tokenCount; + } + } + + #getPostCompressionTokenCount(info: ChatCompressionInfo | null): number { + const tokenCount = this.#extractCompressionTokenCount(info); + if (tokenCount !== null) { + return tokenCount; + } + + return this.lastPromptTokenCount; + } + + #extractCompressionTokenCount( + info: ChatCompressionInfo | null, + ): number | null { + if (!info) { + return null; + } + if (info.compressionStatus === CompressionStatus.COMPRESSED) { + return info.newTokenCount > 0 ? info.newTokenCount : null; + } + const tokenCount = info.originalTokenCount ?? info.newTokenCount ?? null; + if (tokenCount === 0 && info.compressionStatus === CompressionStatus.NOOP) { + return null; + } + return tokenCount; + } + + #syncPromptTokenCountWithCurrentChat(): void { + const chat = this.#getCurrentChat(); + if ( + this.lastPromptTokenCountChat && + this.lastPromptTokenCountChat !== chat + ) { + this.lastPromptTokenCount = 0; + } + this.lastPromptTokenCountChat = chat; + } + + #isAbortError(error: unknown): boolean { + return ( + (error instanceof Error && error.name === 'AbortError') || + (typeof DOMException !== 'undefined' && + error instanceof DOMException && + error.name === 'AbortError') || + (typeof error === 'object' && + error !== null && + 'name' in error && + (error as { name?: unknown }).name === 'AbortError') + ); + } + + #formatError(error: unknown): string { + if (error instanceof Error) { + const parts = [error.message]; + const cause = (error as Error & { cause?: unknown }).cause; + if (cause instanceof Error) { + parts.push(`cause: ${cause.message}`); + } + const status = (error as Error & { status?: unknown }).status; + if (status !== undefined) { + parts.push(`status: ${String(status)}`); + } + return parts.join(' | '); + } + try { + return JSON.stringify(error) ?? String(error); + } catch { + return String(error); + } + } + + async #emitAgentDiagnosticMessageSafely( + text: string, + failureContext: string, + ): Promise { + try { + await this.#emitAgentDiagnosticMessage(text); + } catch (notifyError) { + debugLogger.warn(`${failureContext}: ${this.#formatError(notifyError)}`); + } + } + + async #emitAgentDiagnosticMessage(text: string): Promise { + await this.sendUpdate({ + sessionUpdate: 'agent_message_chunk', + content: { type: 'text', text }, + }); + } + /** * Starts the cron scheduler if cron is enabled and jobs exist. * The scheduler runs in the background, pushing fired prompts into @@ -802,10 +1055,12 @@ export class Session implements SessionContext { */ #startCronSchedulerIfNeeded(): void { if (!this.config.isCronEnabled()) return; + if (this.cronDisabledByTokenLimit) return; const scheduler = this.config.getCronScheduler(); if (scheduler.size === 0) return; scheduler.start((job: { prompt: string }) => { + if (this.cronDisabledByTokenLimit) return; this.cronQueue.push(job.prompt); void this.#drainCronQueue(); }); @@ -885,17 +1140,22 @@ export class Session implements SessionContext { null; const streamStartTime = Date.now(); - const responseStream = await this.config - .getGeminiClient()! - .getChat() - .sendMessageStream( - this.config.getModel(), - { - message: nextMessage.parts ?? [], - config: { abortSignal: ac.signal }, - }, - promptId, + const sendResult = await this.#sendMessageStreamWithAutoCompression( + promptId, + nextMessage.parts ?? [], + ac.signal, + ); + if (!sendResult.responseStream) { + this.#preserveUnsentMessageHistory( + nextMessage, + sendResult.stopReason === 'cancelled', ); + if (sendResult.stopReason === 'max_tokens') { + this.#stopCronAfterTokenLimit(); + } + return; + } + const responseStream = sendResult.responseStream; nextMessage = null; for await (const resp of responseStream) { @@ -933,6 +1193,7 @@ export class Session implements SessionContext { } if (usageMetadata) { + this.#recordPromptTokenCount(usageMetadata); // Kick off rewrite in background (non-blocking) if (this.messageRewriter) { this.messageRewriter.flushTurn(ac.signal); @@ -968,6 +1229,17 @@ export class Session implements SessionContext { ); } + #stopCronAfterTokenLimit(): void { + this.cronDisabledByTokenLimit = true; + this.cronQueue = []; + if (!this.config.isCronEnabled()) return; + this.config.getCronScheduler().stop(); + void this.#emitAgentDiagnosticMessageSafely( + 'Cron jobs disabled for the rest of this session due to token limit. Restart the session to re-enable.', + 'Failed to emit cron-disabled diagnostic', + ); + } + async sendAvailableCommandsUpdate(): Promise { const abortController = new AbortController(); try {