From 53293e4d85a7f2218c973d11eafb019a8b2c4e0a Mon Sep 17 00:00:00 2001 From: tanzhenxin Date: Fri, 24 Apr 2026 12:28:03 +0800 Subject: [PATCH] refactor(core): make OpenAI converter stateless (follow-up to #3525) (#3550) * refactor(core): make OpenAI converter stateless to prevent shared-state races Follow-up to #3525. #3516 showed that OpenAIContentConverter's long-lived per-pipeline state raced between concurrent streams; #3525 scoped the streaming tool-call parser, this removes the remaining shared state. - OpenAIContentConverter is now a module of stand-alone functions; the exported symbol is a namespace object preserved for call-site compatibility. - New RequestContext (in types.ts, alongside PipelineConfig and ErrorHandler) carries model, modalities, startTime, and an optional per-stream toolCallParser. The pipeline builds one per request and threads it through every conversion call. - errorHandler drops duration/isStreaming; duration is recomputed from startTime at error time and troubleshooting text is uniform. - convertOpenAIChunkToGemini now throws if toolCallParser is missing so future misuse surfaces loudly instead of silently constructing a one-shot parser per chunk. * test(core): align timeout expectations --- .../__tests__/openaiTimeoutHandling.test.ts | 12 +- .../loggingContentGenerator.test.ts | 36 +- .../loggingContentGenerator.ts | 39 +- .../openaiContentGenerator/converter.test.ts | 429 ++- .../core/openaiContentGenerator/converter.ts | 2392 ++++++++--------- .../errorHandler.test.ts | 50 +- .../openaiContentGenerator/errorHandler.ts | 55 +- .../src/core/openaiContentGenerator/index.ts | 5 +- .../openaiContentGenerator.ts | 2 +- .../pipeline.concurrent.test.ts | 8 +- .../openaiContentGenerator/pipeline.test.ts | 103 +- .../core/openaiContentGenerator/pipeline.ts | 99 +- .../src/core/openaiContentGenerator/types.ts | 40 + 13 files changed, 1684 insertions(+), 1586 deletions(-) create mode 100644 packages/core/src/core/openaiContentGenerator/types.ts diff --git a/packages/core/src/core/__tests__/openaiTimeoutHandling.test.ts b/packages/core/src/core/__tests__/openaiTimeoutHandling.test.ts index 62e3e39ed..86995d622 100644 --- a/packages/core/src/core/__tests__/openaiTimeoutHandling.test.ts +++ b/packages/core/src/core/__tests__/openaiTimeoutHandling.test.ts @@ -215,13 +215,12 @@ describe('OpenAIContentGenerator Timeout Handling', () => { expect(errorMessage).toContain('Reduce input length or complexity'); expect(errorMessage).toContain('Increase timeout in config'); expect(errorMessage).toContain('Check network connectivity'); - expect(errorMessage).toContain('Consider using streaming mode'); } }); }); describe('generateContentStream timeout handling', () => { - it('should handle streaming timeout errors', async () => { + it('should handle streaming timeout errors with the shared timeout message', async () => { const timeoutError = new Error('Streaming timeout'); mockOpenAIClient.chat.completions.create.mockRejectedValue(timeoutError); @@ -233,11 +232,11 @@ describe('OpenAIContentGenerator Timeout Handling', () => { await expect( generator.generateContentStream(request, 'test-prompt-id'), ).rejects.toThrow( - /Streaming request timeout after \d+s\. Try reducing input length or increasing timeout in config\./, + /Request timeout after \d+s\. Try reducing input length or increasing timeout in config\./, ); }); - it('should include streaming-specific troubleshooting tips', async () => { + it('should include the shared troubleshooting tips for streaming timeouts', async () => { const timeoutError = new Error('request timed out'); mockOpenAIClient.chat.completions.create.mockRejectedValue(timeoutError); @@ -251,9 +250,10 @@ describe('OpenAIContentGenerator Timeout Handling', () => { } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : String(error); - expect(errorMessage).toContain('Streaming timeout troubleshooting:'); + expect(errorMessage).toContain('Troubleshooting tips:'); + expect(errorMessage).toContain('Reduce input length or complexity'); + expect(errorMessage).toContain('Increase timeout in config'); expect(errorMessage).toContain('Check network connectivity'); - expect(errorMessage).toContain('Consider using non-streaming mode'); } }); }); diff --git a/packages/core/src/core/loggingContentGenerator/loggingContentGenerator.test.ts b/packages/core/src/core/loggingContentGenerator/loggingContentGenerator.test.ts index 624f04f3b..bd1bea8b7 100644 --- a/packages/core/src/core/loggingContentGenerator/loggingContentGenerator.test.ts +++ b/packages/core/src/core/loggingContentGenerator/loggingContentGenerator.test.ts @@ -41,15 +41,15 @@ vi.mock('../../utils/openaiLogger.js', () => ({ })); const realConvertGeminiRequestToOpenAI = - OpenAIContentConverter.prototype.convertGeminiRequestToOpenAI; + OpenAIContentConverter.convertGeminiRequestToOpenAI; const convertGeminiRequestToOpenAISpy = vi - .spyOn(OpenAIContentConverter.prototype, 'convertGeminiRequestToOpenAI') + .spyOn(OpenAIContentConverter, 'convertGeminiRequestToOpenAI') .mockReturnValue([{ role: 'user', content: 'converted' }]); const convertGeminiToolsToOpenAISpy = vi - .spyOn(OpenAIContentConverter.prototype, 'convertGeminiToolsToOpenAI') + .spyOn(OpenAIContentConverter, 'convertGeminiToolsToOpenAI') .mockResolvedValue([{ type: 'function', function: { name: 'tool' } }]); const convertGeminiResponseToOpenAISpy = vi - .spyOn(OpenAIContentConverter.prototype, 'convertGeminiResponseToOpenAI') + .spyOn(OpenAIContentConverter, 'convertGeminiResponseToOpenAI') .mockReturnValue({ id: 'openai-response', object: 'chat.completion', @@ -57,10 +57,6 @@ const convertGeminiResponseToOpenAISpy = vi model: 'test-model', choices: [], } as OpenAI.Chat.ChatCompletion); -const setModalitiesSpy = vi.spyOn( - OpenAIContentConverter.prototype, - 'setModalities', -); const createConfig = (overrides: Record = {}): Config => { const configContent = { @@ -121,7 +117,6 @@ describe('LoggingContentGenerator', () => { convertGeminiRequestToOpenAISpy.mockClear(); convertGeminiToolsToOpenAISpy.mockClear(); convertGeminiResponseToOpenAISpy.mockClear(); - setModalitiesSpy.mockClear(); }); it('logs request/response, normalizes thought parts, and logs OpenAI interaction', async () => { @@ -409,13 +404,13 @@ describe('LoggingContentGenerator', () => { }); it('uses generator modalities when converting logged OpenAI requests', async () => { - convertGeminiRequestToOpenAISpy.mockImplementationOnce(function ( - this: OpenAIContentConverter, - request, - options, - ) { - return realConvertGeminiRequestToOpenAI.call(this, request, options); - }); + convertGeminiRequestToOpenAISpy.mockImplementationOnce( + (request, requestContext, options) => realConvertGeminiRequestToOpenAI( + request, + requestContext, + options, + ), + ); const wrapped = createWrappedGenerator( vi @@ -458,7 +453,14 @@ describe('LoggingContentGenerator', () => { await generator.generateContent(request, 'prompt-5'); - expect(setModalitiesSpy).toHaveBeenCalledWith({ image: true }); + expect(convertGeminiRequestToOpenAISpy).toHaveBeenCalledWith( + request, + expect.objectContaining({ + model: 'test-model', + modalities: { image: true }, + }), + { cleanOrphanToolCalls: false }, + ); const openaiLoggerInstance = vi.mocked(OpenAILogger).mock.results[0] ?.value as { logInteraction: ReturnType }; diff --git a/packages/core/src/core/loggingContentGenerator/loggingContentGenerator.ts b/packages/core/src/core/loggingContentGenerator/loggingContentGenerator.ts index d6e2db4b9..4527b760c 100644 --- a/packages/core/src/core/loggingContentGenerator/loggingContentGenerator.ts +++ b/packages/core/src/core/loggingContentGenerator/loggingContentGenerator.ts @@ -39,6 +39,7 @@ import type { InputModalities, } from '../contentGenerator.js'; import { OpenAIContentConverter } from '../openaiContentGenerator/converter.js'; +import type { RequestContext } from '../openaiContentGenerator/types.js'; import { OpenAILogger } from '../../utils/openaiLogger.js'; import { getErrorMessage, @@ -295,14 +296,14 @@ export class LoggingContentGenerator implements ContentGenerator { return undefined; } - const converter = new OpenAIContentConverter( - request.model, - this.schemaCompliance, + const requestContext = this.createLoggingRequestContext(request.model); + const messages = OpenAIContentConverter.convertGeminiRequestToOpenAI( + request, + requestContext, + { + cleanOrphanToolCalls: false, + }, ); - converter.setModalities(this.modalities ?? {}); - const messages = converter.convertGeminiRequestToOpenAI(request, { - cleanOrphanToolCalls: false, - }); const openaiRequest: OpenAI.Chat.ChatCompletionCreateParams = { model: request.model, @@ -310,9 +311,11 @@ export class LoggingContentGenerator implements ContentGenerator { }; if (request.config?.tools) { - openaiRequest.tools = await converter.convertGeminiToolsToOpenAI( - request.config.tools, - ); + openaiRequest.tools = + await OpenAIContentConverter.convertGeminiToolsToOpenAI( + request.config.tools, + this.schemaCompliance ?? 'auto', + ); } if (request.config?.temperature !== undefined) { @@ -334,6 +337,14 @@ export class LoggingContentGenerator implements ContentGenerator { return openaiRequest; } + private createLoggingRequestContext(model: string): RequestContext { + return { + model, + modalities: this.modalities ?? {}, + startTime: 0, + }; + } + private async logOpenAIInteraction( openaiRequest: OpenAI.Chat.ChatCompletionCreateParams | undefined, response?: GenerateContentResponse, @@ -362,12 +373,10 @@ export class LoggingContentGenerator implements ContentGenerator { response: GenerateContentResponse, openaiRequest: OpenAI.Chat.ChatCompletionCreateParams, ): OpenAI.Chat.ChatCompletion { - const converter = new OpenAIContentConverter( - openaiRequest.model, - this.schemaCompliance, + return OpenAIContentConverter.convertGeminiResponseToOpenAI( + response, + this.createLoggingRequestContext(openaiRequest.model), ); - - return converter.convertGeminiResponseToOpenAI(response); } private consolidateGeminiResponsesForLogging( diff --git a/packages/core/src/core/openaiContentGenerator/converter.test.ts b/packages/core/src/core/openaiContentGenerator/converter.test.ts index 09b49e412..f9bc0666f 100644 --- a/packages/core/src/core/openaiContentGenerator/converter.test.ts +++ b/packages/core/src/core/openaiContentGenerator/converter.test.ts @@ -7,6 +7,7 @@ import { describe, it, expect, beforeEach } from 'vitest'; import { OpenAIContentConverter } from './converter.js'; import { StreamingToolCallParser } from './streamingToolCallParser.js'; +import type { RequestContext } from './types.js'; import { Type, FinishReason, @@ -20,25 +21,40 @@ import type OpenAI from 'openai'; import { convertToFunctionResponse } from '../coreToolScheduler.js'; describe('OpenAIContentConverter', () => { - let converter: OpenAIContentConverter; + let converter: typeof OpenAIContentConverter; + let requestContext: RequestContext; beforeEach(() => { - converter = new OpenAIContentConverter('test-model', 'auto', { - image: true, - pdf: true, - audio: true, - video: true, - }); + converter = OpenAIContentConverter; + requestContext = { + model: 'test-model', + modalities: { + image: true, + pdf: true, + audio: true, + video: true, + }, + startTime: 0, + }; }); - describe('createStreamContext', () => { - it('returns a fresh context with its own StreamingToolCallParser', () => { - const ctx1 = converter.createStreamContext(); - const ctx2 = converter.createStreamContext(); + function withStreamParser( + toolCallParser: StreamingToolCallParser = new StreamingToolCallParser(), + ): RequestContext { + return { + ...requestContext, + toolCallParser, + }; + } - expect(ctx1.toolCallParser).toBeInstanceOf(StreamingToolCallParser); - expect(ctx2.toolCallParser).toBeInstanceOf(StreamingToolCallParser); - expect(ctx1.toolCallParser).not.toBe(ctx2.toolCallParser); + describe('stream-local parser state', () => { + it('creates fresh parser instances', () => { + const ctx1 = new StreamingToolCallParser(); + const ctx2 = new StreamingToolCallParser(); + + expect(ctx1).toBeInstanceOf(StreamingToolCallParser); + expect(ctx2).toBeInstanceOf(StreamingToolCallParser); + expect(ctx1).not.toBe(ctx2); }); it('isolates two contexts so writes to one do not leak into the other', () => { @@ -46,16 +62,16 @@ describe('OpenAIContentConverter', () => { // Converter as an instance field, so two concurrent streams sharing // the same Config.contentGenerator would overwrite each other's // tool-call buffers. Per-stream contexts eliminate that contention. - const ctx1 = converter.createStreamContext(); - const ctx2 = converter.createStreamContext(); + const ctx1 = new StreamingToolCallParser(); + const ctx2 = new StreamingToolCallParser(); - ctx1.toolCallParser.addChunk(0, '{"a":1}', 'call_A', 'fn_A'); - ctx2.toolCallParser.addChunk(0, '{"b":2}', 'call_B', 'fn_B'); + ctx1.addChunk(0, '{"a":1}', 'call_A', 'fn_A'); + ctx2.addChunk(0, '{"b":2}', 'call_B', 'fn_B'); - expect(ctx1.toolCallParser.getBuffer(0)).toBe('{"a":1}'); - expect(ctx2.toolCallParser.getBuffer(0)).toBe('{"b":2}'); - expect(ctx1.toolCallParser.getToolCallMeta(0).id).toBe('call_A'); - expect(ctx2.toolCallParser.getToolCallMeta(0).id).toBe('call_B'); + expect(ctx1.getBuffer(0)).toBe('{"a":1}'); + expect(ctx2.getBuffer(0)).toBe('{"b":2}'); + expect(ctx1.getToolCallMeta(0).id).toBe('call_A'); + expect(ctx2.getToolCallMeta(0).id).toBe('call_B'); }); it('demuxes interleaved chunks from two concurrent streams correctly (#3516)', () => { @@ -64,8 +80,8 @@ describe('OpenAIContentConverter', () => { // interleaved at the event loop. Under the pre-fix architecture // this corrupted both tool calls; under per-stream contexts each // stream's chunks stay in their own parser and close cleanly. - const streamA = converter.createStreamContext(); - const streamB = converter.createStreamContext(); + const streamA = withStreamParser(new StreamingToolCallParser()); + const streamB = withStreamParser(new StreamingToolCallParser()); const openerA = { object: 'chat.completion.chunk', @@ -239,7 +255,10 @@ describe('OpenAIContentConverter', () => { output: 'Raw output text', }); - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); const toolMessage = messages.find((message) => message.role === 'tool'); expect(toolMessage).toBeDefined(); @@ -257,7 +276,10 @@ describe('OpenAIContentConverter', () => { error: 'Command failed', }); - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); const toolMessage = messages.find((message) => message.role === 'tool'); expect(toolMessage).toBeDefined(); @@ -275,7 +297,10 @@ describe('OpenAIContentConverter', () => { data: { value: 42 }, }); - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); const toolMessage = messages.find((message) => message.role === 'tool'); expect(toolMessage).toBeDefined(); @@ -327,7 +352,10 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); // Should have tool message with both text and image content const toolMessage = messages.find((message) => message.role === 'tool'); @@ -393,7 +421,10 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); // Should have tool message with both text and image content const toolMessage = messages.find((message) => message.role === 'tool'); @@ -455,7 +486,10 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); // Should have tool message with both text and file content const toolMessage = messages.find((message) => message.role === 'tool'); @@ -519,7 +553,10 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); // Should have tool message with both text and audio content const toolMessage = messages.find((message) => message.role === 'tool'); @@ -585,7 +622,10 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); const toolMessage = messages.find((message) => message.role === 'tool'); expect(toolMessage).toBeDefined(); @@ -645,7 +685,10 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); const toolMessage = messages.find((message) => message.role === 'tool'); expect(toolMessage).toBeDefined(); @@ -705,7 +748,10 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); // Should have tool message with both text and video content const toolMessage = messages.find((message) => message.role === 'tool'); @@ -769,7 +815,10 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); const toolMessage = messages.find((message) => message.role === 'tool'); expect(toolMessage).toBeDefined(); @@ -828,7 +877,10 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); const toolMessage = messages.find((message) => message.role === 'tool'); expect(toolMessage).toBeDefined(); @@ -886,7 +938,10 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); const toolMessage = messages.find((message) => message.role === 'tool'); expect(toolMessage).toBeDefined(); @@ -909,7 +964,10 @@ describe('OpenAIContentConverter', () => { output: 'Plain text output', }); - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); const toolMessage = messages.find((message) => message.role === 'tool'); expect(toolMessage).toBeDefined(); @@ -961,7 +1019,10 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); // Should create an assistant message with tool call and a tool message with empty content // This is required because OpenAI API expects every tool call to have a corresponding response @@ -1005,7 +1066,10 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); const assistantMsg = messages.find((m) => m.role === 'assistant'); expect(assistantMsg).toBeDefined(); @@ -1050,7 +1114,10 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); const assistantMsg = messages.find((m) => m.role === 'assistant'); expect(assistantMsg).toBeDefined(); @@ -1073,7 +1140,10 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); const assistantMsg = messages.find((m) => m.role === 'assistant'); expect(assistantMsg).toBeDefined(); @@ -1136,7 +1206,10 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); const toolMessage = messages.find((m) => m.role === 'tool'); expect(toolMessage).toBeDefined(); @@ -1204,7 +1277,10 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); const toolMessage = messages.find((m) => m.role === 'tool'); expect(toolMessage).toBeDefined(); @@ -1234,13 +1310,16 @@ describe('OpenAIContentConverter', () => { describe('convertOpenAIResponseToGemini', () => { it('should handle empty choices array without crashing', () => { - const response = converter.convertOpenAIResponseToGemini({ - object: 'chat.completion', - id: 'chatcmpl-empty', - created: 123, - model: 'test-model', - choices: [], - } as unknown as OpenAI.Chat.ChatCompletion); + const response = converter.convertOpenAIResponseToGemini( + { + object: 'chat.completion', + id: 'chatcmpl-empty', + created: 123, + model: 'test-model', + choices: [], + } as unknown as OpenAI.Chat.ChatCompletion, + requestContext, + ); expect(response.candidates).toEqual([]); }); @@ -1248,24 +1327,27 @@ describe('OpenAIContentConverter', () => { describe('OpenAI -> Gemini reasoning content', () => { it('should convert reasoning_content to a thought part for non-streaming responses', () => { - const response = converter.convertOpenAIResponseToGemini({ - object: 'chat.completion', - id: 'chatcmpl-1', - created: 123, - model: 'gpt-test', - choices: [ - { - index: 0, - message: { - role: 'assistant', - content: 'final answer', - reasoning_content: 'chain-of-thought', + const response = converter.convertOpenAIResponseToGemini( + { + object: 'chat.completion', + id: 'chatcmpl-1', + created: 123, + model: 'gpt-test', + choices: [ + { + index: 0, + message: { + role: 'assistant', + content: 'final answer', + reasoning_content: 'chain-of-thought', + }, + finish_reason: 'stop', + logprobs: null, }, - finish_reason: 'stop', - logprobs: null, - }, - ], - } as unknown as OpenAI.Chat.ChatCompletion); + ], + } as unknown as OpenAI.Chat.ChatCompletion, + requestContext, + ); const parts = response.candidates?.[0]?.content?.parts; expect(parts?.[0]).toEqual( @@ -1277,24 +1359,27 @@ describe('OpenAIContentConverter', () => { }); it('should convert reasoning to a thought part for non-streaming responses', () => { - const response = converter.convertOpenAIResponseToGemini({ - object: 'chat.completion', - id: 'chatcmpl-2', - created: 123, - model: 'gpt-test', - choices: [ - { - index: 0, - message: { - role: 'assistant', - content: 'final answer', - reasoning: 'chain-of-thought', + const response = converter.convertOpenAIResponseToGemini( + { + object: 'chat.completion', + id: 'chatcmpl-2', + created: 123, + model: 'gpt-test', + choices: [ + { + index: 0, + message: { + role: 'assistant', + content: 'final answer', + reasoning: 'chain-of-thought', + }, + finish_reason: 'stop', + logprobs: null, }, - finish_reason: 'stop', - logprobs: null, - }, - ], - } as unknown as OpenAI.Chat.ChatCompletion); + ], + } as unknown as OpenAI.Chat.ChatCompletion, + requestContext, + ); const parts = response.candidates?.[0]?.content?.parts; expect(parts?.[0]).toEqual( @@ -1324,7 +1409,7 @@ describe('OpenAIContentConverter', () => { ], model: 'gpt-test', } as unknown as OpenAI.Chat.ChatCompletionChunk, - converter.createStreamContext(), + withStreamParser(new StreamingToolCallParser()), ); const parts = chunk.candidates?.[0]?.content?.parts; @@ -1355,7 +1440,7 @@ describe('OpenAIContentConverter', () => { ], model: 'gpt-test', } as unknown as OpenAI.Chat.ChatCompletionChunk, - converter.createStreamContext(), + withStreamParser(new StreamingToolCallParser()), ); const parts = chunk.candidates?.[0]?.content?.parts; @@ -1384,7 +1469,7 @@ describe('OpenAIContentConverter', () => { ], model: 'gpt-test', } as unknown as OpenAI.Chat.ChatCompletionChunk, - converter.createStreamContext(), + withStreamParser(new StreamingToolCallParser()), ); const parts = chunk.candidates?.[0]?.content?.parts; @@ -1742,7 +1827,10 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); expect(messages).toHaveLength(1); expect(messages[0].role).toBe('assistant'); @@ -1768,7 +1856,10 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); expect(messages).toHaveLength(1); expect(messages[0].role).toBe('assistant'); @@ -1830,9 +1921,13 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request, { - cleanOrphanToolCalls: false, - }); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + { + cleanOrphanToolCalls: false, + }, + ); // Should have: assistant (tool_call_1), tool (result_1), assistant (tool_call_2), tool (result_2) expect(messages).toHaveLength(4); @@ -1861,7 +1956,10 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); expect(messages).toHaveLength(3); expect(messages[0].role).toBe('assistant'); @@ -1884,7 +1982,10 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); expect(messages).toHaveLength(1); expect(messages[0].content).toBe('Text partAnother text'); @@ -1909,7 +2010,10 @@ describe('OpenAIContentConverter', () => { ], }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); // Empty messages should be filtered out expect(messages).toHaveLength(1); @@ -1928,15 +2032,21 @@ describe('MCP tool result end-to-end through OpenAI converter (issue #1520)', () * Verifies that multi-part MCP tool results are properly carried through * into the OpenAI tool message, with no content leaking into user messages. */ - let converter: OpenAIContentConverter; + let converter: typeof OpenAIContentConverter; + let requestContext: RequestContext; beforeEach(() => { - converter = new OpenAIContentConverter('test-model', 'auto', { - image: true, - pdf: true, - audio: true, - video: true, - }); + converter = OpenAIContentConverter; + requestContext = { + model: 'test-model', + modalities: { + image: true, + pdf: true, + audio: true, + video: true, + }, + startTime: 0, + }; }); it('should preserve MCP multi-text content in tool message (not leak to user message)', () => { @@ -1983,7 +2093,10 @@ describe('MCP tool result end-to-end through OpenAI converter (issue #1520)', () model: 'models/test', contents, }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); const toolMessages = messages.filter((m) => m.role === 'tool'); const userMessages = messages.filter((m) => m.role === 'user'); @@ -2053,7 +2166,10 @@ describe('MCP tool result end-to-end through OpenAI converter (issue #1520)', () model: 'models/test', contents, }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); const toolMessages = messages.filter((m) => m.role === 'tool'); const userMessages = messages.filter((m) => m.role === 'user'); @@ -2115,7 +2231,10 @@ describe('MCP tool result end-to-end through OpenAI converter (issue #1520)', () model: 'models/test', contents, }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); const toolMessages = messages.filter((m) => m.role === 'tool'); const userMessages = messages.filter((m) => m.role === 'user'); @@ -2180,7 +2299,10 @@ describe('MCP tool result end-to-end through OpenAI converter (issue #1520)', () model: 'models/test', contents, }; - const messages = converter.convertGeminiRequestToOpenAI(request); + const messages = converter.convertGeminiRequestToOpenAI( + request, + requestContext, + ); const toolMessages = messages.filter((m) => m.role === 'tool'); const userMessages = messages.filter((m) => m.role === 'user'); @@ -2211,18 +2333,27 @@ describe('MCP tool result end-to-end through OpenAI converter (issue #1520)', () }); describe('Truncated tool call detection in streaming', () => { - let converter: OpenAIContentConverter; + let converter: typeof OpenAIContentConverter; beforeEach(() => { - converter = new OpenAIContentConverter('test-model'); + converter = OpenAIContentConverter; }); + function createStreamingRequestContext(model = 'test-model'): RequestContext { + return { + model, + modalities: {}, + startTime: 0, + toolCallParser: new StreamingToolCallParser(), + }; + } + /** * Helper: feed streaming chunks then a final chunk with finish_reason, * and return the Gemini response for the final chunk. */ function feedToolCallChunks( - conv: OpenAIContentConverter, + conv: typeof OpenAIContentConverter, toolCallChunks: Array<{ index: number; id?: string; @@ -2232,7 +2363,7 @@ describe('Truncated tool call detection in streaming', () => { finishReason: string, ) { // One stream-local context covers every chunk of this simulated stream. - const ctx = conv.createStreamContext(); + const ctx = createStreamingRequestContext(); // Feed argument chunks (no finish_reason yet) for (const tc of toolCallChunks) { @@ -2384,8 +2515,8 @@ describe('Truncated tool call detection in streaming', () => { it('should detect truncation with multi-chunk streaming arguments', () => { // Feed arguments in multiple small chunks like real streaming - const conv = new OpenAIContentConverter('test-model'); - const ctx = conv.createStreamContext(); + const conv = OpenAIContentConverter; + const ctx = createStreamingRequestContext(); // Chunk 1: start of JSON with tool metadata conv.convertOpenAIChunkToGemini( @@ -2472,6 +2603,17 @@ describe('modality filtering', () => { }; } + function makeRequestContext( + model: string, + modalities: RequestContext['modalities'], + ): RequestContext { + return { + model, + modalities, + startTime: 0, + }; + } + function getUserContentParts( messages: OpenAI.Chat.ChatCompletionMessageParam[], ): Array<{ type: string; text?: string }> { @@ -2487,14 +2629,17 @@ describe('modality filtering', () => { } it('replaces image with placeholder when image modality is disabled', () => { - const conv = new OpenAIContentConverter('deepseek-chat', 'auto', {}); + const conv = OpenAIContentConverter; const request = makeRequest([ { inlineData: { mimeType: 'image/png', data: 'abc123' }, displayName: 'screenshot.png', } as unknown as Part, ]); - const messages = conv.convertGeminiRequestToOpenAI(request); + const messages = conv.convertGeminiRequestToOpenAI( + request, + makeRequestContext('deepseek-chat', {}), + ); const parts = getUserContentParts(messages); expect(parts).toHaveLength(1); expect(parts[0].type).toBe('text'); @@ -2503,22 +2648,23 @@ describe('modality filtering', () => { }); it('keeps image when image modality is enabled', () => { - const conv = new OpenAIContentConverter('gpt-4o', 'auto', { image: true }); + const conv = OpenAIContentConverter; const request = makeRequest([ { inlineData: { mimeType: 'image/png', data: 'abc123' }, } as unknown as Part, ]); - const messages = conv.convertGeminiRequestToOpenAI(request); + const messages = conv.convertGeminiRequestToOpenAI( + request, + makeRequestContext('gpt-4o', { image: true }), + ); const parts = getUserContentParts(messages); expect(parts).toHaveLength(1); expect(parts[0].type).toBe('image_url'); }); it('replaces PDF with placeholder when pdf modality is disabled', () => { - const conv = new OpenAIContentConverter('test-model', 'auto', { - image: true, - }); + const conv = OpenAIContentConverter; const request = makeRequest([ { inlineData: { @@ -2528,7 +2674,10 @@ describe('modality filtering', () => { }, } as unknown as Part, ]); - const messages = conv.convertGeminiRequestToOpenAI(request); + const messages = conv.convertGeminiRequestToOpenAI( + request, + makeRequestContext('test-model', { image: true }), + ); const parts = getUserContentParts(messages); expect(parts).toHaveLength(1); expect(parts[0].type).toBe('text'); @@ -2537,10 +2686,7 @@ describe('modality filtering', () => { }); it('keeps PDF when pdf modality is enabled', () => { - const conv = new OpenAIContentConverter('claude-sonnet', 'auto', { - image: true, - pdf: true, - }); + const conv = OpenAIContentConverter; const request = makeRequest([ { inlineData: { @@ -2550,20 +2696,26 @@ describe('modality filtering', () => { }, } as unknown as Part, ]); - const messages = conv.convertGeminiRequestToOpenAI(request); + const messages = conv.convertGeminiRequestToOpenAI( + request, + makeRequestContext('claude-sonnet', { image: true, pdf: true }), + ); const parts = getUserContentParts(messages); expect(parts).toHaveLength(1); expect(parts[0].type).toBe('file'); }); it('replaces video with placeholder when video modality is disabled', () => { - const conv = new OpenAIContentConverter('test-model', 'auto', {}); + const conv = OpenAIContentConverter; const request = makeRequest([ { inlineData: { mimeType: 'video/mp4', data: 'vid-data' }, } as unknown as Part, ]); - const messages = conv.convertGeminiRequestToOpenAI(request); + const messages = conv.convertGeminiRequestToOpenAI( + request, + makeRequestContext('test-model', {}), + ); const parts = getUserContentParts(messages); expect(parts).toHaveLength(1); expect(parts[0].type).toBe('text'); @@ -2571,13 +2723,16 @@ describe('modality filtering', () => { }); it('replaces audio with placeholder when audio modality is disabled', () => { - const conv = new OpenAIContentConverter('test-model', 'auto', {}); + const conv = OpenAIContentConverter; const request = makeRequest([ { inlineData: { mimeType: 'audio/wav', data: 'audio-data' }, } as unknown as Part, ]); - const messages = conv.convertGeminiRequestToOpenAI(request); + const messages = conv.convertGeminiRequestToOpenAI( + request, + makeRequestContext('test-model', {}), + ); const parts = getUserContentParts(messages); expect(parts).toHaveLength(1); expect(parts[0].type).toBe('text'); @@ -2585,7 +2740,7 @@ describe('modality filtering', () => { }); it('handles mixed content: keeps text + supported media, replaces unsupported', () => { - const conv = new OpenAIContentConverter('gpt-4o', 'auto', { image: true }); + const conv = OpenAIContentConverter; const request = makeRequest([ { text: 'Analyze these files' }, { @@ -2595,7 +2750,10 @@ describe('modality filtering', () => { inlineData: { mimeType: 'video/mp4', data: 'vid-data' }, } as unknown as Part, ]); - const messages = conv.convertGeminiRequestToOpenAI(request); + const messages = conv.convertGeminiRequestToOpenAI( + request, + makeRequestContext('gpt-4o', { image: true }), + ); const parts = getUserContentParts(messages); expect(parts).toHaveLength(3); expect(parts[0].type).toBe('text'); @@ -2606,13 +2764,16 @@ describe('modality filtering', () => { }); it('defaults to text-only when no modalities are specified', () => { - const conv = new OpenAIContentConverter('unknown-model'); + const conv = OpenAIContentConverter; const request = makeRequest([ { inlineData: { mimeType: 'image/png', data: 'img-data' }, } as unknown as Part, ]); - const messages = conv.convertGeminiRequestToOpenAI(request); + const messages = conv.convertGeminiRequestToOpenAI( + request, + makeRequestContext('unknown-model', {}), + ); const parts = getUserContentParts(messages); expect(parts).toHaveLength(1); expect(parts[0].type).toBe('text'); diff --git a/packages/core/src/core/openaiContentGenerator/converter.ts b/packages/core/src/core/openaiContentGenerator/converter.ts index dcab2c449..035c7bd9a 100644 --- a/packages/core/src/core/openaiContentGenerator/converter.ts +++ b/packages/core/src/core/openaiContentGenerator/converter.ts @@ -21,8 +21,7 @@ import { GenerateContentResponse, FinishReason } from '@google/genai'; import type OpenAI from 'openai'; import { safeJsonParse } from '../../utils/safeJsonParse.js'; import { createDebugLogger } from '../../utils/debugLogger.js'; -import type { InputModalities } from '../contentGenerator.js'; -import { StreamingToolCallParser } from './streamingToolCallParser.js'; +import type { RequestContext } from './types.js'; import { convertSchema, type SchemaComplianceMode, @@ -91,1302 +90,1269 @@ type OpenAIContentPart = | OpenAIContentPartFile; /** - * Per-stream state for tool-call parsing. Created by - * `OpenAIContentConverter.createStreamContext()` at the start of each - * streaming response and passed into every `convertOpenAIChunkToGemini` - * call on that stream, so concurrent streams (parallel subagents, fork - * children, …) never share parser state. + * Convert Gemini tool parameters to OpenAI JSON Schema format. */ -export interface ConverterStreamContext { - toolCallParser: StreamingToolCallParser; -} - -/** - * Converter class for transforming data between Gemini and OpenAI formats - */ -export class OpenAIContentConverter { - private model: string; - private schemaCompliance: SchemaComplianceMode; - private modalities: InputModalities; - - constructor( - model: string, - schemaCompliance: SchemaComplianceMode = 'auto', - modalities: InputModalities = {}, - ) { - this.model = model; - this.schemaCompliance = schemaCompliance; - this.modalities = modalities; +export function convertGeminiToolParametersToOpenAI( + parameters: Record, +): Record | undefined { + if (!parameters || typeof parameters !== 'object') { + return parameters; } - /** - * Update the model used for response metadata (modelVersion/logging) and any - * model-specific conversion behavior. - */ - setModel(model: string): void { - this.model = model; - } + const converted = JSON.parse(JSON.stringify(parameters)); - /** - * Update the supported input modalities. - */ - setModalities(modalities: InputModalities): void { - this.modalities = modalities; - } - - /** - * Create fresh per-stream state for processing one OpenAI streaming - * response. The returned context is passed into every - * `convertOpenAIChunkToGemini` call for that stream, then discarded. - * - * Previously the tool-call parser lived on the Converter instance and - * was shared by every caller of the singleton `Config.contentGenerator`. - * Concurrent streams (e.g. two subagents running in parallel after - * PR #3463) raced on that shared state: each stream's stream-start - * `reset()` wiped the other's partial tool-call buffers, chunks from - * different streams landed at the same `index=0` bucket, and - * `getCompletedToolCalls()` returned interleaved corrupt JSON that - * surfaced upstream as `NO_RESPONSE_TEXT` (issue #3516). - */ - createStreamContext(): ConverterStreamContext { - return { toolCallParser: new StreamingToolCallParser() }; - } - - /** - * Convert Gemini tool parameters to OpenAI JSON Schema format - */ - convertGeminiToolParametersToOpenAI( - parameters: Record, - ): Record | undefined { - if (!parameters || typeof parameters !== 'object') { - return parameters; + const convertTypes = (obj: unknown): unknown => { + if (typeof obj !== 'object' || obj === null) { + return obj; } - const converted = JSON.parse(JSON.stringify(parameters)); + if (Array.isArray(obj)) { + return obj.map(convertTypes); + } - const convertTypes = (obj: unknown): unknown => { - if (typeof obj !== 'object' || obj === null) { - return obj; - } - - if (Array.isArray(obj)) { - return obj.map(convertTypes); - } - - const result: Record = {}; - for (const [key, value] of Object.entries(obj)) { - if (key === 'type' && typeof value === 'string') { - // Convert Gemini types to OpenAI JSON Schema types - const lowerValue = value.toLowerCase(); - if (lowerValue === 'integer') { - result[key] = 'integer'; - } else if (lowerValue === 'number') { - result[key] = 'number'; - } else { - result[key] = lowerValue; - } - } else if ( - key === 'minimum' || - key === 'maximum' || - key === 'multipleOf' - ) { - // Ensure numeric constraints are actual numbers, not strings - if (typeof value === 'string' && !isNaN(Number(value))) { - result[key] = Number(value); - } else { - result[key] = value; - } - } else if ( - key === 'minLength' || - key === 'maxLength' || - key === 'minItems' || - key === 'maxItems' - ) { - // Ensure length constraints are integers, not strings - if (typeof value === 'string' && !isNaN(Number(value))) { - result[key] = parseInt(value, 10); - } else { - result[key] = value; - } - } else if (typeof value === 'object') { - result[key] = convertTypes(value); + const result: Record = {}; + for (const [key, value] of Object.entries(obj)) { + if (key === 'type' && typeof value === 'string') { + // Convert Gemini types to OpenAI JSON Schema types + const lowerValue = value.toLowerCase(); + if (lowerValue === 'integer') { + result[key] = 'integer'; + } else if (lowerValue === 'number') { + result[key] = 'number'; + } else { + result[key] = lowerValue; + } + } else if ( + key === 'minimum' || + key === 'maximum' || + key === 'multipleOf' + ) { + // Ensure numeric constraints are actual numbers, not strings + if (typeof value === 'string' && !isNaN(Number(value))) { + result[key] = Number(value); } else { result[key] = value; } - } - return result; - }; - - return convertTypes(converted) as Record | undefined; - } - - /** - * Convert Gemini tools to OpenAI format for API compatibility. - * Handles both Gemini tools (using 'parameters' field) and MCP tools (using 'parametersJsonSchema' field). - */ - async convertGeminiToolsToOpenAI( - geminiTools: ToolListUnion, - ): Promise { - const openAITools: OpenAI.Chat.ChatCompletionTool[] = []; - - for (const tool of geminiTools) { - let actualTool: Tool; - - // Handle CallableTool vs Tool - if ('tool' in tool) { - // This is a CallableTool - actualTool = await (tool as CallableTool).tool(); - } else { - // This is already a Tool - actualTool = tool as Tool; - } - - if (actualTool.functionDeclarations) { - for (const func of actualTool.functionDeclarations) { - if (func.name && func.description) { - let parameters: Record | undefined; - - // Handle both Gemini tools (parameters) and MCP tools (parametersJsonSchema) - if (func.parametersJsonSchema) { - // MCP tool format - use parametersJsonSchema directly - // Create a shallow copy to avoid mutating the original object - const paramsCopy = { - ...(func.parametersJsonSchema as Record), - }; - parameters = paramsCopy; - } else if (func.parameters) { - // Gemini tool format - convert parameters to OpenAI format - parameters = this.convertGeminiToolParametersToOpenAI( - func.parameters as Record, - ); - } - - if (parameters) { - parameters = convertSchema(parameters, this.schemaCompliance); - } - - openAITools.push({ - type: 'function', - function: { - name: func.name, - description: func.description, - parameters, - }, - }); - } - } - } - } - - return openAITools; - } - - /** - * Convert Gemini request to OpenAI message format - */ - convertGeminiRequestToOpenAI( - request: GenerateContentParameters, - options: { cleanOrphanToolCalls: boolean } = { cleanOrphanToolCalls: true }, - ): OpenAI.Chat.ChatCompletionMessageParam[] { - let messages: OpenAI.Chat.ChatCompletionMessageParam[] = []; - - // Handle system instruction from config - this.addSystemInstructionMessage(request, messages); - - // Handle contents - this.processContents(request.contents, messages); - - // Clean up orphaned tool calls and merge consecutive assistant messages - if (options.cleanOrphanToolCalls) { - messages = this.cleanOrphanedToolCalls(messages); - } - messages = this.mergeConsecutiveAssistantMessages(messages); - - return messages; - } - - /** - * Convert Gemini response to OpenAI completion format (for logging). - */ - convertGeminiResponseToOpenAI( - response: GenerateContentResponse, - ): OpenAI.Chat.ChatCompletion { - const candidate = response.candidates?.[0]; - const parts = (candidate?.content?.parts || []) as Part[]; - - // Parse parts inline - const thoughtParts: string[] = []; - const contentParts: string[] = []; - const toolCalls: OpenAI.Chat.ChatCompletionMessageToolCall[] = []; - let toolCallIndex = 0; - - for (const part of parts) { - if (typeof part === 'string') { - contentParts.push(part); - } else if ('text' in part && part.text) { - if ('thought' in part && part.thought) { - thoughtParts.push(part.text); + } else if ( + key === 'minLength' || + key === 'maxLength' || + key === 'minItems' || + key === 'maxItems' + ) { + // Ensure length constraints are integers, not strings + if (typeof value === 'string' && !isNaN(Number(value))) { + result[key] = parseInt(value, 10); } else { - contentParts.push(part.text); + result[key] = value; } - } else if ('functionCall' in part && part.functionCall) { - toolCalls.push({ - id: part.functionCall.id || `call_${toolCallIndex}`, - type: 'function' as const, - function: { - name: part.functionCall.name || '', - arguments: JSON.stringify(part.functionCall.args || {}), - }, - }); - toolCallIndex += 1; + } else if (typeof value === 'object') { + result[key] = convertTypes(value); + } else { + result[key] = value; + } + } + return result; + }; + + return convertTypes(converted) as Record | undefined; +} + +/** + * Convert Gemini tools to OpenAI format for API compatibility. + * Handles both Gemini tools (using 'parameters' field) and MCP tools + * (using 'parametersJsonSchema' field). + */ +export async function convertGeminiToolsToOpenAI( + geminiTools: ToolListUnion, + schemaCompliance: SchemaComplianceMode = 'auto', +): Promise { + const openAITools: OpenAI.Chat.ChatCompletionTool[] = []; + + for (const tool of geminiTools) { + let actualTool: Tool; + + // Handle CallableTool vs Tool + if ('tool' in tool) { + // This is a CallableTool + actualTool = await (tool as CallableTool).tool(); + } else { + // This is already a Tool + actualTool = tool as Tool; + } + + if (actualTool.functionDeclarations) { + for (const func of actualTool.functionDeclarations) { + if (func.name && func.description) { + let parameters: Record | undefined; + + // Handle both Gemini tools (parameters) and MCP tools (parametersJsonSchema) + if (func.parametersJsonSchema) { + // MCP tool format - use parametersJsonSchema directly + // Create a shallow copy to avoid mutating the original object + const paramsCopy = { + ...(func.parametersJsonSchema as Record), + }; + parameters = paramsCopy; + } else if (func.parameters) { + // Gemini tool format - convert parameters to OpenAI format + parameters = convertGeminiToolParametersToOpenAI( + func.parameters as Record, + ); + } + + if (parameters) { + parameters = convertSchema(parameters, schemaCompliance); + } + + openAITools.push({ + type: 'function', + function: { + name: func.name, + description: func.description, + parameters, + }, + }); + } + } + } + } + + return openAITools; +} + +/** + * Convert Gemini request to OpenAI message format. + */ +export function convertGeminiRequestToOpenAI( + request: GenerateContentParameters, + requestContext: RequestContext, + options: { cleanOrphanToolCalls: boolean } = { cleanOrphanToolCalls: true }, +): OpenAI.Chat.ChatCompletionMessageParam[] { + let messages: OpenAI.Chat.ChatCompletionMessageParam[] = []; + + // Handle system instruction from config + addSystemInstructionMessage(request, messages); + + // Handle contents + processContents(request.contents, messages, requestContext); + + // Clean up orphaned tool calls and merge consecutive assistant messages + if (options.cleanOrphanToolCalls) { + messages = cleanOrphanedToolCalls(messages); + } + messages = mergeConsecutiveAssistantMessages(messages); + + return messages; +} + +/** + * Convert Gemini response to OpenAI completion format (for logging). + */ +export function convertGeminiResponseToOpenAI( + response: GenerateContentResponse, + requestContext: RequestContext, +): OpenAI.Chat.ChatCompletion { + const candidate = response.candidates?.[0]; + const parts = (candidate?.content?.parts || []) as Part[]; + + // Parse parts inline + const thoughtParts: string[] = []; + const contentParts: string[] = []; + const toolCalls: OpenAI.Chat.ChatCompletionMessageToolCall[] = []; + let toolCallIndex = 0; + + for (const part of parts) { + if (typeof part === 'string') { + contentParts.push(part); + } else if ('text' in part && part.text) { + if ('thought' in part && part.thought) { + thoughtParts.push(part.text); + } else { + contentParts.push(part.text); + } + } else if ('functionCall' in part && part.functionCall) { + toolCalls.push({ + id: part.functionCall.id || `call_${toolCallIndex}`, + type: 'function' as const, + function: { + name: part.functionCall.name || '', + arguments: JSON.stringify(part.functionCall.args || {}), + }, + }); + toolCallIndex += 1; + } + } + + const message: ExtendedCompletionMessage = { + role: 'assistant', + content: contentParts.join('') || null, + refusal: null, + }; + + const reasoningContent = thoughtParts.join(''); + if (reasoningContent) { + message.reasoning_content = reasoningContent; + } + + if (toolCalls.length > 0) { + message.tool_calls = toolCalls; + } + + const finishReason = mapGeminiFinishReasonToOpenAI(candidate?.finishReason); + + const usageMetadata = response.usageMetadata; + const usage: OpenAI.CompletionUsage = { + prompt_tokens: usageMetadata?.promptTokenCount || 0, + completion_tokens: usageMetadata?.candidatesTokenCount || 0, + total_tokens: usageMetadata?.totalTokenCount || 0, + }; + + if (usageMetadata?.cachedContentTokenCount !== undefined) { + ( + usage as OpenAI.CompletionUsage & { + prompt_tokens_details?: { cached_tokens?: number }; + } + ).prompt_tokens_details = { + cached_tokens: usageMetadata.cachedContentTokenCount, + }; + } + + const createdMs = response.createTime + ? Number(response.createTime) + : Date.now(); + const createdSeconds = Number.isFinite(createdMs) + ? Math.floor(createdMs / 1000) + : Math.floor(Date.now() / 1000); + + return { + id: response.responseId || `gemini-${Date.now()}`, + object: 'chat.completion', + created: createdSeconds, + model: response.modelVersion || requestContext.model, + choices: [ + { + index: 0, + message, + finish_reason: finishReason, + logprobs: null, + }, + ], + usage, + }; +} + +/** + * Extract and add system instruction message from request config. + */ +function addSystemInstructionMessage( + request: GenerateContentParameters, + messages: OpenAI.Chat.ChatCompletionMessageParam[], +): void { + if (!request.config?.systemInstruction) return; + + const systemText = extractTextFromContentUnion( + request.config.systemInstruction, + ); + + if (systemText) { + messages.push({ + role: 'system' as const, + content: systemText, + }); + } +} + +/** + * Process contents and convert to OpenAI messages. + */ +function processContents( + contents: ContentListUnion, + messages: ExtendedChatCompletionMessageParam[], + requestContext: RequestContext, +): void { + if (Array.isArray(contents)) { + for (const content of contents) { + processContent(content, messages, requestContext); + } + } else if (contents) { + processContent(contents, messages, requestContext); + } +} + +/** + * Process a single content item and convert to OpenAI message(s). + */ +function processContent( + content: ContentUnion | PartUnion, + messages: ExtendedChatCompletionMessageParam[], + requestContext: RequestContext, +): void { + if (typeof content === 'string') { + messages.push({ role: 'user' as const, content }); + return; + } + + if (!isContentObject(content)) return; + const parts = content.parts || []; + const role = content.role === 'model' ? 'assistant' : 'user'; + + const contentParts: OpenAIContentPart[] = []; + const reasoningParts: string[] = []; + const toolCalls: OpenAI.Chat.ChatCompletionMessageToolCall[] = []; + let toolCallIndex = 0; + + for (const part of parts) { + if (typeof part === 'string') { + contentParts.push({ type: 'text' as const, text: part }); + continue; + } + + if ('text' in part && 'thought' in part && part.thought) { + if (role === 'assistant' && part.text) { + reasoningParts.push(part.text); } } - const message: ExtendedCompletionMessage = { - role: 'assistant', - content: contentParts.join('') || null, - refusal: null, - }; - - const reasoningContent = thoughtParts.join(''); - if (reasoningContent) { - message.reasoning_content = reasoningContent; + if ('text' in part && part.text && !('thought' in part && part.thought)) { + contentParts.push({ type: 'text' as const, text: part.text }); } + const mediaPart = createMediaContentPart(part, requestContext); + if (mediaPart && role === 'user') { + contentParts.push(mediaPart); + } + + if ('functionCall' in part && part.functionCall && role === 'assistant') { + toolCalls.push({ + id: part.functionCall.id || `call_${toolCallIndex}`, + type: 'function' as const, + function: { + name: part.functionCall.name || '', + arguments: JSON.stringify(part.functionCall.args || {}), + }, + }); + toolCallIndex += 1; + } + + if (part.functionResponse && role === 'user') { + // Create tool message for the function response (with embedded media) + const toolMessage = createToolMessage( + part.functionResponse, + requestContext, + ); + if (toolMessage) { + messages.push(toolMessage); + } + } + } + + if (role === 'assistant') { + if ( + contentParts.length === 0 && + toolCalls.length === 0 && + reasoningParts.length === 0 + ) { + return; + } + + const assistantTextContent = contentParts + .filter( + (part): part is OpenAI.Chat.ChatCompletionContentPartText => + part.type === 'text', + ) + .map((part) => part.text) + .join(''); + const assistantMessage: ExtendedChatCompletionAssistantMessageParam = { + role: 'assistant', + // When there is reasoning content but no text, use "" instead of null. + // Some OpenAI-compatible providers (e.g. Ollama) reject content: null + // when reasoning_content is present, returning HTTP 400. + // For tool-call-only messages we keep null to stay spec-compliant. + content: assistantTextContent || (reasoningParts.length > 0 ? '' : null), + }; + if (toolCalls.length > 0) { - message.tool_calls = toolCalls; + assistantMessage.tool_calls = toolCalls; } - const finishReason = this.mapGeminiFinishReasonToOpenAI( - candidate?.finishReason, - ); - - const usageMetadata = response.usageMetadata; - const usage: OpenAI.CompletionUsage = { - prompt_tokens: usageMetadata?.promptTokenCount || 0, - completion_tokens: usageMetadata?.candidatesTokenCount || 0, - total_tokens: usageMetadata?.totalTokenCount || 0, - }; - - if (usageMetadata?.cachedContentTokenCount !== undefined) { - ( - usage as OpenAI.CompletionUsage & { - prompt_tokens_details?: { cached_tokens?: number }; - } - ).prompt_tokens_details = { - cached_tokens: usageMetadata.cachedContentTokenCount, - }; + const reasoningContent = reasoningParts.join(''); + if (reasoningContent) { + assistantMessage.reasoning_content = reasoningContent; } - const createdMs = response.createTime - ? Number(response.createTime) - : Date.now(); - const createdSeconds = Number.isFinite(createdMs) - ? Math.floor(createdMs / 1000) - : Math.floor(Date.now() / 1000); - - return { - id: response.responseId || `gemini-${Date.now()}`, - object: 'chat.completion', - created: createdSeconds, - model: response.modelVersion || this.model, - choices: [ - { - index: 0, - message, - finish_reason: finishReason, - logprobs: null, - }, - ], - usage, - }; + messages.push(assistantMessage); + return; } - /** - * Extract and add system instruction message from request config - */ - private addSystemInstructionMessage( - request: GenerateContentParameters, - messages: OpenAI.Chat.ChatCompletionMessageParam[], - ): void { - if (!request.config?.systemInstruction) return; - - const systemText = this.extractTextFromContentUnion( - request.config.systemInstruction, - ); - - if (systemText) { - messages.push({ - role: 'system' as const, - content: systemText, - }); - } + if (contentParts.length > 0) { + messages.push({ + role: 'user', + content: + contentParts as unknown as OpenAI.Chat.ChatCompletionContentPart[], + }); } +} - /** - * Process contents and convert to OpenAI messages - */ - private processContents( - contents: ContentListUnion, - messages: ExtendedChatCompletionMessageParam[], - ): void { - if (Array.isArray(contents)) { - for (const content of contents) { - this.processContent(content, messages); - } - } else if (contents) { - this.processContent(contents, messages); - } - } - - /** - * Process a single content item and convert to OpenAI message(s) - */ - private processContent( - content: ContentUnion | PartUnion, - messages: ExtendedChatCompletionMessageParam[], - ): void { - if (typeof content === 'string') { - messages.push({ role: 'user' as const, content }); - return; - } - - if (!this.isContentObject(content)) return; - const parts = content.parts || []; - const role = content.role === 'model' ? 'assistant' : 'user'; - - const contentParts: OpenAIContentPart[] = []; - const reasoningParts: string[] = []; - const toolCalls: OpenAI.Chat.ChatCompletionMessageToolCall[] = []; - let toolCallIndex = 0; - - for (const part of parts) { - if (typeof part === 'string') { - contentParts.push({ type: 'text' as const, text: part }); - continue; - } - - if ('text' in part && 'thought' in part && part.thought) { - if (role === 'assistant' && part.text) { - reasoningParts.push(part.text); - } - } - - if ('text' in part && part.text && !('thought' in part && part.thought)) { - contentParts.push({ type: 'text' as const, text: part.text }); - } - - const mediaPart = this.createMediaContentPart(part); - if (mediaPart && role === 'user') { - contentParts.push(mediaPart); - } - - if ('functionCall' in part && part.functionCall && role === 'assistant') { - toolCalls.push({ - id: part.functionCall.id || `call_${toolCallIndex}`, - type: 'function' as const, - function: { - name: part.functionCall.name || '', - arguments: JSON.stringify(part.functionCall.args || {}), - }, - }); - toolCallIndex += 1; - } - - if (part.functionResponse && role === 'user') { - // Create tool message for the function response (with embedded media) - const toolMessage = this.createToolMessage(part.functionResponse); - if (toolMessage) { - messages.push(toolMessage); - } - } - } - - if (role === 'assistant') { - if ( - contentParts.length === 0 && - toolCalls.length === 0 && - reasoningParts.length === 0 - ) { - return; - } - - const assistantTextContent = contentParts - .filter( - (part): part is OpenAI.Chat.ChatCompletionContentPartText => - part.type === 'text', - ) - .map((part) => part.text) - .join(''); - const assistantMessage: ExtendedChatCompletionAssistantMessageParam = { - role: 'assistant', - // When there is reasoning content but no text, use "" instead of null. - // Some OpenAI-compatible providers (e.g. Ollama) reject content: null - // when reasoning_content is present, returning HTTP 400. - // For tool-call-only messages we keep null to stay spec-compliant. - content: - assistantTextContent || (reasoningParts.length > 0 ? '' : null), - }; - - if (toolCalls.length > 0) { - assistantMessage.tool_calls = toolCalls; - } - - const reasoningContent = reasoningParts.join(''); - if (reasoningContent) { - assistantMessage.reasoning_content = reasoningContent; - } - - messages.push(assistantMessage); - return; - } - - if (contentParts.length > 0) { - messages.push({ - role: 'user', - content: - contentParts as unknown as OpenAI.Chat.ChatCompletionContentPart[], - }); - } - } - - private extractFunctionResponseContent(response: unknown): string { - if (response === null || response === undefined) { - return ''; - } - - if (typeof response === 'string') { - return response; - } - - if (typeof response === 'object') { - const responseObject = response as Record; - const output = responseObject['output']; - if (typeof output === 'string') { - return output; - } - - const error = responseObject['error']; - if (typeof error === 'string') { - return error; - } - } - - try { - const serialized = JSON.stringify(response); - return serialized ?? String(response); - } catch { - return String(response); - } - } - - /** - * Create a tool message from function response (with embedded media parts) - */ - private createToolMessage( - response: FunctionResponse, - ): OpenAI.Chat.ChatCompletionToolMessageParam | null { - const textContent = this.extractFunctionResponseContent(response.response); - const contentParts: OpenAIContentPart[] = []; - - // Add text content first if present - if (textContent) { - contentParts.push({ type: 'text' as const, text: textContent }); - } - - // Add media parts from function response - for (const part of response.parts || []) { - const mediaPart = this.createMediaContentPart(part); - if (mediaPart) { - contentParts.push(mediaPart); - } - } - - // IMPORTANT: Always return a tool message, even if content is empty - // OpenAI API requires that every tool call has a corresponding tool response - // Empty tool results are valid (e.g., reading an empty file, successful operations with no output) - if (contentParts.length === 0) { - // Return empty string for empty tool results - return { - role: 'tool' as const, - tool_call_id: response.id || '', - content: '', - }; - } - - // Cast to OpenAI type - some OpenAI-compatible APIs support richer content in tool messages - return { - role: 'tool' as const, - tool_call_id: response.id || '', - content: contentParts as unknown as - | string - | OpenAI.Chat.ChatCompletionContentPartText[], - }; - } - - /** - * Create OpenAI media content part from Gemini part. - * Checks modality support before building each media type. - */ - private createMediaContentPart(part: Part): OpenAIContentPart | null { - if (part.inlineData?.mimeType && part.inlineData?.data) { - const mimeType = part.inlineData.mimeType; - const mediaType = this.getMediaType(mimeType); - const displayName = part.inlineData.displayName || mimeType; - - if (mediaType === 'image') { - if (!this.modalities.image) { - return this.unsupportedModalityPlaceholder('image', displayName); - } - const dataUrl = `data:${mimeType};base64,${part.inlineData.data}`; - return { - type: 'image_url' as const, - image_url: { url: dataUrl }, - }; - } - - if (mimeType === 'application/pdf') { - if (!this.modalities.pdf) { - return this.unsupportedModalityPlaceholder('pdf', displayName); - } - const filename = part.inlineData.displayName || 'document.pdf'; - return { - type: 'file' as const, - file: { - filename, - file_data: `data:${mimeType};base64,${part.inlineData.data}`, - }, - }; - } - - if (mediaType === 'audio') { - if (!this.modalities.audio) { - return this.unsupportedModalityPlaceholder('audio', displayName); - } - const format = this.getAudioFormat(mimeType); - if (format) { - return { - type: 'input_audio' as const, - input_audio: { - data: `data:${mimeType};base64,${part.inlineData.data}`, - format, - }, - }; - } - } - - if (mediaType === 'video') { - if (!this.modalities.video) { - return this.unsupportedModalityPlaceholder('video', displayName); - } - return { - type: 'video_url' as const, - video_url: { - url: `data:${mimeType};base64,${part.inlineData.data}`, - }, - }; - } - - return { - type: 'text' as const, - text: `Unsupported inline media type: ${mimeType} (${displayName}).`, - }; - } - - if (part.fileData?.mimeType && part.fileData?.fileUri) { - const filename = part.fileData.displayName || 'file'; - const fileUri = part.fileData.fileUri; - const mimeType = part.fileData.mimeType; - const mediaType = this.getMediaType(mimeType); - - if (mediaType === 'image') { - if (!this.modalities.image) { - return this.unsupportedModalityPlaceholder('image', filename); - } - return { - type: 'image_url' as const, - image_url: { url: fileUri }, - }; - } - - if (mimeType === 'application/pdf') { - if (!this.modalities.pdf) { - return this.unsupportedModalityPlaceholder('pdf', filename); - } - return { - type: 'file' as const, - file: { - filename, - file_data: fileUri, - }, - }; - } - - if (mediaType === 'video') { - if (!this.modalities.video) { - return this.unsupportedModalityPlaceholder('video', filename); - } - return { - type: 'video_url' as const, - video_url: { - url: fileUri, - }, - }; - } - - const displayNameStr = part.fileData.displayName - ? ` (${part.fileData.displayName})` - : ''; - return { - type: 'text' as const, - text: `Unsupported file media type: ${mimeType}${displayNameStr}.`, - }; - } - - return null; - } - - /** - * Create a text placeholder for unsupported modalities. - */ - private unsupportedModalityPlaceholder( - modality: string, - displayName: string, - ): OpenAIContentPart { - debugLogger.warn( - `Model '${this.model}' does not support ${modality} input. ` + - `Replacing with text placeholder: ${displayName}`, - ); - let hint: string; - if (modality === 'pdf') { - hint = - 'This model does not support PDF input directly. The read_file tool cannot extract PDF content either. To extract text from the PDF file, try using skills if applicable, or guide user to install pdf skill by running this slash command:\n/extensions install https://github.com/anthropics/skills:document-skills'; - } else { - hint = `This model does not support ${modality} input. The read_file tool cannot process this type of file either. To handle this file, try using skills if applicable, or any tools installed at system wide, or let the user know you cannot process this type of file.`; - } - return { - type: 'text' as const, - text: `[Unsupported ${modality} file: "${displayName}". ${hint}]`, - }; - } - - /** - * Determine media type from MIME type - */ - private getMediaType(mimeType: string): 'image' | 'audio' | 'video' | 'file' { - if (mimeType.startsWith('image/')) return 'image'; - if (mimeType.startsWith('audio/')) return 'audio'; - if (mimeType.startsWith('video/')) return 'video'; - return 'file'; - } - - /** - * Convert MIME type to OpenAI audio format - */ - private getAudioFormat(mimeType: string): 'wav' | 'mp3' | null { - if (mimeType.includes('wav')) return 'wav'; - if (mimeType.includes('mp3') || mimeType.includes('mpeg')) return 'mp3'; - return null; - } - - /** - * Type guard to check if content is a valid Content object - */ - private isContentObject( - content: unknown, - ): content is { role: string; parts: Part[] } { - return ( - typeof content === 'object' && - content !== null && - 'role' in content && - 'parts' in content && - Array.isArray((content as Record)['parts']) - ); - } - - /** - * Extract text content from various Gemini content union types - */ - private extractTextFromContentUnion(contentUnion: unknown): string { - if (typeof contentUnion === 'string') { - return contentUnion; - } - - if (Array.isArray(contentUnion)) { - return contentUnion - .map((item) => this.extractTextFromContentUnion(item)) - .filter(Boolean) - .join('\n'); - } - - if (typeof contentUnion === 'object' && contentUnion !== null) { - if ('parts' in contentUnion) { - const content = contentUnion as Content; - return ( - content.parts - ?.map((part: Part) => { - if (typeof part === 'string') return part; - if ('text' in part) return part.text || ''; - return ''; - }) - .filter(Boolean) - .join('\n') || '' - ); - } - } - +function extractFunctionResponseContent(response: unknown): string { + if (response === null || response === undefined) { return ''; } - /** - * Convert OpenAI response to Gemini format - */ - convertOpenAIResponseToGemini( - openaiResponse: OpenAI.Chat.ChatCompletion, - ): GenerateContentResponse { - const choice = openaiResponse.choices?.[0]; - const response = new GenerateContentResponse(); - - if (choice) { - const parts: Part[] = []; - - // Handle reasoning content (thoughts) - const reasoningText = - (choice.message as ExtendedCompletionMessage).reasoning_content ?? - (choice.message as ExtendedCompletionMessage).reasoning; - if (reasoningText) { - parts.push({ text: reasoningText, thought: true }); - } - - // Handle text content - if (choice.message.content) { - parts.push({ text: choice.message.content }); - } - - // Handle tool calls - if (choice.message.tool_calls) { - for (const toolCall of choice.message.tool_calls) { - if (toolCall.function) { - let args: Record = {}; - if (toolCall.function.arguments) { - args = safeJsonParse(toolCall.function.arguments, {}); - } - - parts.push({ - functionCall: { - id: toolCall.id, - name: toolCall.function.name, - args, - }, - }); - } - } - } - - response.candidates = [ - { - content: { - parts, - role: 'model' as const, - }, - finishReason: this.mapOpenAIFinishReasonToGemini( - choice.finish_reason || 'stop', - ), - index: 0, - safetyRatings: [], - }, - ]; - } else { - response.candidates = []; - } - - response.responseId = openaiResponse.id; - response.createTime = openaiResponse.created - ? openaiResponse.created.toString() - : new Date().getTime().toString(); - - response.modelVersion = this.model; - response.promptFeedback = { safetyRatings: [] }; - - // Add usage metadata if available - if (openaiResponse.usage) { - const usage = openaiResponse.usage; - - const promptTokens = usage.prompt_tokens || 0; - const completionTokens = usage.completion_tokens || 0; - const totalTokens = usage.total_tokens || 0; - // Support both formats: prompt_tokens_details.cached_tokens (OpenAI standard) - // and cached_tokens (some models return it at top level) - const extendedUsage = usage as ExtendedCompletionUsage; - const cachedTokens = - usage.prompt_tokens_details?.cached_tokens ?? - extendedUsage.cached_tokens ?? - 0; - const thinkingTokens = - usage.completion_tokens_details?.reasoning_tokens || 0; - - // If we only have total tokens but no breakdown, estimate the split - // Typically input is ~70% and output is ~30% for most conversations - let finalPromptTokens = promptTokens; - let finalCompletionTokens = completionTokens; - - if (totalTokens > 0 && promptTokens === 0 && completionTokens === 0) { - // Estimate: assume 70% input, 30% output - finalPromptTokens = Math.round(totalTokens * 0.7); - finalCompletionTokens = Math.round(totalTokens * 0.3); - } - - response.usageMetadata = { - promptTokenCount: finalPromptTokens, - candidatesTokenCount: finalCompletionTokens, - totalTokenCount: totalTokens, - cachedContentTokenCount: cachedTokens, - thoughtsTokenCount: thinkingTokens, - }; - } - + if (typeof response === 'string') { return response; } - /** - * Convert OpenAI stream chunk to Gemini format. - * - * `ctx` carries the tool-call parser for this stream. Callers MUST - * obtain it from `createStreamContext()` at the start of the stream - * and pass the same instance for every chunk of that stream. Concurrent - * streams MUST use distinct contexts or their tool-call buffers will - * interleave (issue #3516). - */ - convertOpenAIChunkToGemini( - chunk: OpenAI.Chat.ChatCompletionChunk, - ctx: ConverterStreamContext, - ): GenerateContentResponse { - const choice = chunk.choices?.[0]; - const response = new GenerateContentResponse(); + if (typeof response === 'object') { + const responseObject = response as Record; + const output = responseObject['output']; + if (typeof output === 'string') { + return output; + } - if (choice) { - const parts: Part[] = []; + const error = responseObject['error']; + if (typeof error === 'string') { + return error; + } + } - const reasoningText = - (choice.delta as ExtendedCompletionChunkDelta)?.reasoning_content ?? - (choice.delta as ExtendedCompletionChunkDelta)?.reasoning; - if (reasoningText) { - parts.push({ text: reasoningText, thought: true }); + try { + const serialized = JSON.stringify(response); + return serialized ?? String(response); + } catch { + return String(response); + } +} + +/** + * Create a tool message from function response (with embedded media parts). + */ +function createToolMessage( + response: FunctionResponse, + requestContext: RequestContext, +): OpenAI.Chat.ChatCompletionToolMessageParam | null { + const textContent = extractFunctionResponseContent(response.response); + const contentParts: OpenAIContentPart[] = []; + + // Add text content first if present + if (textContent) { + contentParts.push({ type: 'text' as const, text: textContent }); + } + + // Add media parts from function response + for (const part of response.parts || []) { + const mediaPart = createMediaContentPart(part, requestContext); + if (mediaPart) { + contentParts.push(mediaPart); + } + } + + // IMPORTANT: Always return a tool message, even if content is empty + // OpenAI API requires that every tool call has a corresponding tool response + // Empty tool results are valid (e.g., reading an empty file, successful operations with no output) + if (contentParts.length === 0) { + // Return empty string for empty tool results + return { + role: 'tool' as const, + tool_call_id: response.id || '', + content: '', + }; + } + + // Cast to OpenAI type - some OpenAI-compatible APIs support richer content in tool messages + return { + role: 'tool' as const, + tool_call_id: response.id || '', + content: contentParts as unknown as + | string + | OpenAI.Chat.ChatCompletionContentPartText[], + }; +} + +/** + * Create OpenAI media content part from Gemini part. + * Checks modality support before building each media type. + */ +function createMediaContentPart( + part: Part, + requestContext: RequestContext, +): OpenAIContentPart | null { + const { modalities } = requestContext; + + if (part.inlineData?.mimeType && part.inlineData?.data) { + const mimeType = part.inlineData.mimeType; + const mediaType = getMediaType(mimeType); + const displayName = part.inlineData.displayName || mimeType; + + if (mediaType === 'image') { + if (!modalities.image) { + return unsupportedModalityPlaceholder( + 'image', + displayName, + requestContext, + ); } + const dataUrl = `data:${mimeType};base64,${part.inlineData.data}`; + return { + type: 'image_url' as const, + image_url: { url: dataUrl }, + }; + } - // Handle text content - if (choice.delta?.content) { - if (typeof choice.delta.content === 'string') { - parts.push({ text: choice.delta.content }); - } + if (mimeType === 'application/pdf') { + if (!modalities.pdf) { + return unsupportedModalityPlaceholder( + 'pdf', + displayName, + requestContext, + ); } + const filename = part.inlineData.displayName || 'document.pdf'; + return { + type: 'file' as const, + file: { + filename, + file_data: `data:${mimeType};base64,${part.inlineData.data}`, + }, + }; + } - // Handle tool calls using the stream-local parser - if (choice.delta?.tool_calls) { - for (const toolCall of choice.delta.tool_calls) { - const index = toolCall.index ?? 0; + if (mediaType === 'audio') { + if (!modalities.audio) { + return unsupportedModalityPlaceholder( + 'audio', + displayName, + requestContext, + ); + } + const format = getAudioFormat(mimeType); + if (format) { + return { + type: 'input_audio' as const, + input_audio: { + data: `data:${mimeType};base64,${part.inlineData.data}`, + format, + }, + }; + } + } - // Process the tool call chunk through the streaming parser - if (toolCall.function?.arguments) { - ctx.toolCallParser.addChunk( - index, - toolCall.function.arguments, - toolCall.id, - toolCall.function.name, - ); - } else { - // Handle metadata-only chunks (id and/or name without arguments) - ctx.toolCallParser.addChunk( - index, - '', // Empty chunk for metadata-only updates - toolCall.id, - toolCall.function?.name, - ); + if (mediaType === 'video') { + if (!modalities.video) { + return unsupportedModalityPlaceholder( + 'video', + displayName, + requestContext, + ); + } + return { + type: 'video_url' as const, + video_url: { + url: `data:${mimeType};base64,${part.inlineData.data}`, + }, + }; + } + + return { + type: 'text' as const, + text: `Unsupported inline media type: ${mimeType} (${displayName}).`, + }; + } + + if (part.fileData?.mimeType && part.fileData?.fileUri) { + const filename = part.fileData.displayName || 'file'; + const fileUri = part.fileData.fileUri; + const mimeType = part.fileData.mimeType; + const mediaType = getMediaType(mimeType); + + if (mediaType === 'image') { + if (!modalities.image) { + return unsupportedModalityPlaceholder( + 'image', + filename, + requestContext, + ); + } + return { + type: 'image_url' as const, + image_url: { url: fileUri }, + }; + } + + if (mimeType === 'application/pdf') { + if (!modalities.pdf) { + return unsupportedModalityPlaceholder('pdf', filename, requestContext); + } + return { + type: 'file' as const, + file: { + filename, + file_data: fileUri, + }, + }; + } + + if (mediaType === 'video') { + if (!modalities.video) { + return unsupportedModalityPlaceholder( + 'video', + filename, + requestContext, + ); + } + return { + type: 'video_url' as const, + video_url: { + url: fileUri, + }, + }; + } + + const displayNameStr = part.fileData.displayName + ? ` (${part.fileData.displayName})` + : ''; + return { + type: 'text' as const, + text: `Unsupported file media type: ${mimeType}${displayNameStr}.`, + }; + } + + return null; +} + +/** + * Create a text placeholder for unsupported modalities. + */ +function unsupportedModalityPlaceholder( + modality: string, + displayName: string, + requestContext: RequestContext, +): OpenAIContentPart { + debugLogger.warn( + `Model '${requestContext.model}' does not support ${modality} input. ` + + `Replacing with text placeholder: ${displayName}`, + ); + let hint: string; + if (modality === 'pdf') { + hint = + 'This model does not support PDF input directly. The read_file tool cannot extract PDF content either. To extract text from the PDF file, try using skills if applicable, or guide user to install pdf skill by running this slash command:\n/extensions install https://github.com/anthropics/skills:document-skills'; + } else { + hint = `This model does not support ${modality} input. The read_file tool cannot process this type of file either. To handle this file, try using skills if applicable, or any tools installed at system wide, or let the user know you cannot process this type of file.`; + } + return { + type: 'text' as const, + text: `[Unsupported ${modality} file: "${displayName}". ${hint}]`, + }; +} + +function getMediaType(mimeType: string): 'image' | 'audio' | 'video' | 'file' { + if (mimeType.startsWith('image/')) return 'image'; + if (mimeType.startsWith('audio/')) return 'audio'; + if (mimeType.startsWith('video/')) return 'video'; + return 'file'; +} + +function getAudioFormat(mimeType: string): 'wav' | 'mp3' | null { + if (mimeType.includes('wav')) return 'wav'; + if (mimeType.includes('mp3') || mimeType.includes('mpeg')) return 'mp3'; + return null; +} + +function isContentObject( + content: unknown, +): content is { role: string; parts: Part[] } { + return ( + typeof content === 'object' && + content !== null && + 'role' in content && + 'parts' in content && + Array.isArray((content as Record)['parts']) + ); +} + +function extractTextFromContentUnion(contentUnion: unknown): string { + if (typeof contentUnion === 'string') { + return contentUnion; + } + + if (Array.isArray(contentUnion)) { + return contentUnion + .map((item) => extractTextFromContentUnion(item)) + .filter(Boolean) + .join('\n'); + } + + if (typeof contentUnion === 'object' && contentUnion !== null) { + if ('parts' in contentUnion) { + const content = contentUnion as Content; + return ( + content.parts + ?.map((part: Part) => { + if (typeof part === 'string') return part; + if ('text' in part) return part.text || ''; + return ''; + }) + .filter(Boolean) + .join('\n') || '' + ); + } + } + + return ''; +} + +/** + * Convert OpenAI response to Gemini format. + */ +export function convertOpenAIResponseToGemini( + openaiResponse: OpenAI.Chat.ChatCompletion, + requestContext: RequestContext, +): GenerateContentResponse { + const choice = openaiResponse.choices?.[0]; + const response = new GenerateContentResponse(); + + if (choice) { + const parts: Part[] = []; + + // Handle reasoning content (thoughts) + const reasoningText = + (choice.message as ExtendedCompletionMessage).reasoning_content ?? + (choice.message as ExtendedCompletionMessage).reasoning; + if (reasoningText) { + parts.push({ text: reasoningText, thought: true }); + } + + // Handle text content + if (choice.message.content) { + parts.push({ text: choice.message.content }); + } + + // Handle tool calls + if (choice.message.tool_calls) { + for (const toolCall of choice.message.tool_calls) { + if (toolCall.function) { + let args: Record = {}; + if (toolCall.function.arguments) { + args = safeJsonParse(toolCall.function.arguments, {}); } + + parts.push({ + functionCall: { + id: toolCall.id, + name: toolCall.function.name, + args, + }, + }); } } + } - // Only emit function calls when streaming is complete (finish_reason is present) - let toolCallsTruncated = false; - if (choice.finish_reason) { - // Detect truncation the provider may not report correctly. - // Some providers (e.g. DashScope/Qwen) send "stop" or "tool_calls" - // even when output was cut off mid-JSON due to max_tokens. - toolCallsTruncated = ctx.toolCallParser.hasIncompleteToolCalls(); - - const completedToolCalls = ctx.toolCallParser.getCompletedToolCalls(); - - for (const toolCall of completedToolCalls) { - if (toolCall.name) { - parts.push({ - functionCall: { - id: - toolCall.id || - `call_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`, - name: toolCall.name, - args: toolCall.args, - }, - }); - } - } - - // Parser is stream-local; it will be discarded with the - // ConverterStreamContext when the stream finishes. No manual - // reset needed. - } - - // If tool call JSON was truncated, override to "length" so downstream - // (turn.ts) correctly sets wasOutputTruncated=true. - const effectiveFinishReason = - toolCallsTruncated && choice.finish_reason !== 'length' - ? 'length' - : choice.finish_reason; - - // Only include finishReason key if finish_reason is present - const candidate: Candidate = { + response.candidates = [ + { content: { parts, role: 'model' as const, }, + finishReason: mapOpenAIFinishReasonToGemini( + choice.finish_reason || 'stop', + ), index: 0, safetyRatings: [], - }; - if (effectiveFinishReason) { - candidate.finishReason = this.mapOpenAIFinishReasonToGemini( - effectiveFinishReason, - ); - } - response.candidates = [candidate]; - } else { - response.candidates = []; - } - - response.responseId = chunk.id; - response.createTime = chunk.created - ? chunk.created.toString() - : new Date().getTime().toString(); - - response.modelVersion = this.model; - response.promptFeedback = { safetyRatings: [] }; - - // Add usage metadata if available in the chunk - if (chunk.usage) { - const usage = chunk.usage; - - const promptTokens = usage.prompt_tokens || 0; - const completionTokens = usage.completion_tokens || 0; - const totalTokens = usage.total_tokens || 0; - const thinkingTokens = - usage.completion_tokens_details?.reasoning_tokens || 0; - // Support both formats: prompt_tokens_details.cached_tokens (OpenAI standard) - // and cached_tokens (some models return it at top level) - const extendedUsage = usage as ExtendedCompletionUsage; - const cachedTokens = - usage.prompt_tokens_details?.cached_tokens ?? - extendedUsage.cached_tokens ?? - 0; - - // If we only have total tokens but no breakdown, estimate the split - // Typically input is ~70% and output is ~30% for most conversations - let finalPromptTokens = promptTokens; - let finalCompletionTokens = completionTokens; - - if (totalTokens > 0 && promptTokens === 0 && completionTokens === 0) { - // Estimate: assume 70% input, 30% output - finalPromptTokens = Math.round(totalTokens * 0.7); - finalCompletionTokens = Math.round(totalTokens * 0.3); - } - - response.usageMetadata = { - promptTokenCount: finalPromptTokens, - candidatesTokenCount: finalCompletionTokens, - thoughtsTokenCount: thinkingTokens, - totalTokenCount: totalTokens, - cachedContentTokenCount: cachedTokens, - }; - } - - return response; + }, + ]; + } else { + response.candidates = []; } - /** - * Map OpenAI finish reasons to Gemini finish reasons - */ - private mapOpenAIFinishReasonToGemini( - openaiReason: string | null, - ): FinishReason { - if (!openaiReason) return FinishReason.FINISH_REASON_UNSPECIFIED; - const mapping: Record = { - stop: FinishReason.STOP, - length: FinishReason.MAX_TOKENS, - content_filter: FinishReason.SAFETY, - function_call: FinishReason.STOP, - tool_calls: FinishReason.STOP, + response.responseId = openaiResponse.id; + response.createTime = openaiResponse.created + ? openaiResponse.created.toString() + : new Date().getTime().toString(); + + response.modelVersion = requestContext.model; + response.promptFeedback = { safetyRatings: [] }; + + // Add usage metadata if available + if (openaiResponse.usage) { + const usage = openaiResponse.usage; + + const promptTokens = usage.prompt_tokens || 0; + const completionTokens = usage.completion_tokens || 0; + const totalTokens = usage.total_tokens || 0; + // Support both formats: prompt_tokens_details.cached_tokens (OpenAI standard) + // and cached_tokens (some models return it at top level) + const extendedUsage = usage as ExtendedCompletionUsage; + const cachedTokens = + usage.prompt_tokens_details?.cached_tokens ?? + extendedUsage.cached_tokens ?? + 0; + const thinkingTokens = + usage.completion_tokens_details?.reasoning_tokens || 0; + + // If we only have total tokens but no breakdown, estimate the split + // Typically input is ~70% and output is ~30% for most conversations + let finalPromptTokens = promptTokens; + let finalCompletionTokens = completionTokens; + + if (totalTokens > 0 && promptTokens === 0 && completionTokens === 0) { + // Estimate: assume 70% input, 30% output + finalPromptTokens = Math.round(totalTokens * 0.7); + finalCompletionTokens = Math.round(totalTokens * 0.3); + } + + response.usageMetadata = { + promptTokenCount: finalPromptTokens, + candidatesTokenCount: finalCompletionTokens, + totalTokenCount: totalTokens, + cachedContentTokenCount: cachedTokens, + thoughtsTokenCount: thinkingTokens, }; - return mapping[openaiReason] || FinishReason.FINISH_REASON_UNSPECIFIED; } - private mapGeminiFinishReasonToOpenAI( - geminiReason?: FinishReason, - ): 'stop' | 'length' | 'tool_calls' | 'content_filter' | 'function_call' { - if (!geminiReason) { - return 'stop'; - } + return response; +} - switch (geminiReason) { - case FinishReason.STOP: - return 'stop'; - case FinishReason.MAX_TOKENS: - return 'length'; - case FinishReason.SAFETY: - return 'content_filter'; - default: - if (geminiReason === ('RECITATION' as FinishReason)) { - return 'content_filter'; - } - return 'stop'; - } +/** + * Convert OpenAI stream chunk to Gemini format. + * + * `requestContext.toolCallParser` carries the tool-call parser for this + * stream. Callers MUST attach a fresh parser at stream start and pass the + * same instance for every chunk of that stream. Concurrent streams MUST use + * distinct parsers or their tool-call buffers will interleave (issue #3516). + */ +export function convertOpenAIChunkToGemini( + chunk: OpenAI.Chat.ChatCompletionChunk, + requestContext: RequestContext, +): GenerateContentResponse { + const choice = chunk.choices?.[0]; + const response = new GenerateContentResponse(); + const toolCallParser = requestContext.toolCallParser; + if (!toolCallParser) { + throw new Error( + 'convertOpenAIChunkToGemini requires requestContext.toolCallParser — attach a fresh StreamingToolCallParser at stream start.', + ); } - /** - * Clean up orphaned tool calls from message history to prevent OpenAI API errors - */ - private cleanOrphanedToolCalls( - messages: OpenAI.Chat.ChatCompletionMessageParam[], - ): OpenAI.Chat.ChatCompletionMessageParam[] { - const cleaned: OpenAI.Chat.ChatCompletionMessageParam[] = []; - const toolCallIds = new Set(); - const toolResponseIds = new Set(); + if (choice) { + const parts: Part[] = []; - // First pass: collect all tool call IDs and tool response IDs - for (const message of messages) { - if ( - message.role === 'assistant' && - 'tool_calls' in message && - message.tool_calls - ) { - for (const toolCall of message.tool_calls) { - if (toolCall.id) { - toolCallIds.add(toolCall.id); - } - } - } else if ( - message.role === 'tool' && - 'tool_call_id' in message && - message.tool_call_id - ) { - toolResponseIds.add(message.tool_call_id); + const reasoningText = + (choice.delta as ExtendedCompletionChunkDelta)?.reasoning_content ?? + (choice.delta as ExtendedCompletionChunkDelta)?.reasoning; + if (reasoningText) { + parts.push({ text: reasoningText, thought: true }); + } + + // Handle text content + if (choice.delta?.content) { + if (typeof choice.delta.content === 'string') { + parts.push({ text: choice.delta.content }); } } - // Second pass: filter out orphaned messages - for (const message of messages) { - if ( - message.role === 'assistant' && - 'tool_calls' in message && - message.tool_calls - ) { - // Filter out tool calls that don't have corresponding responses - const validToolCalls = message.tool_calls.filter( - (toolCall) => toolCall.id && toolResponseIds.has(toolCall.id), - ); + // Handle tool calls using the stream-local parser + if (choice.delta?.tool_calls) { + for (const toolCall of choice.delta.tool_calls) { + const index = toolCall.index ?? 0; - if (validToolCalls.length > 0) { - // Keep the message but only with valid tool calls - const cleanedMessage = { ...message }; - ( - cleanedMessage as OpenAI.Chat.ChatCompletionMessageParam & { - tool_calls?: OpenAI.Chat.ChatCompletionMessageToolCall[]; - } - ).tool_calls = validToolCalls; - cleaned.push(cleanedMessage); - } else if ( - typeof message.content === 'string' && - message.content.trim() - ) { - // Keep the message if it has text content, but remove tool calls - const cleanedMessage = { ...message }; - delete ( - cleanedMessage as OpenAI.Chat.ChatCompletionMessageParam & { - tool_calls?: OpenAI.Chat.ChatCompletionMessageToolCall[]; - } - ).tool_calls; - cleaned.push(cleanedMessage); + // Process the tool call chunk through the streaming parser + if (toolCall.function?.arguments) { + toolCallParser.addChunk( + index, + toolCall.function.arguments, + toolCall.id, + toolCall.function.name, + ); + } else { + // Handle metadata-only chunks (id and/or name without arguments) + toolCallParser.addChunk( + index, + '', // Empty chunk for metadata-only updates + toolCall.id, + toolCall.function?.name, + ); } - // If no valid tool calls and no content, skip the message entirely + } + } + + // Only emit function calls when streaming is complete (finish_reason is present) + let toolCallsTruncated = false; + if (choice.finish_reason) { + // Detect truncation the provider may not report correctly. + // Some providers (e.g. DashScope/Qwen) send "stop" or "tool_calls" + // even when output was cut off mid-JSON due to max_tokens. + toolCallsTruncated = toolCallParser.hasIncompleteToolCalls(); + + const completedToolCalls = toolCallParser.getCompletedToolCalls(); + + for (const toolCall of completedToolCalls) { + if (toolCall.name) { + parts.push({ + functionCall: { + id: + toolCall.id || + `call_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`, + name: toolCall.name, + args: toolCall.args, + }, + }); + } + } + } + + // If tool call JSON was truncated, override to "length" so downstream + // (turn.ts) correctly sets wasOutputTruncated=true. + const effectiveFinishReason = + toolCallsTruncated && choice.finish_reason !== 'length' + ? 'length' + : choice.finish_reason; + + // Only include finishReason key if finish_reason is present + const candidate: Candidate = { + content: { + parts, + role: 'model' as const, + }, + index: 0, + safetyRatings: [], + }; + if (effectiveFinishReason) { + candidate.finishReason = mapOpenAIFinishReasonToGemini( + effectiveFinishReason, + ); + } + response.candidates = [candidate]; + } else { + response.candidates = []; + } + + response.responseId = chunk.id; + response.createTime = chunk.created + ? chunk.created.toString() + : new Date().getTime().toString(); + + response.modelVersion = requestContext.model; + response.promptFeedback = { safetyRatings: [] }; + + // Add usage metadata if available in the chunk + if (chunk.usage) { + const usage = chunk.usage; + + const promptTokens = usage.prompt_tokens || 0; + const completionTokens = usage.completion_tokens || 0; + const totalTokens = usage.total_tokens || 0; + const thinkingTokens = + usage.completion_tokens_details?.reasoning_tokens || 0; + // Support both formats: prompt_tokens_details.cached_tokens (OpenAI standard) + // and cached_tokens (some models return it at top level) + const extendedUsage = usage as ExtendedCompletionUsage; + const cachedTokens = + usage.prompt_tokens_details?.cached_tokens ?? + extendedUsage.cached_tokens ?? + 0; + + // If we only have total tokens but no breakdown, estimate the split + // Typically input is ~70% and output is ~30% for most conversations + let finalPromptTokens = promptTokens; + let finalCompletionTokens = completionTokens; + + if (totalTokens > 0 && promptTokens === 0 && completionTokens === 0) { + // Estimate: assume 70% input, 30% output + finalPromptTokens = Math.round(totalTokens * 0.7); + finalCompletionTokens = Math.round(totalTokens * 0.3); + } + + response.usageMetadata = { + promptTokenCount: finalPromptTokens, + candidatesTokenCount: finalCompletionTokens, + thoughtsTokenCount: thinkingTokens, + totalTokenCount: totalTokens, + cachedContentTokenCount: cachedTokens, + }; + } + + return response; +} + +function mapOpenAIFinishReasonToGemini( + openaiReason: string | null, +): FinishReason { + if (!openaiReason) return FinishReason.FINISH_REASON_UNSPECIFIED; + const mapping: Record = { + stop: FinishReason.STOP, + length: FinishReason.MAX_TOKENS, + content_filter: FinishReason.SAFETY, + function_call: FinishReason.STOP, + tool_calls: FinishReason.STOP, + }; + return mapping[openaiReason] || FinishReason.FINISH_REASON_UNSPECIFIED; +} + +function mapGeminiFinishReasonToOpenAI( + geminiReason?: FinishReason, +): 'stop' | 'length' | 'tool_calls' | 'content_filter' | 'function_call' { + if (!geminiReason) { + return 'stop'; + } + + switch (geminiReason) { + case FinishReason.STOP: + return 'stop'; + case FinishReason.MAX_TOKENS: + return 'length'; + case FinishReason.SAFETY: + return 'content_filter'; + default: + if (geminiReason === ('RECITATION' as FinishReason)) { + return 'content_filter'; + } + return 'stop'; + } +} + +/** + * Clean up orphaned tool calls from message history to prevent OpenAI API errors. + */ +function cleanOrphanedToolCalls( + messages: OpenAI.Chat.ChatCompletionMessageParam[], +): OpenAI.Chat.ChatCompletionMessageParam[] { + const cleaned: OpenAI.Chat.ChatCompletionMessageParam[] = []; + const toolCallIds = new Set(); + const toolResponseIds = new Set(); + + // First pass: collect all tool call IDs and tool response IDs + for (const message of messages) { + if ( + message.role === 'assistant' && + 'tool_calls' in message && + message.tool_calls + ) { + for (const toolCall of message.tool_calls) { + if (toolCall.id) { + toolCallIds.add(toolCall.id); + } + } + } else if ( + message.role === 'tool' && + 'tool_call_id' in message && + message.tool_call_id + ) { + toolResponseIds.add(message.tool_call_id); + } + } + + // Second pass: filter out orphaned messages + for (const message of messages) { + if ( + message.role === 'assistant' && + 'tool_calls' in message && + message.tool_calls + ) { + // Filter out tool calls that don't have corresponding responses + const validToolCalls = message.tool_calls.filter( + (toolCall) => toolCall.id && toolResponseIds.has(toolCall.id), + ); + + if (validToolCalls.length > 0) { + // Keep the message but only with valid tool calls + const cleanedMessage = { ...message }; + ( + cleanedMessage as OpenAI.Chat.ChatCompletionMessageParam & { + tool_calls?: OpenAI.Chat.ChatCompletionMessageToolCall[]; + } + ).tool_calls = validToolCalls; + cleaned.push(cleanedMessage); } else if ( - message.role === 'tool' && - 'tool_call_id' in message && - message.tool_call_id + typeof message.content === 'string' && + message.content.trim() ) { - // Only keep tool responses that have corresponding tool calls - if (toolCallIds.has(message.tool_call_id)) { - cleaned.push(message); - } - } else { - // Keep all other messages as-is + // Keep the message if it has text content, but remove tool calls + const cleanedMessage = { ...message }; + delete ( + cleanedMessage as OpenAI.Chat.ChatCompletionMessageParam & { + tool_calls?: OpenAI.Chat.ChatCompletionMessageToolCall[]; + } + ).tool_calls; + cleaned.push(cleanedMessage); + } + // If no valid tool calls and no content, skip the message entirely + } else if ( + message.role === 'tool' && + 'tool_call_id' in message && + message.tool_call_id + ) { + // Only keep tool responses that have corresponding tool calls + if (toolCallIds.has(message.tool_call_id)) { cleaned.push(message); } + } else { + // Keep all other messages as-is + cleaned.push(message); } - - // Final validation: ensure every assistant message with tool_calls has corresponding tool responses - const finalCleaned: OpenAI.Chat.ChatCompletionMessageParam[] = []; - const finalToolCallIds = new Set(); - - // Collect all remaining tool call IDs - for (const message of cleaned) { - if ( - message.role === 'assistant' && - 'tool_calls' in message && - message.tool_calls - ) { - for (const toolCall of message.tool_calls) { - if (toolCall.id) { - finalToolCallIds.add(toolCall.id); - } - } - } - } - - // Verify all tool calls have responses - const finalToolResponseIds = new Set(); - for (const message of cleaned) { - if ( - message.role === 'tool' && - 'tool_call_id' in message && - message.tool_call_id - ) { - finalToolResponseIds.add(message.tool_call_id); - } - } - - // Remove any remaining orphaned tool calls - for (const message of cleaned) { - if ( - message.role === 'assistant' && - 'tool_calls' in message && - message.tool_calls - ) { - const finalValidToolCalls = message.tool_calls.filter( - (toolCall) => toolCall.id && finalToolResponseIds.has(toolCall.id), - ); - - if (finalValidToolCalls.length > 0) { - const cleanedMessage = { ...message }; - ( - cleanedMessage as OpenAI.Chat.ChatCompletionMessageParam & { - tool_calls?: OpenAI.Chat.ChatCompletionMessageToolCall[]; - } - ).tool_calls = finalValidToolCalls; - finalCleaned.push(cleanedMessage); - } else if ( - typeof message.content === 'string' && - message.content.trim() - ) { - const cleanedMessage = { ...message }; - delete ( - cleanedMessage as OpenAI.Chat.ChatCompletionMessageParam & { - tool_calls?: OpenAI.Chat.ChatCompletionMessageToolCall[]; - } - ).tool_calls; - finalCleaned.push(cleanedMessage); - } - } else { - finalCleaned.push(message); - } - } - - return finalCleaned; } - /** - * Merge consecutive assistant messages to combine split text and tool calls - */ - private mergeConsecutiveAssistantMessages( - messages: OpenAI.Chat.ChatCompletionMessageParam[], - ): OpenAI.Chat.ChatCompletionMessageParam[] { - const merged: OpenAI.Chat.ChatCompletionMessageParam[] = []; + // Final validation: ensure every assistant message with tool_calls has corresponding tool responses + const finalCleaned: OpenAI.Chat.ChatCompletionMessageParam[] = []; + const finalToolCallIds = new Set(); - for (const message of messages) { - if (message.role === 'assistant' && merged.length > 0) { - const lastMessage = merged[merged.length - 1]; + // Collect all remaining tool call IDs + for (const message of cleaned) { + if ( + message.role === 'assistant' && + 'tool_calls' in message && + message.tool_calls + ) { + for (const toolCall of message.tool_calls) { + if (toolCall.id) { + finalToolCallIds.add(toolCall.id); + } + } + } + } - // If the last message is also an assistant message, merge them - if (lastMessage.role === 'assistant') { - const lastToolCalls = - 'tool_calls' in lastMessage ? lastMessage.tool_calls || [] : []; - const currentToolCalls = - 'tool_calls' in message ? message.tool_calls || [] : []; - // Combine content - const lastContent = lastMessage.content; - const currentContent = message.content; + // Verify all tool calls have responses + const finalToolResponseIds = new Set(); + for (const message of cleaned) { + if ( + message.role === 'tool' && + 'tool_call_id' in message && + message.tool_call_id + ) { + finalToolResponseIds.add(message.tool_call_id); + } + } - // Determine if we should use array format (if either content is an array) - const useArrayFormat = - Array.isArray(lastContent) || Array.isArray(currentContent); + // Remove any remaining orphaned tool calls + for (const message of cleaned) { + if ( + message.role === 'assistant' && + 'tool_calls' in message && + message.tool_calls + ) { + const finalValidToolCalls = message.tool_calls.filter( + (toolCall) => toolCall.id && finalToolResponseIds.has(toolCall.id), + ); - let combinedContent: - | string - | OpenAI.Chat.ChatCompletionContentPart[] - | null; - - if (useArrayFormat) { - // Convert both to array format and merge - const lastParts = Array.isArray(lastContent) - ? lastContent - : typeof lastContent === 'string' && lastContent - ? [{ type: 'text' as const, text: lastContent }] - : []; - - const currentParts = Array.isArray(currentContent) - ? currentContent - : typeof currentContent === 'string' && currentContent - ? [{ type: 'text' as const, text: currentContent }] - : []; - - combinedContent = [ - ...lastParts, - ...currentParts, - ] as OpenAI.Chat.ChatCompletionContentPart[]; - } else { - // Both are strings or null, merge as strings - const lastText = typeof lastContent === 'string' ? lastContent : ''; - const currentText = - typeof currentContent === 'string' ? currentContent : ''; - const mergedText = [lastText, currentText].filter(Boolean).join(''); - combinedContent = mergedText || null; + if (finalValidToolCalls.length > 0) { + const cleanedMessage = { ...message }; + ( + cleanedMessage as OpenAI.Chat.ChatCompletionMessageParam & { + tool_calls?: OpenAI.Chat.ChatCompletionMessageToolCall[]; } + ).tool_calls = finalValidToolCalls; + finalCleaned.push(cleanedMessage); + } else if ( + typeof message.content === 'string' && + message.content.trim() + ) { + const cleanedMessage = { ...message }; + delete ( + cleanedMessage as OpenAI.Chat.ChatCompletionMessageParam & { + tool_calls?: OpenAI.Chat.ChatCompletionMessageToolCall[]; + } + ).tool_calls; + finalCleaned.push(cleanedMessage); + } + } else { + finalCleaned.push(message); + } + } - // Combine tool calls - const combinedToolCalls = [...lastToolCalls, ...currentToolCalls]; + return finalCleaned; +} - // Update the last message with combined data +/** + * Merge consecutive assistant messages to combine split text and tool calls. + */ +function mergeConsecutiveAssistantMessages( + messages: OpenAI.Chat.ChatCompletionMessageParam[], +): OpenAI.Chat.ChatCompletionMessageParam[] { + const merged: OpenAI.Chat.ChatCompletionMessageParam[] = []; + + for (const message of messages) { + if (message.role === 'assistant' && merged.length > 0) { + const lastMessage = merged[merged.length - 1]; + + // If the last message is also an assistant message, merge them + if (lastMessage.role === 'assistant') { + const lastToolCalls = + 'tool_calls' in lastMessage ? lastMessage.tool_calls || [] : []; + const currentToolCalls = + 'tool_calls' in message ? message.tool_calls || [] : []; + // Combine content + const lastContent = lastMessage.content; + const currentContent = message.content; + + // Determine if we should use array format (if either content is an array) + const useArrayFormat = + Array.isArray(lastContent) || Array.isArray(currentContent); + + let combinedContent: + | string + | OpenAI.Chat.ChatCompletionContentPart[] + | null; + + if (useArrayFormat) { + // Convert both to array format and merge + const lastParts = Array.isArray(lastContent) + ? lastContent + : typeof lastContent === 'string' && lastContent + ? [{ type: 'text' as const, text: lastContent }] + : []; + + const currentParts = Array.isArray(currentContent) + ? currentContent + : typeof currentContent === 'string' && currentContent + ? [{ type: 'text' as const, text: currentContent }] + : []; + + combinedContent = [ + ...lastParts, + ...currentParts, + ] as OpenAI.Chat.ChatCompletionContentPart[]; + } else { + // Both are strings or null, merge as strings + const lastText = typeof lastContent === 'string' ? lastContent : ''; + const currentText = + typeof currentContent === 'string' ? currentContent : ''; + const mergedText = [lastText, currentText].filter(Boolean).join(''); + combinedContent = mergedText || null; + } + + // Combine tool calls + const combinedToolCalls = [...lastToolCalls, ...currentToolCalls]; + + // Update the last message with combined data + ( + lastMessage as OpenAI.Chat.ChatCompletionMessageParam & { + content: string | OpenAI.Chat.ChatCompletionContentPart[] | null; + tool_calls?: OpenAI.Chat.ChatCompletionMessageToolCall[]; + } + ).content = combinedContent || null; + if (combinedToolCalls.length > 0) { ( lastMessage as OpenAI.Chat.ChatCompletionMessageParam & { content: string | OpenAI.Chat.ChatCompletionContentPart[] | null; tool_calls?: OpenAI.Chat.ChatCompletionMessageToolCall[]; } - ).content = combinedContent || null; - if (combinedToolCalls.length > 0) { - ( - lastMessage as OpenAI.Chat.ChatCompletionMessageParam & { - content: - | string - | OpenAI.Chat.ChatCompletionContentPart[] - | null; - tool_calls?: OpenAI.Chat.ChatCompletionMessageToolCall[]; - } - ).tool_calls = combinedToolCalls; - } - - continue; // Skip adding the current message since it's been merged + ).tool_calls = combinedToolCalls; } - } - // Add the message as-is if no merging is needed - merged.push(message); + continue; // Skip adding the current message since it's been merged + } } - return merged; + // Add the message as-is if no merging is needed + merged.push(message); } + + return merged; } + +export const OpenAIContentConverter = { + convertGeminiToolParametersToOpenAI, + convertGeminiToolsToOpenAI, + convertGeminiRequestToOpenAI, + convertGeminiResponseToOpenAI, + convertOpenAIResponseToGemini, + convertOpenAIChunkToGemini, +}; diff --git a/packages/core/src/core/openaiContentGenerator/errorHandler.test.ts b/packages/core/src/core/openaiContentGenerator/errorHandler.test.ts index a75b43ca8..a844cbfb5 100644 --- a/packages/core/src/core/openaiContentGenerator/errorHandler.test.ts +++ b/packages/core/src/core/openaiContentGenerator/errorHandler.test.ts @@ -7,21 +7,20 @@ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; import type { GenerateContentParameters } from '@google/genai'; import { EnhancedErrorHandler } from './errorHandler.js'; -import type { RequestContext } from './errorHandler.js'; +import type { RequestContext } from './types.js'; describe('EnhancedErrorHandler', () => { + const fixedNow = 10_000; let errorHandler: EnhancedErrorHandler; let mockContext: RequestContext; let mockRequest: GenerateContentParameters; beforeEach(() => { + vi.spyOn(Date, 'now').mockReturnValue(fixedNow); mockContext = { - userPromptId: 'test-prompt-id', model: 'test-model', - authType: 'test-auth', - startTime: Date.now() - 5000, - duration: 5000, - isStreaming: false, + modalities: {}, + startTime: fixedNow - 5000, }; mockRequest = { @@ -194,7 +193,7 @@ describe('EnhancedErrorHandler', () => { errorHandler = new EnhancedErrorHandler(); }); - it('should build timeout error message for non-streaming requests', () => { + it('should build timeout error message', () => { const timeoutError = new Error('timeout'); expect(() => { @@ -204,17 +203,6 @@ describe('EnhancedErrorHandler', () => { ); }); - it('should build timeout error message for streaming requests', () => { - const streamingContext = { ...mockContext, isStreaming: true }; - const timeoutError = new Error('timeout'); - - expect(() => { - errorHandler.handle(timeoutError, streamingContext, mockRequest); - }).toThrow( - /Streaming request timeout after 5s\. Try reducing input length or increasing timeout in config\./, - ); - }); - it('should use original error message for non-timeout errors', () => { const originalError = new Error('Original error message'); @@ -245,7 +233,10 @@ describe('EnhancedErrorHandler', () => { }); it('should handle different duration values correctly', () => { - const contextWithDifferentDuration = { ...mockContext, duration: 12345 }; + const contextWithDifferentDuration = { + ...mockContext, + startTime: fixedNow - 12345, + }; const timeoutError = new Error('timeout'); expect(() => { @@ -263,24 +254,13 @@ describe('EnhancedErrorHandler', () => { errorHandler = new EnhancedErrorHandler(); }); - it('should provide general troubleshooting tips for non-streaming requests', () => { + it('should provide generic troubleshooting tips', () => { const timeoutError = new Error('timeout'); expect(() => { errorHandler.handle(timeoutError, mockContext, mockRequest); }).toThrow( - /Troubleshooting tips:\n- Reduce input length or complexity\n- Increase timeout in config: contentGenerator\.timeout\n- Check network connectivity\n- Consider using streaming mode for long responses/, - ); - }); - - it('should provide streaming-specific troubleshooting tips for streaming requests', () => { - const streamingContext = { ...mockContext, isStreaming: true }; - const timeoutError = new Error('timeout'); - - expect(() => { - errorHandler.handle(timeoutError, streamingContext, mockRequest); - }).toThrow( - /Streaming timeout troubleshooting:\n- Reduce input length or complexity\n- Increase timeout in config: contentGenerator\.timeout\n- Check network connectivity\n- Check network stability for streaming connections\n- Consider using non-streaming mode for very long inputs/, + /Troubleshooting tips:\n- Reduce input length or complexity\n- Increase timeout in config: contentGenerator\.timeout\n- Check network connectivity/, ); }); }); @@ -310,7 +290,7 @@ describe('EnhancedErrorHandler', () => { }); it('should handle zero duration', () => { - const zeroContext = { ...mockContext, duration: 0 }; + const zeroContext = { ...mockContext, startTime: fixedNow }; const timeoutError = new Error('timeout'); expect(() => { @@ -319,7 +299,7 @@ describe('EnhancedErrorHandler', () => { }); it('should handle negative duration', () => { - const negativeContext = { ...mockContext, duration: -1000 }; + const negativeContext = { ...mockContext, startTime: fixedNow + 1000 }; const timeoutError = new Error('timeout'); expect(() => { @@ -328,7 +308,7 @@ describe('EnhancedErrorHandler', () => { }); it('should handle very large duration', () => { - const largeContext = { ...mockContext, duration: 999999 }; + const largeContext = { ...mockContext, startTime: fixedNow - 999999 }; const timeoutError = new Error('timeout'); expect(() => { diff --git a/packages/core/src/core/openaiContentGenerator/errorHandler.ts b/packages/core/src/core/openaiContentGenerator/errorHandler.ts index 8a7509187..231722ac4 100644 --- a/packages/core/src/core/openaiContentGenerator/errorHandler.ts +++ b/packages/core/src/core/openaiContentGenerator/errorHandler.ts @@ -6,29 +6,10 @@ import type { GenerateContentParameters } from '@google/genai'; import { createDebugLogger } from '../../utils/debugLogger.js'; +import type { ErrorHandler, RequestContext } from './types.js'; const debugLogger = createDebugLogger('OPENAI_ERROR'); - -export interface RequestContext { - userPromptId: string; - model: string; - authType: string; - startTime: number; - duration: number; - isStreaming: boolean; -} - -export interface ErrorHandler { - handle( - error: unknown, - context: RequestContext, - request: GenerateContentParameters, - ): never; - shouldSuppressErrorLogging( - error: unknown, - request: GenerateContentParameters, - ): boolean; -} +export type { ErrorHandler } from './types.js'; export class EnhancedErrorHandler implements ErrorHandler { constructor( @@ -48,16 +29,13 @@ export class EnhancedErrorHandler implements ErrorHandler { // Allow subclasses to suppress error logging for specific scenarios if (!this.shouldSuppressErrorLogging(error, request)) { - const logPrefix = context.isStreaming - ? 'OpenAI API Streaming Error:' - : 'OpenAI API Error:'; - debugLogger.error(logPrefix, errorMessage); + debugLogger.error('OpenAI API Error:', errorMessage); } // Provide helpful timeout-specific error message if (isTimeoutError) { throw new Error( - `${errorMessage}\n\n${this.getTimeoutTroubleshootingTips(context)}`, + `${errorMessage}\n\n${this.getTimeoutTroubleshootingTips()}`, ); } @@ -105,36 +83,21 @@ export class EnhancedErrorHandler implements ErrorHandler { context: RequestContext, isTimeoutError: boolean, ): string { - const durationSeconds = Math.round(context.duration / 1000); + const durationSeconds = Math.round((Date.now() - context.startTime) / 1000); if (isTimeoutError) { - const prefix = context.isStreaming - ? 'Streaming request timeout' - : 'Request timeout'; - return `${prefix} after ${durationSeconds}s. Try reducing input length or increasing timeout in config.`; + return `Request timeout after ${durationSeconds}s. Try reducing input length or increasing timeout in config.`; } return error instanceof Error ? error.message : String(error); } - private getTimeoutTroubleshootingTips(context: RequestContext): string { - const baseTitle = context.isStreaming - ? 'Streaming timeout troubleshooting:' - : 'Troubleshooting tips:'; - - const baseTips = [ + private getTimeoutTroubleshootingTips(): string { + const tips = [ '- Reduce input length or complexity', '- Increase timeout in config: contentGenerator.timeout', '- Check network connectivity', ]; - - const streamingSpecificTips = context.isStreaming - ? [ - '- Check network stability for streaming connections', - '- Consider using non-streaming mode for very long inputs', - ] - : ['- Consider using streaming mode for long responses']; - - return `${baseTitle}\n${[...baseTips, ...streamingSpecificTips].join('\n')}`; + return `Troubleshooting tips:\n${tips.join('\n')}`; } } diff --git a/packages/core/src/core/openaiContentGenerator/index.ts b/packages/core/src/core/openaiContentGenerator/index.ts index fee32a049..15b403808 100644 --- a/packages/core/src/core/openaiContentGenerator/index.ts +++ b/packages/core/src/core/openaiContentGenerator/index.ts @@ -20,7 +20,8 @@ import { } from './provider/index.js'; export { OpenAIContentGenerator } from './openaiContentGenerator.js'; -export { ContentGenerationPipeline, type PipelineConfig } from './pipeline.js'; +export { ContentGenerationPipeline } from './pipeline.js'; +export type { ErrorHandler, PipelineConfig, RequestContext } from './types.js'; export { type OpenAICompatibleProvider, @@ -91,4 +92,4 @@ export function determineProvider( return new DefaultOpenAICompatibleProvider(contentGeneratorConfig, cliConfig); } -export { type ErrorHandler, EnhancedErrorHandler } from './errorHandler.js'; +export { EnhancedErrorHandler } from './errorHandler.js'; diff --git a/packages/core/src/core/openaiContentGenerator/openaiContentGenerator.ts b/packages/core/src/core/openaiContentGenerator/openaiContentGenerator.ts index 5c8479bc0..17d751e42 100644 --- a/packages/core/src/core/openaiContentGenerator/openaiContentGenerator.ts +++ b/packages/core/src/core/openaiContentGenerator/openaiContentGenerator.ts @@ -9,7 +9,7 @@ import type { GenerateContentParameters, GenerateContentResponse, } from '@google/genai'; -import type { PipelineConfig } from './pipeline.js'; +import type { PipelineConfig } from './types.js'; import { ContentGenerationPipeline } from './pipeline.js'; import { EnhancedErrorHandler } from './errorHandler.js'; import { RequestTokenEstimator } from '../../utils/request-tokenizer/index.js'; diff --git a/packages/core/src/core/openaiContentGenerator/pipeline.concurrent.test.ts b/packages/core/src/core/openaiContentGenerator/pipeline.concurrent.test.ts index 0c83ab47c..ff36916a4 100644 --- a/packages/core/src/core/openaiContentGenerator/pipeline.concurrent.test.ts +++ b/packages/core/src/core/openaiContentGenerator/pipeline.concurrent.test.ts @@ -18,8 +18,9 @@ * and chunks routed by `index: 0` interleaved into corrupt JSON. * * With the fix, `processStreamWithLogging` creates a fresh - * `ConverterStreamContext` at stream entry, so each concurrent generator - * has its own parser. This test would fail deterministically on pre-fix + * request context with its own `toolCallParser` at stream entry, so each + * concurrent generator has its own parser. This test would fail + * deterministically on pre-fix * code because stream B's entry would wipe stream A's accumulator * mid-flight, and A's finish chunk would emit zero function calls * (`wasOutputTruncated`-style behavior). @@ -29,12 +30,11 @@ import { describe, it, expect, vi } from 'vitest'; import type OpenAI from 'openai'; import type { GenerateContentParameters } from '@google/genai'; import type { Part } from '@google/genai'; -import type { PipelineConfig } from './pipeline.js'; +import type { ErrorHandler, PipelineConfig } from './types.js'; import { ContentGenerationPipeline } from './pipeline.js'; import type { Config } from '../../config/config.js'; import type { ContentGeneratorConfig, AuthType } from '../contentGenerator.js'; import type { OpenAICompatibleProvider } from './provider/index.js'; -import type { ErrorHandler } from './errorHandler.js'; type ChunkFactory = () => OpenAI.Chat.ChatCompletionChunk; diff --git a/packages/core/src/core/openaiContentGenerator/pipeline.test.ts b/packages/core/src/core/openaiContentGenerator/pipeline.test.ts index 235d23c51..473dbe3f0 100644 --- a/packages/core/src/core/openaiContentGenerator/pipeline.test.ts +++ b/packages/core/src/core/openaiContentGenerator/pipeline.test.ts @@ -9,17 +9,23 @@ import { describe, it, expect, beforeEach, vi } from 'vitest'; import type OpenAI from 'openai'; import type { GenerateContentParameters } from '@google/genai'; import { GenerateContentResponse, Type, FinishReason } from '@google/genai'; -import type { PipelineConfig } from './pipeline.js'; +import type { ErrorHandler, PipelineConfig } from './types.js'; import { ContentGenerationPipeline, StreamContentError } from './pipeline.js'; import { OpenAIContentConverter } from './converter.js'; import { StreamingToolCallParser } from './streamingToolCallParser.js'; import type { Config } from '../../config/config.js'; import type { ContentGeneratorConfig, AuthType } from '../contentGenerator.js'; import type { OpenAICompatibleProvider } from './provider/index.js'; -import type { ErrorHandler } from './errorHandler.js'; // Mock dependencies -vi.mock('./converter.js'); +vi.mock('./converter.js', () => ({ + OpenAIContentConverter: { + convertGeminiRequestToOpenAI: vi.fn(), + convertOpenAIResponseToGemini: vi.fn(), + convertOpenAIChunkToGemini: vi.fn(), + convertGeminiToolsToOpenAI: vi.fn(), + }, +})); vi.mock('openai'); describe('ContentGenerationPipeline', () => { @@ -27,7 +33,7 @@ describe('ContentGenerationPipeline', () => { let mockConfig: PipelineConfig; let mockProvider: OpenAICompatibleProvider; let mockClient: OpenAI; - let mockConverter: OpenAIContentConverter; + let mockConverter: typeof OpenAIContentConverter; let mockErrorHandler: ErrorHandler; let mockContentGeneratorConfig: ContentGeneratorConfig; let mockCliConfig: Config; @@ -45,19 +51,9 @@ describe('ContentGenerationPipeline', () => { }, } as unknown as OpenAI; - // Mock converter. `createStreamContext` returns a fresh parser each - // stream; tests that don't care about tool-call buffers just ignore it. - mockConverter = { - setModel: vi.fn(), - setModalities: vi.fn(), - convertGeminiRequestToOpenAI: vi.fn(), - convertOpenAIResponseToGemini: vi.fn(), - convertOpenAIChunkToGemini: vi.fn(), - convertGeminiToolsToOpenAI: vi.fn(), - createStreamContext: vi.fn(() => ({ - toolCallParser: new StreamingToolCallParser(), - })), - } as unknown as OpenAIContentConverter; + // Mock converter methods. The pipeline now snapshots request-scoped state + // into context and calls the stateless converter namespace directly. + mockConverter = OpenAIContentConverter; // Mock provider mockProvider = { @@ -87,11 +83,6 @@ describe('ContentGenerationPipeline', () => { }, } as ContentGeneratorConfig; - // Mock the OpenAIContentConverter constructor - (OpenAIContentConverter as unknown as Mock).mockImplementation( - () => mockConverter, - ); - mockConfig = { cliConfig: mockCliConfig, provider: mockProvider, @@ -105,12 +96,6 @@ describe('ContentGenerationPipeline', () => { describe('constructor', () => { it('should initialize with correct configuration', () => { expect(mockProvider.buildClient).toHaveBeenCalled(); - // Converter is constructed once and the model is updated per-request via setModel(). - expect(OpenAIContentConverter).toHaveBeenCalledWith( - 'test-model', - undefined, - {}, - ); }); }); @@ -152,11 +137,12 @@ describe('ContentGenerationPipeline', () => { // Assert expect(result).toBe(mockGeminiResponse); - expect( - (mockConverter as unknown as { setModel: Mock }).setModel, - ).toHaveBeenCalledWith('test-model'); expect(mockConverter.convertGeminiRequestToOpenAI).toHaveBeenCalledWith( request, + expect.objectContaining({ + model: 'test-model', + modalities: {}, + }), ); expect(mockClient.chat.completions.create).toHaveBeenCalledWith( expect.objectContaining({ @@ -172,6 +158,10 @@ describe('ContentGenerationPipeline', () => { ); expect(mockConverter.convertOpenAIResponseToGemini).toHaveBeenCalledWith( mockOpenAIResponse, + expect.objectContaining({ + model: 'test-model', + modalities: {}, + }), ); }); @@ -211,9 +201,12 @@ describe('ContentGenerationPipeline', () => { // Assert — request.model takes precedence over contentGeneratorConfig.model expect(result).toBe(mockGeminiResponse); - expect( - (mockConverter as unknown as { setModel: Mock }).setModel, - ).toHaveBeenCalledWith('override-model'); + expect(mockConverter.convertGeminiRequestToOpenAI).toHaveBeenCalledWith( + request, + expect.objectContaining({ + model: 'override-model', + }), + ); expect(mockClient.chat.completions.create).toHaveBeenCalledWith( expect.objectContaining({ model: 'override-model', @@ -258,9 +251,12 @@ describe('ContentGenerationPipeline', () => { // Assert — falls back to contentGeneratorConfig.model expect(result).toBe(mockGeminiResponse); - expect( - (mockConverter as unknown as { setModel: Mock }).setModel, - ).toHaveBeenCalledWith('test-model'); + expect(mockConverter.convertGeminiRequestToOpenAI).toHaveBeenCalledWith( + request, + expect.objectContaining({ + model: 'test-model', + }), + ); expect(mockClient.chat.completions.create).toHaveBeenCalledWith( expect.objectContaining({ model: 'test-model', @@ -322,11 +318,15 @@ describe('ContentGenerationPipeline', () => { // Assert expect(result).toBe(mockGeminiResponse); - expect( - (mockConverter as unknown as { setModel: Mock }).setModel, - ).toHaveBeenCalledWith('test-model'); + expect(mockConverter.convertGeminiRequestToOpenAI).toHaveBeenCalledWith( + request, + expect.objectContaining({ + model: 'test-model', + }), + ); expect(mockConverter.convertGeminiToolsToOpenAI).toHaveBeenCalledWith( request.config!.tools, + 'auto', ); expect(mockClient.chat.completions.create).toHaveBeenCalledWith( expect.objectContaining({ @@ -611,9 +611,22 @@ describe('ContentGenerationPipeline', () => { expect(results).toHaveLength(2); expect(results[0]).toBe(mockGeminiResponse1); expect(results[1]).toBe(mockGeminiResponse2); - // Parser is now created per-stream via createStreamContext — assert - // that the pipeline asked for a fresh one at stream entry. - expect(mockConverter.createStreamContext).toHaveBeenCalled(); + const [, firstChunkContext] = ( + mockConverter.convertOpenAIChunkToGemini as Mock + ).mock.calls[0]; + const [, secondChunkContext] = ( + mockConverter.convertOpenAIChunkToGemini as Mock + ).mock.calls[1]; + expect(firstChunkContext).toEqual( + expect.objectContaining({ + model: 'test-model', + modalities: {}, + toolCallParser: expect.any(StreamingToolCallParser), + }), + ); + expect(secondChunkContext.toolCallParser).toBe( + firstChunkContext.toolCallParser, + ); expect(mockClient.chat.completions.create).toHaveBeenCalledWith( expect.objectContaining({ stream: true, @@ -725,10 +738,6 @@ describe('ContentGenerationPipeline', () => { } expect(results).toHaveLength(0); // No results due to error - // createStreamContext is called exactly once at stream entry; the - // error path no longer needs an explicit parser reset because the - // stream-local context is discarded when the generator unwinds. - expect(mockConverter.createStreamContext).toHaveBeenCalledTimes(1); expect(mockErrorHandler.handle).toHaveBeenCalledWith( testError, expect.any(Object), diff --git a/packages/core/src/core/openaiContentGenerator/pipeline.ts b/packages/core/src/core/openaiContentGenerator/pipeline.ts index 77ff96f2f..728968f9a 100644 --- a/packages/core/src/core/openaiContentGenerator/pipeline.ts +++ b/packages/core/src/core/openaiContentGenerator/pipeline.ts @@ -10,11 +10,10 @@ import { type GenerateContentParameters, GenerateContentResponse, } from '@google/genai'; -import type { Config } from '../../config/config.js'; import type { ContentGeneratorConfig } from '../contentGenerator.js'; -import type { OpenAICompatibleProvider } from './provider/index.js'; import { OpenAIContentConverter } from './converter.js'; -import type { ErrorHandler, RequestContext } from './errorHandler.js'; +import { StreamingToolCallParser } from './streamingToolCallParser.js'; +import type { PipelineConfig, RequestContext } from './types.js'; /** * The OpenAI SDK adds an abort listener for every `chat.completions.create` @@ -46,44 +45,27 @@ export class StreamContentError extends Error { } } -export interface PipelineConfig { - cliConfig: Config; - provider: OpenAICompatibleProvider; - contentGeneratorConfig: ContentGeneratorConfig; - errorHandler: ErrorHandler; -} +export type { PipelineConfig } from './types.js'; export class ContentGenerationPipeline { client: OpenAI; - private converter: OpenAIContentConverter; private contentGeneratorConfig: ContentGeneratorConfig; constructor(private config: PipelineConfig) { this.contentGeneratorConfig = config.contentGeneratorConfig; this.client = this.config.provider.buildClient(); - this.converter = new OpenAIContentConverter( - this.contentGeneratorConfig.model, - this.contentGeneratorConfig.schemaCompliance, - this.contentGeneratorConfig.modalities ?? {}, - ); } async execute( request: GenerateContentParameters, userPromptId: string, ): Promise { - // Use request.model when explicitly provided (e.g., fastModel for suggestion - // generation), falling back to the configured model as the default. - const effectiveModel = request.model || this.contentGeneratorConfig.model; - this.converter.setModel(effectiveModel); - this.converter.setModalities(this.contentGeneratorConfig.modalities ?? {}); raiseAbortListenerCap(request.config?.abortSignal); return this.executeWithErrorHandling( request, userPromptId, false, - effectiveModel, - async (openaiRequest) => { + async (openaiRequest, context) => { const openaiResponse = (await this.client.chat.completions.create( openaiRequest, { @@ -92,7 +74,10 @@ export class ContentGenerationPipeline { )) as OpenAI.Chat.ChatCompletion; const geminiResponse = - this.converter.convertOpenAIResponseToGemini(openaiResponse); + OpenAIContentConverter.convertOpenAIResponseToGemini( + openaiResponse, + context, + ); return geminiResponse; }, @@ -103,15 +88,11 @@ export class ContentGenerationPipeline { request: GenerateContentParameters, userPromptId: string, ): Promise> { - const effectiveModel = request.model || this.contentGeneratorConfig.model; - this.converter.setModel(effectiveModel); - this.converter.setModalities(this.contentGeneratorConfig.modalities ?? {}); raiseAbortListenerCap(request.config?.abortSignal); return this.executeWithErrorHandling( request, userPromptId, true, - effectiveModel, async (openaiRequest, context) => { // Stage 1: Create OpenAI stream const stream = (await this.client.chat.completions.create( @@ -143,13 +124,6 @@ export class ContentGenerationPipeline { ): AsyncGenerator { const collectedGeminiResponses: GenerateContentResponse[] = []; - // Stream-local parser state. Previously the tool-call parser lived on - // the Converter singleton and was reset at stream start — but that - // wiped concurrent streams' in-flight buffers (e.g. parallel subagents - // sharing the same Config.contentGenerator). Scoping it per-stream - // fixes issue #3516. - const streamCtx = this.converter.createStreamContext(); - // State for handling chunk merging. // pendingFinishResponse holds a finish chunk waiting to be merged with // a subsequent usage-metadata chunk before yielding. @@ -174,9 +148,9 @@ export class ContentGenerationPipeline { throw new StreamContentError(errorContent); } - const response = this.converter.convertOpenAIChunkToGemini( + const response = OpenAIContentConverter.convertOpenAIChunkToGemini( chunk, - streamCtx, + context, ); // Stage 2b: Filter empty responses to avoid downstream issues @@ -237,13 +211,7 @@ export class ContentGenerationPipeline { if (pendingFinishResponse && !finishYielded) { yield pendingFinishResponse; } - - // Stage 2e: Stream completed successfully - context.duration = Date.now() - context.startTime; } catch (error) { - // No manual parser cleanup needed — streamCtx is stream-local and - // becomes eligible for garbage collection once this generator unwinds. - // Re-throw StreamContentError directly so it can be handled by // the caller's retry logic (e.g., TPM throttling retry in sendMessageStream) if (error instanceof StreamContentError) { @@ -337,20 +305,23 @@ export class ContentGenerationPipeline { private async buildRequest( request: GenerateContentParameters, userPromptId: string, - streaming: boolean = false, - effectiveModel: string, + context: RequestContext, + isStreaming: boolean, ): Promise { - const messages = this.converter.convertGeminiRequestToOpenAI(request); + const messages = OpenAIContentConverter.convertGeminiRequestToOpenAI( + request, + context, + ); // Apply provider-specific enhancements const baseRequest: OpenAI.Chat.ChatCompletionCreateParams = { - model: effectiveModel, + model: context.model, messages, ...this.buildGenerateContentConfig(request), }; // Add streaming options if present - if (streaming) { + if (isStreaming) { ( baseRequest as unknown as OpenAI.Chat.ChatCompletionCreateParamsStreaming ).stream = true; @@ -360,9 +331,11 @@ export class ContentGenerationPipeline { // Add tools if present and non-empty. // Some providers reject tools: [] (empty array), so skip when there are no tools. if (request.config?.tools && request.config.tools.length > 0) { - baseRequest.tools = await this.converter.convertGeminiToolsToOpenAI( - request.config.tools, - ); + baseRequest.tools = + await OpenAIContentConverter.convertGeminiToolsToOpenAI( + request.config.tools, + this.contentGeneratorConfig.schemaCompliance ?? 'auto', + ); } // Let provider enhance the request (e.g., add metadata, cache control) @@ -497,29 +470,22 @@ export class ContentGenerationPipeline { request: GenerateContentParameters, userPromptId: string, isStreaming: boolean, - effectiveModel: string, executor: ( openaiRequest: OpenAI.Chat.ChatCompletionCreateParams, context: RequestContext, ) => Promise, ): Promise { - const context = this.createRequestContext( - userPromptId, - isStreaming, - effectiveModel, - ); + const context = this.createRequestContext(request, isStreaming); try { const openaiRequest = await this.buildRequest( request, userPromptId, + context, isStreaming, - effectiveModel, ); const result = await executor(openaiRequest, context); - - context.duration = Date.now() - context.startTime; return result; } catch (error) { // Use shared error handling logic @@ -536,7 +502,6 @@ export class ContentGenerationPipeline { context: RequestContext, request: GenerateContentParameters, ): Promise { - context.duration = Date.now() - context.startTime; this.config.errorHandler.handle(error, context, request); } @@ -544,17 +509,19 @@ export class ContentGenerationPipeline { * Create request context with common properties */ private createRequestContext( - userPromptId: string, + request: GenerateContentParameters, isStreaming: boolean, - effectiveModel: string, ): RequestContext { + const effectiveModel = request.model || this.contentGeneratorConfig.model; + const toolCallParser = isStreaming + ? new StreamingToolCallParser() + : undefined; + return { - userPromptId, model: effectiveModel, - authType: this.contentGeneratorConfig.authType || 'unknown', + modalities: this.contentGeneratorConfig.modalities ?? {}, startTime: Date.now(), - duration: 0, - isStreaming, + ...(toolCallParser ? { toolCallParser } : {}), }; } } diff --git a/packages/core/src/core/openaiContentGenerator/types.ts b/packages/core/src/core/openaiContentGenerator/types.ts new file mode 100644 index 000000000..630612172 --- /dev/null +++ b/packages/core/src/core/openaiContentGenerator/types.ts @@ -0,0 +1,40 @@ +/** + * @license + * Copyright 2025 Qwen + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { GenerateContentParameters } from '@google/genai'; +import type { Config } from '../../config/config.js'; +import type { + ContentGeneratorConfig, + InputModalities, +} from '../contentGenerator.js'; +import type { OpenAICompatibleProvider } from './provider/index.js'; +import type { StreamingToolCallParser } from './streamingToolCallParser.js'; + +export interface RequestContext { + model: string; + modalities: InputModalities; + startTime: number; + toolCallParser?: StreamingToolCallParser; +} + +export interface ErrorHandler { + handle( + error: unknown, + context: RequestContext, + request: GenerateContentParameters, + ): never; + shouldSuppressErrorLogging( + error: unknown, + request: GenerateContentParameters, + ): boolean; +} + +export interface PipelineConfig { + cliConfig: Config; + provider: OpenAICompatibleProvider; + contentGeneratorConfig: ContentGeneratorConfig; + errorHandler: ErrorHandler; +}