fix(acp): run auto compression before model sends (#3698)

This commit is contained in:
JerryLee 2026-05-05 20:14:13 +10:00 committed by GitHub
parent 0501c7165b
commit 095a39a8d5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 1346 additions and 41 deletions

File diff suppressed because it is too large Load diff

View file

@ -21,10 +21,13 @@ import type {
HookExecutionRequest,
HookExecutionResponse,
MessageBus,
StreamEvent,
ChatCompressionInfo,
} from '@qwen-code/qwen-code-core';
import {
AuthType,
ApprovalMode,
CompressionStatus,
convertToFunctionResponse,
createDebugLogger,
DiscoveredMCPTool,
@ -108,6 +111,10 @@ import {
const debugLogger = createDebugLogger('SESSION');
type AutoCompressionSendResult =
| { responseStream: AsyncGenerator<StreamEvent>; stopReason?: never }
| { responseStream: null; stopReason: PromptResponse['stopReason'] };
/**
* Session represents an active conversation session with the AI model.
* It uses modular components for consistent event emission:
@ -134,6 +141,9 @@ export class Session implements SessionContext {
private cronProcessing = false;
private cronAbortController: AbortController | null = null;
private cronCompletion: Promise<void> | null = null;
private cronDisabledByTokenLimit = false;
private lastPromptTokenCount = 0;
private lastPromptTokenCountChat: GeminiChat | null = null;
// Modular components
private readonly historyReplayer: HistoryReplayer;
@ -295,9 +305,6 @@ export class Session implements SessionContext {
// Increment turn counter for each user prompt
this.turn += 1;
// Always fetch the current chat from GeminiClient so that /clear's
// resetChat() (which replaces the chat instance) is reflected here.
const chat = this.config.getGeminiClient()!.getChat();
const promptId = this.config.getSessionId() + '########' + this.turn;
// Extract text from all text blocks to construct the full prompt text for logging
@ -413,7 +420,7 @@ export class Session implements SessionContext {
while (nextMessage !== null) {
if (pendingSend.signal.aborted) {
chat.addHistory(nextMessage);
this.#getCurrentChat().addHistory(nextMessage);
return { stopReason: 'cancelled' };
}
@ -422,16 +429,19 @@ export class Session implements SessionContext {
const streamStartTime = Date.now();
try {
const responseStream = await chat.sendMessageStream(
this.config.getModel(),
{
message: nextMessage?.parts ?? [],
config: {
abortSignal: pendingSend.signal,
},
},
const sendResult = await this.#sendMessageStreamWithAutoCompression(
promptId,
nextMessage?.parts ?? [],
pendingSend.signal,
);
if (!sendResult.responseStream) {
this.#preserveUnsentMessageHistory(
nextMessage,
sendResult.stopReason === 'cancelled',
);
return { stopReason: sendResult.stopReason };
}
const responseStream = sendResult.responseStream;
nextMessage = null;
for await (const resp of responseStream) {
@ -510,6 +520,7 @@ export class Session implements SessionContext {
}
if (usageMetadata) {
this.#recordPromptTokenCount(usageMetadata);
// Kick off rewrite in background (non-blocking, runs parallel to tools)
if (this.messageRewriter) {
this.messageRewriter.flushTurn(pendingSend.signal);
@ -541,7 +552,6 @@ export class Session implements SessionContext {
// Fire Stop hook loop (aligned with core path in client.ts)
// This is triggered after model response completes with no pending tool calls
return this.#handleStopHookLoop(
chat,
pendingSend,
promptId,
hooksEnabled,
@ -557,20 +567,18 @@ export class Session implements SessionContext {
* If a Stop hook requests continuation, it sends a follow-up message and loops back.
* Maximum iterations (100) prevent infinite loops.
*
* @param chat - The GeminiChat instance
* @param pendingSend - The abort controller for the current prompt
* @param promptId - The prompt ID for tracking
* @param hooksEnabled - Whether hooks are enabled
* @param messageBus - The MessageBus for hook communication (may be undefined)
* @returns The stop reason ('end_turn' or 'cancelled')
* @returns The ACP stop reason for the prompt.
*/
async #handleStopHookLoop(
chat: GeminiChat,
pendingSend: AbortController,
promptId: string,
hooksEnabled: boolean,
messageBus: MessageBus | undefined,
): Promise<{ stopReason: 'end_turn' | 'cancelled' }> {
): Promise<{ stopReason: PromptResponse['stopReason'] }> {
const MAX_STOP_HOOK_ITERATIONS = 100;
let stopHookIterationCount = 0;
let stopHookReasons: string[] = [];
@ -586,7 +594,7 @@ export class Session implements SessionContext {
}
// Get response text from the chat history
const history = chat.getHistory();
const history = this.#getCurrentChat().getHistory();
const lastModelMessage = history
.filter((msg: Content) => msg.role === 'model')
.pop();
@ -666,16 +674,21 @@ export class Session implements SessionContext {
const streamStartTime = Date.now();
try {
const continueResponseStream = await chat.sendMessageStream(
this.config.getModel(),
{
message: nextMessage?.parts ?? [],
config: {
abortSignal: pendingSend.signal,
},
},
promptId + '_stop_hook_' + stopHookIterationCount,
);
const continueSendResult =
await this.#sendMessageStreamWithAutoCompression(
promptId + '_stop_hook_' + stopHookIterationCount,
nextMessage?.parts ?? [],
pendingSend.signal,
{ skipCompression: stopHookIterationCount > 1 },
);
if (!continueSendResult.responseStream) {
this.#preserveUnsentMessageHistory(
nextMessage,
continueSendResult.stopReason === 'cancelled',
);
return { stopReason: continueSendResult.stopReason };
}
const continueResponseStream = continueSendResult.responseStream;
nextMessage = null;
for await (const resp of continueResponseStream) {
@ -749,6 +762,7 @@ export class Session implements SessionContext {
}
if (usageMetadata) {
this.#recordPromptTokenCount(usageMetadata);
const durationMs = Date.now() - streamStartTime;
await this.messageEmitter.emitUsageMetadata(
usageMetadata,
@ -795,6 +809,245 @@ export class Session implements SessionContext {
await this.client.sessionUpdate(params);
}
#getCurrentChat(): GeminiChat {
return this.config.getGeminiClient()!.getChat();
}
/**
* Mirrors the core send path for ACP model sends.
*
* Attempts automatic chat compression first, checks the session token limit,
* emits an ACP-visible notice when compression succeeds, and returns the ACP
* stop reason when the provider send should be skipped because the request
* was cancelled or the session token limit was exceeded.
*/
async #sendMessageStreamWithAutoCompression(
promptId: string,
message: Part[],
abortSignal: AbortSignal,
options: { skipCompression?: boolean } = {},
): Promise<AutoCompressionSendResult> {
const geminiClient = this.config.getGeminiClient()!;
let compressionDiagnostic: string | null = null;
let compressionInfo: ChatCompressionInfo | null = null;
if (!options.skipCompression) {
try {
const compressed = await geminiClient.tryCompressChat(
promptId,
false,
abortSignal,
);
compressionInfo = compressed;
this.#recordCompressionTokenCount(compressed);
if (compressed.compressionStatus === CompressionStatus.COMPRESSED) {
compressionDiagnostic =
`IMPORTANT: This conversation approached the input token limit for ${this.config.getModel()}. ` +
`A compressed context will be sent for future messages (compressed from: ` +
`${compressed.originalTokenCount ?? 'unknown'} to ` +
`${compressed.newTokenCount ?? 'unknown'} tokens).`;
}
} catch (compressionError) {
if (abortSignal.aborted || this.#isAbortError(compressionError)) {
debugLogger.debug(`Auto-compression aborted for prompt ${promptId}`);
return { responseStream: null, stopReason: 'cancelled' };
}
debugLogger.warn(
`Auto-compression failed for prompt ${promptId}; proceeding without compression: ` +
this.#formatError(compressionError),
);
}
}
if (abortSignal.aborted) {
debugLogger.debug(`Auto-compression aborted for prompt ${promptId}`);
return { responseStream: null, stopReason: 'cancelled' };
}
if (!compressionInfo) {
this.#syncPromptTokenCountWithCurrentChat();
}
const sessionTokenLimit = this.config.getSessionTokenLimit();
if (sessionTokenLimit > 0) {
const lastPromptTokenCount =
this.#getPostCompressionTokenCount(compressionInfo);
if (lastPromptTokenCount > sessionTokenLimit) {
debugLogger.warn(
`Session token limit exceeded for prompt ${promptId}: ` +
`${lastPromptTokenCount} > ${sessionTokenLimit}. Send dropped.`,
);
await this.#emitAgentDiagnosticMessageSafely(
`Session token limit exceeded: ${lastPromptTokenCount} tokens > ${sessionTokenLimit} limit. ` +
'Please start a new session or increase the sessionTokenLimit in your settings.json.',
`Failed to emit token limit diagnostic for prompt ${promptId}`,
);
return { responseStream: null, stopReason: 'max_tokens' };
}
}
if (compressionDiagnostic) {
await this.#emitAgentDiagnosticMessageSafely(
compressionDiagnostic,
`Failed to emit compression notification for prompt ${promptId}`,
);
}
if (abortSignal.aborted) {
debugLogger.debug(
`Send aborted after compression diagnostic for prompt ${promptId}`,
);
return { responseStream: null, stopReason: 'cancelled' };
}
const responseStream = await this.#getCurrentChat().sendMessageStream(
this.config.getModel(),
{
message,
config: {
abortSignal,
},
},
promptId,
);
return { responseStream };
}
#preserveUnsentMessageHistory(
message: Content | null,
preserveFullMessage: boolean,
): void {
if (!message) return;
if (preserveFullMessage) {
this.#getCurrentChat().addHistory(message);
return;
}
const functionResponseParts =
message.parts?.filter(
(part: Part) => 'functionResponse' in part && part.functionResponse,
) ?? [];
const droppedParts =
(message.parts?.length ?? 0) - functionResponseParts.length;
if (droppedParts > 0) {
debugLogger.debug(
`Dropping ${droppedParts} non-functionResponse part(s) from unsent ACP message after send was skipped.`,
);
}
if (functionResponseParts.length > 0) {
this.#getCurrentChat().addHistory({
...message,
parts: functionResponseParts,
});
}
}
#recordCompressionTokenCount(info: ChatCompressionInfo): void {
this.#syncPromptTokenCountWithCurrentChat();
const tokenCount = this.#extractCompressionTokenCount(info);
if (tokenCount !== null && tokenCount > 0) {
this.lastPromptTokenCount = tokenCount;
}
}
#recordPromptTokenCount(
usageMetadata: GenerateContentResponseUsageMetadata,
): void {
this.#syncPromptTokenCountWithCurrentChat();
const tokenCount =
usageMetadata.promptTokenCount ?? usageMetadata.totalTokenCount;
if (tokenCount !== undefined && tokenCount > 0) {
this.lastPromptTokenCount = tokenCount;
}
}
#getPostCompressionTokenCount(info: ChatCompressionInfo | null): number {
const tokenCount = this.#extractCompressionTokenCount(info);
if (tokenCount !== null) {
return tokenCount;
}
return this.lastPromptTokenCount;
}
#extractCompressionTokenCount(
info: ChatCompressionInfo | null,
): number | null {
if (!info) {
return null;
}
if (info.compressionStatus === CompressionStatus.COMPRESSED) {
return info.newTokenCount > 0 ? info.newTokenCount : null;
}
const tokenCount = info.originalTokenCount ?? info.newTokenCount ?? null;
if (tokenCount === 0 && info.compressionStatus === CompressionStatus.NOOP) {
return null;
}
return tokenCount;
}
#syncPromptTokenCountWithCurrentChat(): void {
const chat = this.#getCurrentChat();
if (
this.lastPromptTokenCountChat &&
this.lastPromptTokenCountChat !== chat
) {
this.lastPromptTokenCount = 0;
}
this.lastPromptTokenCountChat = chat;
}
#isAbortError(error: unknown): boolean {
return (
(error instanceof Error && error.name === 'AbortError') ||
(typeof DOMException !== 'undefined' &&
error instanceof DOMException &&
error.name === 'AbortError') ||
(typeof error === 'object' &&
error !== null &&
'name' in error &&
(error as { name?: unknown }).name === 'AbortError')
);
}
#formatError(error: unknown): string {
if (error instanceof Error) {
const parts = [error.message];
const cause = (error as Error & { cause?: unknown }).cause;
if (cause instanceof Error) {
parts.push(`cause: ${cause.message}`);
}
const status = (error as Error & { status?: unknown }).status;
if (status !== undefined) {
parts.push(`status: ${String(status)}`);
}
return parts.join(' | ');
}
try {
return JSON.stringify(error) ?? String(error);
} catch {
return String(error);
}
}
async #emitAgentDiagnosticMessageSafely(
text: string,
failureContext: string,
): Promise<void> {
try {
await this.#emitAgentDiagnosticMessage(text);
} catch (notifyError) {
debugLogger.warn(`${failureContext}: ${this.#formatError(notifyError)}`);
}
}
async #emitAgentDiagnosticMessage(text: string): Promise<void> {
await this.sendUpdate({
sessionUpdate: 'agent_message_chunk',
content: { type: 'text', text },
});
}
/**
* Starts the cron scheduler if cron is enabled and jobs exist.
* The scheduler runs in the background, pushing fired prompts into
@ -802,10 +1055,12 @@ export class Session implements SessionContext {
*/
#startCronSchedulerIfNeeded(): void {
if (!this.config.isCronEnabled()) return;
if (this.cronDisabledByTokenLimit) return;
const scheduler = this.config.getCronScheduler();
if (scheduler.size === 0) return;
scheduler.start((job: { prompt: string }) => {
if (this.cronDisabledByTokenLimit) return;
this.cronQueue.push(job.prompt);
void this.#drainCronQueue();
});
@ -885,17 +1140,22 @@ export class Session implements SessionContext {
null;
const streamStartTime = Date.now();
const responseStream = await this.config
.getGeminiClient()!
.getChat()
.sendMessageStream(
this.config.getModel(),
{
message: nextMessage.parts ?? [],
config: { abortSignal: ac.signal },
},
promptId,
const sendResult = await this.#sendMessageStreamWithAutoCompression(
promptId,
nextMessage.parts ?? [],
ac.signal,
);
if (!sendResult.responseStream) {
this.#preserveUnsentMessageHistory(
nextMessage,
sendResult.stopReason === 'cancelled',
);
if (sendResult.stopReason === 'max_tokens') {
this.#stopCronAfterTokenLimit();
}
return;
}
const responseStream = sendResult.responseStream;
nextMessage = null;
for await (const resp of responseStream) {
@ -933,6 +1193,7 @@ export class Session implements SessionContext {
}
if (usageMetadata) {
this.#recordPromptTokenCount(usageMetadata);
// Kick off rewrite in background (non-blocking)
if (this.messageRewriter) {
this.messageRewriter.flushTurn(ac.signal);
@ -968,6 +1229,17 @@ export class Session implements SessionContext {
);
}
#stopCronAfterTokenLimit(): void {
this.cronDisabledByTokenLimit = true;
this.cronQueue = [];
if (!this.config.isCronEnabled()) return;
this.config.getCronScheduler().stop();
void this.#emitAgentDiagnosticMessageSafely(
'Cron jobs disabled for the rest of this session due to token limit. Restart the session to re-enable.',
'Failed to emit cron-disabled diagnostic',
);
}
async sendAvailableCommandsUpdate(): Promise<void> {
const abortController = new AbortController();
try {