diff --git a/packages/core/src/core/openaiContentGenerator/converter.test.ts b/packages/core/src/core/openaiContentGenerator/converter.test.ts index 86d629026..09b49e412 100644 --- a/packages/core/src/core/openaiContentGenerator/converter.test.ts +++ b/packages/core/src/core/openaiContentGenerator/converter.test.ts @@ -6,7 +6,7 @@ import { describe, it, expect, beforeEach } from 'vitest'; import { OpenAIContentConverter } from './converter.js'; -import type { StreamingToolCallParser } from './streamingToolCallParser.js'; +import { StreamingToolCallParser } from './streamingToolCallParser.js'; import { Type, FinishReason, @@ -31,57 +31,170 @@ describe('OpenAIContentConverter', () => { }); }); - describe('resetStreamingToolCalls', () => { - it('should clear streaming tool calls accumulator', () => { - // Access private field for testing - const parser = ( - converter as unknown as { - streamingToolCallParser: StreamingToolCallParser; - } - ).streamingToolCallParser; + describe('createStreamContext', () => { + it('returns a fresh context with its own StreamingToolCallParser', () => { + const ctx1 = converter.createStreamContext(); + const ctx2 = converter.createStreamContext(); - // Add some test data to the parser - parser.addChunk(0, '{"arg": "value"}', 'test-id', 'test-function'); - parser.addChunk(1, '{"arg2": "value2"}', 'test-id-2', 'test-function-2'); - - // Verify data is present - expect(parser.getBuffer(0)).toBe('{"arg": "value"}'); - expect(parser.getBuffer(1)).toBe('{"arg2": "value2"}'); - - // Call reset method - converter.resetStreamingToolCalls(); - - // Verify data is cleared - expect(parser.getBuffer(0)).toBe(''); - expect(parser.getBuffer(1)).toBe(''); + expect(ctx1.toolCallParser).toBeInstanceOf(StreamingToolCallParser); + expect(ctx2.toolCallParser).toBeInstanceOf(StreamingToolCallParser); + expect(ctx1.toolCallParser).not.toBe(ctx2.toolCallParser); }); - it('should be safe to call multiple times', () => { - // Call reset multiple times - converter.resetStreamingToolCalls(); - converter.resetStreamingToolCalls(); - converter.resetStreamingToolCalls(); + it('isolates two contexts so writes to one do not leak into the other', () => { + // Regression for issue #3516: previously the parser lived on the + // Converter as an instance field, so two concurrent streams sharing + // the same Config.contentGenerator would overwrite each other's + // tool-call buffers. Per-stream contexts eliminate that contention. + const ctx1 = converter.createStreamContext(); + const ctx2 = converter.createStreamContext(); - // Should not throw any errors - const parser = ( - converter as unknown as { - streamingToolCallParser: StreamingToolCallParser; - } - ).streamingToolCallParser; - expect(parser.getBuffer(0)).toBe(''); + ctx1.toolCallParser.addChunk(0, '{"a":1}', 'call_A', 'fn_A'); + ctx2.toolCallParser.addChunk(0, '{"b":2}', 'call_B', 'fn_B'); + + expect(ctx1.toolCallParser.getBuffer(0)).toBe('{"a":1}'); + expect(ctx2.toolCallParser.getBuffer(0)).toBe('{"b":2}'); + expect(ctx1.toolCallParser.getToolCallMeta(0).id).toBe('call_A'); + expect(ctx2.toolCallParser.getToolCallMeta(0).id).toBe('call_B'); }); - it('should be safe to call on empty accumulator', () => { - // Call reset on empty accumulator - converter.resetStreamingToolCalls(); + it('demuxes interleaved chunks from two concurrent streams correctly (#3516)', () => { + // Real-world shape: two subagents share one Config (hence one + // Converter). Their OpenAI streams run concurrently; chunks arrive + // interleaved at the event loop. Under the pre-fix architecture + // this corrupted both tool calls; under per-stream contexts each + // stream's chunks stay in their own parser and close cleanly. + const streamA = converter.createStreamContext(); + const streamB = converter.createStreamContext(); - // Should not throw any errors - const parser = ( - converter as unknown as { - streamingToolCallParser: StreamingToolCallParser; - } - ).streamingToolCallParser; - expect(parser.getBuffer(0)).toBe(''); + const openerA = { + object: 'chat.completion.chunk', + id: 'A-open', + created: 1, + model: 'test', + choices: [ + { + index: 0, + delta: { + tool_calls: [ + { + index: 0, + id: 'call_A', + type: 'function' as const, + function: { + name: 'read_file', + arguments: '{"file_path":"/a', + }, + }, + ], + }, + finish_reason: null, + logprobs: null, + }, + ], + } as unknown as OpenAI.Chat.ChatCompletionChunk; + + const openerB = { + ...openerA, + id: 'B-open', + choices: [ + { + index: 0, + delta: { + tool_calls: [ + { + index: 0, + id: 'call_B', + type: 'function' as const, + function: { + name: 'read_file', + arguments: '{"file_path":"/b', + }, + }, + ], + }, + finish_reason: null, + logprobs: null, + }, + ], + } as unknown as OpenAI.Chat.ChatCompletionChunk; + + const contA = { + ...openerA, + id: 'A-cont', + choices: [ + { + index: 0, + delta: { + tool_calls: [{ index: 0, function: { arguments: '/x.ts"}' } }], + }, + finish_reason: null, + logprobs: null, + }, + ], + } as unknown as OpenAI.Chat.ChatCompletionChunk; + + const contB = { + ...openerB, + id: 'B-cont', + choices: [ + { + index: 0, + delta: { + tool_calls: [{ index: 0, function: { arguments: '/y.ts"}' } }], + }, + finish_reason: null, + logprobs: null, + }, + ], + } as unknown as OpenAI.Chat.ChatCompletionChunk; + + const finisher = (id: string) => + ({ + object: 'chat.completion.chunk', + id, + created: 2, + model: 'test', + choices: [ + { + index: 0, + delta: {}, + finish_reason: 'tool_calls', + logprobs: null, + }, + ], + }) as unknown as OpenAI.Chat.ChatCompletionChunk; + + // Interleave the two streams. Pre-fix this produced corrupt JSON + // because every chunk fed the same shared parser. + converter.convertOpenAIChunkToGemini(openerA, streamA); + converter.convertOpenAIChunkToGemini(openerB, streamB); + converter.convertOpenAIChunkToGemini(contA, streamA); + converter.convertOpenAIChunkToGemini(contB, streamB); + + const resultA = converter.convertOpenAIChunkToGemini( + finisher('A-finish'), + streamA, + ); + const resultB = converter.convertOpenAIChunkToGemini( + finisher('B-finish'), + streamB, + ); + + const fnA = resultA.candidates?.[0]?.content?.parts?.find( + (p: Part) => p.functionCall, + )?.functionCall; + const fnB = resultB.candidates?.[0]?.content?.parts?.find( + (p: Part) => p.functionCall, + )?.functionCall; + + expect(fnA?.name).toBe('read_file'); + expect(fnA?.args).toEqual({ file_path: '/a/x.ts' }); + expect(fnA?.id).toBe('call_A'); + + expect(fnB?.name).toBe('read_file'); + expect(fnB?.args).toEqual({ file_path: '/b/y.ts' }); + expect(fnB?.id).toBe('call_B'); }); }); @@ -1193,23 +1306,26 @@ describe('OpenAIContentConverter', () => { }); it('should convert streaming reasoning_content delta to a thought part', () => { - const chunk = converter.convertOpenAIChunkToGemini({ - object: 'chat.completion.chunk', - id: 'chunk-1', - created: 456, - choices: [ - { - index: 0, - delta: { - content: 'visible text', - reasoning_content: 'thinking...', + const chunk = converter.convertOpenAIChunkToGemini( + { + object: 'chat.completion.chunk', + id: 'chunk-1', + created: 456, + choices: [ + { + index: 0, + delta: { + content: 'visible text', + reasoning_content: 'thinking...', + }, + finish_reason: 'stop', + logprobs: null, }, - finish_reason: 'stop', - logprobs: null, - }, - ], - model: 'gpt-test', - } as unknown as OpenAI.Chat.ChatCompletionChunk); + ], + model: 'gpt-test', + } as unknown as OpenAI.Chat.ChatCompletionChunk, + converter.createStreamContext(), + ); const parts = chunk.candidates?.[0]?.content?.parts; expect(parts?.[0]).toEqual( @@ -1221,23 +1337,26 @@ describe('OpenAIContentConverter', () => { }); it('should convert streaming reasoning delta to a thought part', () => { - const chunk = converter.convertOpenAIChunkToGemini({ - object: 'chat.completion.chunk', - id: 'chunk-1b', - created: 456, - choices: [ - { - index: 0, - delta: { - content: 'visible text', - reasoning: 'thinking...', + const chunk = converter.convertOpenAIChunkToGemini( + { + object: 'chat.completion.chunk', + id: 'chunk-1b', + created: 456, + choices: [ + { + index: 0, + delta: { + content: 'visible text', + reasoning: 'thinking...', + }, + finish_reason: 'stop', + logprobs: null, }, - finish_reason: 'stop', - logprobs: null, - }, - ], - model: 'gpt-test', - } as unknown as OpenAI.Chat.ChatCompletionChunk); + ], + model: 'gpt-test', + } as unknown as OpenAI.Chat.ChatCompletionChunk, + converter.createStreamContext(), + ); const parts = chunk.candidates?.[0]?.content?.parts; expect(parts?.[0]).toEqual( @@ -1249,21 +1368,24 @@ describe('OpenAIContentConverter', () => { }); it('should not throw when streaming chunk has no delta', () => { - const chunk = converter.convertOpenAIChunkToGemini({ - object: 'chat.completion.chunk', - id: 'chunk-2', - created: 456, - choices: [ - { - index: 0, - // Some OpenAI-compatible providers may omit delta entirely. - delta: undefined, - finish_reason: null, - logprobs: null, - }, - ], - model: 'gpt-test', - } as unknown as OpenAI.Chat.ChatCompletionChunk); + const chunk = converter.convertOpenAIChunkToGemini( + { + object: 'chat.completion.chunk', + id: 'chunk-2', + created: 456, + choices: [ + { + index: 0, + // Some OpenAI-compatible providers may omit delta entirely. + delta: undefined, + finish_reason: null, + logprobs: null, + }, + ], + model: 'gpt-test', + } as unknown as OpenAI.Chat.ChatCompletionChunk, + converter.createStreamContext(), + ); const parts = chunk.candidates?.[0]?.content?.parts; expect(parts).toEqual([]); @@ -2109,51 +2231,60 @@ describe('Truncated tool call detection in streaming', () => { }>, finishReason: string, ) { + // One stream-local context covers every chunk of this simulated stream. + const ctx = conv.createStreamContext(); + // Feed argument chunks (no finish_reason yet) for (const tc of toolCallChunks) { - conv.convertOpenAIChunkToGemini({ + conv.convertOpenAIChunkToGemini( + { + object: 'chat.completion.chunk', + id: 'chunk-stream', + created: 100, + model: 'test-model', + choices: [ + { + index: 0, + delta: { + tool_calls: [ + { + index: tc.index, + id: tc.id, + type: 'function' as const, + function: { + name: tc.name, + arguments: tc.arguments, + }, + }, + ], + }, + finish_reason: null, + logprobs: null, + }, + ], + } as unknown as OpenAI.Chat.ChatCompletionChunk, + ctx, + ); + } + + // Final chunk with finish_reason + return conv.convertOpenAIChunkToGemini( + { object: 'chat.completion.chunk', - id: 'chunk-stream', - created: 100, + id: 'chunk-final', + created: 101, model: 'test-model', choices: [ { index: 0, - delta: { - tool_calls: [ - { - index: tc.index, - id: tc.id, - type: 'function' as const, - function: { - name: tc.name, - arguments: tc.arguments, - }, - }, - ], - }, - finish_reason: null, + delta: {}, + finish_reason: finishReason, logprobs: null, }, ], - } as unknown as OpenAI.Chat.ChatCompletionChunk); - } - - // Final chunk with finish_reason - return conv.convertOpenAIChunkToGemini({ - object: 'chat.completion.chunk', - id: 'chunk-final', - created: 101, - model: 'test-model', - choices: [ - { - index: 0, - delta: {}, - finish_reason: finishReason, - logprobs: null, - }, - ], - } as unknown as OpenAI.Chat.ChatCompletionChunk); + } as unknown as OpenAI.Chat.ChatCompletionChunk, + ctx, + ); } it('should override finishReason to MAX_TOKENS when tool call JSON is truncated and provider reports "stop"', () => { @@ -2254,70 +2385,80 @@ describe('Truncated tool call detection in streaming', () => { it('should detect truncation with multi-chunk streaming arguments', () => { // Feed arguments in multiple small chunks like real streaming const conv = new OpenAIContentConverter('test-model'); + const ctx = conv.createStreamContext(); // Chunk 1: start of JSON with tool metadata - conv.convertOpenAIChunkToGemini({ - object: 'chat.completion.chunk', - id: 'c1', - created: 100, - model: 'test-model', - choices: [ - { - index: 0, - delta: { - tool_calls: [ - { - index: 0, - id: 'call_1', - type: 'function' as const, - function: { name: 'write_file', arguments: '{"file_' }, - }, - ], + conv.convertOpenAIChunkToGemini( + { + object: 'chat.completion.chunk', + id: 'c1', + created: 100, + model: 'test-model', + choices: [ + { + index: 0, + delta: { + tool_calls: [ + { + index: 0, + id: 'call_1', + type: 'function' as const, + function: { name: 'write_file', arguments: '{"file_' }, + }, + ], + }, + finish_reason: null, + logprobs: null, }, - finish_reason: null, - logprobs: null, - }, - ], - } as unknown as OpenAI.Chat.ChatCompletionChunk); + ], + } as unknown as OpenAI.Chat.ChatCompletionChunk, + ctx, + ); // Chunk 2: more arguments - conv.convertOpenAIChunkToGemini({ - object: 'chat.completion.chunk', - id: 'c2', - created: 100, - model: 'test-model', - choices: [ - { - index: 0, - delta: { - tool_calls: [ - { - index: 0, - function: { arguments: 'path": "/tmp/f.txt", "conten' }, - }, - ], + conv.convertOpenAIChunkToGemini( + { + object: 'chat.completion.chunk', + id: 'c2', + created: 100, + model: 'test-model', + choices: [ + { + index: 0, + delta: { + tool_calls: [ + { + index: 0, + function: { arguments: 'path": "/tmp/f.txt", "conten' }, + }, + ], + }, + finish_reason: null, + logprobs: null, }, - finish_reason: null, - logprobs: null, - }, - ], - } as unknown as OpenAI.Chat.ChatCompletionChunk); + ], + } as unknown as OpenAI.Chat.ChatCompletionChunk, + ctx, + ); // Final chunk: finish_reason "stop" but JSON is still incomplete - const result = conv.convertOpenAIChunkToGemini({ - object: 'chat.completion.chunk', - id: 'c3', - created: 101, - model: 'test-model', - choices: [ - { - index: 0, - delta: {}, - finish_reason: 'stop', - logprobs: null, - }, - ], - } as unknown as OpenAI.Chat.ChatCompletionChunk); + const result = conv.convertOpenAIChunkToGemini( + { + object: 'chat.completion.chunk', + id: 'c3', + created: 101, + model: 'test-model', + choices: [ + { + index: 0, + delta: {}, + finish_reason: 'stop', + logprobs: null, + }, + ], + } as unknown as OpenAI.Chat.ChatCompletionChunk, + ctx, + ); expect(result.candidates?.[0]?.finishReason).toBe(FinishReason.MAX_TOKENS); }); diff --git a/packages/core/src/core/openaiContentGenerator/converter.ts b/packages/core/src/core/openaiContentGenerator/converter.ts index 82439c8fa..dcab2c449 100644 --- a/packages/core/src/core/openaiContentGenerator/converter.ts +++ b/packages/core/src/core/openaiContentGenerator/converter.ts @@ -90,6 +90,17 @@ type OpenAIContentPart = | OpenAIContentPartVideoUrl | OpenAIContentPartFile; +/** + * Per-stream state for tool-call parsing. Created by + * `OpenAIContentConverter.createStreamContext()` at the start of each + * streaming response and passed into every `convertOpenAIChunkToGemini` + * call on that stream, so concurrent streams (parallel subagents, fork + * children, …) never share parser state. + */ +export interface ConverterStreamContext { + toolCallParser: StreamingToolCallParser; +} + /** * Converter class for transforming data between Gemini and OpenAI formats */ @@ -97,8 +108,6 @@ export class OpenAIContentConverter { private model: string; private schemaCompliance: SchemaComplianceMode; private modalities: InputModalities; - private streamingToolCallParser: StreamingToolCallParser = - new StreamingToolCallParser(); constructor( model: string, @@ -126,12 +135,21 @@ export class OpenAIContentConverter { } /** - * Reset streaming tool calls parser for new stream processing - * This should be called at the beginning of each stream to prevent - * data pollution from previous incomplete streams + * Create fresh per-stream state for processing one OpenAI streaming + * response. The returned context is passed into every + * `convertOpenAIChunkToGemini` call for that stream, then discarded. + * + * Previously the tool-call parser lived on the Converter instance and + * was shared by every caller of the singleton `Config.contentGenerator`. + * Concurrent streams (e.g. two subagents running in parallel after + * PR #3463) raced on that shared state: each stream's stream-start + * `reset()` wiped the other's partial tool-call buffers, chunks from + * different streams landed at the same `index=0` bucket, and + * `getCompletedToolCalls()` returned interleaved corrupt JSON that + * surfaced upstream as `NO_RESPONSE_TEXT` (issue #3516). */ - resetStreamingToolCalls(): void { - this.streamingToolCallParser.reset(); + createStreamContext(): ConverterStreamContext { + return { toolCallParser: new StreamingToolCallParser() }; } /** @@ -931,10 +949,17 @@ export class OpenAIContentConverter { } /** - * Convert OpenAI stream chunk to Gemini format + * Convert OpenAI stream chunk to Gemini format. + * + * `ctx` carries the tool-call parser for this stream. Callers MUST + * obtain it from `createStreamContext()` at the start of the stream + * and pass the same instance for every chunk of that stream. Concurrent + * streams MUST use distinct contexts or their tool-call buffers will + * interleave (issue #3516). */ convertOpenAIChunkToGemini( chunk: OpenAI.Chat.ChatCompletionChunk, + ctx: ConverterStreamContext, ): GenerateContentResponse { const choice = chunk.choices?.[0]; const response = new GenerateContentResponse(); @@ -956,14 +981,14 @@ export class OpenAIContentConverter { } } - // Handle tool calls using the streaming parser + // Handle tool calls using the stream-local parser if (choice.delta?.tool_calls) { for (const toolCall of choice.delta.tool_calls) { const index = toolCall.index ?? 0; // Process the tool call chunk through the streaming parser if (toolCall.function?.arguments) { - this.streamingToolCallParser.addChunk( + ctx.toolCallParser.addChunk( index, toolCall.function.arguments, toolCall.id, @@ -971,7 +996,7 @@ export class OpenAIContentConverter { ); } else { // Handle metadata-only chunks (id and/or name without arguments) - this.streamingToolCallParser.addChunk( + ctx.toolCallParser.addChunk( index, '', // Empty chunk for metadata-only updates toolCall.id, @@ -987,11 +1012,9 @@ export class OpenAIContentConverter { // Detect truncation the provider may not report correctly. // Some providers (e.g. DashScope/Qwen) send "stop" or "tool_calls" // even when output was cut off mid-JSON due to max_tokens. - toolCallsTruncated = - this.streamingToolCallParser.hasIncompleteToolCalls(); + toolCallsTruncated = ctx.toolCallParser.hasIncompleteToolCalls(); - const completedToolCalls = - this.streamingToolCallParser.getCompletedToolCalls(); + const completedToolCalls = ctx.toolCallParser.getCompletedToolCalls(); for (const toolCall of completedToolCalls) { if (toolCall.name) { @@ -1007,8 +1030,9 @@ export class OpenAIContentConverter { } } - // Clear the parser for the next stream - this.streamingToolCallParser.reset(); + // Parser is stream-local; it will be discarded with the + // ConverterStreamContext when the stream finishes. No manual + // reset needed. } // If tool call JSON was truncated, override to "length" so downstream diff --git a/packages/core/src/core/openaiContentGenerator/pipeline.concurrent.test.ts b/packages/core/src/core/openaiContentGenerator/pipeline.concurrent.test.ts new file mode 100644 index 000000000..0c83ab47c --- /dev/null +++ b/packages/core/src/core/openaiContentGenerator/pipeline.concurrent.test.ts @@ -0,0 +1,350 @@ +/** + * @license + * Copyright 2025 Qwen + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * Integration test — deliberately does NOT mock `./converter.js`. Unlike + * `pipeline.test.ts` which stubs the converter, this suite drives the real + * `ContentGenerationPipeline` + real `OpenAIContentConverter` through two + * streams that interleave on the event loop, and asserts that tool-call + * arguments from one stream never bleed into the other's output. + * + * This is the regression test for issue #3516: before the per-stream + * parser scoping fix, the Converter singleton held a single + * `StreamingToolCallParser` instance. Two concurrent streams would share + * it; each stream's entry-time reset wiped the other's partial buffers, + * and chunks routed by `index: 0` interleaved into corrupt JSON. + * + * With the fix, `processStreamWithLogging` creates a fresh + * `ConverterStreamContext` at stream entry, so each concurrent generator + * has its own parser. This test would fail deterministically on pre-fix + * code because stream B's entry would wipe stream A's accumulator + * mid-flight, and A's finish chunk would emit zero function calls + * (`wasOutputTruncated`-style behavior). + */ + +import { describe, it, expect, vi } from 'vitest'; +import type OpenAI from 'openai'; +import type { GenerateContentParameters } from '@google/genai'; +import type { Part } from '@google/genai'; +import type { PipelineConfig } from './pipeline.js'; +import { ContentGenerationPipeline } from './pipeline.js'; +import type { Config } from '../../config/config.js'; +import type { ContentGeneratorConfig, AuthType } from '../contentGenerator.js'; +import type { OpenAICompatibleProvider } from './provider/index.js'; +import type { ErrorHandler } from './errorHandler.js'; + +type ChunkFactory = () => OpenAI.Chat.ChatCompletionChunk; + +/** + * Build a slow stream that yields to the event loop between chunks. + * Without the `setImmediate` await, a `for await` loop on one stream + * drains synchronously and `Promise.all` degenerates to serial execution, + * which hides the cross-stream bug. + */ +async function* interleavingStream( + chunks: ChunkFactory[], +): AsyncGenerator { + for (const make of chunks) { + // Yield control so the sibling stream can advance one step before we do. + await new Promise((r) => setImmediate(r)); + yield make(); + } +} + +function openerChunk( + id: string, + name: string, + firstArgs: string, +): OpenAI.Chat.ChatCompletionChunk { + return { + id: `${id}-opener`, + object: 'chat.completion.chunk', + created: 1, + model: 'test', + choices: [ + { + index: 0, + delta: { + tool_calls: [ + { + index: 0, + id, + type: 'function', + function: { name, arguments: firstArgs }, + }, + ], + }, + finish_reason: null, + logprobs: null, + }, + ], + } as unknown as OpenAI.Chat.ChatCompletionChunk; +} + +function continuationChunk( + argsFragment: string, +): OpenAI.Chat.ChatCompletionChunk { + return { + id: 'cont', + object: 'chat.completion.chunk', + created: 1, + model: 'test', + choices: [ + { + index: 0, + delta: { + tool_calls: [{ index: 0, function: { arguments: argsFragment } }], + }, + finish_reason: null, + logprobs: null, + }, + ], + } as unknown as OpenAI.Chat.ChatCompletionChunk; +} + +function finisherChunk(): OpenAI.Chat.ChatCompletionChunk { + return { + id: 'finish', + object: 'chat.completion.chunk', + created: 1, + model: 'test', + choices: [ + { + index: 0, + delta: {}, + finish_reason: 'tool_calls', + logprobs: null, + }, + ], + usage: { prompt_tokens: 10, completion_tokens: 20, total_tokens: 30 }, + } as unknown as OpenAI.Chat.ChatCompletionChunk; +} + +describe('ContentGenerationPipeline — concurrent streams (issue #3516)', () => { + function buildPipeline( + createStreamImpl: () => AsyncIterable, + ) { + const mockClient = { + chat: { + completions: { + // Each call returns a fresh stream. The real Pipeline will + // invoke this twice — once per concurrent executeStream call. + create: vi.fn().mockImplementation(() => createStreamImpl()), + }, + }, + } as unknown as OpenAI; + + const mockProvider: OpenAICompatibleProvider = { + buildClient: vi.fn().mockReturnValue(mockClient), + buildRequest: vi.fn().mockImplementation((req) => req), + buildHeaders: vi.fn().mockReturnValue({}), + getDefaultGenerationConfig: vi.fn().mockReturnValue({}), + } as unknown as OpenAICompatibleProvider; + + const mockErrorHandler: ErrorHandler = { + handle: vi.fn().mockImplementation((error: unknown) => { + throw error; + }), + shouldSuppressErrorLogging: vi.fn().mockReturnValue(false), + } as unknown as ErrorHandler; + + const contentGeneratorConfig: ContentGeneratorConfig = { + model: 'test-model', + authType: 'openai' as AuthType, + } as ContentGeneratorConfig; + + const config: PipelineConfig = { + cliConfig: {} as Config, + provider: mockProvider, + contentGeneratorConfig, + errorHandler: mockErrorHandler, + }; + + return { pipeline: new ContentGenerationPipeline(config), mockClient }; + } + + it('two concurrent streams keep their tool-call buffers isolated', async () => { + // Queue of pending stream factories — each call to the mocked + // chat.completions.create consumes one. + const streamQueue: Array< + () => AsyncIterable + > = []; + + streamQueue.push(() => + interleavingStream([ + () => openerChunk('call_A', 'read_file', '{"file_path":"/a'), + () => continuationChunk('/one.ts"}'), + () => finisherChunk(), + ]), + ); + streamQueue.push(() => + interleavingStream([ + () => openerChunk('call_B', 'read_file', '{"file_path":"/b'), + () => continuationChunk('/two.ts"}'), + () => finisherChunk(), + ]), + ); + + const { pipeline } = buildPipeline(() => { + const next = streamQueue.shift(); + if (!next) throw new Error('unexpected extra stream request'); + return next(); + }); + + const request: GenerateContentParameters = { + model: 'test-model', + contents: [{ role: 'user', parts: [{ text: 'read the files' }] }], + }; + + // Kick off both streams *before* consuming either, so the two generators + // are actually alive on the event loop at the same time. + const [streamA, streamB] = await Promise.all([ + pipeline.executeStream(request, 'prompt-a'), + pipeline.executeStream(request, 'prompt-b'), + ]); + + // Interleaved consumption: alternate one chunk from each to maximize + // parser state overlap. + const collectedA: unknown[] = []; + const collectedB: unknown[] = []; + + const aIter = streamA[Symbol.asyncIterator](); + const bIter = streamB[Symbol.asyncIterator](); + + while (true) { + const [aNext, bNext] = await Promise.all([aIter.next(), bIter.next()]); + if (!aNext.done) collectedA.push(aNext.value); + if (!bNext.done) collectedB.push(bNext.value); + if (aNext.done && bNext.done) break; + } + + const extractFunctionCall = (responses: unknown[]) => { + for (const resp of responses) { + const candidates = ( + resp as { candidates?: Array<{ content?: { parts?: Part[] } }> } + ).candidates; + const parts = candidates?.[0]?.content?.parts ?? []; + const fc = parts.find((p) => p.functionCall)?.functionCall; + if (fc) return fc; + } + return undefined; + }; + + const fnA = extractFunctionCall(collectedA); + const fnB = extractFunctionCall(collectedB); + + // Pre-fix behaviour: at least one of these would either be undefined + // (buffer wiped by the other stream's reset) or carry the wrong args + // (other stream's chunks merged into this bucket). + expect(fnA?.name).toBe('read_file'); + expect(fnA?.id).toBe('call_A'); + expect(fnA?.args).toEqual({ file_path: '/a/one.ts' }); + + expect(fnB?.name).toBe('read_file'); + expect(fnB?.id).toBe('call_B'); + expect(fnB?.args).toEqual({ file_path: '/b/two.ts' }); + }); + + it('an error in one stream does not poison a concurrent stream (no shared reset on error)', async () => { + // Stream A: normal tool call. + // Stream B: yields an `error_finish` chunk mid-flight, which the + // Pipeline wraps as StreamContentError. + // Pre-fix: the error path ran `resetStreamingToolCalls()` on the shared + // converter, wiping A's partial buffers. Post-fix: streamCtx is local + // to each generator, so A is untouched. + const streamQueue: Array< + () => AsyncIterable + > = []; + + streamQueue.push(() => + interleavingStream([ + () => openerChunk('call_A', 'read_file', '{"file_path":"/x'), + () => continuationChunk('.ts"}'), + () => finisherChunk(), + ]), + ); + + streamQueue.push(() => + interleavingStream([ + () => openerChunk('call_B', 'read_file', '{"file_path":"/y'), + // Inject an error_finish chunk — this triggers StreamContentError + // inside processStreamWithLogging's catch block. + () => + ({ + id: 'err', + object: 'chat.completion.chunk', + created: 1, + model: 'test', + choices: [ + { + index: 0, + delta: { content: 'rate limit' }, + finish_reason: 'error_finish', + logprobs: null, + }, + ], + }) as unknown as OpenAI.Chat.ChatCompletionChunk, + ]), + ); + + const { pipeline } = buildPipeline(() => { + const next = streamQueue.shift(); + if (!next) throw new Error('unexpected extra stream request'); + return next(); + }); + + const request: GenerateContentParameters = { + model: 'test-model', + contents: [{ role: 'user', parts: [{ text: 'read the files' }] }], + }; + + const [streamA, streamB] = await Promise.all([ + pipeline.executeStream(request, 'prompt-a'), + pipeline.executeStream(request, 'prompt-b'), + ]); + + const consumeA = (async () => { + const out: unknown[] = []; + for await (const r of streamA) out.push(r); + return out; + })(); + const consumeB = (async () => { + try { + for await (const _ of streamB) { + /* drain */ + } + return 'completed'; + } catch (e) { + return e instanceof Error ? e.message : String(e); + } + })(); + + const [aResults, bOutcome] = await Promise.all([consumeA, consumeB]); + + // Stream B blew up as expected. + expect(typeof bOutcome).toBe('string'); + expect(bOutcome).toContain('rate limit'); + + // Stream A still emitted its function call cleanly, despite B's error + // path running concurrently. On pre-fix code the error path would have + // called converter.resetStreamingToolCalls(), wiping A's in-flight + // buffer and causing A to emit zero function calls. + const fnA = (() => { + for (const resp of aResults) { + const parts = + (resp as { candidates?: Array<{ content?: { parts?: Part[] } }> }) + .candidates?.[0]?.content?.parts ?? []; + const fc = parts.find((p) => p.functionCall)?.functionCall; + if (fc) return fc; + } + return undefined; + })(); + + expect(fnA?.name).toBe('read_file'); + expect(fnA?.id).toBe('call_A'); + expect(fnA?.args).toEqual({ file_path: '/x.ts' }); + }); +}); diff --git a/packages/core/src/core/openaiContentGenerator/pipeline.test.ts b/packages/core/src/core/openaiContentGenerator/pipeline.test.ts index 442cd1b36..235d23c51 100644 --- a/packages/core/src/core/openaiContentGenerator/pipeline.test.ts +++ b/packages/core/src/core/openaiContentGenerator/pipeline.test.ts @@ -12,6 +12,7 @@ import { GenerateContentResponse, Type, FinishReason } from '@google/genai'; import type { PipelineConfig } from './pipeline.js'; import { ContentGenerationPipeline, StreamContentError } from './pipeline.js'; import { OpenAIContentConverter } from './converter.js'; +import { StreamingToolCallParser } from './streamingToolCallParser.js'; import type { Config } from '../../config/config.js'; import type { ContentGeneratorConfig, AuthType } from '../contentGenerator.js'; import type { OpenAICompatibleProvider } from './provider/index.js'; @@ -44,7 +45,8 @@ describe('ContentGenerationPipeline', () => { }, } as unknown as OpenAI; - // Mock converter + // Mock converter. `createStreamContext` returns a fresh parser each + // stream; tests that don't care about tool-call buffers just ignore it. mockConverter = { setModel: vi.fn(), setModalities: vi.fn(), @@ -52,7 +54,9 @@ describe('ContentGenerationPipeline', () => { convertOpenAIResponseToGemini: vi.fn(), convertOpenAIChunkToGemini: vi.fn(), convertGeminiToolsToOpenAI: vi.fn(), - resetStreamingToolCalls: vi.fn(), + createStreamContext: vi.fn(() => ({ + toolCallParser: new StreamingToolCallParser(), + })), } as unknown as OpenAIContentConverter; // Mock provider @@ -607,7 +611,9 @@ describe('ContentGenerationPipeline', () => { expect(results).toHaveLength(2); expect(results[0]).toBe(mockGeminiResponse1); expect(results[1]).toBe(mockGeminiResponse2); - expect(mockConverter.resetStreamingToolCalls).toHaveBeenCalled(); + // Parser is now created per-stream via createStreamContext — assert + // that the pipeline asked for a fresh one at stream entry. + expect(mockConverter.createStreamContext).toHaveBeenCalled(); expect(mockClient.chat.completions.create).toHaveBeenCalledWith( expect.objectContaining({ stream: true, @@ -719,7 +725,10 @@ describe('ContentGenerationPipeline', () => { } expect(results).toHaveLength(0); // No results due to error - expect(mockConverter.resetStreamingToolCalls).toHaveBeenCalledTimes(2); // Once at start, once on error + // createStreamContext is called exactly once at stream entry; the + // error path no longer needs an explicit parser reset because the + // stream-local context is discarded when the generator unwinds. + expect(mockConverter.createStreamContext).toHaveBeenCalledTimes(1); expect(mockErrorHandler.handle).toHaveBeenCalledWith( testError, expect.any(Object), diff --git a/packages/core/src/core/openaiContentGenerator/pipeline.ts b/packages/core/src/core/openaiContentGenerator/pipeline.ts index e359b8d49..77ff96f2f 100644 --- a/packages/core/src/core/openaiContentGenerator/pipeline.ts +++ b/packages/core/src/core/openaiContentGenerator/pipeline.ts @@ -143,8 +143,12 @@ export class ContentGenerationPipeline { ): AsyncGenerator { const collectedGeminiResponses: GenerateContentResponse[] = []; - // Reset streaming tool calls to prevent data pollution from previous streams - this.converter.resetStreamingToolCalls(); + // Stream-local parser state. Previously the tool-call parser lived on + // the Converter singleton and was reset at stream start — but that + // wiped concurrent streams' in-flight buffers (e.g. parallel subagents + // sharing the same Config.contentGenerator). Scoping it per-stream + // fixes issue #3516. + const streamCtx = this.converter.createStreamContext(); // State for handling chunk merging. // pendingFinishResponse holds a finish chunk waiting to be merged with @@ -170,7 +174,10 @@ export class ContentGenerationPipeline { throw new StreamContentError(errorContent); } - const response = this.converter.convertOpenAIChunkToGemini(chunk); + const response = this.converter.convertOpenAIChunkToGemini( + chunk, + streamCtx, + ); // Stage 2b: Filter empty responses to avoid downstream issues if ( @@ -234,8 +241,8 @@ export class ContentGenerationPipeline { // Stage 2e: Stream completed successfully context.duration = Date.now() - context.startTime; } catch (error) { - // Clear streaming tool calls on error to prevent data pollution - this.converter.resetStreamingToolCalls(); + // No manual parser cleanup needed — streamCtx is stream-local and + // becomes eligible for garbage collection once this generator unwinds. // Re-throw StreamContentError directly so it can be handled by // the caller's retry logic (e.g., TPM throttling retry in sendMessageStream)