diff --git a/docs/design/adaptive-output-token-escalation/adaptive-output-token-escalation-design.md b/docs/design/adaptive-output-token-escalation/adaptive-output-token-escalation-design.md index b24e6c10f..8b72bd190 100644 --- a/docs/design/adaptive-output-token-escalation/adaptive-output-token-escalation-design.md +++ b/docs/design/adaptive-output-token-escalation/adaptive-output-token-escalation-design.md @@ -1,6 +1,6 @@ # Adaptive Output Token Escalation Design -> Reduces GPU slot over-reservation by ~4x through a "low default + escalate on truncation" strategy for output tokens. +> Reduces GPU slot over-reservation by ~4x through a "low default + escalate on truncation" strategy for output tokens, with multi-turn recovery for responses that exceed even the escalated limit. ## Problem @@ -8,61 +8,81 @@ Every API request reserves a fixed GPU slot proportional to `max_tokens`. The pr ## Solution -Use a capped default of **8K** output tokens. When a response is truncated (the model hits `max_tokens`), automatically retry once with an escalated limit of **64K**. Since <1% of requests are actually truncated, this reduces average slot reservation significantly while preserving output quality for long responses. +Use a capped default of **8K** output tokens. When a response is truncated (the model hits `max_tokens`): + +1. **Escalate** to the model's full output limit (with 64K as a floor for unknown models) +2. If still truncated, **recover** by keeping the partial response in history and injecting a continuation message, up to 3 times +3. If recovery is exhausted, fall back to the tool scheduler's truncation guidance + +Since <1% of requests are actually truncated, this reduces average slot reservation significantly while preserving output quality for long responses. ## Architecture ``` - ┌─────────────────────────┐ - │ Request starts │ - │ max_tokens = 8K │ - └───────────┬─────────────┘ - │ - ▼ - ┌─────────────────────────┐ - │ Stream response │ - └───────────┬─────────────┘ - │ - ┌─────────┴─────────┐ - │ │ - finish_reason finish_reason - != MAX_TOKENS == MAX_TOKENS - │ │ - ▼ ▼ - ┌───────────┐ ┌─────────────────────┐ - │ Done │ │ Check conditions: │ - └───────────┘ │ - No user override? │ - │ - No env override? │ - │ - Not already │ - │ escalated? │ - └─────────┬───────────┘ - YES │ NO - ┌─────────┴────┐ - │ │ - ▼ ▼ - ┌─────────────┐ ┌──────────┐ - │ Pop partial │ │ Done │ - │ model resp │ │ (truncd) │ - │ from history│ └──────────┘ - │ │ - │ Yield RETRY │ - │ event │ - │ │ - │ Re-send │ - │ max_tokens │ - │ = 64K │ - └─────────────┘ +Request (max_tokens = 8K) +│ +▼ +┌─────────────────────────┐ +│ Response truncated? │──── No ──▶ Done ✓ +│ (MAX_TOKENS) │ +└───────────┬──────────────┘ + │ Yes + ▼ +┌──────────────────────────────────────────────────┐ +│ Layer 1: Escalate to model output limit │ +│ ┌────────────────────────────────────────────┐ │ +│ │ Pop partial response from history │ │ +│ │ RETRY (isContinuation: false → reset UI) │ │ +│ │ Re-send at max(64K, model output limit) │ │ +│ └────────────────────────────────────────────┘ │ +└───────────┬──────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────┐ +│ Still truncated? │──── No ──▶ Done ✓ +│ (MAX_TOKENS) │ +└───────────┬──────────────┘ + │ Yes + ▼ +┌──────────────────────────────────────────────────┐ +│ Layer 2: Multi-turn recovery (up to 3×) │ +│ ┌────────────────────────────────────────────┐ │ +│ │ Keep partial response in history │ │ +│ │ Push user message: "Resume directly..." │ │ +│ │ RETRY (isContinuation: true → keep UI buf) │ │ +│ │ Re-send with updated history │ │ +│ │ Model continues from where it left off │ │ +│ └──────────────┬─────────────────────────────┘ │ +│ │ │ +│ ┌──────┴──────┐ │ +│ │ Succeeded? │── Yes ──▶ Done ✓ │ +│ └──────┬──────┘ │ +│ │ No (still truncated) │ +│ ▼ │ +│ attempt < 3? ── Yes ──▶ loop back ↑ │ +└───────────┬──────────────────────────────────────┘ + │ No (exhausted) + ▼ +┌──────────────────────────────────────────────────┐ +│ Layer 3: Tool scheduler fallback │ +│ ┌────────────────────────────────────────────┐ │ +│ │ Reject truncated Edit/Write tool calls │ │ +│ │ Return guidance: "You MUST split into │ │ +│ │ smaller parts — write skeleton first, │ │ +│ │ then edit incrementally." │ │ +│ └────────────────────────────────────────────┘ │ +└──────────────────────────────────────────────────┘ ``` ## Token limit determination The effective `max_tokens` is resolved in the following priority order: -| Priority | Source | Value (known model) | Value (unknown model) | Escalation behavior | -| ----------- | ---------------------------------------------------- | ---------------------------- | --------------------- | ------------------------------ | -| 1 (highest) | User config (`samplingParams.max_tokens`) | `min(userValue, modelLimit)` | `userValue` | No escalation | -| 2 | Environment variable (`QWEN_CODE_MAX_OUTPUT_TOKENS`) | `min(envValue, modelLimit)` | `envValue` | No escalation | -| 3 (lowest) | Capped default | `min(modelLimit, 8K)` | `min(32K, 8K)` = 8K | Escalates to 64K on truncation | +| Priority | Source | Value (known model) | Value (unknown model) | Escalation behavior | +| ----------- | ---------------------------------------------------- | ---------------------------- | --------------------- | ----------------------------------------------- | +| 1 (highest) | User config (`samplingParams.max_tokens`) | `min(userValue, modelLimit)` | `userValue` | No escalation | +| 2 | Environment variable (`QWEN_CODE_MAX_OUTPUT_TOKENS`) | `min(envValue, modelLimit)` | `envValue` | No escalation | +| 3 (lowest) | Capped default | `min(modelLimit, 8K)` | `min(32K, 8K)` = 8K | Escalates to model limit (64K floor) + recovery | A "known model" is one that has an explicit entry in `OUTPUT_PATTERNS` (checked via `hasExplicitOutputLimit()`). For known models, the effective value is always capped at the model's declared output limit to avoid API errors. Unknown models (custom deployments, self-hosted endpoints) pass the user's value through directly, since the backend may support larger limits. @@ -88,9 +108,25 @@ The escalation logic lives in `geminiChat.ts`, placed **outside** the main retry 3. Guard checks pass: - maxTokensEscalated === false (prevent infinite escalation) - hasUserMaxTokensOverride === false (respect user intent) -4. Pop the partial model response from chat history -5. Yield RETRY event → UI discards partial output -6. Re-send the same request with maxOutputTokens: 64K +4. Compute escalated limit: max(ESCALATED_MAX_TOKENS, tokenLimit(model, 'output')) +5. Pop the partial model response from chat history +6. Yield RETRY event (isContinuation: false) → UI discards partial output and resets buffers +7. Re-send the same request with maxOutputTokens: escalatedLimit +``` + +### Recovery steps (geminiChat.ts) + +If the escalated response is also truncated (finishReason === MAX_TOKENS), the recovery loop runs up to `MAX_OUTPUT_RECOVERY_ATTEMPTS` (3) times: + +``` +1. Partial model response is already in history (pushed by processStreamResponse) +2. Push a recovery user message: OUTPUT_RECOVERY_MESSAGE +3. Yield RETRY event (isContinuation: true) → UI keeps text buffer for continuation +4. Re-send with updated history (model sees its partial output + recovery instruction) +5. If still truncated and attempts remain, loop back to step 1 +6. If recovery attempt throws (empty response, network error): + - Pop the dangling recovery message from history + - Break out of recovery loop ``` ### State cleanup on RETRY (turn.ts) @@ -102,14 +138,26 @@ When the `Turn` class receives a RETRY event, it clears accumulated state to pre - `debugResponses` — cleared to avoid stale debug data - `finishReason` — reset to `undefined` so the new response's finish reason is used +The `isContinuation` flag is passed through to the UI so it can decide whether to reset text buffers (escalation) or keep them (recovery). + ## Constants -Defined in `tokenLimits.ts`: +Defined in `geminiChat.ts` and `tokenLimits.ts`: -| Constant | Value | Purpose | -| --------------------------- | ------ | ------------------------------------------------------- | -| `CAPPED_DEFAULT_MAX_TOKENS` | 8,000 | Default output token limit when no user override is set | -| `ESCALATED_MAX_TOKENS` | 64,000 | Output token limit used on truncation retry | +| Constant | Value | Purpose | +| ------------------------------ | ------ | ------------------------------------------------------- | +| `CAPPED_DEFAULT_MAX_TOKENS` | 8,000 | Default output token limit when no user override is set | +| `ESCALATED_MAX_TOKENS` | 64,000 | Floor for escalation (used when model limit is unknown) | +| `MAX_OUTPUT_RECOVERY_ATTEMPTS` | 3 | Max multi-turn recovery attempts after escalation | + +The effective escalated limit is `max(ESCALATED_MAX_TOKENS, tokenLimit(model, 'output'))`: + +| Model | Escalated limit | +| ---------------- | --------------- | +| Claude Opus 4.6 | 131,072 (128K) | +| GPT-5 / o-series | 131,072 (128K) | +| Qwen3.x | 65,536 (64K) | +| Unknown models | 64,000 (floor) | ## Design decisions @@ -119,20 +167,22 @@ Defined in `tokenLimits.ts`: - 8K provides reasonable headroom for slightly longer responses without triggering unnecessary retries - Reduces average slot reservation from 32K to 8K (4x improvement) -### Why 64K escalated limit? +### Why escalate to model limit instead of fixed 64K? -- Covers the vast majority of long outputs that were truncated at 8K -- Matches the output limit of many modern models (Claude Sonnet, Gemini 3.x, Qwen3.x) -- Higher values (e.g., 128K) would negate slot optimization benefits for the <1% of requests that escalate +- Models with higher output limits (Claude Opus 128K, GPT-5 128K) were constrained to 64K unnecessarily +- Using the model's actual limit captures the vast majority of long outputs without a second retry +- `ESCALATED_MAX_TOKENS` (64K) serves as a floor for unknown models where `tokenLimit()` returns the default 32K -### Why not progressive escalation (8K → 16K → 32K → 64K)? +### Why multi-turn recovery instead of progressive escalation? -- Each retry adds latency (the full response must be regenerated) -- A single retry is the simplest approach that captures almost all cases -- The <1% truncation rate at 8K means almost no requests need escalation; those that do are likely to need significantly more than 16K +- Progressive escalation (8K → 16K → 32K → 64K) requires regenerating the full response each time +- Multi-turn recovery keeps the partial response and lets the model continue, saving tokens and latency +- Recovery messages are cheap (~40 tokens each) compared to regenerating large responses +- The 3-attempt limit prevents infinite loops while covering most practical cases ### Why is escalation outside the retry loop? - Truncation is a success case, not an error - Errors from the escalated stream (rate limits, network failures) should propagate directly rather than being silently retried with incorrect parameters - Keeps the retry loop focused on its original purpose (transient error recovery) +- Recovery errors are caught separately to avoid aborting the entire conversation diff --git a/packages/cli/src/ui/hooks/useGeminiStream.ts b/packages/cli/src/ui/hooks/useGeminiStream.ts index c63b474dc..d680a736d 100644 --- a/packages/cli/src/ui/hooks/useGeminiStream.ts +++ b/packages/cli/src/ui/hooks/useGeminiStream.ts @@ -1190,10 +1190,26 @@ export const useGeminiStream = ( loopDetectedRef.current = true; break; case ServerGeminiEventType.Retry: - // Clear any pending partial content from the failed attempt - if (pendingHistoryItemRef.current) { - setPendingHistoryItem(null); + // On fresh restart (escalation / rate-limit / invalid stream), + // clear pending content and buffers to discard the failed attempt. + // On continuation (recovery), keep the pending gemini item AND + // buffers so the model's continuation text appends to them — + // otherwise handleContentEvent would see a null pending item, + // create a fresh one, and reset the buffer to just the new chunk, + // losing the partial text we meant to preserve. + if (!event.isContinuation) { + if (pendingHistoryItemRef.current) { + setPendingHistoryItem(null); + } + geminiMessageBuffer = ''; + thoughtBuffer = ''; } + // Always discard tool call requests from the truncated/failed + // attempt to prevent duplicate execution after escalation or + // recovery. The recovery path now skips turns that already + // contain a functionCall (see geminiChat.ts), so this only + // clears stale requests from pre-RETRY accumulation. + toolCallRequests.length = 0; // Show retry info if available (rate-limit / throttling errors) if (event.retryInfo) { startRetryCountdown(event.retryInfo); diff --git a/packages/core/src/core/coreToolScheduler.ts b/packages/core/src/core/coreToolScheduler.ts index 4abe34970..78bccb21c 100644 --- a/packages/core/src/core/coreToolScheduler.ts +++ b/packages/core/src/core/coreToolScheduler.ts @@ -71,20 +71,22 @@ import { IdeClient } from '../ide/ide-client.js'; const TRUNCATION_PARAM_GUIDANCE = 'Note: Your previous response was truncated due to max_tokens limit, ' + - 'which likely caused incomplete tool call parameters. ' + + 'which caused incomplete tool call parameters. ' + 'Please retry the tool call with complete parameters. ' + 'If the content is too large for a single response, ' + - 'consider splitting it into smaller parts.'; + 'you MUST split it into smaller parts: ' + + 'first write_file with a skeleton/partial content, ' + + 'then use edit to add the remaining sections incrementally.'; const TRUNCATION_EDIT_REJECTION = 'Your previous response was truncated due to max_tokens limit, ' + - 'which likely produced incomplete file content. ' + + 'which produced incomplete file content. ' + 'The tool call has been rejected to prevent writing ' + 'truncated content to the file. ' + - 'Please retry the tool call with complete content. ' + - 'If the content is too large for a single response, ' + - 'consider splitting it into smaller parts ' + - '(e.g., write_file for initial content, then edit for additions).'; + 'You MUST split the content into smaller parts: ' + + 'first write_file with a skeleton/partial content, ' + + 'then use edit to add the remaining sections incrementally. ' + + 'Do NOT retry with the same large content.'; export type ValidatingToolCall = { status: 'validating'; diff --git a/packages/core/src/core/geminiChat.test.ts b/packages/core/src/core/geminiChat.test.ts index 03f63d167..39cfed3d5 100644 --- a/packages/core/src/core/geminiChat.test.ts +++ b/packages/core/src/core/geminiChat.test.ts @@ -2290,4 +2290,303 @@ describe('GeminiChat', async () => { expect(chat.getHistory()).toEqual([]); }); }); + + describe('output token recovery', () => { + function makeChunk( + parts: Array<{ text?: string; functionCall?: unknown }>, + finishReason?: string, + ): GenerateContentResponse { + return { + candidates: [ + { + content: { role: 'model', parts }, + ...(finishReason ? { finishReason } : {}), + }, + ], + } as unknown as GenerateContentResponse; + } + + function makeStream(chunks: GenerateContentResponse[]) { + return (async function* () { + for (const c of chunks) { + yield c; + } + })(); + } + + it('should enter recovery loop when escalated response is also truncated', async () => { + // Three streams: initial (MAX_TOKENS) → escalated (MAX_TOKENS) → + // recovery (STOP). + const streams = [ + makeStream([makeChunk([{ text: 'Hello' }], 'MAX_TOKENS')]), + makeStream([makeChunk([{ text: ' world' }], 'MAX_TOKENS')]), + makeStream([makeChunk([{ text: ' ending.' }], 'STOP')]), + ]; + let callIndex = 0; + vi.mocked(mockContentGenerator.generateContentStream).mockImplementation( + async () => streams[callIndex++]!, + ); + + const stream = await chat.sendMessageStream( + 'gemini-3-pro', + { message: 'write a long essay' }, + 'prompt-recovery', + ); + + const events: StreamEvent[] = []; + for await (const event of stream) { + events.push(event); + } + + const retries = events.filter((e) => e.type === StreamEventType.RETRY); + // One RETRY for escalation (isContinuation undefined/false), + // one for recovery (isContinuation true). + expect(retries.length).toBe(2); + expect(retries[0]!.type).toBe(StreamEventType.RETRY); + expect((retries[0] as { isContinuation?: boolean }).isContinuation).toBe( + undefined, + ); + expect((retries[1] as { isContinuation?: boolean }).isContinuation).toBe( + true, + ); + // API called 3 times: initial + escalation + recovery. + expect(mockContentGenerator.generateContentStream).toHaveBeenCalledTimes( + 3, + ); + }); + + it('should skip recovery when truncated turn has a functionCall', async () => { + // Initial stream returns a functionCall + MAX_TOKENS. Escalated stream + // returns the same (functionCall + MAX_TOKENS). Recovery must NOT run + // because appending a user turn after functionCall is invalid. + const streams = [ + makeStream([ + makeChunk( + [ + { + functionCall: { name: 'write_file', args: { file_path: '/x' } }, + }, + ], + 'MAX_TOKENS', + ), + ]), + makeStream([ + makeChunk( + [ + { + functionCall: { name: 'write_file', args: { file_path: '/x' } }, + }, + ], + 'MAX_TOKENS', + ), + ]), + ]; + let callIndex = 0; + vi.mocked(mockContentGenerator.generateContentStream).mockImplementation( + async () => streams[callIndex++]!, + ); + + const stream = await chat.sendMessageStream( + 'gemini-3-pro', + { message: 'write a file' }, + 'prompt-recovery-skip', + ); + + const events: StreamEvent[] = []; + for await (const event of stream) { + events.push(event); + } + + // Only the escalation RETRY should fire; no continuation RETRY. + const continuations = events.filter( + (e) => + e.type === StreamEventType.RETRY && + (e as { isContinuation?: boolean }).isContinuation === true, + ); + expect(continuations.length).toBe(0); + + // API called twice: initial + escalation. No recovery calls. + expect(mockContentGenerator.generateContentStream).toHaveBeenCalledTimes( + 2, + ); + + // History should end with the truncated model turn that has the + // functionCall. No dangling user recovery message. + const history = chat.getHistory(); + const lastEntry = history[history.length - 1]!; + expect(lastEntry.role).toBe('model'); + expect( + lastEntry.parts?.some((p) => 'functionCall' in p && p.functionCall), + ).toBe(true); + }); + + it('should cap recovery attempts at MAX_OUTPUT_RECOVERY_ATTEMPTS (3)', async () => { + // Every stream returns MAX_TOKENS with text (no functionCall). + vi.mocked(mockContentGenerator.generateContentStream).mockImplementation( + async () => makeStream([makeChunk([{ text: 'x' }], 'MAX_TOKENS')]), + ); + + const stream = await chat.sendMessageStream( + 'gemini-3-pro', + { message: 'infinite loop test' }, + 'prompt-recovery-cap', + ); + + // Consume + for await (const _ of stream) { + /* consume */ + } + + // 1 initial + 1 escalation + 3 recovery = 5 total. + expect(mockContentGenerator.generateContentStream).toHaveBeenCalledTimes( + 5, + ); + }); + + it('should pop dangling recovery message and emit STOP chunk when recovery throws', async () => { + const streams = [ + makeStream([makeChunk([{ text: 'partial' }], 'MAX_TOKENS')]), + makeStream([makeChunk([{ text: 'still partial' }], 'MAX_TOKENS')]), + // Recovery stream throws (simulate by yielding no chunks; this makes + // processStreamResponse reject with NO_FINISH_REASON). + (async function* () { + /* empty stream */ + })(), + ]; + let callIndex = 0; + vi.mocked(mockContentGenerator.generateContentStream).mockImplementation( + async () => streams[callIndex++]!, + ); + + const stream = await chat.sendMessageStream( + 'gemini-3-pro', + { message: 'recovery fails' }, + 'prompt-recovery-fail', + ); + + const events: StreamEvent[] = []; + for await (const event of stream) { + events.push(event); + } + + // The last chunk should be the synthetic STOP chunk from the catch. + const chunkEvents = events.filter( + (e) => e.type === StreamEventType.CHUNK, + ); + const lastChunk = chunkEvents[chunkEvents.length - 1]!; + expect( + (lastChunk as { value: GenerateContentResponse }).value.candidates?.[0] + ?.finishReason, + ).toBe('STOP'); + + // History should NOT end with a dangling user recovery message, + // and roles must strictly alternate so providers don't reject the + // next turn with "consecutive same-role content" errors. + const history = chat.getHistory(); + for (let i = 1; i < history.length; i++) { + expect(history[i]!.role).not.toBe(history[i - 1]!.role); + } + const lastEntry = history[history.length - 1]!; + // Last entry should be the escalated model response, not a user + // recovery message, and must carry actual parts so the turn is + // not an empty placeholder. + expect(lastEntry.role).toBe('model'); + expect(lastEntry.parts!.length).toBeGreaterThan(0); + }); + + it('should stop recovery mid-loop when a later iteration emits functionCall', async () => { + // Covers the cross-iteration guard: iter 1 returns plain text (recovery + // proceeds), iter 2 returns a functionCall (recovery must break before + // iter 3 pushes another user turn after the functionCall). + const streams = [ + makeStream([makeChunk([{ text: 'initial' }], 'MAX_TOKENS')]), + makeStream([makeChunk([{ text: 'escalated' }], 'MAX_TOKENS')]), + makeStream([makeChunk([{ text: 'recovery 1 text' }], 'MAX_TOKENS')]), + makeStream([ + makeChunk( + [ + { + functionCall: { name: 'write_file', args: { file_path: '/x' } }, + }, + ], + 'MAX_TOKENS', + ), + ]), + ]; + let callIndex = 0; + vi.mocked(mockContentGenerator.generateContentStream).mockImplementation( + async () => streams[callIndex++]!, + ); + + const stream = await chat.sendMessageStream( + 'gemini-3-pro', + { message: 'mixed recovery' }, + 'prompt-recovery-mixed', + ); + + for await (const _ of stream) { + /* consume */ + } + + // Should call: 1 initial + 1 escalation + 2 recovery (iter 1 text, + // iter 2 functionCall) = 4 total. The guard fires at the start of + // iter 3 before any further API call. + expect(mockContentGenerator.generateContentStream).toHaveBeenCalledTimes( + 4, + ); + + // History must end on the functionCall model turn (not a dangling + // recovery user turn). + const history = chat.getHistory(); + const lastEntry = history[history.length - 1]!; + expect(lastEntry.role).toBe('model'); + expect( + lastEntry.parts?.some((p) => 'functionCall' in p && p.functionCall), + ).toBe(true); + }); + + it('should coalesce successful recovery iterations into the preceding model turn', async () => { + // Two recovery iterations then a clean STOP. Without coalescing, the + // internal OUTPUT_RECOVERY_MESSAGE would persist as a real user turn + // and bias every later model call. + const streams = [ + makeStream([makeChunk([{ text: 'A' }], 'MAX_TOKENS')]), + makeStream([makeChunk([{ text: 'B' }], 'MAX_TOKENS')]), + makeStream([makeChunk([{ text: 'C' }], 'MAX_TOKENS')]), + makeStream([makeChunk([{ text: 'D' }], 'STOP')]), + ]; + let callIndex = 0; + vi.mocked(mockContentGenerator.generateContentStream).mockImplementation( + async () => streams[callIndex++]!, + ); + + const stream = await chat.sendMessageStream( + 'gemini-3-pro', + { message: 'essay' }, + 'prompt-recovery-coalesce', + ); + for await (const _ of stream) { + /* consume */ + } + + const history = chat.getHistory(); + // Exactly one user turn + one model turn — the recovery pairs should + // be folded back into the preceding model entry. + expect(history.length).toBe(2); + expect(history[0]!.role).toBe('user'); + expect(history[1]!.role).toBe('model'); + + // The control prompt must NOT appear anywhere in durable history. + const flattened = JSON.stringify(history); + expect(flattened).not.toContain('Resume directly'); + expect(flattened).not.toContain('Output token limit hit'); + + // All escalation + recovery content must be preserved in the merged + // model turn, in order (B escalation → C recovery-1 → D recovery-2). + const mergedText = (history[1]!.parts ?? []) + .map((p) => ('text' in p ? ((p as { text?: string }).text ?? '') : '')) + .join(''); + expect(mergedText).toBe('BCD'); + }); + }); }); diff --git a/packages/core/src/core/geminiChat.ts b/packages/core/src/core/geminiChat.ts index 070acf9f2..932fc7301 100644 --- a/packages/core/src/core/geminiChat.ts +++ b/packages/core/src/core/geminiChat.ts @@ -23,7 +23,7 @@ import { createDebugLogger } from '../utils/debugLogger.js'; import { parseAndFormatApiError } from '../utils/errorParsing.js'; import { isRateLimitError, type RetryInfo } from '../utils/rateLimit.js'; import type { Config } from '../config/config.js'; -import { ESCALATED_MAX_TOKENS } from './tokenLimits.js'; +import { ESCALATED_MAX_TOKENS, tokenLimit } from './tokenLimits.js'; import { hasCycleInSchema } from '../tools/tools.js'; import type { StructuredError } from './turn.js'; import { @@ -49,7 +49,14 @@ export enum StreamEventType { export type StreamEvent = | { type: StreamEventType.CHUNK; value: GenerateContentResponse } - | { type: StreamEventType.RETRY; retryInfo?: RetryInfo }; + | { + type: StreamEventType.RETRY; + retryInfo?: RetryInfo; + /** When true, the retry is a continuation (recovery) rather than a + * fresh restart (escalation). The UI should keep the accumulated text + * buffer so the continuation appends to it. */ + isContinuation?: boolean; + }; /** * Options for retrying due to invalid content from the model. @@ -76,6 +83,23 @@ const INVALID_STREAM_RETRY_CONFIG = { initialDelayMs: 2000, }; +/** + * Max recovery attempts when the escalated response is also truncated. + * Each attempt keeps the partial response in history and injects a recovery + * message so the model can continue from where it left off. + */ +const MAX_OUTPUT_RECOVERY_ATTEMPTS = 3; + +/** + * Recovery message injected as a user turn when the model's output is + * truncated even after token escalation. Instructs the model to resume + * without repeating itself and to break remaining work into smaller steps. + */ +const OUTPUT_RECOVERY_MESSAGE = + 'Output token limit hit. Resume directly — no apology, no recap of what ' + + 'you were doing. Pick up mid-thought if that is where the cut happened. ' + + 'Break remaining work into smaller pieces.'; + /** * Options for retrying on rate-limit throttling errors returned as stream content. * Fixed 60s delay matches the DashScope per-minute quota window. @@ -497,7 +521,11 @@ export class GeminiChat { } // Max output tokens escalation: if the retry loop succeeded with - // the capped default (8K) but hit MAX_TOKENS, retry once at 64K. + // the capped default (8K) but hit MAX_TOKENS, retry once at the + // model's full output limit. This ensures models with large output + // limits (e.g., 128K for Claude Opus, GPT-5) are fully utilized, + // while using ESCALATED_MAX_TOKENS (64K) as a floor for unknown + // models. // Placed outside the retry loop so that any errors from the // escalated stream propagate directly (not caught by retry logic). if ( @@ -507,8 +535,12 @@ export class GeminiChat { !hasUserMaxTokensOverride ) { maxTokensEscalated = true; + const escalatedLimit = Math.max( + ESCALATED_MAX_TOKENS, + tokenLimit(model, 'output'), + ); debugLogger.info( - `Output truncated at capped default. Escalating to ${ESCALATED_MAX_TOKENS} tokens.`, + `Output truncated at capped default. Escalating to ${escalatedLimit} tokens.`, ); // Remove partial model response from history // (processStreamResponse already pushed it) @@ -525,9 +557,10 @@ export class GeminiChat { ...params, config: { ...params.config, - maxOutputTokens: ESCALATED_MAX_TOKENS, + maxOutputTokens: escalatedLimit, }, }; + let escalatedFinishReason: string | undefined; const escalatedStream = await self.makeApiCallAndProcessStream( model, requestContents, @@ -535,8 +568,112 @@ export class GeminiChat { prompt_id, ); for await (const chunk of escalatedStream) { + const fr = chunk.candidates?.[0]?.finishReason; + if (fr) escalatedFinishReason = fr; yield { type: StreamEventType.CHUNK, value: chunk }; } + + // Recovery: if the escalated response is also truncated, keep the + // partial response in history and inject a recovery message so the + // model can continue from where it left off. + let recoveryCount = 0; + let successfulRecoveries = 0; + while ( + escalatedFinishReason === FinishReason.MAX_TOKENS && + recoveryCount < MAX_OUTPUT_RECOVERY_ATTEMPTS + ) { + // Skip recovery when the truncated turn already contains a + // functionCall. Injecting a plain user message between a + // functionCall and its functionResponse produces an invalid API + // sequence that providers commonly reject. The existing layer-3 + // tool scheduler fallback handles these cases correctly. + const lastEntry = self.history[self.history.length - 1]; + const hasFunctionCall = + lastEntry?.role === 'model' && + lastEntry.parts?.some((p) => p.functionCall) === true; + if (hasFunctionCall) { + debugLogger.info( + 'Skipping recovery: truncated turn contains functionCall; ' + + 'deferring to tool scheduler fallback.', + ); + break; + } + + recoveryCount++; + debugLogger.info( + `Output still truncated after escalation. ` + + `Recovery attempt ${recoveryCount}/${MAX_OUTPUT_RECOVERY_ATTEMPTS}.`, + ); + // The partial model response is already in history + // (pushed by processStreamResponse). Push a recovery user + // message so the model sees its partial output and continues. + self.history.push( + createUserContent([{ text: OUTPUT_RECOVERY_MESSAGE }]), + ); + // Signal UI/turn to clear pending (incomplete) tool calls. + // isContinuation tells the UI to keep the text buffer so the + // model's continuation appends to the previous partial output. + yield { type: StreamEventType.RETRY, isContinuation: true }; + // Re-send with the updated history (includes partial + recovery) + const recoveryContents = self.getHistory(true); + escalatedFinishReason = undefined; + try { + const recoveryStream = await self.makeApiCallAndProcessStream( + model, + recoveryContents, + escalatedParams, + prompt_id, + ); + for await (const chunk of recoveryStream) { + const fr = chunk.candidates?.[0]?.finishReason; + if (fr) escalatedFinishReason = fr; + yield { type: StreamEventType.CHUNK, value: chunk }; + } + // Iteration fully succeeded: both the user recovery turn and + // the model continuation turn are now in history and can be + // coalesced back into the preceding model entry after the loop. + successfulRecoveries++; + } catch (recoveryError) { + // If a recovery attempt fails (e.g., empty response, network + // error), stop recovering and let the partial output stand. + // Pop the dangling recovery message to keep history valid. + if ( + self.history.length > 0 && + self.history[self.history.length - 1].role === 'user' + ) { + self.history.pop(); + } + debugLogger.warn( + `Recovery attempt ${recoveryCount} failed: ${recoveryError}`, + ); + // Emit a synthetic finish-reason chunk so the UI gets a + // terminal signal (Finished event) instead of a partial + // response with no end marker. Uses STOP because partial + // chunks from prior successful iterations are already in + // the transcript and represent the user-visible response. + yield { + type: StreamEventType.CHUNK, + value: { + candidates: [ + { + content: { role: 'model', parts: [] }, + finishReason: FinishReason.STOP, + }, + ], + } as unknown as GenerateContentResponse, + }; + break; + } + } + + // Coalesce completed recovery pairs back into the preceding model + // turn so the OUTPUT_RECOVERY_MESSAGE control prompt does not + // persist as a synthetic user turn in durable history. The user + // never sent that message, and leaving it in history would bias + // later turns and pollute compression / replay / export. + if (successfulRecoveries > 0) { + self.coalesceRecoveryPairs(successfulRecoveries); + } } if (lastError) { @@ -964,6 +1101,44 @@ export class GeminiChat { ], }); } + + /** + * Merge `pairCount` trailing (user_recovery, model_continuation) pairs back + * into the model turn that precedes them. Used after the output-token + * recovery loop so the internal OUTPUT_RECOVERY_MESSAGE control prompt + * does not persist in durable history as if the user sent it. + * + * Expected tail shape per iteration (walking from the back): + * [..., precedingModel, userRecovery, modelContinuation] + * + * If any pair doesn't match that shape the method bails defensively + * rather than corrupting history. + */ + private coalesceRecoveryPairs(pairCount: number): void { + for (let i = 0; i < pairCount; i++) { + const len = this.history.length; + if (len < 3) return; + + const modelContinuation = this.history[len - 1]!; + const userRecovery = this.history[len - 2]!; + const precedingModel = this.history[len - 3]!; + + if ( + modelContinuation.role !== 'model' || + userRecovery.role !== 'user' || + precedingModel.role !== 'model' + ) { + return; + } + + precedingModel.parts = [ + ...(precedingModel.parts ?? []), + ...(modelContinuation.parts ?? []), + ]; + // Drop the (userRecovery, modelContinuation) pair. + this.history.splice(len - 2, 2); + } + } } /** Visible for Testing */ diff --git a/packages/core/src/core/turn.ts b/packages/core/src/core/turn.ts index 78e8dac24..b0971d0b6 100644 --- a/packages/core/src/core/turn.ts +++ b/packages/core/src/core/turn.ts @@ -69,6 +69,9 @@ export enum GeminiEventType { export type ServerGeminiRetryEvent = { type: GeminiEventType.Retry; retryInfo?: RetryInfo; + /** When true, the retry is a continuation (recovery) rather than a fresh + * restart. The UI should keep accumulated text so the continuation appends. */ + isContinuation?: boolean; }; export interface StructuredError { @@ -298,6 +301,7 @@ export class Turn { yield { type: GeminiEventType.Retry, retryInfo: streamEvent.retryInfo, + isContinuation: streamEvent.isContinuation, }; continue; // Skip to the next event in the stream }