mirror of
https://github.com/QwenLM/qwen-code.git
synced 2026-04-28 11:41:04 +00:00
fix(core): recover from truncated tool calls via multi-turn continuation (#3313)
* fix(core): recover from truncated tool calls via multi-turn continuation (#3049) When large tool calls (e.g., WriteFile with big HTML) exceed the output token limit, the model's response gets truncated and required parameters like file_path are missing. Previously this surfaced as a confusing "params must have required property" error. Three-layer defense: 1. Escalate to model's actual output limit (not fixed 64K). Models with 128K output (Claude Opus, GPT-5) now use their full capacity. 2. Multi-turn recovery: if the escalated response is still truncated, keep the partial response in history and inject a recovery message ("Resume directly — pick up mid-thought") so the model continues from where it left off. Up to 3 recovery attempts before falling back to the tool scheduler's guidance. 3. Stronger truncation guidance as fallback: "you MUST split" instead of "consider splitting". Also fixes: - Clear toolCallRequests on RETRY to prevent duplicate tool execution - Add isContinuation flag to RETRY events so the UI preserves text buffers during recovery (continuation) but resets them during escalation (fresh restart) - Catch errors during recovery to prevent dangling history entries * docs: update adaptive output token escalation design for recovery mechanism Update the design doc to reflect: - Escalation now targets model's actual output limit (64K floor) - Multi-turn recovery loop after escalation (up to 3 attempts) - isContinuation flag on RETRY events - Recovery error handling (pop dangling message, break) - Updated constants table and model-specific escalation limits - New design decision: why multi-turn recovery over progressive escalation * fix: remove competitor reference from code comment * fix: address review feedback on recovery mechanism Three correctness fixes from @tanzhenxin's review: 1. Partial text lost during continuation (useGeminiStream.ts): On continuation RETRY, setPendingHistoryItem(null) cleared the pending gemini item. The next Content event then saw a null pending item, created a fresh one, and reset geminiMessageBuffer = eventValue — discarding the preserved partial text. Now the pending item AND buffers are kept on continuation, so the continuation appends. 2. Recovery on truncated tool-call turns (geminiChat.ts): When the truncated turn already contains a complete functionCall, appending a user recovery message produces model(functionCall) → user(text) with no intervening functionResponse — an invalid API sequence. Now recovery skips turns with functionCall parts and defers to the tool scheduler's layer-3 fallback. 3. Recovery errors swallowed after partial chunks (geminiChat.ts): If a recovery attempt yielded chunks then failed, the catch block broke without emitting any terminal signal, leaving the UI with partial text and no Finished event. Now emits a synthetic finishReason=STOP chunk in the catch so the UI gets a proper terminal signal. * test: add coverage for output token recovery loop Four targeted tests for the recovery mechanism introduced in the truncated-tool-call-recovery PR: 1. Recovery loop fires when escalated response is also truncated: initial MAX_TOKENS → escalation MAX_TOKENS → recovery STOP. Verifies two RETRY events (one escalation, one continuation) and three API calls. 2. Recovery is skipped when truncated turn contains a functionCall: prevents the invalid model(functionCall) → user(text) sequence. Verifies no continuation RETRY and history ends with the functionCall intact. 3. Recovery attempts are capped at MAX_OUTPUT_RECOVERY_ATTEMPTS (3): persistent MAX_TOKENS triggers exactly 5 API calls (1 initial + 1 escalation + 3 recovery). 4. Recovery catch block emits synthetic STOP chunk and pops dangling user message: when a recovery attempt fails (empty stream → InvalidStreamError), the UI gets a terminal signal and history ends on the model turn, not a dangling user recovery message. * test: cover cross-iteration functionCall detection in recovery loop Existing tests cover the functionCall guard when both initial and escalated responses have functionCall. This adds a test for the cross-iteration case: iter 1 returns text (recovery proceeds), iter 2 returns functionCall (recovery must break before iter 3). Verifies: - API called exactly 4 times (1 initial + 1 escalation + 2 recovery) - History ends with the functionCall model turn, not a dangling user recovery message - Iter 3's user recovery message is never pushed (guard fires at top of loop before recoveryCount increment) * fix(core): cast synthetic STOP chunk via unknown for TS2352 The object literal {candidates, content, parts} doesn't structurally overlap enough with GenerateContentResponse for TypeScript's strict narrow cast. Casting through 'unknown' is required per TS2352. Build error from CI: src/core/geminiChat.ts(651,24): error TS2352: Conversion of type '...' to type 'GenerateContentResponse' may be a mistake because neither type sufficiently overlaps with the other. If this was intentional, convert the expression to 'unknown' first. * test(core): tighten recovery history integrity assertions Strengthen the "pop dangling recovery message" test to catch any future regression that leaves consecutive same-role entries or an empty last-model placeholder in history — conditions providers reject on the next turn. * fix(core): coalesce recovery pairs to avoid leaking control prompt Previously every output-token recovery iteration left a (user, model) pair in durable history where the user turn was the internal OUTPUT_RECOVERY_MESSAGE control prompt. That prompt was then visible to every later turn, biasing responses and polluting compression, replay, and export. Track successful recovery iterations and, after the recovery loop, fold each completed pair back into the preceding model turn via a new `coalesceRecoveryPairs` helper. Failed iterations already pop their user turn in the catch block, so they need no coalescing. Adds a targeted test that runs escalation + two successful recovery iterations + a clean STOP, and asserts the merged history has exactly one user turn and one model turn, no trace of the control prompt text, and content ordered as B (escalation) + C + D.
This commit is contained in:
parent
c25136f0ef
commit
519e5aa1de
6 changed files with 623 additions and 77 deletions
|
|
@ -1,6 +1,6 @@
|
|||
# Adaptive Output Token Escalation Design
|
||||
|
||||
> Reduces GPU slot over-reservation by ~4x through a "low default + escalate on truncation" strategy for output tokens.
|
||||
> Reduces GPU slot over-reservation by ~4x through a "low default + escalate on truncation" strategy for output tokens, with multi-turn recovery for responses that exceed even the escalated limit.
|
||||
|
||||
## Problem
|
||||
|
||||
|
|
@ -8,61 +8,81 @@ Every API request reserves a fixed GPU slot proportional to `max_tokens`. The pr
|
|||
|
||||
## Solution
|
||||
|
||||
Use a capped default of **8K** output tokens. When a response is truncated (the model hits `max_tokens`), automatically retry once with an escalated limit of **64K**. Since <1% of requests are actually truncated, this reduces average slot reservation significantly while preserving output quality for long responses.
|
||||
Use a capped default of **8K** output tokens. When a response is truncated (the model hits `max_tokens`):
|
||||
|
||||
1. **Escalate** to the model's full output limit (with 64K as a floor for unknown models)
|
||||
2. If still truncated, **recover** by keeping the partial response in history and injecting a continuation message, up to 3 times
|
||||
3. If recovery is exhausted, fall back to the tool scheduler's truncation guidance
|
||||
|
||||
Since <1% of requests are actually truncated, this reduces average slot reservation significantly while preserving output quality for long responses.
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
┌─────────────────────────┐
|
||||
│ Request starts │
|
||||
│ max_tokens = 8K │
|
||||
└───────────┬─────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────┐
|
||||
│ Stream response │
|
||||
└───────────┬─────────────┘
|
||||
│
|
||||
┌─────────┴─────────┐
|
||||
│ │
|
||||
finish_reason finish_reason
|
||||
!= MAX_TOKENS == MAX_TOKENS
|
||||
│ │
|
||||
▼ ▼
|
||||
┌───────────┐ ┌─────────────────────┐
|
||||
│ Done │ │ Check conditions: │
|
||||
└───────────┘ │ - No user override? │
|
||||
│ - No env override? │
|
||||
│ - Not already │
|
||||
│ escalated? │
|
||||
└─────────┬───────────┘
|
||||
YES │ NO
|
||||
┌─────────┴────┐
|
||||
│ │
|
||||
▼ ▼
|
||||
┌─────────────┐ ┌──────────┐
|
||||
│ Pop partial │ │ Done │
|
||||
│ model resp │ │ (truncd) │
|
||||
│ from history│ └──────────┘
|
||||
│ │
|
||||
│ Yield RETRY │
|
||||
│ event │
|
||||
│ │
|
||||
│ Re-send │
|
||||
│ max_tokens │
|
||||
│ = 64K │
|
||||
└─────────────┘
|
||||
Request (max_tokens = 8K)
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────┐
|
||||
│ Response truncated? │──── No ──▶ Done ✓
|
||||
│ (MAX_TOKENS) │
|
||||
└───────────┬──────────────┘
|
||||
│ Yes
|
||||
▼
|
||||
┌──────────────────────────────────────────────────┐
|
||||
│ Layer 1: Escalate to model output limit │
|
||||
│ ┌────────────────────────────────────────────┐ │
|
||||
│ │ Pop partial response from history │ │
|
||||
│ │ RETRY (isContinuation: false → reset UI) │ │
|
||||
│ │ Re-send at max(64K, model output limit) │ │
|
||||
│ └────────────────────────────────────────────┘ │
|
||||
└───────────┬──────────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────┐
|
||||
│ Still truncated? │──── No ──▶ Done ✓
|
||||
│ (MAX_TOKENS) │
|
||||
└───────────┬──────────────┘
|
||||
│ Yes
|
||||
▼
|
||||
┌──────────────────────────────────────────────────┐
|
||||
│ Layer 2: Multi-turn recovery (up to 3×) │
|
||||
│ ┌────────────────────────────────────────────┐ │
|
||||
│ │ Keep partial response in history │ │
|
||||
│ │ Push user message: "Resume directly..." │ │
|
||||
│ │ RETRY (isContinuation: true → keep UI buf) │ │
|
||||
│ │ Re-send with updated history │ │
|
||||
│ │ Model continues from where it left off │ │
|
||||
│ └──────────────┬─────────────────────────────┘ │
|
||||
│ │ │
|
||||
│ ┌──────┴──────┐ │
|
||||
│ │ Succeeded? │── Yes ──▶ Done ✓ │
|
||||
│ └──────┬──────┘ │
|
||||
│ │ No (still truncated) │
|
||||
│ ▼ │
|
||||
│ attempt < 3? ── Yes ──▶ loop back ↑ │
|
||||
└───────────┬──────────────────────────────────────┘
|
||||
│ No (exhausted)
|
||||
▼
|
||||
┌──────────────────────────────────────────────────┐
|
||||
│ Layer 3: Tool scheduler fallback │
|
||||
│ ┌────────────────────────────────────────────┐ │
|
||||
│ │ Reject truncated Edit/Write tool calls │ │
|
||||
│ │ Return guidance: "You MUST split into │ │
|
||||
│ │ smaller parts — write skeleton first, │ │
|
||||
│ │ then edit incrementally." │ │
|
||||
│ └────────────────────────────────────────────┘ │
|
||||
└──────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## Token limit determination
|
||||
|
||||
The effective `max_tokens` is resolved in the following priority order:
|
||||
|
||||
| Priority | Source | Value (known model) | Value (unknown model) | Escalation behavior |
|
||||
| ----------- | ---------------------------------------------------- | ---------------------------- | --------------------- | ------------------------------ |
|
||||
| 1 (highest) | User config (`samplingParams.max_tokens`) | `min(userValue, modelLimit)` | `userValue` | No escalation |
|
||||
| 2 | Environment variable (`QWEN_CODE_MAX_OUTPUT_TOKENS`) | `min(envValue, modelLimit)` | `envValue` | No escalation |
|
||||
| 3 (lowest) | Capped default | `min(modelLimit, 8K)` | `min(32K, 8K)` = 8K | Escalates to 64K on truncation |
|
||||
| Priority | Source | Value (known model) | Value (unknown model) | Escalation behavior |
|
||||
| ----------- | ---------------------------------------------------- | ---------------------------- | --------------------- | ----------------------------------------------- |
|
||||
| 1 (highest) | User config (`samplingParams.max_tokens`) | `min(userValue, modelLimit)` | `userValue` | No escalation |
|
||||
| 2 | Environment variable (`QWEN_CODE_MAX_OUTPUT_TOKENS`) | `min(envValue, modelLimit)` | `envValue` | No escalation |
|
||||
| 3 (lowest) | Capped default | `min(modelLimit, 8K)` | `min(32K, 8K)` = 8K | Escalates to model limit (64K floor) + recovery |
|
||||
|
||||
A "known model" is one that has an explicit entry in `OUTPUT_PATTERNS` (checked via `hasExplicitOutputLimit()`). For known models, the effective value is always capped at the model's declared output limit to avoid API errors. Unknown models (custom deployments, self-hosted endpoints) pass the user's value through directly, since the backend may support larger limits.
|
||||
|
||||
|
|
@ -88,9 +108,25 @@ The escalation logic lives in `geminiChat.ts`, placed **outside** the main retry
|
|||
3. Guard checks pass:
|
||||
- maxTokensEscalated === false (prevent infinite escalation)
|
||||
- hasUserMaxTokensOverride === false (respect user intent)
|
||||
4. Pop the partial model response from chat history
|
||||
5. Yield RETRY event → UI discards partial output
|
||||
6. Re-send the same request with maxOutputTokens: 64K
|
||||
4. Compute escalated limit: max(ESCALATED_MAX_TOKENS, tokenLimit(model, 'output'))
|
||||
5. Pop the partial model response from chat history
|
||||
6. Yield RETRY event (isContinuation: false) → UI discards partial output and resets buffers
|
||||
7. Re-send the same request with maxOutputTokens: escalatedLimit
|
||||
```
|
||||
|
||||
### Recovery steps (geminiChat.ts)
|
||||
|
||||
If the escalated response is also truncated (finishReason === MAX_TOKENS), the recovery loop runs up to `MAX_OUTPUT_RECOVERY_ATTEMPTS` (3) times:
|
||||
|
||||
```
|
||||
1. Partial model response is already in history (pushed by processStreamResponse)
|
||||
2. Push a recovery user message: OUTPUT_RECOVERY_MESSAGE
|
||||
3. Yield RETRY event (isContinuation: true) → UI keeps text buffer for continuation
|
||||
4. Re-send with updated history (model sees its partial output + recovery instruction)
|
||||
5. If still truncated and attempts remain, loop back to step 1
|
||||
6. If recovery attempt throws (empty response, network error):
|
||||
- Pop the dangling recovery message from history
|
||||
- Break out of recovery loop
|
||||
```
|
||||
|
||||
### State cleanup on RETRY (turn.ts)
|
||||
|
|
@ -102,14 +138,26 @@ When the `Turn` class receives a RETRY event, it clears accumulated state to pre
|
|||
- `debugResponses` — cleared to avoid stale debug data
|
||||
- `finishReason` — reset to `undefined` so the new response's finish reason is used
|
||||
|
||||
The `isContinuation` flag is passed through to the UI so it can decide whether to reset text buffers (escalation) or keep them (recovery).
|
||||
|
||||
## Constants
|
||||
|
||||
Defined in `tokenLimits.ts`:
|
||||
Defined in `geminiChat.ts` and `tokenLimits.ts`:
|
||||
|
||||
| Constant | Value | Purpose |
|
||||
| --------------------------- | ------ | ------------------------------------------------------- |
|
||||
| `CAPPED_DEFAULT_MAX_TOKENS` | 8,000 | Default output token limit when no user override is set |
|
||||
| `ESCALATED_MAX_TOKENS` | 64,000 | Output token limit used on truncation retry |
|
||||
| Constant | Value | Purpose |
|
||||
| ------------------------------ | ------ | ------------------------------------------------------- |
|
||||
| `CAPPED_DEFAULT_MAX_TOKENS` | 8,000 | Default output token limit when no user override is set |
|
||||
| `ESCALATED_MAX_TOKENS` | 64,000 | Floor for escalation (used when model limit is unknown) |
|
||||
| `MAX_OUTPUT_RECOVERY_ATTEMPTS` | 3 | Max multi-turn recovery attempts after escalation |
|
||||
|
||||
The effective escalated limit is `max(ESCALATED_MAX_TOKENS, tokenLimit(model, 'output'))`:
|
||||
|
||||
| Model | Escalated limit |
|
||||
| ---------------- | --------------- |
|
||||
| Claude Opus 4.6 | 131,072 (128K) |
|
||||
| GPT-5 / o-series | 131,072 (128K) |
|
||||
| Qwen3.x | 65,536 (64K) |
|
||||
| Unknown models | 64,000 (floor) |
|
||||
|
||||
## Design decisions
|
||||
|
||||
|
|
@ -119,20 +167,22 @@ Defined in `tokenLimits.ts`:
|
|||
- 8K provides reasonable headroom for slightly longer responses without triggering unnecessary retries
|
||||
- Reduces average slot reservation from 32K to 8K (4x improvement)
|
||||
|
||||
### Why 64K escalated limit?
|
||||
### Why escalate to model limit instead of fixed 64K?
|
||||
|
||||
- Covers the vast majority of long outputs that were truncated at 8K
|
||||
- Matches the output limit of many modern models (Claude Sonnet, Gemini 3.x, Qwen3.x)
|
||||
- Higher values (e.g., 128K) would negate slot optimization benefits for the <1% of requests that escalate
|
||||
- Models with higher output limits (Claude Opus 128K, GPT-5 128K) were constrained to 64K unnecessarily
|
||||
- Using the model's actual limit captures the vast majority of long outputs without a second retry
|
||||
- `ESCALATED_MAX_TOKENS` (64K) serves as a floor for unknown models where `tokenLimit()` returns the default 32K
|
||||
|
||||
### Why not progressive escalation (8K → 16K → 32K → 64K)?
|
||||
### Why multi-turn recovery instead of progressive escalation?
|
||||
|
||||
- Each retry adds latency (the full response must be regenerated)
|
||||
- A single retry is the simplest approach that captures almost all cases
|
||||
- The <1% truncation rate at 8K means almost no requests need escalation; those that do are likely to need significantly more than 16K
|
||||
- Progressive escalation (8K → 16K → 32K → 64K) requires regenerating the full response each time
|
||||
- Multi-turn recovery keeps the partial response and lets the model continue, saving tokens and latency
|
||||
- Recovery messages are cheap (~40 tokens each) compared to regenerating large responses
|
||||
- The 3-attempt limit prevents infinite loops while covering most practical cases
|
||||
|
||||
### Why is escalation outside the retry loop?
|
||||
|
||||
- Truncation is a success case, not an error
|
||||
- Errors from the escalated stream (rate limits, network failures) should propagate directly rather than being silently retried with incorrect parameters
|
||||
- Keeps the retry loop focused on its original purpose (transient error recovery)
|
||||
- Recovery errors are caught separately to avoid aborting the entire conversation
|
||||
|
|
|
|||
|
|
@ -1190,10 +1190,26 @@ export const useGeminiStream = (
|
|||
loopDetectedRef.current = true;
|
||||
break;
|
||||
case ServerGeminiEventType.Retry:
|
||||
// Clear any pending partial content from the failed attempt
|
||||
if (pendingHistoryItemRef.current) {
|
||||
setPendingHistoryItem(null);
|
||||
// On fresh restart (escalation / rate-limit / invalid stream),
|
||||
// clear pending content and buffers to discard the failed attempt.
|
||||
// On continuation (recovery), keep the pending gemini item AND
|
||||
// buffers so the model's continuation text appends to them —
|
||||
// otherwise handleContentEvent would see a null pending item,
|
||||
// create a fresh one, and reset the buffer to just the new chunk,
|
||||
// losing the partial text we meant to preserve.
|
||||
if (!event.isContinuation) {
|
||||
if (pendingHistoryItemRef.current) {
|
||||
setPendingHistoryItem(null);
|
||||
}
|
||||
geminiMessageBuffer = '';
|
||||
thoughtBuffer = '';
|
||||
}
|
||||
// Always discard tool call requests from the truncated/failed
|
||||
// attempt to prevent duplicate execution after escalation or
|
||||
// recovery. The recovery path now skips turns that already
|
||||
// contain a functionCall (see geminiChat.ts), so this only
|
||||
// clears stale requests from pre-RETRY accumulation.
|
||||
toolCallRequests.length = 0;
|
||||
// Show retry info if available (rate-limit / throttling errors)
|
||||
if (event.retryInfo) {
|
||||
startRetryCountdown(event.retryInfo);
|
||||
|
|
|
|||
|
|
@ -71,20 +71,22 @@ import { IdeClient } from '../ide/ide-client.js';
|
|||
|
||||
const TRUNCATION_PARAM_GUIDANCE =
|
||||
'Note: Your previous response was truncated due to max_tokens limit, ' +
|
||||
'which likely caused incomplete tool call parameters. ' +
|
||||
'which caused incomplete tool call parameters. ' +
|
||||
'Please retry the tool call with complete parameters. ' +
|
||||
'If the content is too large for a single response, ' +
|
||||
'consider splitting it into smaller parts.';
|
||||
'you MUST split it into smaller parts: ' +
|
||||
'first write_file with a skeleton/partial content, ' +
|
||||
'then use edit to add the remaining sections incrementally.';
|
||||
|
||||
const TRUNCATION_EDIT_REJECTION =
|
||||
'Your previous response was truncated due to max_tokens limit, ' +
|
||||
'which likely produced incomplete file content. ' +
|
||||
'which produced incomplete file content. ' +
|
||||
'The tool call has been rejected to prevent writing ' +
|
||||
'truncated content to the file. ' +
|
||||
'Please retry the tool call with complete content. ' +
|
||||
'If the content is too large for a single response, ' +
|
||||
'consider splitting it into smaller parts ' +
|
||||
'(e.g., write_file for initial content, then edit for additions).';
|
||||
'You MUST split the content into smaller parts: ' +
|
||||
'first write_file with a skeleton/partial content, ' +
|
||||
'then use edit to add the remaining sections incrementally. ' +
|
||||
'Do NOT retry with the same large content.';
|
||||
|
||||
export type ValidatingToolCall = {
|
||||
status: 'validating';
|
||||
|
|
|
|||
|
|
@ -2290,4 +2290,303 @@ describe('GeminiChat', async () => {
|
|||
expect(chat.getHistory()).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('output token recovery', () => {
|
||||
function makeChunk(
|
||||
parts: Array<{ text?: string; functionCall?: unknown }>,
|
||||
finishReason?: string,
|
||||
): GenerateContentResponse {
|
||||
return {
|
||||
candidates: [
|
||||
{
|
||||
content: { role: 'model', parts },
|
||||
...(finishReason ? { finishReason } : {}),
|
||||
},
|
||||
],
|
||||
} as unknown as GenerateContentResponse;
|
||||
}
|
||||
|
||||
function makeStream(chunks: GenerateContentResponse[]) {
|
||||
return (async function* () {
|
||||
for (const c of chunks) {
|
||||
yield c;
|
||||
}
|
||||
})();
|
||||
}
|
||||
|
||||
it('should enter recovery loop when escalated response is also truncated', async () => {
|
||||
// Three streams: initial (MAX_TOKENS) → escalated (MAX_TOKENS) →
|
||||
// recovery (STOP).
|
||||
const streams = [
|
||||
makeStream([makeChunk([{ text: 'Hello' }], 'MAX_TOKENS')]),
|
||||
makeStream([makeChunk([{ text: ' world' }], 'MAX_TOKENS')]),
|
||||
makeStream([makeChunk([{ text: ' ending.' }], 'STOP')]),
|
||||
];
|
||||
let callIndex = 0;
|
||||
vi.mocked(mockContentGenerator.generateContentStream).mockImplementation(
|
||||
async () => streams[callIndex++]!,
|
||||
);
|
||||
|
||||
const stream = await chat.sendMessageStream(
|
||||
'gemini-3-pro',
|
||||
{ message: 'write a long essay' },
|
||||
'prompt-recovery',
|
||||
);
|
||||
|
||||
const events: StreamEvent[] = [];
|
||||
for await (const event of stream) {
|
||||
events.push(event);
|
||||
}
|
||||
|
||||
const retries = events.filter((e) => e.type === StreamEventType.RETRY);
|
||||
// One RETRY for escalation (isContinuation undefined/false),
|
||||
// one for recovery (isContinuation true).
|
||||
expect(retries.length).toBe(2);
|
||||
expect(retries[0]!.type).toBe(StreamEventType.RETRY);
|
||||
expect((retries[0] as { isContinuation?: boolean }).isContinuation).toBe(
|
||||
undefined,
|
||||
);
|
||||
expect((retries[1] as { isContinuation?: boolean }).isContinuation).toBe(
|
||||
true,
|
||||
);
|
||||
// API called 3 times: initial + escalation + recovery.
|
||||
expect(mockContentGenerator.generateContentStream).toHaveBeenCalledTimes(
|
||||
3,
|
||||
);
|
||||
});
|
||||
|
||||
it('should skip recovery when truncated turn has a functionCall', async () => {
|
||||
// Initial stream returns a functionCall + MAX_TOKENS. Escalated stream
|
||||
// returns the same (functionCall + MAX_TOKENS). Recovery must NOT run
|
||||
// because appending a user turn after functionCall is invalid.
|
||||
const streams = [
|
||||
makeStream([
|
||||
makeChunk(
|
||||
[
|
||||
{
|
||||
functionCall: { name: 'write_file', args: { file_path: '/x' } },
|
||||
},
|
||||
],
|
||||
'MAX_TOKENS',
|
||||
),
|
||||
]),
|
||||
makeStream([
|
||||
makeChunk(
|
||||
[
|
||||
{
|
||||
functionCall: { name: 'write_file', args: { file_path: '/x' } },
|
||||
},
|
||||
],
|
||||
'MAX_TOKENS',
|
||||
),
|
||||
]),
|
||||
];
|
||||
let callIndex = 0;
|
||||
vi.mocked(mockContentGenerator.generateContentStream).mockImplementation(
|
||||
async () => streams[callIndex++]!,
|
||||
);
|
||||
|
||||
const stream = await chat.sendMessageStream(
|
||||
'gemini-3-pro',
|
||||
{ message: 'write a file' },
|
||||
'prompt-recovery-skip',
|
||||
);
|
||||
|
||||
const events: StreamEvent[] = [];
|
||||
for await (const event of stream) {
|
||||
events.push(event);
|
||||
}
|
||||
|
||||
// Only the escalation RETRY should fire; no continuation RETRY.
|
||||
const continuations = events.filter(
|
||||
(e) =>
|
||||
e.type === StreamEventType.RETRY &&
|
||||
(e as { isContinuation?: boolean }).isContinuation === true,
|
||||
);
|
||||
expect(continuations.length).toBe(0);
|
||||
|
||||
// API called twice: initial + escalation. No recovery calls.
|
||||
expect(mockContentGenerator.generateContentStream).toHaveBeenCalledTimes(
|
||||
2,
|
||||
);
|
||||
|
||||
// History should end with the truncated model turn that has the
|
||||
// functionCall. No dangling user recovery message.
|
||||
const history = chat.getHistory();
|
||||
const lastEntry = history[history.length - 1]!;
|
||||
expect(lastEntry.role).toBe('model');
|
||||
expect(
|
||||
lastEntry.parts?.some((p) => 'functionCall' in p && p.functionCall),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it('should cap recovery attempts at MAX_OUTPUT_RECOVERY_ATTEMPTS (3)', async () => {
|
||||
// Every stream returns MAX_TOKENS with text (no functionCall).
|
||||
vi.mocked(mockContentGenerator.generateContentStream).mockImplementation(
|
||||
async () => makeStream([makeChunk([{ text: 'x' }], 'MAX_TOKENS')]),
|
||||
);
|
||||
|
||||
const stream = await chat.sendMessageStream(
|
||||
'gemini-3-pro',
|
||||
{ message: 'infinite loop test' },
|
||||
'prompt-recovery-cap',
|
||||
);
|
||||
|
||||
// Consume
|
||||
for await (const _ of stream) {
|
||||
/* consume */
|
||||
}
|
||||
|
||||
// 1 initial + 1 escalation + 3 recovery = 5 total.
|
||||
expect(mockContentGenerator.generateContentStream).toHaveBeenCalledTimes(
|
||||
5,
|
||||
);
|
||||
});
|
||||
|
||||
it('should pop dangling recovery message and emit STOP chunk when recovery throws', async () => {
|
||||
const streams = [
|
||||
makeStream([makeChunk([{ text: 'partial' }], 'MAX_TOKENS')]),
|
||||
makeStream([makeChunk([{ text: 'still partial' }], 'MAX_TOKENS')]),
|
||||
// Recovery stream throws (simulate by yielding no chunks; this makes
|
||||
// processStreamResponse reject with NO_FINISH_REASON).
|
||||
(async function* () {
|
||||
/* empty stream */
|
||||
})(),
|
||||
];
|
||||
let callIndex = 0;
|
||||
vi.mocked(mockContentGenerator.generateContentStream).mockImplementation(
|
||||
async () => streams[callIndex++]!,
|
||||
);
|
||||
|
||||
const stream = await chat.sendMessageStream(
|
||||
'gemini-3-pro',
|
||||
{ message: 'recovery fails' },
|
||||
'prompt-recovery-fail',
|
||||
);
|
||||
|
||||
const events: StreamEvent[] = [];
|
||||
for await (const event of stream) {
|
||||
events.push(event);
|
||||
}
|
||||
|
||||
// The last chunk should be the synthetic STOP chunk from the catch.
|
||||
const chunkEvents = events.filter(
|
||||
(e) => e.type === StreamEventType.CHUNK,
|
||||
);
|
||||
const lastChunk = chunkEvents[chunkEvents.length - 1]!;
|
||||
expect(
|
||||
(lastChunk as { value: GenerateContentResponse }).value.candidates?.[0]
|
||||
?.finishReason,
|
||||
).toBe('STOP');
|
||||
|
||||
// History should NOT end with a dangling user recovery message,
|
||||
// and roles must strictly alternate so providers don't reject the
|
||||
// next turn with "consecutive same-role content" errors.
|
||||
const history = chat.getHistory();
|
||||
for (let i = 1; i < history.length; i++) {
|
||||
expect(history[i]!.role).not.toBe(history[i - 1]!.role);
|
||||
}
|
||||
const lastEntry = history[history.length - 1]!;
|
||||
// Last entry should be the escalated model response, not a user
|
||||
// recovery message, and must carry actual parts so the turn is
|
||||
// not an empty placeholder.
|
||||
expect(lastEntry.role).toBe('model');
|
||||
expect(lastEntry.parts!.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
it('should stop recovery mid-loop when a later iteration emits functionCall', async () => {
|
||||
// Covers the cross-iteration guard: iter 1 returns plain text (recovery
|
||||
// proceeds), iter 2 returns a functionCall (recovery must break before
|
||||
// iter 3 pushes another user turn after the functionCall).
|
||||
const streams = [
|
||||
makeStream([makeChunk([{ text: 'initial' }], 'MAX_TOKENS')]),
|
||||
makeStream([makeChunk([{ text: 'escalated' }], 'MAX_TOKENS')]),
|
||||
makeStream([makeChunk([{ text: 'recovery 1 text' }], 'MAX_TOKENS')]),
|
||||
makeStream([
|
||||
makeChunk(
|
||||
[
|
||||
{
|
||||
functionCall: { name: 'write_file', args: { file_path: '/x' } },
|
||||
},
|
||||
],
|
||||
'MAX_TOKENS',
|
||||
),
|
||||
]),
|
||||
];
|
||||
let callIndex = 0;
|
||||
vi.mocked(mockContentGenerator.generateContentStream).mockImplementation(
|
||||
async () => streams[callIndex++]!,
|
||||
);
|
||||
|
||||
const stream = await chat.sendMessageStream(
|
||||
'gemini-3-pro',
|
||||
{ message: 'mixed recovery' },
|
||||
'prompt-recovery-mixed',
|
||||
);
|
||||
|
||||
for await (const _ of stream) {
|
||||
/* consume */
|
||||
}
|
||||
|
||||
// Should call: 1 initial + 1 escalation + 2 recovery (iter 1 text,
|
||||
// iter 2 functionCall) = 4 total. The guard fires at the start of
|
||||
// iter 3 before any further API call.
|
||||
expect(mockContentGenerator.generateContentStream).toHaveBeenCalledTimes(
|
||||
4,
|
||||
);
|
||||
|
||||
// History must end on the functionCall model turn (not a dangling
|
||||
// recovery user turn).
|
||||
const history = chat.getHistory();
|
||||
const lastEntry = history[history.length - 1]!;
|
||||
expect(lastEntry.role).toBe('model');
|
||||
expect(
|
||||
lastEntry.parts?.some((p) => 'functionCall' in p && p.functionCall),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it('should coalesce successful recovery iterations into the preceding model turn', async () => {
|
||||
// Two recovery iterations then a clean STOP. Without coalescing, the
|
||||
// internal OUTPUT_RECOVERY_MESSAGE would persist as a real user turn
|
||||
// and bias every later model call.
|
||||
const streams = [
|
||||
makeStream([makeChunk([{ text: 'A' }], 'MAX_TOKENS')]),
|
||||
makeStream([makeChunk([{ text: 'B' }], 'MAX_TOKENS')]),
|
||||
makeStream([makeChunk([{ text: 'C' }], 'MAX_TOKENS')]),
|
||||
makeStream([makeChunk([{ text: 'D' }], 'STOP')]),
|
||||
];
|
||||
let callIndex = 0;
|
||||
vi.mocked(mockContentGenerator.generateContentStream).mockImplementation(
|
||||
async () => streams[callIndex++]!,
|
||||
);
|
||||
|
||||
const stream = await chat.sendMessageStream(
|
||||
'gemini-3-pro',
|
||||
{ message: 'essay' },
|
||||
'prompt-recovery-coalesce',
|
||||
);
|
||||
for await (const _ of stream) {
|
||||
/* consume */
|
||||
}
|
||||
|
||||
const history = chat.getHistory();
|
||||
// Exactly one user turn + one model turn — the recovery pairs should
|
||||
// be folded back into the preceding model entry.
|
||||
expect(history.length).toBe(2);
|
||||
expect(history[0]!.role).toBe('user');
|
||||
expect(history[1]!.role).toBe('model');
|
||||
|
||||
// The control prompt must NOT appear anywhere in durable history.
|
||||
const flattened = JSON.stringify(history);
|
||||
expect(flattened).not.toContain('Resume directly');
|
||||
expect(flattened).not.toContain('Output token limit hit');
|
||||
|
||||
// All escalation + recovery content must be preserved in the merged
|
||||
// model turn, in order (B escalation → C recovery-1 → D recovery-2).
|
||||
const mergedText = (history[1]!.parts ?? [])
|
||||
.map((p) => ('text' in p ? ((p as { text?: string }).text ?? '') : ''))
|
||||
.join('');
|
||||
expect(mergedText).toBe('BCD');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ import { createDebugLogger } from '../utils/debugLogger.js';
|
|||
import { parseAndFormatApiError } from '../utils/errorParsing.js';
|
||||
import { isRateLimitError, type RetryInfo } from '../utils/rateLimit.js';
|
||||
import type { Config } from '../config/config.js';
|
||||
import { ESCALATED_MAX_TOKENS } from './tokenLimits.js';
|
||||
import { ESCALATED_MAX_TOKENS, tokenLimit } from './tokenLimits.js';
|
||||
import { hasCycleInSchema } from '../tools/tools.js';
|
||||
import type { StructuredError } from './turn.js';
|
||||
import {
|
||||
|
|
@ -49,7 +49,14 @@ export enum StreamEventType {
|
|||
|
||||
export type StreamEvent =
|
||||
| { type: StreamEventType.CHUNK; value: GenerateContentResponse }
|
||||
| { type: StreamEventType.RETRY; retryInfo?: RetryInfo };
|
||||
| {
|
||||
type: StreamEventType.RETRY;
|
||||
retryInfo?: RetryInfo;
|
||||
/** When true, the retry is a continuation (recovery) rather than a
|
||||
* fresh restart (escalation). The UI should keep the accumulated text
|
||||
* buffer so the continuation appends to it. */
|
||||
isContinuation?: boolean;
|
||||
};
|
||||
|
||||
/**
|
||||
* Options for retrying due to invalid content from the model.
|
||||
|
|
@ -76,6 +83,23 @@ const INVALID_STREAM_RETRY_CONFIG = {
|
|||
initialDelayMs: 2000,
|
||||
};
|
||||
|
||||
/**
|
||||
* Max recovery attempts when the escalated response is also truncated.
|
||||
* Each attempt keeps the partial response in history and injects a recovery
|
||||
* message so the model can continue from where it left off.
|
||||
*/
|
||||
const MAX_OUTPUT_RECOVERY_ATTEMPTS = 3;
|
||||
|
||||
/**
|
||||
* Recovery message injected as a user turn when the model's output is
|
||||
* truncated even after token escalation. Instructs the model to resume
|
||||
* without repeating itself and to break remaining work into smaller steps.
|
||||
*/
|
||||
const OUTPUT_RECOVERY_MESSAGE =
|
||||
'Output token limit hit. Resume directly — no apology, no recap of what ' +
|
||||
'you were doing. Pick up mid-thought if that is where the cut happened. ' +
|
||||
'Break remaining work into smaller pieces.';
|
||||
|
||||
/**
|
||||
* Options for retrying on rate-limit throttling errors returned as stream content.
|
||||
* Fixed 60s delay matches the DashScope per-minute quota window.
|
||||
|
|
@ -497,7 +521,11 @@ export class GeminiChat {
|
|||
}
|
||||
|
||||
// Max output tokens escalation: if the retry loop succeeded with
|
||||
// the capped default (8K) but hit MAX_TOKENS, retry once at 64K.
|
||||
// the capped default (8K) but hit MAX_TOKENS, retry once at the
|
||||
// model's full output limit. This ensures models with large output
|
||||
// limits (e.g., 128K for Claude Opus, GPT-5) are fully utilized,
|
||||
// while using ESCALATED_MAX_TOKENS (64K) as a floor for unknown
|
||||
// models.
|
||||
// Placed outside the retry loop so that any errors from the
|
||||
// escalated stream propagate directly (not caught by retry logic).
|
||||
if (
|
||||
|
|
@ -507,8 +535,12 @@ export class GeminiChat {
|
|||
!hasUserMaxTokensOverride
|
||||
) {
|
||||
maxTokensEscalated = true;
|
||||
const escalatedLimit = Math.max(
|
||||
ESCALATED_MAX_TOKENS,
|
||||
tokenLimit(model, 'output'),
|
||||
);
|
||||
debugLogger.info(
|
||||
`Output truncated at capped default. Escalating to ${ESCALATED_MAX_TOKENS} tokens.`,
|
||||
`Output truncated at capped default. Escalating to ${escalatedLimit} tokens.`,
|
||||
);
|
||||
// Remove partial model response from history
|
||||
// (processStreamResponse already pushed it)
|
||||
|
|
@ -525,9 +557,10 @@ export class GeminiChat {
|
|||
...params,
|
||||
config: {
|
||||
...params.config,
|
||||
maxOutputTokens: ESCALATED_MAX_TOKENS,
|
||||
maxOutputTokens: escalatedLimit,
|
||||
},
|
||||
};
|
||||
let escalatedFinishReason: string | undefined;
|
||||
const escalatedStream = await self.makeApiCallAndProcessStream(
|
||||
model,
|
||||
requestContents,
|
||||
|
|
@ -535,8 +568,112 @@ export class GeminiChat {
|
|||
prompt_id,
|
||||
);
|
||||
for await (const chunk of escalatedStream) {
|
||||
const fr = chunk.candidates?.[0]?.finishReason;
|
||||
if (fr) escalatedFinishReason = fr;
|
||||
yield { type: StreamEventType.CHUNK, value: chunk };
|
||||
}
|
||||
|
||||
// Recovery: if the escalated response is also truncated, keep the
|
||||
// partial response in history and inject a recovery message so the
|
||||
// model can continue from where it left off.
|
||||
let recoveryCount = 0;
|
||||
let successfulRecoveries = 0;
|
||||
while (
|
||||
escalatedFinishReason === FinishReason.MAX_TOKENS &&
|
||||
recoveryCount < MAX_OUTPUT_RECOVERY_ATTEMPTS
|
||||
) {
|
||||
// Skip recovery when the truncated turn already contains a
|
||||
// functionCall. Injecting a plain user message between a
|
||||
// functionCall and its functionResponse produces an invalid API
|
||||
// sequence that providers commonly reject. The existing layer-3
|
||||
// tool scheduler fallback handles these cases correctly.
|
||||
const lastEntry = self.history[self.history.length - 1];
|
||||
const hasFunctionCall =
|
||||
lastEntry?.role === 'model' &&
|
||||
lastEntry.parts?.some((p) => p.functionCall) === true;
|
||||
if (hasFunctionCall) {
|
||||
debugLogger.info(
|
||||
'Skipping recovery: truncated turn contains functionCall; ' +
|
||||
'deferring to tool scheduler fallback.',
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
recoveryCount++;
|
||||
debugLogger.info(
|
||||
`Output still truncated after escalation. ` +
|
||||
`Recovery attempt ${recoveryCount}/${MAX_OUTPUT_RECOVERY_ATTEMPTS}.`,
|
||||
);
|
||||
// The partial model response is already in history
|
||||
// (pushed by processStreamResponse). Push a recovery user
|
||||
// message so the model sees its partial output and continues.
|
||||
self.history.push(
|
||||
createUserContent([{ text: OUTPUT_RECOVERY_MESSAGE }]),
|
||||
);
|
||||
// Signal UI/turn to clear pending (incomplete) tool calls.
|
||||
// isContinuation tells the UI to keep the text buffer so the
|
||||
// model's continuation appends to the previous partial output.
|
||||
yield { type: StreamEventType.RETRY, isContinuation: true };
|
||||
// Re-send with the updated history (includes partial + recovery)
|
||||
const recoveryContents = self.getHistory(true);
|
||||
escalatedFinishReason = undefined;
|
||||
try {
|
||||
const recoveryStream = await self.makeApiCallAndProcessStream(
|
||||
model,
|
||||
recoveryContents,
|
||||
escalatedParams,
|
||||
prompt_id,
|
||||
);
|
||||
for await (const chunk of recoveryStream) {
|
||||
const fr = chunk.candidates?.[0]?.finishReason;
|
||||
if (fr) escalatedFinishReason = fr;
|
||||
yield { type: StreamEventType.CHUNK, value: chunk };
|
||||
}
|
||||
// Iteration fully succeeded: both the user recovery turn and
|
||||
// the model continuation turn are now in history and can be
|
||||
// coalesced back into the preceding model entry after the loop.
|
||||
successfulRecoveries++;
|
||||
} catch (recoveryError) {
|
||||
// If a recovery attempt fails (e.g., empty response, network
|
||||
// error), stop recovering and let the partial output stand.
|
||||
// Pop the dangling recovery message to keep history valid.
|
||||
if (
|
||||
self.history.length > 0 &&
|
||||
self.history[self.history.length - 1].role === 'user'
|
||||
) {
|
||||
self.history.pop();
|
||||
}
|
||||
debugLogger.warn(
|
||||
`Recovery attempt ${recoveryCount} failed: ${recoveryError}`,
|
||||
);
|
||||
// Emit a synthetic finish-reason chunk so the UI gets a
|
||||
// terminal signal (Finished event) instead of a partial
|
||||
// response with no end marker. Uses STOP because partial
|
||||
// chunks from prior successful iterations are already in
|
||||
// the transcript and represent the user-visible response.
|
||||
yield {
|
||||
type: StreamEventType.CHUNK,
|
||||
value: {
|
||||
candidates: [
|
||||
{
|
||||
content: { role: 'model', parts: [] },
|
||||
finishReason: FinishReason.STOP,
|
||||
},
|
||||
],
|
||||
} as unknown as GenerateContentResponse,
|
||||
};
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Coalesce completed recovery pairs back into the preceding model
|
||||
// turn so the OUTPUT_RECOVERY_MESSAGE control prompt does not
|
||||
// persist as a synthetic user turn in durable history. The user
|
||||
// never sent that message, and leaving it in history would bias
|
||||
// later turns and pollute compression / replay / export.
|
||||
if (successfulRecoveries > 0) {
|
||||
self.coalesceRecoveryPairs(successfulRecoveries);
|
||||
}
|
||||
}
|
||||
|
||||
if (lastError) {
|
||||
|
|
@ -964,6 +1101,44 @@ export class GeminiChat {
|
|||
],
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge `pairCount` trailing (user_recovery, model_continuation) pairs back
|
||||
* into the model turn that precedes them. Used after the output-token
|
||||
* recovery loop so the internal OUTPUT_RECOVERY_MESSAGE control prompt
|
||||
* does not persist in durable history as if the user sent it.
|
||||
*
|
||||
* Expected tail shape per iteration (walking from the back):
|
||||
* [..., precedingModel, userRecovery, modelContinuation]
|
||||
*
|
||||
* If any pair doesn't match that shape the method bails defensively
|
||||
* rather than corrupting history.
|
||||
*/
|
||||
private coalesceRecoveryPairs(pairCount: number): void {
|
||||
for (let i = 0; i < pairCount; i++) {
|
||||
const len = this.history.length;
|
||||
if (len < 3) return;
|
||||
|
||||
const modelContinuation = this.history[len - 1]!;
|
||||
const userRecovery = this.history[len - 2]!;
|
||||
const precedingModel = this.history[len - 3]!;
|
||||
|
||||
if (
|
||||
modelContinuation.role !== 'model' ||
|
||||
userRecovery.role !== 'user' ||
|
||||
precedingModel.role !== 'model'
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
precedingModel.parts = [
|
||||
...(precedingModel.parts ?? []),
|
||||
...(modelContinuation.parts ?? []),
|
||||
];
|
||||
// Drop the (userRecovery, modelContinuation) pair.
|
||||
this.history.splice(len - 2, 2);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Visible for Testing */
|
||||
|
|
|
|||
|
|
@ -69,6 +69,9 @@ export enum GeminiEventType {
|
|||
export type ServerGeminiRetryEvent = {
|
||||
type: GeminiEventType.Retry;
|
||||
retryInfo?: RetryInfo;
|
||||
/** When true, the retry is a continuation (recovery) rather than a fresh
|
||||
* restart. The UI should keep accumulated text so the continuation appends. */
|
||||
isContinuation?: boolean;
|
||||
};
|
||||
|
||||
export interface StructuredError {
|
||||
|
|
@ -298,6 +301,7 @@ export class Turn {
|
|||
yield {
|
||||
type: GeminiEventType.Retry,
|
||||
retryInfo: streamEvent.retryInfo,
|
||||
isContinuation: streamEvent.isContinuation,
|
||||
};
|
||||
continue; // Skip to the next event in the stream
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue