fix(core): scope StreamingToolCallParser per stream, not per Converter (#3516) (#3525)
Some checks are pending
Qwen Code CI / Lint (push) Waiting to run
Qwen Code CI / Test (push) Blocked by required conditions
Qwen Code CI / Test-1 (push) Blocked by required conditions
Qwen Code CI / Test-2 (push) Blocked by required conditions
Qwen Code CI / Test-3 (push) Blocked by required conditions
Qwen Code CI / Test-4 (push) Blocked by required conditions
Qwen Code CI / Test-5 (push) Blocked by required conditions
Qwen Code CI / Test-6 (push) Blocked by required conditions
Qwen Code CI / Test-7 (push) Blocked by required conditions
Qwen Code CI / Test-8 (push) Blocked by required conditions
Qwen Code CI / Post Coverage Comment (push) Blocked by required conditions
Qwen Code CI / CodeQL (push) Waiting to run
E2E Tests / E2E Test (Linux) - sandbox:docker (push) Waiting to run
E2E Tests / E2E Test (Linux) - sandbox:none (push) Waiting to run
E2E Tests / E2E Test - macOS (push) Waiting to run

* fix(core): scope StreamingToolCallParser per stream, not per Converter

Issue #3516 reports subagent failures with `Model stream ended with
empty response text` whose real root cause is concurrent streams
racing on a single shared tool-call parser.

Architecture before this change:

    Config (singleton)
      └── contentGenerator (OpenAIContentGenerator)
            └── ContentGenerationPipeline
                  └── OpenAIContentConverter
                        └── streamingToolCallParser  ← shared!

Any caller of `Config.getContentGenerator()` — foreground turns,
fork subagents, `run_in_background: true` subagents, ACP concurrent
Agent calls (PR #3463) — ends up using the same parser instance.
When two streams run concurrently, `processStreamWithLogging`'s
stream-start `resetStreamingToolCalls()` wipes the other stream's
in-flight buffers, and their chunks interleave at `index: 0`,
producing corrupt JSON like
`{"file_path": "/A{"file_path": "/B...` that even jsonrepair cannot
salvage. The corrupted tool calls are dropped entirely and the
stream surfaces upstream as `NO_RESPONSE_TEXT`.

Fix: move parser state from Converter instance field into
per-stream local state.

- Add `ConverterStreamContext` and `createStreamContext()` factory
  on `OpenAIContentConverter`. Each call returns a fresh context
  holding its own `StreamingToolCallParser`.
- `convertOpenAIChunkToGemini(chunk, ctx)` now takes the context
  as an explicit arg; all internal parser calls route through it.
- `ContentGenerationPipeline.processStreamWithLogging` creates one
  context at stream entry and passes it to every chunk conversion.
- Drop `OpenAIContentConverter.streamingToolCallParser` field.
- Drop `resetStreamingToolCalls()` — the context has stream-local
  lifetime, no manual reset needed. The two call sites in the
  pipeline (stream entry and error path) are removed.

Tests:

- Replace the `resetStreamingToolCalls` suite with a
  `createStreamContext` suite asserting that distinct contexts are
  independent and writes to one never leak into the other.
- Add a regression test simulating two concurrent streams with
  interleaved chunks through the same Converter instance; both
  tool calls close cleanly with correct arguments and ids.
- All existing single-stream tests updated to obtain a context via
  `createStreamContext()` and pass it through to chunk conversion.
- `pipeline.test.ts` mocks updated accordingly.

packages/core test suite: 841 passed. No stale references to
`resetStreamingToolCalls` or the private parser field remain.

Refs #3516

* docs(core): clarify GC wording in per-stream context comment (copilot review)

* test(core): add pipeline-level integration test for concurrent streams

Complements the unit tests in converter.test.ts by driving the real
ContentGenerationPipeline + real OpenAIContentConverter (no mocks on
converter) through two streams that interleave on the event loop via
`setImmediate`-paced async generators.

Two scenarios:

1. Happy path — two concurrent executeStream invocations with their
   own tool-call chunks. Assert each stream emits its own function
   call with the correct id and args (not cross-contaminated from
   the sibling stream).

2. Error isolation — one stream hits `error_finish` mid-flight while
   a sibling stream is still accumulating tool-call chunks. Assert
   the sibling's function call still emits cleanly, covering the
   removed `resetStreamingToolCalls()` call in the error path of
   processStreamWithLogging.

Verified as a positive control: with the per-stream context fix
reverted (origin/main state), both tests fail with exactly the
bug shape users reported — one stream's function call is either
overwritten by the other's id/args, or is swallowed entirely when
the sibling stream's error path wipes the shared parser buffer.

Refs #3516
This commit is contained in:
zhangxy-zju 2026-04-22 20:32:30 +08:00 committed by GitHub
parent f2fac208ff
commit d40fe7cdba
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 740 additions and 209 deletions

View file

@ -6,7 +6,7 @@
import { describe, it, expect, beforeEach } from 'vitest';
import { OpenAIContentConverter } from './converter.js';
import type { StreamingToolCallParser } from './streamingToolCallParser.js';
import { StreamingToolCallParser } from './streamingToolCallParser.js';
import {
Type,
FinishReason,
@ -31,57 +31,170 @@ describe('OpenAIContentConverter', () => {
});
});
describe('resetStreamingToolCalls', () => {
it('should clear streaming tool calls accumulator', () => {
// Access private field for testing
const parser = (
converter as unknown as {
streamingToolCallParser: StreamingToolCallParser;
}
).streamingToolCallParser;
describe('createStreamContext', () => {
it('returns a fresh context with its own StreamingToolCallParser', () => {
const ctx1 = converter.createStreamContext();
const ctx2 = converter.createStreamContext();
// Add some test data to the parser
parser.addChunk(0, '{"arg": "value"}', 'test-id', 'test-function');
parser.addChunk(1, '{"arg2": "value2"}', 'test-id-2', 'test-function-2');
// Verify data is present
expect(parser.getBuffer(0)).toBe('{"arg": "value"}');
expect(parser.getBuffer(1)).toBe('{"arg2": "value2"}');
// Call reset method
converter.resetStreamingToolCalls();
// Verify data is cleared
expect(parser.getBuffer(0)).toBe('');
expect(parser.getBuffer(1)).toBe('');
expect(ctx1.toolCallParser).toBeInstanceOf(StreamingToolCallParser);
expect(ctx2.toolCallParser).toBeInstanceOf(StreamingToolCallParser);
expect(ctx1.toolCallParser).not.toBe(ctx2.toolCallParser);
});
it('should be safe to call multiple times', () => {
// Call reset multiple times
converter.resetStreamingToolCalls();
converter.resetStreamingToolCalls();
converter.resetStreamingToolCalls();
it('isolates two contexts so writes to one do not leak into the other', () => {
// Regression for issue #3516: previously the parser lived on the
// Converter as an instance field, so two concurrent streams sharing
// the same Config.contentGenerator would overwrite each other's
// tool-call buffers. Per-stream contexts eliminate that contention.
const ctx1 = converter.createStreamContext();
const ctx2 = converter.createStreamContext();
// Should not throw any errors
const parser = (
converter as unknown as {
streamingToolCallParser: StreamingToolCallParser;
}
).streamingToolCallParser;
expect(parser.getBuffer(0)).toBe('');
ctx1.toolCallParser.addChunk(0, '{"a":1}', 'call_A', 'fn_A');
ctx2.toolCallParser.addChunk(0, '{"b":2}', 'call_B', 'fn_B');
expect(ctx1.toolCallParser.getBuffer(0)).toBe('{"a":1}');
expect(ctx2.toolCallParser.getBuffer(0)).toBe('{"b":2}');
expect(ctx1.toolCallParser.getToolCallMeta(0).id).toBe('call_A');
expect(ctx2.toolCallParser.getToolCallMeta(0).id).toBe('call_B');
});
it('should be safe to call on empty accumulator', () => {
// Call reset on empty accumulator
converter.resetStreamingToolCalls();
it('demuxes interleaved chunks from two concurrent streams correctly (#3516)', () => {
// Real-world shape: two subagents share one Config (hence one
// Converter). Their OpenAI streams run concurrently; chunks arrive
// interleaved at the event loop. Under the pre-fix architecture
// this corrupted both tool calls; under per-stream contexts each
// stream's chunks stay in their own parser and close cleanly.
const streamA = converter.createStreamContext();
const streamB = converter.createStreamContext();
// Should not throw any errors
const parser = (
converter as unknown as {
streamingToolCallParser: StreamingToolCallParser;
}
).streamingToolCallParser;
expect(parser.getBuffer(0)).toBe('');
const openerA = {
object: 'chat.completion.chunk',
id: 'A-open',
created: 1,
model: 'test',
choices: [
{
index: 0,
delta: {
tool_calls: [
{
index: 0,
id: 'call_A',
type: 'function' as const,
function: {
name: 'read_file',
arguments: '{"file_path":"/a',
},
},
],
},
finish_reason: null,
logprobs: null,
},
],
} as unknown as OpenAI.Chat.ChatCompletionChunk;
const openerB = {
...openerA,
id: 'B-open',
choices: [
{
index: 0,
delta: {
tool_calls: [
{
index: 0,
id: 'call_B',
type: 'function' as const,
function: {
name: 'read_file',
arguments: '{"file_path":"/b',
},
},
],
},
finish_reason: null,
logprobs: null,
},
],
} as unknown as OpenAI.Chat.ChatCompletionChunk;
const contA = {
...openerA,
id: 'A-cont',
choices: [
{
index: 0,
delta: {
tool_calls: [{ index: 0, function: { arguments: '/x.ts"}' } }],
},
finish_reason: null,
logprobs: null,
},
],
} as unknown as OpenAI.Chat.ChatCompletionChunk;
const contB = {
...openerB,
id: 'B-cont',
choices: [
{
index: 0,
delta: {
tool_calls: [{ index: 0, function: { arguments: '/y.ts"}' } }],
},
finish_reason: null,
logprobs: null,
},
],
} as unknown as OpenAI.Chat.ChatCompletionChunk;
const finisher = (id: string) =>
({
object: 'chat.completion.chunk',
id,
created: 2,
model: 'test',
choices: [
{
index: 0,
delta: {},
finish_reason: 'tool_calls',
logprobs: null,
},
],
}) as unknown as OpenAI.Chat.ChatCompletionChunk;
// Interleave the two streams. Pre-fix this produced corrupt JSON
// because every chunk fed the same shared parser.
converter.convertOpenAIChunkToGemini(openerA, streamA);
converter.convertOpenAIChunkToGemini(openerB, streamB);
converter.convertOpenAIChunkToGemini(contA, streamA);
converter.convertOpenAIChunkToGemini(contB, streamB);
const resultA = converter.convertOpenAIChunkToGemini(
finisher('A-finish'),
streamA,
);
const resultB = converter.convertOpenAIChunkToGemini(
finisher('B-finish'),
streamB,
);
const fnA = resultA.candidates?.[0]?.content?.parts?.find(
(p: Part) => p.functionCall,
)?.functionCall;
const fnB = resultB.candidates?.[0]?.content?.parts?.find(
(p: Part) => p.functionCall,
)?.functionCall;
expect(fnA?.name).toBe('read_file');
expect(fnA?.args).toEqual({ file_path: '/a/x.ts' });
expect(fnA?.id).toBe('call_A');
expect(fnB?.name).toBe('read_file');
expect(fnB?.args).toEqual({ file_path: '/b/y.ts' });
expect(fnB?.id).toBe('call_B');
});
});
@ -1193,23 +1306,26 @@ describe('OpenAIContentConverter', () => {
});
it('should convert streaming reasoning_content delta to a thought part', () => {
const chunk = converter.convertOpenAIChunkToGemini({
object: 'chat.completion.chunk',
id: 'chunk-1',
created: 456,
choices: [
{
index: 0,
delta: {
content: 'visible text',
reasoning_content: 'thinking...',
const chunk = converter.convertOpenAIChunkToGemini(
{
object: 'chat.completion.chunk',
id: 'chunk-1',
created: 456,
choices: [
{
index: 0,
delta: {
content: 'visible text',
reasoning_content: 'thinking...',
},
finish_reason: 'stop',
logprobs: null,
},
finish_reason: 'stop',
logprobs: null,
},
],
model: 'gpt-test',
} as unknown as OpenAI.Chat.ChatCompletionChunk);
],
model: 'gpt-test',
} as unknown as OpenAI.Chat.ChatCompletionChunk,
converter.createStreamContext(),
);
const parts = chunk.candidates?.[0]?.content?.parts;
expect(parts?.[0]).toEqual(
@ -1221,23 +1337,26 @@ describe('OpenAIContentConverter', () => {
});
it('should convert streaming reasoning delta to a thought part', () => {
const chunk = converter.convertOpenAIChunkToGemini({
object: 'chat.completion.chunk',
id: 'chunk-1b',
created: 456,
choices: [
{
index: 0,
delta: {
content: 'visible text',
reasoning: 'thinking...',
const chunk = converter.convertOpenAIChunkToGemini(
{
object: 'chat.completion.chunk',
id: 'chunk-1b',
created: 456,
choices: [
{
index: 0,
delta: {
content: 'visible text',
reasoning: 'thinking...',
},
finish_reason: 'stop',
logprobs: null,
},
finish_reason: 'stop',
logprobs: null,
},
],
model: 'gpt-test',
} as unknown as OpenAI.Chat.ChatCompletionChunk);
],
model: 'gpt-test',
} as unknown as OpenAI.Chat.ChatCompletionChunk,
converter.createStreamContext(),
);
const parts = chunk.candidates?.[0]?.content?.parts;
expect(parts?.[0]).toEqual(
@ -1249,21 +1368,24 @@ describe('OpenAIContentConverter', () => {
});
it('should not throw when streaming chunk has no delta', () => {
const chunk = converter.convertOpenAIChunkToGemini({
object: 'chat.completion.chunk',
id: 'chunk-2',
created: 456,
choices: [
{
index: 0,
// Some OpenAI-compatible providers may omit delta entirely.
delta: undefined,
finish_reason: null,
logprobs: null,
},
],
model: 'gpt-test',
} as unknown as OpenAI.Chat.ChatCompletionChunk);
const chunk = converter.convertOpenAIChunkToGemini(
{
object: 'chat.completion.chunk',
id: 'chunk-2',
created: 456,
choices: [
{
index: 0,
// Some OpenAI-compatible providers may omit delta entirely.
delta: undefined,
finish_reason: null,
logprobs: null,
},
],
model: 'gpt-test',
} as unknown as OpenAI.Chat.ChatCompletionChunk,
converter.createStreamContext(),
);
const parts = chunk.candidates?.[0]?.content?.parts;
expect(parts).toEqual([]);
@ -2109,51 +2231,60 @@ describe('Truncated tool call detection in streaming', () => {
}>,
finishReason: string,
) {
// One stream-local context covers every chunk of this simulated stream.
const ctx = conv.createStreamContext();
// Feed argument chunks (no finish_reason yet)
for (const tc of toolCallChunks) {
conv.convertOpenAIChunkToGemini({
conv.convertOpenAIChunkToGemini(
{
object: 'chat.completion.chunk',
id: 'chunk-stream',
created: 100,
model: 'test-model',
choices: [
{
index: 0,
delta: {
tool_calls: [
{
index: tc.index,
id: tc.id,
type: 'function' as const,
function: {
name: tc.name,
arguments: tc.arguments,
},
},
],
},
finish_reason: null,
logprobs: null,
},
],
} as unknown as OpenAI.Chat.ChatCompletionChunk,
ctx,
);
}
// Final chunk with finish_reason
return conv.convertOpenAIChunkToGemini(
{
object: 'chat.completion.chunk',
id: 'chunk-stream',
created: 100,
id: 'chunk-final',
created: 101,
model: 'test-model',
choices: [
{
index: 0,
delta: {
tool_calls: [
{
index: tc.index,
id: tc.id,
type: 'function' as const,
function: {
name: tc.name,
arguments: tc.arguments,
},
},
],
},
finish_reason: null,
delta: {},
finish_reason: finishReason,
logprobs: null,
},
],
} as unknown as OpenAI.Chat.ChatCompletionChunk);
}
// Final chunk with finish_reason
return conv.convertOpenAIChunkToGemini({
object: 'chat.completion.chunk',
id: 'chunk-final',
created: 101,
model: 'test-model',
choices: [
{
index: 0,
delta: {},
finish_reason: finishReason,
logprobs: null,
},
],
} as unknown as OpenAI.Chat.ChatCompletionChunk);
} as unknown as OpenAI.Chat.ChatCompletionChunk,
ctx,
);
}
it('should override finishReason to MAX_TOKENS when tool call JSON is truncated and provider reports "stop"', () => {
@ -2254,70 +2385,80 @@ describe('Truncated tool call detection in streaming', () => {
it('should detect truncation with multi-chunk streaming arguments', () => {
// Feed arguments in multiple small chunks like real streaming
const conv = new OpenAIContentConverter('test-model');
const ctx = conv.createStreamContext();
// Chunk 1: start of JSON with tool metadata
conv.convertOpenAIChunkToGemini({
object: 'chat.completion.chunk',
id: 'c1',
created: 100,
model: 'test-model',
choices: [
{
index: 0,
delta: {
tool_calls: [
{
index: 0,
id: 'call_1',
type: 'function' as const,
function: { name: 'write_file', arguments: '{"file_' },
},
],
conv.convertOpenAIChunkToGemini(
{
object: 'chat.completion.chunk',
id: 'c1',
created: 100,
model: 'test-model',
choices: [
{
index: 0,
delta: {
tool_calls: [
{
index: 0,
id: 'call_1',
type: 'function' as const,
function: { name: 'write_file', arguments: '{"file_' },
},
],
},
finish_reason: null,
logprobs: null,
},
finish_reason: null,
logprobs: null,
},
],
} as unknown as OpenAI.Chat.ChatCompletionChunk);
],
} as unknown as OpenAI.Chat.ChatCompletionChunk,
ctx,
);
// Chunk 2: more arguments
conv.convertOpenAIChunkToGemini({
object: 'chat.completion.chunk',
id: 'c2',
created: 100,
model: 'test-model',
choices: [
{
index: 0,
delta: {
tool_calls: [
{
index: 0,
function: { arguments: 'path": "/tmp/f.txt", "conten' },
},
],
conv.convertOpenAIChunkToGemini(
{
object: 'chat.completion.chunk',
id: 'c2',
created: 100,
model: 'test-model',
choices: [
{
index: 0,
delta: {
tool_calls: [
{
index: 0,
function: { arguments: 'path": "/tmp/f.txt", "conten' },
},
],
},
finish_reason: null,
logprobs: null,
},
finish_reason: null,
logprobs: null,
},
],
} as unknown as OpenAI.Chat.ChatCompletionChunk);
],
} as unknown as OpenAI.Chat.ChatCompletionChunk,
ctx,
);
// Final chunk: finish_reason "stop" but JSON is still incomplete
const result = conv.convertOpenAIChunkToGemini({
object: 'chat.completion.chunk',
id: 'c3',
created: 101,
model: 'test-model',
choices: [
{
index: 0,
delta: {},
finish_reason: 'stop',
logprobs: null,
},
],
} as unknown as OpenAI.Chat.ChatCompletionChunk);
const result = conv.convertOpenAIChunkToGemini(
{
object: 'chat.completion.chunk',
id: 'c3',
created: 101,
model: 'test-model',
choices: [
{
index: 0,
delta: {},
finish_reason: 'stop',
logprobs: null,
},
],
} as unknown as OpenAI.Chat.ChatCompletionChunk,
ctx,
);
expect(result.candidates?.[0]?.finishReason).toBe(FinishReason.MAX_TOKENS);
});

View file

@ -90,6 +90,17 @@ type OpenAIContentPart =
| OpenAIContentPartVideoUrl
| OpenAIContentPartFile;
/**
* Per-stream state for tool-call parsing. Created by
* `OpenAIContentConverter.createStreamContext()` at the start of each
* streaming response and passed into every `convertOpenAIChunkToGemini`
* call on that stream, so concurrent streams (parallel subagents, fork
* children, ) never share parser state.
*/
export interface ConverterStreamContext {
toolCallParser: StreamingToolCallParser;
}
/**
* Converter class for transforming data between Gemini and OpenAI formats
*/
@ -97,8 +108,6 @@ export class OpenAIContentConverter {
private model: string;
private schemaCompliance: SchemaComplianceMode;
private modalities: InputModalities;
private streamingToolCallParser: StreamingToolCallParser =
new StreamingToolCallParser();
constructor(
model: string,
@ -126,12 +135,21 @@ export class OpenAIContentConverter {
}
/**
* Reset streaming tool calls parser for new stream processing
* This should be called at the beginning of each stream to prevent
* data pollution from previous incomplete streams
* Create fresh per-stream state for processing one OpenAI streaming
* response. The returned context is passed into every
* `convertOpenAIChunkToGemini` call for that stream, then discarded.
*
* Previously the tool-call parser lived on the Converter instance and
* was shared by every caller of the singleton `Config.contentGenerator`.
* Concurrent streams (e.g. two subagents running in parallel after
* PR #3463) raced on that shared state: each stream's stream-start
* `reset()` wiped the other's partial tool-call buffers, chunks from
* different streams landed at the same `index=0` bucket, and
* `getCompletedToolCalls()` returned interleaved corrupt JSON that
* surfaced upstream as `NO_RESPONSE_TEXT` (issue #3516).
*/
resetStreamingToolCalls(): void {
this.streamingToolCallParser.reset();
createStreamContext(): ConverterStreamContext {
return { toolCallParser: new StreamingToolCallParser() };
}
/**
@ -931,10 +949,17 @@ export class OpenAIContentConverter {
}
/**
* Convert OpenAI stream chunk to Gemini format
* Convert OpenAI stream chunk to Gemini format.
*
* `ctx` carries the tool-call parser for this stream. Callers MUST
* obtain it from `createStreamContext()` at the start of the stream
* and pass the same instance for every chunk of that stream. Concurrent
* streams MUST use distinct contexts or their tool-call buffers will
* interleave (issue #3516).
*/
convertOpenAIChunkToGemini(
chunk: OpenAI.Chat.ChatCompletionChunk,
ctx: ConverterStreamContext,
): GenerateContentResponse {
const choice = chunk.choices?.[0];
const response = new GenerateContentResponse();
@ -956,14 +981,14 @@ export class OpenAIContentConverter {
}
}
// Handle tool calls using the streaming parser
// Handle tool calls using the stream-local parser
if (choice.delta?.tool_calls) {
for (const toolCall of choice.delta.tool_calls) {
const index = toolCall.index ?? 0;
// Process the tool call chunk through the streaming parser
if (toolCall.function?.arguments) {
this.streamingToolCallParser.addChunk(
ctx.toolCallParser.addChunk(
index,
toolCall.function.arguments,
toolCall.id,
@ -971,7 +996,7 @@ export class OpenAIContentConverter {
);
} else {
// Handle metadata-only chunks (id and/or name without arguments)
this.streamingToolCallParser.addChunk(
ctx.toolCallParser.addChunk(
index,
'', // Empty chunk for metadata-only updates
toolCall.id,
@ -987,11 +1012,9 @@ export class OpenAIContentConverter {
// Detect truncation the provider may not report correctly.
// Some providers (e.g. DashScope/Qwen) send "stop" or "tool_calls"
// even when output was cut off mid-JSON due to max_tokens.
toolCallsTruncated =
this.streamingToolCallParser.hasIncompleteToolCalls();
toolCallsTruncated = ctx.toolCallParser.hasIncompleteToolCalls();
const completedToolCalls =
this.streamingToolCallParser.getCompletedToolCalls();
const completedToolCalls = ctx.toolCallParser.getCompletedToolCalls();
for (const toolCall of completedToolCalls) {
if (toolCall.name) {
@ -1007,8 +1030,9 @@ export class OpenAIContentConverter {
}
}
// Clear the parser for the next stream
this.streamingToolCallParser.reset();
// Parser is stream-local; it will be discarded with the
// ConverterStreamContext when the stream finishes. No manual
// reset needed.
}
// If tool call JSON was truncated, override to "length" so downstream

View file

@ -0,0 +1,350 @@
/**
* @license
* Copyright 2025 Qwen
* SPDX-License-Identifier: Apache-2.0
*/
/**
* Integration test deliberately does NOT mock `./converter.js`. Unlike
* `pipeline.test.ts` which stubs the converter, this suite drives the real
* `ContentGenerationPipeline` + real `OpenAIContentConverter` through two
* streams that interleave on the event loop, and asserts that tool-call
* arguments from one stream never bleed into the other's output.
*
* This is the regression test for issue #3516: before the per-stream
* parser scoping fix, the Converter singleton held a single
* `StreamingToolCallParser` instance. Two concurrent streams would share
* it; each stream's entry-time reset wiped the other's partial buffers,
* and chunks routed by `index: 0` interleaved into corrupt JSON.
*
* With the fix, `processStreamWithLogging` creates a fresh
* `ConverterStreamContext` at stream entry, so each concurrent generator
* has its own parser. This test would fail deterministically on pre-fix
* code because stream B's entry would wipe stream A's accumulator
* mid-flight, and A's finish chunk would emit zero function calls
* (`wasOutputTruncated`-style behavior).
*/
import { describe, it, expect, vi } from 'vitest';
import type OpenAI from 'openai';
import type { GenerateContentParameters } from '@google/genai';
import type { Part } from '@google/genai';
import type { PipelineConfig } from './pipeline.js';
import { ContentGenerationPipeline } from './pipeline.js';
import type { Config } from '../../config/config.js';
import type { ContentGeneratorConfig, AuthType } from '../contentGenerator.js';
import type { OpenAICompatibleProvider } from './provider/index.js';
import type { ErrorHandler } from './errorHandler.js';
type ChunkFactory = () => OpenAI.Chat.ChatCompletionChunk;
/**
* Build a slow stream that yields to the event loop between chunks.
* Without the `setImmediate` await, a `for await` loop on one stream
* drains synchronously and `Promise.all` degenerates to serial execution,
* which hides the cross-stream bug.
*/
async function* interleavingStream(
chunks: ChunkFactory[],
): AsyncGenerator<OpenAI.Chat.ChatCompletionChunk> {
for (const make of chunks) {
// Yield control so the sibling stream can advance one step before we do.
await new Promise((r) => setImmediate(r));
yield make();
}
}
function openerChunk(
id: string,
name: string,
firstArgs: string,
): OpenAI.Chat.ChatCompletionChunk {
return {
id: `${id}-opener`,
object: 'chat.completion.chunk',
created: 1,
model: 'test',
choices: [
{
index: 0,
delta: {
tool_calls: [
{
index: 0,
id,
type: 'function',
function: { name, arguments: firstArgs },
},
],
},
finish_reason: null,
logprobs: null,
},
],
} as unknown as OpenAI.Chat.ChatCompletionChunk;
}
function continuationChunk(
argsFragment: string,
): OpenAI.Chat.ChatCompletionChunk {
return {
id: 'cont',
object: 'chat.completion.chunk',
created: 1,
model: 'test',
choices: [
{
index: 0,
delta: {
tool_calls: [{ index: 0, function: { arguments: argsFragment } }],
},
finish_reason: null,
logprobs: null,
},
],
} as unknown as OpenAI.Chat.ChatCompletionChunk;
}
function finisherChunk(): OpenAI.Chat.ChatCompletionChunk {
return {
id: 'finish',
object: 'chat.completion.chunk',
created: 1,
model: 'test',
choices: [
{
index: 0,
delta: {},
finish_reason: 'tool_calls',
logprobs: null,
},
],
usage: { prompt_tokens: 10, completion_tokens: 20, total_tokens: 30 },
} as unknown as OpenAI.Chat.ChatCompletionChunk;
}
describe('ContentGenerationPipeline — concurrent streams (issue #3516)', () => {
function buildPipeline(
createStreamImpl: () => AsyncIterable<OpenAI.Chat.ChatCompletionChunk>,
) {
const mockClient = {
chat: {
completions: {
// Each call returns a fresh stream. The real Pipeline will
// invoke this twice — once per concurrent executeStream call.
create: vi.fn().mockImplementation(() => createStreamImpl()),
},
},
} as unknown as OpenAI;
const mockProvider: OpenAICompatibleProvider = {
buildClient: vi.fn().mockReturnValue(mockClient),
buildRequest: vi.fn().mockImplementation((req) => req),
buildHeaders: vi.fn().mockReturnValue({}),
getDefaultGenerationConfig: vi.fn().mockReturnValue({}),
} as unknown as OpenAICompatibleProvider;
const mockErrorHandler: ErrorHandler = {
handle: vi.fn().mockImplementation((error: unknown) => {
throw error;
}),
shouldSuppressErrorLogging: vi.fn().mockReturnValue(false),
} as unknown as ErrorHandler;
const contentGeneratorConfig: ContentGeneratorConfig = {
model: 'test-model',
authType: 'openai' as AuthType,
} as ContentGeneratorConfig;
const config: PipelineConfig = {
cliConfig: {} as Config,
provider: mockProvider,
contentGeneratorConfig,
errorHandler: mockErrorHandler,
};
return { pipeline: new ContentGenerationPipeline(config), mockClient };
}
it('two concurrent streams keep their tool-call buffers isolated', async () => {
// Queue of pending stream factories — each call to the mocked
// chat.completions.create consumes one.
const streamQueue: Array<
() => AsyncIterable<OpenAI.Chat.ChatCompletionChunk>
> = [];
streamQueue.push(() =>
interleavingStream([
() => openerChunk('call_A', 'read_file', '{"file_path":"/a'),
() => continuationChunk('/one.ts"}'),
() => finisherChunk(),
]),
);
streamQueue.push(() =>
interleavingStream([
() => openerChunk('call_B', 'read_file', '{"file_path":"/b'),
() => continuationChunk('/two.ts"}'),
() => finisherChunk(),
]),
);
const { pipeline } = buildPipeline(() => {
const next = streamQueue.shift();
if (!next) throw new Error('unexpected extra stream request');
return next();
});
const request: GenerateContentParameters = {
model: 'test-model',
contents: [{ role: 'user', parts: [{ text: 'read the files' }] }],
};
// Kick off both streams *before* consuming either, so the two generators
// are actually alive on the event loop at the same time.
const [streamA, streamB] = await Promise.all([
pipeline.executeStream(request, 'prompt-a'),
pipeline.executeStream(request, 'prompt-b'),
]);
// Interleaved consumption: alternate one chunk from each to maximize
// parser state overlap.
const collectedA: unknown[] = [];
const collectedB: unknown[] = [];
const aIter = streamA[Symbol.asyncIterator]();
const bIter = streamB[Symbol.asyncIterator]();
while (true) {
const [aNext, bNext] = await Promise.all([aIter.next(), bIter.next()]);
if (!aNext.done) collectedA.push(aNext.value);
if (!bNext.done) collectedB.push(bNext.value);
if (aNext.done && bNext.done) break;
}
const extractFunctionCall = (responses: unknown[]) => {
for (const resp of responses) {
const candidates = (
resp as { candidates?: Array<{ content?: { parts?: Part[] } }> }
).candidates;
const parts = candidates?.[0]?.content?.parts ?? [];
const fc = parts.find((p) => p.functionCall)?.functionCall;
if (fc) return fc;
}
return undefined;
};
const fnA = extractFunctionCall(collectedA);
const fnB = extractFunctionCall(collectedB);
// Pre-fix behaviour: at least one of these would either be undefined
// (buffer wiped by the other stream's reset) or carry the wrong args
// (other stream's chunks merged into this bucket).
expect(fnA?.name).toBe('read_file');
expect(fnA?.id).toBe('call_A');
expect(fnA?.args).toEqual({ file_path: '/a/one.ts' });
expect(fnB?.name).toBe('read_file');
expect(fnB?.id).toBe('call_B');
expect(fnB?.args).toEqual({ file_path: '/b/two.ts' });
});
it('an error in one stream does not poison a concurrent stream (no shared reset on error)', async () => {
// Stream A: normal tool call.
// Stream B: yields an `error_finish` chunk mid-flight, which the
// Pipeline wraps as StreamContentError.
// Pre-fix: the error path ran `resetStreamingToolCalls()` on the shared
// converter, wiping A's partial buffers. Post-fix: streamCtx is local
// to each generator, so A is untouched.
const streamQueue: Array<
() => AsyncIterable<OpenAI.Chat.ChatCompletionChunk>
> = [];
streamQueue.push(() =>
interleavingStream([
() => openerChunk('call_A', 'read_file', '{"file_path":"/x'),
() => continuationChunk('.ts"}'),
() => finisherChunk(),
]),
);
streamQueue.push(() =>
interleavingStream([
() => openerChunk('call_B', 'read_file', '{"file_path":"/y'),
// Inject an error_finish chunk — this triggers StreamContentError
// inside processStreamWithLogging's catch block.
() =>
({
id: 'err',
object: 'chat.completion.chunk',
created: 1,
model: 'test',
choices: [
{
index: 0,
delta: { content: 'rate limit' },
finish_reason: 'error_finish',
logprobs: null,
},
],
}) as unknown as OpenAI.Chat.ChatCompletionChunk,
]),
);
const { pipeline } = buildPipeline(() => {
const next = streamQueue.shift();
if (!next) throw new Error('unexpected extra stream request');
return next();
});
const request: GenerateContentParameters = {
model: 'test-model',
contents: [{ role: 'user', parts: [{ text: 'read the files' }] }],
};
const [streamA, streamB] = await Promise.all([
pipeline.executeStream(request, 'prompt-a'),
pipeline.executeStream(request, 'prompt-b'),
]);
const consumeA = (async () => {
const out: unknown[] = [];
for await (const r of streamA) out.push(r);
return out;
})();
const consumeB = (async () => {
try {
for await (const _ of streamB) {
/* drain */
}
return 'completed';
} catch (e) {
return e instanceof Error ? e.message : String(e);
}
})();
const [aResults, bOutcome] = await Promise.all([consumeA, consumeB]);
// Stream B blew up as expected.
expect(typeof bOutcome).toBe('string');
expect(bOutcome).toContain('rate limit');
// Stream A still emitted its function call cleanly, despite B's error
// path running concurrently. On pre-fix code the error path would have
// called converter.resetStreamingToolCalls(), wiping A's in-flight
// buffer and causing A to emit zero function calls.
const fnA = (() => {
for (const resp of aResults) {
const parts =
(resp as { candidates?: Array<{ content?: { parts?: Part[] } }> })
.candidates?.[0]?.content?.parts ?? [];
const fc = parts.find((p) => p.functionCall)?.functionCall;
if (fc) return fc;
}
return undefined;
})();
expect(fnA?.name).toBe('read_file');
expect(fnA?.id).toBe('call_A');
expect(fnA?.args).toEqual({ file_path: '/x.ts' });
});
});

View file

@ -12,6 +12,7 @@ import { GenerateContentResponse, Type, FinishReason } from '@google/genai';
import type { PipelineConfig } from './pipeline.js';
import { ContentGenerationPipeline, StreamContentError } from './pipeline.js';
import { OpenAIContentConverter } from './converter.js';
import { StreamingToolCallParser } from './streamingToolCallParser.js';
import type { Config } from '../../config/config.js';
import type { ContentGeneratorConfig, AuthType } from '../contentGenerator.js';
import type { OpenAICompatibleProvider } from './provider/index.js';
@ -44,7 +45,8 @@ describe('ContentGenerationPipeline', () => {
},
} as unknown as OpenAI;
// Mock converter
// Mock converter. `createStreamContext` returns a fresh parser each
// stream; tests that don't care about tool-call buffers just ignore it.
mockConverter = {
setModel: vi.fn(),
setModalities: vi.fn(),
@ -52,7 +54,9 @@ describe('ContentGenerationPipeline', () => {
convertOpenAIResponseToGemini: vi.fn(),
convertOpenAIChunkToGemini: vi.fn(),
convertGeminiToolsToOpenAI: vi.fn(),
resetStreamingToolCalls: vi.fn(),
createStreamContext: vi.fn(() => ({
toolCallParser: new StreamingToolCallParser(),
})),
} as unknown as OpenAIContentConverter;
// Mock provider
@ -607,7 +611,9 @@ describe('ContentGenerationPipeline', () => {
expect(results).toHaveLength(2);
expect(results[0]).toBe(mockGeminiResponse1);
expect(results[1]).toBe(mockGeminiResponse2);
expect(mockConverter.resetStreamingToolCalls).toHaveBeenCalled();
// Parser is now created per-stream via createStreamContext — assert
// that the pipeline asked for a fresh one at stream entry.
expect(mockConverter.createStreamContext).toHaveBeenCalled();
expect(mockClient.chat.completions.create).toHaveBeenCalledWith(
expect.objectContaining({
stream: true,
@ -719,7 +725,10 @@ describe('ContentGenerationPipeline', () => {
}
expect(results).toHaveLength(0); // No results due to error
expect(mockConverter.resetStreamingToolCalls).toHaveBeenCalledTimes(2); // Once at start, once on error
// createStreamContext is called exactly once at stream entry; the
// error path no longer needs an explicit parser reset because the
// stream-local context is discarded when the generator unwinds.
expect(mockConverter.createStreamContext).toHaveBeenCalledTimes(1);
expect(mockErrorHandler.handle).toHaveBeenCalledWith(
testError,
expect.any(Object),

View file

@ -143,8 +143,12 @@ export class ContentGenerationPipeline {
): AsyncGenerator<GenerateContentResponse> {
const collectedGeminiResponses: GenerateContentResponse[] = [];
// Reset streaming tool calls to prevent data pollution from previous streams
this.converter.resetStreamingToolCalls();
// Stream-local parser state. Previously the tool-call parser lived on
// the Converter singleton and was reset at stream start — but that
// wiped concurrent streams' in-flight buffers (e.g. parallel subagents
// sharing the same Config.contentGenerator). Scoping it per-stream
// fixes issue #3516.
const streamCtx = this.converter.createStreamContext();
// State for handling chunk merging.
// pendingFinishResponse holds a finish chunk waiting to be merged with
@ -170,7 +174,10 @@ export class ContentGenerationPipeline {
throw new StreamContentError(errorContent);
}
const response = this.converter.convertOpenAIChunkToGemini(chunk);
const response = this.converter.convertOpenAIChunkToGemini(
chunk,
streamCtx,
);
// Stage 2b: Filter empty responses to avoid downstream issues
if (
@ -234,8 +241,8 @@ export class ContentGenerationPipeline {
// Stage 2e: Stream completed successfully
context.duration = Date.now() - context.startTime;
} catch (error) {
// Clear streaming tool calls on error to prevent data pollution
this.converter.resetStreamingToolCalls();
// No manual parser cleanup needed — streamCtx is stream-local and
// becomes eligible for garbage collection once this generator unwinds.
// Re-throw StreamContentError directly so it can be handled by
// the caller's retry logic (e.g., TPM throttling retry in sendMessageStream)