refactor(core): make OpenAI converter stateless (follow-up to #3525) (#3550)

* refactor(core): make OpenAI converter stateless to prevent shared-state races

Follow-up to #3525. #3516 showed that OpenAIContentConverter's long-lived
per-pipeline state raced between concurrent streams; #3525 scoped the
streaming tool-call parser, this removes the remaining shared state.

- OpenAIContentConverter is now a module of stand-alone functions; the
  exported symbol is a namespace object preserved for call-site
  compatibility.
- New RequestContext (in types.ts, alongside PipelineConfig and
  ErrorHandler) carries model, modalities, startTime, and an optional
  per-stream toolCallParser. The pipeline builds one per request and
  threads it through every conversion call.
- errorHandler drops duration/isStreaming; duration is recomputed from
  startTime at error time and troubleshooting text is uniform.
- convertOpenAIChunkToGemini now throws if toolCallParser is missing so
  future misuse surfaces loudly instead of silently constructing a
  one-shot parser per chunk.

* test(core): align timeout expectations
This commit is contained in:
tanzhenxin 2026-04-24 12:28:03 +08:00 committed by GitHub
parent 5556699e43
commit 53293e4d85
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 1684 additions and 1586 deletions

View file

@ -215,13 +215,12 @@ describe('OpenAIContentGenerator Timeout Handling', () => {
expect(errorMessage).toContain('Reduce input length or complexity');
expect(errorMessage).toContain('Increase timeout in config');
expect(errorMessage).toContain('Check network connectivity');
expect(errorMessage).toContain('Consider using streaming mode');
}
});
});
describe('generateContentStream timeout handling', () => {
it('should handle streaming timeout errors', async () => {
it('should handle streaming timeout errors with the shared timeout message', async () => {
const timeoutError = new Error('Streaming timeout');
mockOpenAIClient.chat.completions.create.mockRejectedValue(timeoutError);
@ -233,11 +232,11 @@ describe('OpenAIContentGenerator Timeout Handling', () => {
await expect(
generator.generateContentStream(request, 'test-prompt-id'),
).rejects.toThrow(
/Streaming request timeout after \d+s\. Try reducing input length or increasing timeout in config\./,
/Request timeout after \d+s\. Try reducing input length or increasing timeout in config\./,
);
});
it('should include streaming-specific troubleshooting tips', async () => {
it('should include the shared troubleshooting tips for streaming timeouts', async () => {
const timeoutError = new Error('request timed out');
mockOpenAIClient.chat.completions.create.mockRejectedValue(timeoutError);
@ -251,9 +250,10 @@ describe('OpenAIContentGenerator Timeout Handling', () => {
} catch (error: unknown) {
const errorMessage =
error instanceof Error ? error.message : String(error);
expect(errorMessage).toContain('Streaming timeout troubleshooting:');
expect(errorMessage).toContain('Troubleshooting tips:');
expect(errorMessage).toContain('Reduce input length or complexity');
expect(errorMessage).toContain('Increase timeout in config');
expect(errorMessage).toContain('Check network connectivity');
expect(errorMessage).toContain('Consider using non-streaming mode');
}
});
});

View file

@ -41,15 +41,15 @@ vi.mock('../../utils/openaiLogger.js', () => ({
}));
const realConvertGeminiRequestToOpenAI =
OpenAIContentConverter.prototype.convertGeminiRequestToOpenAI;
OpenAIContentConverter.convertGeminiRequestToOpenAI;
const convertGeminiRequestToOpenAISpy = vi
.spyOn(OpenAIContentConverter.prototype, 'convertGeminiRequestToOpenAI')
.spyOn(OpenAIContentConverter, 'convertGeminiRequestToOpenAI')
.mockReturnValue([{ role: 'user', content: 'converted' }]);
const convertGeminiToolsToOpenAISpy = vi
.spyOn(OpenAIContentConverter.prototype, 'convertGeminiToolsToOpenAI')
.spyOn(OpenAIContentConverter, 'convertGeminiToolsToOpenAI')
.mockResolvedValue([{ type: 'function', function: { name: 'tool' } }]);
const convertGeminiResponseToOpenAISpy = vi
.spyOn(OpenAIContentConverter.prototype, 'convertGeminiResponseToOpenAI')
.spyOn(OpenAIContentConverter, 'convertGeminiResponseToOpenAI')
.mockReturnValue({
id: 'openai-response',
object: 'chat.completion',
@ -57,10 +57,6 @@ const convertGeminiResponseToOpenAISpy = vi
model: 'test-model',
choices: [],
} as OpenAI.Chat.ChatCompletion);
const setModalitiesSpy = vi.spyOn(
OpenAIContentConverter.prototype,
'setModalities',
);
const createConfig = (overrides: Record<string, unknown> = {}): Config => {
const configContent = {
@ -121,7 +117,6 @@ describe('LoggingContentGenerator', () => {
convertGeminiRequestToOpenAISpy.mockClear();
convertGeminiToolsToOpenAISpy.mockClear();
convertGeminiResponseToOpenAISpy.mockClear();
setModalitiesSpy.mockClear();
});
it('logs request/response, normalizes thought parts, and logs OpenAI interaction', async () => {
@ -409,13 +404,13 @@ describe('LoggingContentGenerator', () => {
});
it('uses generator modalities when converting logged OpenAI requests', async () => {
convertGeminiRequestToOpenAISpy.mockImplementationOnce(function (
this: OpenAIContentConverter,
convertGeminiRequestToOpenAISpy.mockImplementationOnce(
(request, requestContext, options) => realConvertGeminiRequestToOpenAI(
request,
requestContext,
options,
) {
return realConvertGeminiRequestToOpenAI.call(this, request, options);
});
),
);
const wrapped = createWrappedGenerator(
vi
@ -458,7 +453,14 @@ describe('LoggingContentGenerator', () => {
await generator.generateContent(request, 'prompt-5');
expect(setModalitiesSpy).toHaveBeenCalledWith({ image: true });
expect(convertGeminiRequestToOpenAISpy).toHaveBeenCalledWith(
request,
expect.objectContaining({
model: 'test-model',
modalities: { image: true },
}),
{ cleanOrphanToolCalls: false },
);
const openaiLoggerInstance = vi.mocked(OpenAILogger).mock.results[0]
?.value as { logInteraction: ReturnType<typeof vi.fn> };

View file

@ -39,6 +39,7 @@ import type {
InputModalities,
} from '../contentGenerator.js';
import { OpenAIContentConverter } from '../openaiContentGenerator/converter.js';
import type { RequestContext } from '../openaiContentGenerator/types.js';
import { OpenAILogger } from '../../utils/openaiLogger.js';
import {
getErrorMessage,
@ -295,14 +296,14 @@ export class LoggingContentGenerator implements ContentGenerator {
return undefined;
}
const converter = new OpenAIContentConverter(
request.model,
this.schemaCompliance,
);
converter.setModalities(this.modalities ?? {});
const messages = converter.convertGeminiRequestToOpenAI(request, {
const requestContext = this.createLoggingRequestContext(request.model);
const messages = OpenAIContentConverter.convertGeminiRequestToOpenAI(
request,
requestContext,
{
cleanOrphanToolCalls: false,
});
},
);
const openaiRequest: OpenAI.Chat.ChatCompletionCreateParams = {
model: request.model,
@ -310,8 +311,10 @@ export class LoggingContentGenerator implements ContentGenerator {
};
if (request.config?.tools) {
openaiRequest.tools = await converter.convertGeminiToolsToOpenAI(
openaiRequest.tools =
await OpenAIContentConverter.convertGeminiToolsToOpenAI(
request.config.tools,
this.schemaCompliance ?? 'auto',
);
}
@ -334,6 +337,14 @@ export class LoggingContentGenerator implements ContentGenerator {
return openaiRequest;
}
private createLoggingRequestContext(model: string): RequestContext {
return {
model,
modalities: this.modalities ?? {},
startTime: 0,
};
}
private async logOpenAIInteraction(
openaiRequest: OpenAI.Chat.ChatCompletionCreateParams | undefined,
response?: GenerateContentResponse,
@ -362,12 +373,10 @@ export class LoggingContentGenerator implements ContentGenerator {
response: GenerateContentResponse,
openaiRequest: OpenAI.Chat.ChatCompletionCreateParams,
): OpenAI.Chat.ChatCompletion {
const converter = new OpenAIContentConverter(
openaiRequest.model,
this.schemaCompliance,
return OpenAIContentConverter.convertGeminiResponseToOpenAI(
response,
this.createLoggingRequestContext(openaiRequest.model),
);
return converter.convertGeminiResponseToOpenAI(response);
}
private consolidateGeminiResponsesForLogging(

View file

@ -7,6 +7,7 @@
import { describe, it, expect, beforeEach } from 'vitest';
import { OpenAIContentConverter } from './converter.js';
import { StreamingToolCallParser } from './streamingToolCallParser.js';
import type { RequestContext } from './types.js';
import {
Type,
FinishReason,
@ -20,25 +21,40 @@ import type OpenAI from 'openai';
import { convertToFunctionResponse } from '../coreToolScheduler.js';
describe('OpenAIContentConverter', () => {
let converter: OpenAIContentConverter;
let converter: typeof OpenAIContentConverter;
let requestContext: RequestContext;
beforeEach(() => {
converter = new OpenAIContentConverter('test-model', 'auto', {
converter = OpenAIContentConverter;
requestContext = {
model: 'test-model',
modalities: {
image: true,
pdf: true,
audio: true,
video: true,
});
},
startTime: 0,
};
});
describe('createStreamContext', () => {
it('returns a fresh context with its own StreamingToolCallParser', () => {
const ctx1 = converter.createStreamContext();
const ctx2 = converter.createStreamContext();
function withStreamParser(
toolCallParser: StreamingToolCallParser = new StreamingToolCallParser(),
): RequestContext {
return {
...requestContext,
toolCallParser,
};
}
expect(ctx1.toolCallParser).toBeInstanceOf(StreamingToolCallParser);
expect(ctx2.toolCallParser).toBeInstanceOf(StreamingToolCallParser);
expect(ctx1.toolCallParser).not.toBe(ctx2.toolCallParser);
describe('stream-local parser state', () => {
it('creates fresh parser instances', () => {
const ctx1 = new StreamingToolCallParser();
const ctx2 = new StreamingToolCallParser();
expect(ctx1).toBeInstanceOf(StreamingToolCallParser);
expect(ctx2).toBeInstanceOf(StreamingToolCallParser);
expect(ctx1).not.toBe(ctx2);
});
it('isolates two contexts so writes to one do not leak into the other', () => {
@ -46,16 +62,16 @@ describe('OpenAIContentConverter', () => {
// Converter as an instance field, so two concurrent streams sharing
// the same Config.contentGenerator would overwrite each other's
// tool-call buffers. Per-stream contexts eliminate that contention.
const ctx1 = converter.createStreamContext();
const ctx2 = converter.createStreamContext();
const ctx1 = new StreamingToolCallParser();
const ctx2 = new StreamingToolCallParser();
ctx1.toolCallParser.addChunk(0, '{"a":1}', 'call_A', 'fn_A');
ctx2.toolCallParser.addChunk(0, '{"b":2}', 'call_B', 'fn_B');
ctx1.addChunk(0, '{"a":1}', 'call_A', 'fn_A');
ctx2.addChunk(0, '{"b":2}', 'call_B', 'fn_B');
expect(ctx1.toolCallParser.getBuffer(0)).toBe('{"a":1}');
expect(ctx2.toolCallParser.getBuffer(0)).toBe('{"b":2}');
expect(ctx1.toolCallParser.getToolCallMeta(0).id).toBe('call_A');
expect(ctx2.toolCallParser.getToolCallMeta(0).id).toBe('call_B');
expect(ctx1.getBuffer(0)).toBe('{"a":1}');
expect(ctx2.getBuffer(0)).toBe('{"b":2}');
expect(ctx1.getToolCallMeta(0).id).toBe('call_A');
expect(ctx2.getToolCallMeta(0).id).toBe('call_B');
});
it('demuxes interleaved chunks from two concurrent streams correctly (#3516)', () => {
@ -64,8 +80,8 @@ describe('OpenAIContentConverter', () => {
// interleaved at the event loop. Under the pre-fix architecture
// this corrupted both tool calls; under per-stream contexts each
// stream's chunks stay in their own parser and close cleanly.
const streamA = converter.createStreamContext();
const streamB = converter.createStreamContext();
const streamA = withStreamParser(new StreamingToolCallParser());
const streamB = withStreamParser(new StreamingToolCallParser());
const openerA = {
object: 'chat.completion.chunk',
@ -239,7 +255,10 @@ describe('OpenAIContentConverter', () => {
output: 'Raw output text',
});
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
const toolMessage = messages.find((message) => message.role === 'tool');
expect(toolMessage).toBeDefined();
@ -257,7 +276,10 @@ describe('OpenAIContentConverter', () => {
error: 'Command failed',
});
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
const toolMessage = messages.find((message) => message.role === 'tool');
expect(toolMessage).toBeDefined();
@ -275,7 +297,10 @@ describe('OpenAIContentConverter', () => {
data: { value: 42 },
});
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
const toolMessage = messages.find((message) => message.role === 'tool');
expect(toolMessage).toBeDefined();
@ -327,7 +352,10 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
// Should have tool message with both text and image content
const toolMessage = messages.find((message) => message.role === 'tool');
@ -393,7 +421,10 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
// Should have tool message with both text and image content
const toolMessage = messages.find((message) => message.role === 'tool');
@ -455,7 +486,10 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
// Should have tool message with both text and file content
const toolMessage = messages.find((message) => message.role === 'tool');
@ -519,7 +553,10 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
// Should have tool message with both text and audio content
const toolMessage = messages.find((message) => message.role === 'tool');
@ -585,7 +622,10 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
const toolMessage = messages.find((message) => message.role === 'tool');
expect(toolMessage).toBeDefined();
@ -645,7 +685,10 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
const toolMessage = messages.find((message) => message.role === 'tool');
expect(toolMessage).toBeDefined();
@ -705,7 +748,10 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
// Should have tool message with both text and video content
const toolMessage = messages.find((message) => message.role === 'tool');
@ -769,7 +815,10 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
const toolMessage = messages.find((message) => message.role === 'tool');
expect(toolMessage).toBeDefined();
@ -828,7 +877,10 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
const toolMessage = messages.find((message) => message.role === 'tool');
expect(toolMessage).toBeDefined();
@ -886,7 +938,10 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
const toolMessage = messages.find((message) => message.role === 'tool');
expect(toolMessage).toBeDefined();
@ -909,7 +964,10 @@ describe('OpenAIContentConverter', () => {
output: 'Plain text output',
});
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
const toolMessage = messages.find((message) => message.role === 'tool');
expect(toolMessage).toBeDefined();
@ -961,7 +1019,10 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
// Should create an assistant message with tool call and a tool message with empty content
// This is required because OpenAI API expects every tool call to have a corresponding response
@ -1005,7 +1066,10 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
const assistantMsg = messages.find((m) => m.role === 'assistant');
expect(assistantMsg).toBeDefined();
@ -1050,7 +1114,10 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
const assistantMsg = messages.find((m) => m.role === 'assistant');
expect(assistantMsg).toBeDefined();
@ -1073,7 +1140,10 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
const assistantMsg = messages.find((m) => m.role === 'assistant');
expect(assistantMsg).toBeDefined();
@ -1136,7 +1206,10 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
const toolMessage = messages.find((m) => m.role === 'tool');
expect(toolMessage).toBeDefined();
@ -1204,7 +1277,10 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
const toolMessage = messages.find((m) => m.role === 'tool');
expect(toolMessage).toBeDefined();
@ -1234,13 +1310,16 @@ describe('OpenAIContentConverter', () => {
describe('convertOpenAIResponseToGemini', () => {
it('should handle empty choices array without crashing', () => {
const response = converter.convertOpenAIResponseToGemini({
const response = converter.convertOpenAIResponseToGemini(
{
object: 'chat.completion',
id: 'chatcmpl-empty',
created: 123,
model: 'test-model',
choices: [],
} as unknown as OpenAI.Chat.ChatCompletion);
} as unknown as OpenAI.Chat.ChatCompletion,
requestContext,
);
expect(response.candidates).toEqual([]);
});
@ -1248,7 +1327,8 @@ describe('OpenAIContentConverter', () => {
describe('OpenAI -> Gemini reasoning content', () => {
it('should convert reasoning_content to a thought part for non-streaming responses', () => {
const response = converter.convertOpenAIResponseToGemini({
const response = converter.convertOpenAIResponseToGemini(
{
object: 'chat.completion',
id: 'chatcmpl-1',
created: 123,
@ -1265,7 +1345,9 @@ describe('OpenAIContentConverter', () => {
logprobs: null,
},
],
} as unknown as OpenAI.Chat.ChatCompletion);
} as unknown as OpenAI.Chat.ChatCompletion,
requestContext,
);
const parts = response.candidates?.[0]?.content?.parts;
expect(parts?.[0]).toEqual(
@ -1277,7 +1359,8 @@ describe('OpenAIContentConverter', () => {
});
it('should convert reasoning to a thought part for non-streaming responses', () => {
const response = converter.convertOpenAIResponseToGemini({
const response = converter.convertOpenAIResponseToGemini(
{
object: 'chat.completion',
id: 'chatcmpl-2',
created: 123,
@ -1294,7 +1377,9 @@ describe('OpenAIContentConverter', () => {
logprobs: null,
},
],
} as unknown as OpenAI.Chat.ChatCompletion);
} as unknown as OpenAI.Chat.ChatCompletion,
requestContext,
);
const parts = response.candidates?.[0]?.content?.parts;
expect(parts?.[0]).toEqual(
@ -1324,7 +1409,7 @@ describe('OpenAIContentConverter', () => {
],
model: 'gpt-test',
} as unknown as OpenAI.Chat.ChatCompletionChunk,
converter.createStreamContext(),
withStreamParser(new StreamingToolCallParser()),
);
const parts = chunk.candidates?.[0]?.content?.parts;
@ -1355,7 +1440,7 @@ describe('OpenAIContentConverter', () => {
],
model: 'gpt-test',
} as unknown as OpenAI.Chat.ChatCompletionChunk,
converter.createStreamContext(),
withStreamParser(new StreamingToolCallParser()),
);
const parts = chunk.candidates?.[0]?.content?.parts;
@ -1384,7 +1469,7 @@ describe('OpenAIContentConverter', () => {
],
model: 'gpt-test',
} as unknown as OpenAI.Chat.ChatCompletionChunk,
converter.createStreamContext(),
withStreamParser(new StreamingToolCallParser()),
);
const parts = chunk.candidates?.[0]?.content?.parts;
@ -1742,7 +1827,10 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
expect(messages).toHaveLength(1);
expect(messages[0].role).toBe('assistant');
@ -1768,7 +1856,10 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
expect(messages).toHaveLength(1);
expect(messages[0].role).toBe('assistant');
@ -1830,9 +1921,13 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request, {
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
{
cleanOrphanToolCalls: false,
});
},
);
// Should have: assistant (tool_call_1), tool (result_1), assistant (tool_call_2), tool (result_2)
expect(messages).toHaveLength(4);
@ -1861,7 +1956,10 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
expect(messages).toHaveLength(3);
expect(messages[0].role).toBe('assistant');
@ -1884,7 +1982,10 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
expect(messages).toHaveLength(1);
expect(messages[0].content).toBe('Text partAnother text');
@ -1909,7 +2010,10 @@ describe('OpenAIContentConverter', () => {
],
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
// Empty messages should be filtered out
expect(messages).toHaveLength(1);
@ -1928,15 +2032,21 @@ describe('MCP tool result end-to-end through OpenAI converter (issue #1520)', ()
* Verifies that multi-part MCP tool results are properly carried through
* into the OpenAI tool message, with no content leaking into user messages.
*/
let converter: OpenAIContentConverter;
let converter: typeof OpenAIContentConverter;
let requestContext: RequestContext;
beforeEach(() => {
converter = new OpenAIContentConverter('test-model', 'auto', {
converter = OpenAIContentConverter;
requestContext = {
model: 'test-model',
modalities: {
image: true,
pdf: true,
audio: true,
video: true,
});
},
startTime: 0,
};
});
it('should preserve MCP multi-text content in tool message (not leak to user message)', () => {
@ -1983,7 +2093,10 @@ describe('MCP tool result end-to-end through OpenAI converter (issue #1520)', ()
model: 'models/test',
contents,
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
const toolMessages = messages.filter((m) => m.role === 'tool');
const userMessages = messages.filter((m) => m.role === 'user');
@ -2053,7 +2166,10 @@ describe('MCP tool result end-to-end through OpenAI converter (issue #1520)', ()
model: 'models/test',
contents,
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
const toolMessages = messages.filter((m) => m.role === 'tool');
const userMessages = messages.filter((m) => m.role === 'user');
@ -2115,7 +2231,10 @@ describe('MCP tool result end-to-end through OpenAI converter (issue #1520)', ()
model: 'models/test',
contents,
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
const toolMessages = messages.filter((m) => m.role === 'tool');
const userMessages = messages.filter((m) => m.role === 'user');
@ -2180,7 +2299,10 @@ describe('MCP tool result end-to-end through OpenAI converter (issue #1520)', ()
model: 'models/test',
contents,
};
const messages = converter.convertGeminiRequestToOpenAI(request);
const messages = converter.convertGeminiRequestToOpenAI(
request,
requestContext,
);
const toolMessages = messages.filter((m) => m.role === 'tool');
const userMessages = messages.filter((m) => m.role === 'user');
@ -2211,18 +2333,27 @@ describe('MCP tool result end-to-end through OpenAI converter (issue #1520)', ()
});
describe('Truncated tool call detection in streaming', () => {
let converter: OpenAIContentConverter;
let converter: typeof OpenAIContentConverter;
beforeEach(() => {
converter = new OpenAIContentConverter('test-model');
converter = OpenAIContentConverter;
});
function createStreamingRequestContext(model = 'test-model'): RequestContext {
return {
model,
modalities: {},
startTime: 0,
toolCallParser: new StreamingToolCallParser(),
};
}
/**
* Helper: feed streaming chunks then a final chunk with finish_reason,
* and return the Gemini response for the final chunk.
*/
function feedToolCallChunks(
conv: OpenAIContentConverter,
conv: typeof OpenAIContentConverter,
toolCallChunks: Array<{
index: number;
id?: string;
@ -2232,7 +2363,7 @@ describe('Truncated tool call detection in streaming', () => {
finishReason: string,
) {
// One stream-local context covers every chunk of this simulated stream.
const ctx = conv.createStreamContext();
const ctx = createStreamingRequestContext();
// Feed argument chunks (no finish_reason yet)
for (const tc of toolCallChunks) {
@ -2384,8 +2515,8 @@ describe('Truncated tool call detection in streaming', () => {
it('should detect truncation with multi-chunk streaming arguments', () => {
// Feed arguments in multiple small chunks like real streaming
const conv = new OpenAIContentConverter('test-model');
const ctx = conv.createStreamContext();
const conv = OpenAIContentConverter;
const ctx = createStreamingRequestContext();
// Chunk 1: start of JSON with tool metadata
conv.convertOpenAIChunkToGemini(
@ -2472,6 +2603,17 @@ describe('modality filtering', () => {
};
}
function makeRequestContext(
model: string,
modalities: RequestContext['modalities'],
): RequestContext {
return {
model,
modalities,
startTime: 0,
};
}
function getUserContentParts(
messages: OpenAI.Chat.ChatCompletionMessageParam[],
): Array<{ type: string; text?: string }> {
@ -2487,14 +2629,17 @@ describe('modality filtering', () => {
}
it('replaces image with placeholder when image modality is disabled', () => {
const conv = new OpenAIContentConverter('deepseek-chat', 'auto', {});
const conv = OpenAIContentConverter;
const request = makeRequest([
{
inlineData: { mimeType: 'image/png', data: 'abc123' },
displayName: 'screenshot.png',
} as unknown as Part,
]);
const messages = conv.convertGeminiRequestToOpenAI(request);
const messages = conv.convertGeminiRequestToOpenAI(
request,
makeRequestContext('deepseek-chat', {}),
);
const parts = getUserContentParts(messages);
expect(parts).toHaveLength(1);
expect(parts[0].type).toBe('text');
@ -2503,22 +2648,23 @@ describe('modality filtering', () => {
});
it('keeps image when image modality is enabled', () => {
const conv = new OpenAIContentConverter('gpt-4o', 'auto', { image: true });
const conv = OpenAIContentConverter;
const request = makeRequest([
{
inlineData: { mimeType: 'image/png', data: 'abc123' },
} as unknown as Part,
]);
const messages = conv.convertGeminiRequestToOpenAI(request);
const messages = conv.convertGeminiRequestToOpenAI(
request,
makeRequestContext('gpt-4o', { image: true }),
);
const parts = getUserContentParts(messages);
expect(parts).toHaveLength(1);
expect(parts[0].type).toBe('image_url');
});
it('replaces PDF with placeholder when pdf modality is disabled', () => {
const conv = new OpenAIContentConverter('test-model', 'auto', {
image: true,
});
const conv = OpenAIContentConverter;
const request = makeRequest([
{
inlineData: {
@ -2528,7 +2674,10 @@ describe('modality filtering', () => {
},
} as unknown as Part,
]);
const messages = conv.convertGeminiRequestToOpenAI(request);
const messages = conv.convertGeminiRequestToOpenAI(
request,
makeRequestContext('test-model', { image: true }),
);
const parts = getUserContentParts(messages);
expect(parts).toHaveLength(1);
expect(parts[0].type).toBe('text');
@ -2537,10 +2686,7 @@ describe('modality filtering', () => {
});
it('keeps PDF when pdf modality is enabled', () => {
const conv = new OpenAIContentConverter('claude-sonnet', 'auto', {
image: true,
pdf: true,
});
const conv = OpenAIContentConverter;
const request = makeRequest([
{
inlineData: {
@ -2550,20 +2696,26 @@ describe('modality filtering', () => {
},
} as unknown as Part,
]);
const messages = conv.convertGeminiRequestToOpenAI(request);
const messages = conv.convertGeminiRequestToOpenAI(
request,
makeRequestContext('claude-sonnet', { image: true, pdf: true }),
);
const parts = getUserContentParts(messages);
expect(parts).toHaveLength(1);
expect(parts[0].type).toBe('file');
});
it('replaces video with placeholder when video modality is disabled', () => {
const conv = new OpenAIContentConverter('test-model', 'auto', {});
const conv = OpenAIContentConverter;
const request = makeRequest([
{
inlineData: { mimeType: 'video/mp4', data: 'vid-data' },
} as unknown as Part,
]);
const messages = conv.convertGeminiRequestToOpenAI(request);
const messages = conv.convertGeminiRequestToOpenAI(
request,
makeRequestContext('test-model', {}),
);
const parts = getUserContentParts(messages);
expect(parts).toHaveLength(1);
expect(parts[0].type).toBe('text');
@ -2571,13 +2723,16 @@ describe('modality filtering', () => {
});
it('replaces audio with placeholder when audio modality is disabled', () => {
const conv = new OpenAIContentConverter('test-model', 'auto', {});
const conv = OpenAIContentConverter;
const request = makeRequest([
{
inlineData: { mimeType: 'audio/wav', data: 'audio-data' },
} as unknown as Part,
]);
const messages = conv.convertGeminiRequestToOpenAI(request);
const messages = conv.convertGeminiRequestToOpenAI(
request,
makeRequestContext('test-model', {}),
);
const parts = getUserContentParts(messages);
expect(parts).toHaveLength(1);
expect(parts[0].type).toBe('text');
@ -2585,7 +2740,7 @@ describe('modality filtering', () => {
});
it('handles mixed content: keeps text + supported media, replaces unsupported', () => {
const conv = new OpenAIContentConverter('gpt-4o', 'auto', { image: true });
const conv = OpenAIContentConverter;
const request = makeRequest([
{ text: 'Analyze these files' },
{
@ -2595,7 +2750,10 @@ describe('modality filtering', () => {
inlineData: { mimeType: 'video/mp4', data: 'vid-data' },
} as unknown as Part,
]);
const messages = conv.convertGeminiRequestToOpenAI(request);
const messages = conv.convertGeminiRequestToOpenAI(
request,
makeRequestContext('gpt-4o', { image: true }),
);
const parts = getUserContentParts(messages);
expect(parts).toHaveLength(3);
expect(parts[0].type).toBe('text');
@ -2606,13 +2764,16 @@ describe('modality filtering', () => {
});
it('defaults to text-only when no modalities are specified', () => {
const conv = new OpenAIContentConverter('unknown-model');
const conv = OpenAIContentConverter;
const request = makeRequest([
{
inlineData: { mimeType: 'image/png', data: 'img-data' },
} as unknown as Part,
]);
const messages = conv.convertGeminiRequestToOpenAI(request);
const messages = conv.convertGeminiRequestToOpenAI(
request,
makeRequestContext('unknown-model', {}),
);
const parts = getUserContentParts(messages);
expect(parts).toHaveLength(1);
expect(parts[0].type).toBe('text');

View file

@ -21,8 +21,7 @@ import { GenerateContentResponse, FinishReason } from '@google/genai';
import type OpenAI from 'openai';
import { safeJsonParse } from '../../utils/safeJsonParse.js';
import { createDebugLogger } from '../../utils/debugLogger.js';
import type { InputModalities } from '../contentGenerator.js';
import { StreamingToolCallParser } from './streamingToolCallParser.js';
import type { RequestContext } from './types.js';
import {
convertSchema,
type SchemaComplianceMode,
@ -91,71 +90,9 @@ type OpenAIContentPart =
| OpenAIContentPartFile;
/**
* Per-stream state for tool-call parsing. Created by
* `OpenAIContentConverter.createStreamContext()` at the start of each
* streaming response and passed into every `convertOpenAIChunkToGemini`
* call on that stream, so concurrent streams (parallel subagents, fork
* children, ) never share parser state.
* Convert Gemini tool parameters to OpenAI JSON Schema format.
*/
export interface ConverterStreamContext {
toolCallParser: StreamingToolCallParser;
}
/**
* Converter class for transforming data between Gemini and OpenAI formats
*/
export class OpenAIContentConverter {
private model: string;
private schemaCompliance: SchemaComplianceMode;
private modalities: InputModalities;
constructor(
model: string,
schemaCompliance: SchemaComplianceMode = 'auto',
modalities: InputModalities = {},
) {
this.model = model;
this.schemaCompliance = schemaCompliance;
this.modalities = modalities;
}
/**
* Update the model used for response metadata (modelVersion/logging) and any
* model-specific conversion behavior.
*/
setModel(model: string): void {
this.model = model;
}
/**
* Update the supported input modalities.
*/
setModalities(modalities: InputModalities): void {
this.modalities = modalities;
}
/**
* Create fresh per-stream state for processing one OpenAI streaming
* response. The returned context is passed into every
* `convertOpenAIChunkToGemini` call for that stream, then discarded.
*
* Previously the tool-call parser lived on the Converter instance and
* was shared by every caller of the singleton `Config.contentGenerator`.
* Concurrent streams (e.g. two subagents running in parallel after
* PR #3463) raced on that shared state: each stream's stream-start
* `reset()` wiped the other's partial tool-call buffers, chunks from
* different streams landed at the same `index=0` bucket, and
* `getCompletedToolCalls()` returned interleaved corrupt JSON that
* surfaced upstream as `NO_RESPONSE_TEXT` (issue #3516).
*/
createStreamContext(): ConverterStreamContext {
return { toolCallParser: new StreamingToolCallParser() };
}
/**
* Convert Gemini tool parameters to OpenAI JSON Schema format
*/
convertGeminiToolParametersToOpenAI(
export function convertGeminiToolParametersToOpenAI(
parameters: Record<string, unknown>,
): Record<string, unknown> | undefined {
if (!parameters || typeof parameters !== 'object') {
@ -222,10 +159,12 @@ export class OpenAIContentConverter {
/**
* Convert Gemini tools to OpenAI format for API compatibility.
* Handles both Gemini tools (using 'parameters' field) and MCP tools (using 'parametersJsonSchema' field).
* Handles both Gemini tools (using 'parameters' field) and MCP tools
* (using 'parametersJsonSchema' field).
*/
async convertGeminiToolsToOpenAI(
export async function convertGeminiToolsToOpenAI(
geminiTools: ToolListUnion,
schemaCompliance: SchemaComplianceMode = 'auto',
): Promise<OpenAI.Chat.ChatCompletionTool[]> {
const openAITools: OpenAI.Chat.ChatCompletionTool[] = [];
@ -256,13 +195,13 @@ export class OpenAIContentConverter {
parameters = paramsCopy;
} else if (func.parameters) {
// Gemini tool format - convert parameters to OpenAI format
parameters = this.convertGeminiToolParametersToOpenAI(
parameters = convertGeminiToolParametersToOpenAI(
func.parameters as Record<string, unknown>,
);
}
if (parameters) {
parameters = convertSchema(parameters, this.schemaCompliance);
parameters = convertSchema(parameters, schemaCompliance);
}
openAITools.push({
@ -282,25 +221,26 @@ export class OpenAIContentConverter {
}
/**
* Convert Gemini request to OpenAI message format
* Convert Gemini request to OpenAI message format.
*/
convertGeminiRequestToOpenAI(
export function convertGeminiRequestToOpenAI(
request: GenerateContentParameters,
requestContext: RequestContext,
options: { cleanOrphanToolCalls: boolean } = { cleanOrphanToolCalls: true },
): OpenAI.Chat.ChatCompletionMessageParam[] {
let messages: OpenAI.Chat.ChatCompletionMessageParam[] = [];
// Handle system instruction from config
this.addSystemInstructionMessage(request, messages);
addSystemInstructionMessage(request, messages);
// Handle contents
this.processContents(request.contents, messages);
processContents(request.contents, messages, requestContext);
// Clean up orphaned tool calls and merge consecutive assistant messages
if (options.cleanOrphanToolCalls) {
messages = this.cleanOrphanedToolCalls(messages);
messages = cleanOrphanedToolCalls(messages);
}
messages = this.mergeConsecutiveAssistantMessages(messages);
messages = mergeConsecutiveAssistantMessages(messages);
return messages;
}
@ -308,8 +248,9 @@ export class OpenAIContentConverter {
/**
* Convert Gemini response to OpenAI completion format (for logging).
*/
convertGeminiResponseToOpenAI(
export function convertGeminiResponseToOpenAI(
response: GenerateContentResponse,
requestContext: RequestContext,
): OpenAI.Chat.ChatCompletion {
const candidate = response.candidates?.[0];
const parts = (candidate?.content?.parts || []) as Part[];
@ -357,9 +298,7 @@ export class OpenAIContentConverter {
message.tool_calls = toolCalls;
}
const finishReason = this.mapGeminiFinishReasonToOpenAI(
candidate?.finishReason,
);
const finishReason = mapGeminiFinishReasonToOpenAI(candidate?.finishReason);
const usageMetadata = response.usageMetadata;
const usage: OpenAI.CompletionUsage = {
@ -389,7 +328,7 @@ export class OpenAIContentConverter {
id: response.responseId || `gemini-${Date.now()}`,
object: 'chat.completion',
created: createdSeconds,
model: response.modelVersion || this.model,
model: response.modelVersion || requestContext.model,
choices: [
{
index: 0,
@ -403,15 +342,15 @@ export class OpenAIContentConverter {
}
/**
* Extract and add system instruction message from request config
* Extract and add system instruction message from request config.
*/
private addSystemInstructionMessage(
function addSystemInstructionMessage(
request: GenerateContentParameters,
messages: OpenAI.Chat.ChatCompletionMessageParam[],
): void {
if (!request.config?.systemInstruction) return;
const systemText = this.extractTextFromContentUnion(
const systemText = extractTextFromContentUnion(
request.config.systemInstruction,
);
@ -424,34 +363,36 @@ export class OpenAIContentConverter {
}
/**
* Process contents and convert to OpenAI messages
* Process contents and convert to OpenAI messages.
*/
private processContents(
function processContents(
contents: ContentListUnion,
messages: ExtendedChatCompletionMessageParam[],
requestContext: RequestContext,
): void {
if (Array.isArray(contents)) {
for (const content of contents) {
this.processContent(content, messages);
processContent(content, messages, requestContext);
}
} else if (contents) {
this.processContent(contents, messages);
processContent(contents, messages, requestContext);
}
}
/**
* Process a single content item and convert to OpenAI message(s)
* Process a single content item and convert to OpenAI message(s).
*/
private processContent(
function processContent(
content: ContentUnion | PartUnion,
messages: ExtendedChatCompletionMessageParam[],
requestContext: RequestContext,
): void {
if (typeof content === 'string') {
messages.push({ role: 'user' as const, content });
return;
}
if (!this.isContentObject(content)) return;
if (!isContentObject(content)) return;
const parts = content.parts || [];
const role = content.role === 'model' ? 'assistant' : 'user';
@ -476,7 +417,7 @@ export class OpenAIContentConverter {
contentParts.push({ type: 'text' as const, text: part.text });
}
const mediaPart = this.createMediaContentPart(part);
const mediaPart = createMediaContentPart(part, requestContext);
if (mediaPart && role === 'user') {
contentParts.push(mediaPart);
}
@ -495,7 +436,10 @@ export class OpenAIContentConverter {
if (part.functionResponse && role === 'user') {
// Create tool message for the function response (with embedded media)
const toolMessage = this.createToolMessage(part.functionResponse);
const toolMessage = createToolMessage(
part.functionResponse,
requestContext,
);
if (toolMessage) {
messages.push(toolMessage);
}
@ -524,8 +468,7 @@ export class OpenAIContentConverter {
// Some OpenAI-compatible providers (e.g. Ollama) reject content: null
// when reasoning_content is present, returning HTTP 400.
// For tool-call-only messages we keep null to stay spec-compliant.
content:
assistantTextContent || (reasoningParts.length > 0 ? '' : null),
content: assistantTextContent || (reasoningParts.length > 0 ? '' : null),
};
if (toolCalls.length > 0) {
@ -550,7 +493,7 @@ export class OpenAIContentConverter {
}
}
private extractFunctionResponseContent(response: unknown): string {
function extractFunctionResponseContent(response: unknown): string {
if (response === null || response === undefined) {
return '';
}
@ -581,12 +524,13 @@ export class OpenAIContentConverter {
}
/**
* Create a tool message from function response (with embedded media parts)
* Create a tool message from function response (with embedded media parts).
*/
private createToolMessage(
function createToolMessage(
response: FunctionResponse,
requestContext: RequestContext,
): OpenAI.Chat.ChatCompletionToolMessageParam | null {
const textContent = this.extractFunctionResponseContent(response.response);
const textContent = extractFunctionResponseContent(response.response);
const contentParts: OpenAIContentPart[] = [];
// Add text content first if present
@ -596,7 +540,7 @@ export class OpenAIContentConverter {
// Add media parts from function response
for (const part of response.parts || []) {
const mediaPart = this.createMediaContentPart(part);
const mediaPart = createMediaContentPart(part, requestContext);
if (mediaPart) {
contentParts.push(mediaPart);
}
@ -628,15 +572,24 @@ export class OpenAIContentConverter {
* Create OpenAI media content part from Gemini part.
* Checks modality support before building each media type.
*/
private createMediaContentPart(part: Part): OpenAIContentPart | null {
function createMediaContentPart(
part: Part,
requestContext: RequestContext,
): OpenAIContentPart | null {
const { modalities } = requestContext;
if (part.inlineData?.mimeType && part.inlineData?.data) {
const mimeType = part.inlineData.mimeType;
const mediaType = this.getMediaType(mimeType);
const mediaType = getMediaType(mimeType);
const displayName = part.inlineData.displayName || mimeType;
if (mediaType === 'image') {
if (!this.modalities.image) {
return this.unsupportedModalityPlaceholder('image', displayName);
if (!modalities.image) {
return unsupportedModalityPlaceholder(
'image',
displayName,
requestContext,
);
}
const dataUrl = `data:${mimeType};base64,${part.inlineData.data}`;
return {
@ -646,8 +599,12 @@ export class OpenAIContentConverter {
}
if (mimeType === 'application/pdf') {
if (!this.modalities.pdf) {
return this.unsupportedModalityPlaceholder('pdf', displayName);
if (!modalities.pdf) {
return unsupportedModalityPlaceholder(
'pdf',
displayName,
requestContext,
);
}
const filename = part.inlineData.displayName || 'document.pdf';
return {
@ -660,10 +617,14 @@ export class OpenAIContentConverter {
}
if (mediaType === 'audio') {
if (!this.modalities.audio) {
return this.unsupportedModalityPlaceholder('audio', displayName);
if (!modalities.audio) {
return unsupportedModalityPlaceholder(
'audio',
displayName,
requestContext,
);
}
const format = this.getAudioFormat(mimeType);
const format = getAudioFormat(mimeType);
if (format) {
return {
type: 'input_audio' as const,
@ -676,8 +637,12 @@ export class OpenAIContentConverter {
}
if (mediaType === 'video') {
if (!this.modalities.video) {
return this.unsupportedModalityPlaceholder('video', displayName);
if (!modalities.video) {
return unsupportedModalityPlaceholder(
'video',
displayName,
requestContext,
);
}
return {
type: 'video_url' as const,
@ -697,11 +662,15 @@ export class OpenAIContentConverter {
const filename = part.fileData.displayName || 'file';
const fileUri = part.fileData.fileUri;
const mimeType = part.fileData.mimeType;
const mediaType = this.getMediaType(mimeType);
const mediaType = getMediaType(mimeType);
if (mediaType === 'image') {
if (!this.modalities.image) {
return this.unsupportedModalityPlaceholder('image', filename);
if (!modalities.image) {
return unsupportedModalityPlaceholder(
'image',
filename,
requestContext,
);
}
return {
type: 'image_url' as const,
@ -710,8 +679,8 @@ export class OpenAIContentConverter {
}
if (mimeType === 'application/pdf') {
if (!this.modalities.pdf) {
return this.unsupportedModalityPlaceholder('pdf', filename);
if (!modalities.pdf) {
return unsupportedModalityPlaceholder('pdf', filename, requestContext);
}
return {
type: 'file' as const,
@ -723,8 +692,12 @@ export class OpenAIContentConverter {
}
if (mediaType === 'video') {
if (!this.modalities.video) {
return this.unsupportedModalityPlaceholder('video', filename);
if (!modalities.video) {
return unsupportedModalityPlaceholder(
'video',
filename,
requestContext,
);
}
return {
type: 'video_url' as const,
@ -749,12 +722,13 @@ export class OpenAIContentConverter {
/**
* Create a text placeholder for unsupported modalities.
*/
private unsupportedModalityPlaceholder(
function unsupportedModalityPlaceholder(
modality: string,
displayName: string,
requestContext: RequestContext,
): OpenAIContentPart {
debugLogger.warn(
`Model '${this.model}' does not support ${modality} input. ` +
`Model '${requestContext.model}' does not support ${modality} input. ` +
`Replacing with text placeholder: ${displayName}`,
);
let hint: string;
@ -770,29 +744,20 @@ export class OpenAIContentConverter {
};
}
/**
* Determine media type from MIME type
*/
private getMediaType(mimeType: string): 'image' | 'audio' | 'video' | 'file' {
function getMediaType(mimeType: string): 'image' | 'audio' | 'video' | 'file' {
if (mimeType.startsWith('image/')) return 'image';
if (mimeType.startsWith('audio/')) return 'audio';
if (mimeType.startsWith('video/')) return 'video';
return 'file';
}
/**
* Convert MIME type to OpenAI audio format
*/
private getAudioFormat(mimeType: string): 'wav' | 'mp3' | null {
function getAudioFormat(mimeType: string): 'wav' | 'mp3' | null {
if (mimeType.includes('wav')) return 'wav';
if (mimeType.includes('mp3') || mimeType.includes('mpeg')) return 'mp3';
return null;
}
/**
* Type guard to check if content is a valid Content object
*/
private isContentObject(
function isContentObject(
content: unknown,
): content is { role: string; parts: Part[] } {
return (
@ -804,17 +769,14 @@ export class OpenAIContentConverter {
);
}
/**
* Extract text content from various Gemini content union types
*/
private extractTextFromContentUnion(contentUnion: unknown): string {
function extractTextFromContentUnion(contentUnion: unknown): string {
if (typeof contentUnion === 'string') {
return contentUnion;
}
if (Array.isArray(contentUnion)) {
return contentUnion
.map((item) => this.extractTextFromContentUnion(item))
.map((item) => extractTextFromContentUnion(item))
.filter(Boolean)
.join('\n');
}
@ -839,10 +801,11 @@ export class OpenAIContentConverter {
}
/**
* Convert OpenAI response to Gemini format
* Convert OpenAI response to Gemini format.
*/
convertOpenAIResponseToGemini(
export function convertOpenAIResponseToGemini(
openaiResponse: OpenAI.Chat.ChatCompletion,
requestContext: RequestContext,
): GenerateContentResponse {
const choice = openaiResponse.choices?.[0];
const response = new GenerateContentResponse();
@ -889,7 +852,7 @@ export class OpenAIContentConverter {
parts,
role: 'model' as const,
},
finishReason: this.mapOpenAIFinishReasonToGemini(
finishReason: mapOpenAIFinishReasonToGemini(
choice.finish_reason || 'stop',
),
index: 0,
@ -905,7 +868,7 @@ export class OpenAIContentConverter {
? openaiResponse.created.toString()
: new Date().getTime().toString();
response.modelVersion = this.model;
response.modelVersion = requestContext.model;
response.promptFeedback = { safetyRatings: [] };
// Add usage metadata if available
@ -951,18 +914,23 @@ export class OpenAIContentConverter {
/**
* Convert OpenAI stream chunk to Gemini format.
*
* `ctx` carries the tool-call parser for this stream. Callers MUST
* obtain it from `createStreamContext()` at the start of the stream
* and pass the same instance for every chunk of that stream. Concurrent
* streams MUST use distinct contexts or their tool-call buffers will
* interleave (issue #3516).
* `requestContext.toolCallParser` carries the tool-call parser for this
* stream. Callers MUST attach a fresh parser at stream start and pass the
* same instance for every chunk of that stream. Concurrent streams MUST use
* distinct parsers or their tool-call buffers will interleave (issue #3516).
*/
convertOpenAIChunkToGemini(
export function convertOpenAIChunkToGemini(
chunk: OpenAI.Chat.ChatCompletionChunk,
ctx: ConverterStreamContext,
requestContext: RequestContext,
): GenerateContentResponse {
const choice = chunk.choices?.[0];
const response = new GenerateContentResponse();
const toolCallParser = requestContext.toolCallParser;
if (!toolCallParser) {
throw new Error(
'convertOpenAIChunkToGemini requires requestContext.toolCallParser — attach a fresh StreamingToolCallParser at stream start.',
);
}
if (choice) {
const parts: Part[] = [];
@ -988,7 +956,7 @@ export class OpenAIContentConverter {
// Process the tool call chunk through the streaming parser
if (toolCall.function?.arguments) {
ctx.toolCallParser.addChunk(
toolCallParser.addChunk(
index,
toolCall.function.arguments,
toolCall.id,
@ -996,7 +964,7 @@ export class OpenAIContentConverter {
);
} else {
// Handle metadata-only chunks (id and/or name without arguments)
ctx.toolCallParser.addChunk(
toolCallParser.addChunk(
index,
'', // Empty chunk for metadata-only updates
toolCall.id,
@ -1012,9 +980,9 @@ export class OpenAIContentConverter {
// Detect truncation the provider may not report correctly.
// Some providers (e.g. DashScope/Qwen) send "stop" or "tool_calls"
// even when output was cut off mid-JSON due to max_tokens.
toolCallsTruncated = ctx.toolCallParser.hasIncompleteToolCalls();
toolCallsTruncated = toolCallParser.hasIncompleteToolCalls();
const completedToolCalls = ctx.toolCallParser.getCompletedToolCalls();
const completedToolCalls = toolCallParser.getCompletedToolCalls();
for (const toolCall of completedToolCalls) {
if (toolCall.name) {
@ -1029,10 +997,6 @@ export class OpenAIContentConverter {
});
}
}
// Parser is stream-local; it will be discarded with the
// ConverterStreamContext when the stream finishes. No manual
// reset needed.
}
// If tool call JSON was truncated, override to "length" so downstream
@ -1052,7 +1016,7 @@ export class OpenAIContentConverter {
safetyRatings: [],
};
if (effectiveFinishReason) {
candidate.finishReason = this.mapOpenAIFinishReasonToGemini(
candidate.finishReason = mapOpenAIFinishReasonToGemini(
effectiveFinishReason,
);
}
@ -1066,7 +1030,7 @@ export class OpenAIContentConverter {
? chunk.created.toString()
: new Date().getTime().toString();
response.modelVersion = this.model;
response.modelVersion = requestContext.model;
response.promptFeedback = { safetyRatings: [] };
// Add usage metadata if available in the chunk
@ -1109,10 +1073,7 @@ export class OpenAIContentConverter {
return response;
}
/**
* Map OpenAI finish reasons to Gemini finish reasons
*/
private mapOpenAIFinishReasonToGemini(
function mapOpenAIFinishReasonToGemini(
openaiReason: string | null,
): FinishReason {
if (!openaiReason) return FinishReason.FINISH_REASON_UNSPECIFIED;
@ -1126,7 +1087,7 @@ export class OpenAIContentConverter {
return mapping[openaiReason] || FinishReason.FINISH_REASON_UNSPECIFIED;
}
private mapGeminiFinishReasonToOpenAI(
function mapGeminiFinishReasonToOpenAI(
geminiReason?: FinishReason,
): 'stop' | 'length' | 'tool_calls' | 'content_filter' | 'function_call' {
if (!geminiReason) {
@ -1149,9 +1110,9 @@ export class OpenAIContentConverter {
}
/**
* Clean up orphaned tool calls from message history to prevent OpenAI API errors
* Clean up orphaned tool calls from message history to prevent OpenAI API errors.
*/
private cleanOrphanedToolCalls(
function cleanOrphanedToolCalls(
messages: OpenAI.Chat.ChatCompletionMessageParam[],
): OpenAI.Chat.ChatCompletionMessageParam[] {
const cleaned: OpenAI.Chat.ChatCompletionMessageParam[] = [];
@ -1300,9 +1261,9 @@ export class OpenAIContentConverter {
}
/**
* Merge consecutive assistant messages to combine split text and tool calls
* Merge consecutive assistant messages to combine split text and tool calls.
*/
private mergeConsecutiveAssistantMessages(
function mergeConsecutiveAssistantMessages(
messages: OpenAI.Chat.ChatCompletionMessageParam[],
): OpenAI.Chat.ChatCompletionMessageParam[] {
const merged: OpenAI.Chat.ChatCompletionMessageParam[] = [];
@ -1370,10 +1331,7 @@ export class OpenAIContentConverter {
if (combinedToolCalls.length > 0) {
(
lastMessage as OpenAI.Chat.ChatCompletionMessageParam & {
content:
| string
| OpenAI.Chat.ChatCompletionContentPart[]
| null;
content: string | OpenAI.Chat.ChatCompletionContentPart[] | null;
tool_calls?: OpenAI.Chat.ChatCompletionMessageToolCall[];
}
).tool_calls = combinedToolCalls;
@ -1389,4 +1347,12 @@ export class OpenAIContentConverter {
return merged;
}
}
export const OpenAIContentConverter = {
convertGeminiToolParametersToOpenAI,
convertGeminiToolsToOpenAI,
convertGeminiRequestToOpenAI,
convertGeminiResponseToOpenAI,
convertOpenAIResponseToGemini,
convertOpenAIChunkToGemini,
};

View file

@ -7,21 +7,20 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import type { GenerateContentParameters } from '@google/genai';
import { EnhancedErrorHandler } from './errorHandler.js';
import type { RequestContext } from './errorHandler.js';
import type { RequestContext } from './types.js';
describe('EnhancedErrorHandler', () => {
const fixedNow = 10_000;
let errorHandler: EnhancedErrorHandler;
let mockContext: RequestContext;
let mockRequest: GenerateContentParameters;
beforeEach(() => {
vi.spyOn(Date, 'now').mockReturnValue(fixedNow);
mockContext = {
userPromptId: 'test-prompt-id',
model: 'test-model',
authType: 'test-auth',
startTime: Date.now() - 5000,
duration: 5000,
isStreaming: false,
modalities: {},
startTime: fixedNow - 5000,
};
mockRequest = {
@ -194,7 +193,7 @@ describe('EnhancedErrorHandler', () => {
errorHandler = new EnhancedErrorHandler();
});
it('should build timeout error message for non-streaming requests', () => {
it('should build timeout error message', () => {
const timeoutError = new Error('timeout');
expect(() => {
@ -204,17 +203,6 @@ describe('EnhancedErrorHandler', () => {
);
});
it('should build timeout error message for streaming requests', () => {
const streamingContext = { ...mockContext, isStreaming: true };
const timeoutError = new Error('timeout');
expect(() => {
errorHandler.handle(timeoutError, streamingContext, mockRequest);
}).toThrow(
/Streaming request timeout after 5s\. Try reducing input length or increasing timeout in config\./,
);
});
it('should use original error message for non-timeout errors', () => {
const originalError = new Error('Original error message');
@ -245,7 +233,10 @@ describe('EnhancedErrorHandler', () => {
});
it('should handle different duration values correctly', () => {
const contextWithDifferentDuration = { ...mockContext, duration: 12345 };
const contextWithDifferentDuration = {
...mockContext,
startTime: fixedNow - 12345,
};
const timeoutError = new Error('timeout');
expect(() => {
@ -263,24 +254,13 @@ describe('EnhancedErrorHandler', () => {
errorHandler = new EnhancedErrorHandler();
});
it('should provide general troubleshooting tips for non-streaming requests', () => {
it('should provide generic troubleshooting tips', () => {
const timeoutError = new Error('timeout');
expect(() => {
errorHandler.handle(timeoutError, mockContext, mockRequest);
}).toThrow(
/Troubleshooting tips:\n- Reduce input length or complexity\n- Increase timeout in config: contentGenerator\.timeout\n- Check network connectivity\n- Consider using streaming mode for long responses/,
);
});
it('should provide streaming-specific troubleshooting tips for streaming requests', () => {
const streamingContext = { ...mockContext, isStreaming: true };
const timeoutError = new Error('timeout');
expect(() => {
errorHandler.handle(timeoutError, streamingContext, mockRequest);
}).toThrow(
/Streaming timeout troubleshooting:\n- Reduce input length or complexity\n- Increase timeout in config: contentGenerator\.timeout\n- Check network connectivity\n- Check network stability for streaming connections\n- Consider using non-streaming mode for very long inputs/,
/Troubleshooting tips:\n- Reduce input length or complexity\n- Increase timeout in config: contentGenerator\.timeout\n- Check network connectivity/,
);
});
});
@ -310,7 +290,7 @@ describe('EnhancedErrorHandler', () => {
});
it('should handle zero duration', () => {
const zeroContext = { ...mockContext, duration: 0 };
const zeroContext = { ...mockContext, startTime: fixedNow };
const timeoutError = new Error('timeout');
expect(() => {
@ -319,7 +299,7 @@ describe('EnhancedErrorHandler', () => {
});
it('should handle negative duration', () => {
const negativeContext = { ...mockContext, duration: -1000 };
const negativeContext = { ...mockContext, startTime: fixedNow + 1000 };
const timeoutError = new Error('timeout');
expect(() => {
@ -328,7 +308,7 @@ describe('EnhancedErrorHandler', () => {
});
it('should handle very large duration', () => {
const largeContext = { ...mockContext, duration: 999999 };
const largeContext = { ...mockContext, startTime: fixedNow - 999999 };
const timeoutError = new Error('timeout');
expect(() => {

View file

@ -6,29 +6,10 @@
import type { GenerateContentParameters } from '@google/genai';
import { createDebugLogger } from '../../utils/debugLogger.js';
import type { ErrorHandler, RequestContext } from './types.js';
const debugLogger = createDebugLogger('OPENAI_ERROR');
export interface RequestContext {
userPromptId: string;
model: string;
authType: string;
startTime: number;
duration: number;
isStreaming: boolean;
}
export interface ErrorHandler {
handle(
error: unknown,
context: RequestContext,
request: GenerateContentParameters,
): never;
shouldSuppressErrorLogging(
error: unknown,
request: GenerateContentParameters,
): boolean;
}
export type { ErrorHandler } from './types.js';
export class EnhancedErrorHandler implements ErrorHandler {
constructor(
@ -48,16 +29,13 @@ export class EnhancedErrorHandler implements ErrorHandler {
// Allow subclasses to suppress error logging for specific scenarios
if (!this.shouldSuppressErrorLogging(error, request)) {
const logPrefix = context.isStreaming
? 'OpenAI API Streaming Error:'
: 'OpenAI API Error:';
debugLogger.error(logPrefix, errorMessage);
debugLogger.error('OpenAI API Error:', errorMessage);
}
// Provide helpful timeout-specific error message
if (isTimeoutError) {
throw new Error(
`${errorMessage}\n\n${this.getTimeoutTroubleshootingTips(context)}`,
`${errorMessage}\n\n${this.getTimeoutTroubleshootingTips()}`,
);
}
@ -105,36 +83,21 @@ export class EnhancedErrorHandler implements ErrorHandler {
context: RequestContext,
isTimeoutError: boolean,
): string {
const durationSeconds = Math.round(context.duration / 1000);
const durationSeconds = Math.round((Date.now() - context.startTime) / 1000);
if (isTimeoutError) {
const prefix = context.isStreaming
? 'Streaming request timeout'
: 'Request timeout';
return `${prefix} after ${durationSeconds}s. Try reducing input length or increasing timeout in config.`;
return `Request timeout after ${durationSeconds}s. Try reducing input length or increasing timeout in config.`;
}
return error instanceof Error ? error.message : String(error);
}
private getTimeoutTroubleshootingTips(context: RequestContext): string {
const baseTitle = context.isStreaming
? 'Streaming timeout troubleshooting:'
: 'Troubleshooting tips:';
const baseTips = [
private getTimeoutTroubleshootingTips(): string {
const tips = [
'- Reduce input length or complexity',
'- Increase timeout in config: contentGenerator.timeout',
'- Check network connectivity',
];
const streamingSpecificTips = context.isStreaming
? [
'- Check network stability for streaming connections',
'- Consider using non-streaming mode for very long inputs',
]
: ['- Consider using streaming mode for long responses'];
return `${baseTitle}\n${[...baseTips, ...streamingSpecificTips].join('\n')}`;
return `Troubleshooting tips:\n${tips.join('\n')}`;
}
}

View file

@ -20,7 +20,8 @@ import {
} from './provider/index.js';
export { OpenAIContentGenerator } from './openaiContentGenerator.js';
export { ContentGenerationPipeline, type PipelineConfig } from './pipeline.js';
export { ContentGenerationPipeline } from './pipeline.js';
export type { ErrorHandler, PipelineConfig, RequestContext } from './types.js';
export {
type OpenAICompatibleProvider,
@ -91,4 +92,4 @@ export function determineProvider(
return new DefaultOpenAICompatibleProvider(contentGeneratorConfig, cliConfig);
}
export { type ErrorHandler, EnhancedErrorHandler } from './errorHandler.js';
export { EnhancedErrorHandler } from './errorHandler.js';

View file

@ -9,7 +9,7 @@ import type {
GenerateContentParameters,
GenerateContentResponse,
} from '@google/genai';
import type { PipelineConfig } from './pipeline.js';
import type { PipelineConfig } from './types.js';
import { ContentGenerationPipeline } from './pipeline.js';
import { EnhancedErrorHandler } from './errorHandler.js';
import { RequestTokenEstimator } from '../../utils/request-tokenizer/index.js';

View file

@ -18,8 +18,9 @@
* and chunks routed by `index: 0` interleaved into corrupt JSON.
*
* With the fix, `processStreamWithLogging` creates a fresh
* `ConverterStreamContext` at stream entry, so each concurrent generator
* has its own parser. This test would fail deterministically on pre-fix
* request context with its own `toolCallParser` at stream entry, so each
* concurrent generator has its own parser. This test would fail
* deterministically on pre-fix
* code because stream B's entry would wipe stream A's accumulator
* mid-flight, and A's finish chunk would emit zero function calls
* (`wasOutputTruncated`-style behavior).
@ -29,12 +30,11 @@ import { describe, it, expect, vi } from 'vitest';
import type OpenAI from 'openai';
import type { GenerateContentParameters } from '@google/genai';
import type { Part } from '@google/genai';
import type { PipelineConfig } from './pipeline.js';
import type { ErrorHandler, PipelineConfig } from './types.js';
import { ContentGenerationPipeline } from './pipeline.js';
import type { Config } from '../../config/config.js';
import type { ContentGeneratorConfig, AuthType } from '../contentGenerator.js';
import type { OpenAICompatibleProvider } from './provider/index.js';
import type { ErrorHandler } from './errorHandler.js';
type ChunkFactory = () => OpenAI.Chat.ChatCompletionChunk;

View file

@ -9,17 +9,23 @@ import { describe, it, expect, beforeEach, vi } from 'vitest';
import type OpenAI from 'openai';
import type { GenerateContentParameters } from '@google/genai';
import { GenerateContentResponse, Type, FinishReason } from '@google/genai';
import type { PipelineConfig } from './pipeline.js';
import type { ErrorHandler, PipelineConfig } from './types.js';
import { ContentGenerationPipeline, StreamContentError } from './pipeline.js';
import { OpenAIContentConverter } from './converter.js';
import { StreamingToolCallParser } from './streamingToolCallParser.js';
import type { Config } from '../../config/config.js';
import type { ContentGeneratorConfig, AuthType } from '../contentGenerator.js';
import type { OpenAICompatibleProvider } from './provider/index.js';
import type { ErrorHandler } from './errorHandler.js';
// Mock dependencies
vi.mock('./converter.js');
vi.mock('./converter.js', () => ({
OpenAIContentConverter: {
convertGeminiRequestToOpenAI: vi.fn(),
convertOpenAIResponseToGemini: vi.fn(),
convertOpenAIChunkToGemini: vi.fn(),
convertGeminiToolsToOpenAI: vi.fn(),
},
}));
vi.mock('openai');
describe('ContentGenerationPipeline', () => {
@ -27,7 +33,7 @@ describe('ContentGenerationPipeline', () => {
let mockConfig: PipelineConfig;
let mockProvider: OpenAICompatibleProvider;
let mockClient: OpenAI;
let mockConverter: OpenAIContentConverter;
let mockConverter: typeof OpenAIContentConverter;
let mockErrorHandler: ErrorHandler;
let mockContentGeneratorConfig: ContentGeneratorConfig;
let mockCliConfig: Config;
@ -45,19 +51,9 @@ describe('ContentGenerationPipeline', () => {
},
} as unknown as OpenAI;
// Mock converter. `createStreamContext` returns a fresh parser each
// stream; tests that don't care about tool-call buffers just ignore it.
mockConverter = {
setModel: vi.fn(),
setModalities: vi.fn(),
convertGeminiRequestToOpenAI: vi.fn(),
convertOpenAIResponseToGemini: vi.fn(),
convertOpenAIChunkToGemini: vi.fn(),
convertGeminiToolsToOpenAI: vi.fn(),
createStreamContext: vi.fn(() => ({
toolCallParser: new StreamingToolCallParser(),
})),
} as unknown as OpenAIContentConverter;
// Mock converter methods. The pipeline now snapshots request-scoped state
// into context and calls the stateless converter namespace directly.
mockConverter = OpenAIContentConverter;
// Mock provider
mockProvider = {
@ -87,11 +83,6 @@ describe('ContentGenerationPipeline', () => {
},
} as ContentGeneratorConfig;
// Mock the OpenAIContentConverter constructor
(OpenAIContentConverter as unknown as Mock).mockImplementation(
() => mockConverter,
);
mockConfig = {
cliConfig: mockCliConfig,
provider: mockProvider,
@ -105,12 +96,6 @@ describe('ContentGenerationPipeline', () => {
describe('constructor', () => {
it('should initialize with correct configuration', () => {
expect(mockProvider.buildClient).toHaveBeenCalled();
// Converter is constructed once and the model is updated per-request via setModel().
expect(OpenAIContentConverter).toHaveBeenCalledWith(
'test-model',
undefined,
{},
);
});
});
@ -152,11 +137,12 @@ describe('ContentGenerationPipeline', () => {
// Assert
expect(result).toBe(mockGeminiResponse);
expect(
(mockConverter as unknown as { setModel: Mock }).setModel,
).toHaveBeenCalledWith('test-model');
expect(mockConverter.convertGeminiRequestToOpenAI).toHaveBeenCalledWith(
request,
expect.objectContaining({
model: 'test-model',
modalities: {},
}),
);
expect(mockClient.chat.completions.create).toHaveBeenCalledWith(
expect.objectContaining({
@ -172,6 +158,10 @@ describe('ContentGenerationPipeline', () => {
);
expect(mockConverter.convertOpenAIResponseToGemini).toHaveBeenCalledWith(
mockOpenAIResponse,
expect.objectContaining({
model: 'test-model',
modalities: {},
}),
);
});
@ -211,9 +201,12 @@ describe('ContentGenerationPipeline', () => {
// Assert — request.model takes precedence over contentGeneratorConfig.model
expect(result).toBe(mockGeminiResponse);
expect(
(mockConverter as unknown as { setModel: Mock }).setModel,
).toHaveBeenCalledWith('override-model');
expect(mockConverter.convertGeminiRequestToOpenAI).toHaveBeenCalledWith(
request,
expect.objectContaining({
model: 'override-model',
}),
);
expect(mockClient.chat.completions.create).toHaveBeenCalledWith(
expect.objectContaining({
model: 'override-model',
@ -258,9 +251,12 @@ describe('ContentGenerationPipeline', () => {
// Assert — falls back to contentGeneratorConfig.model
expect(result).toBe(mockGeminiResponse);
expect(
(mockConverter as unknown as { setModel: Mock }).setModel,
).toHaveBeenCalledWith('test-model');
expect(mockConverter.convertGeminiRequestToOpenAI).toHaveBeenCalledWith(
request,
expect.objectContaining({
model: 'test-model',
}),
);
expect(mockClient.chat.completions.create).toHaveBeenCalledWith(
expect.objectContaining({
model: 'test-model',
@ -322,11 +318,15 @@ describe('ContentGenerationPipeline', () => {
// Assert
expect(result).toBe(mockGeminiResponse);
expect(
(mockConverter as unknown as { setModel: Mock }).setModel,
).toHaveBeenCalledWith('test-model');
expect(mockConverter.convertGeminiRequestToOpenAI).toHaveBeenCalledWith(
request,
expect.objectContaining({
model: 'test-model',
}),
);
expect(mockConverter.convertGeminiToolsToOpenAI).toHaveBeenCalledWith(
request.config!.tools,
'auto',
);
expect(mockClient.chat.completions.create).toHaveBeenCalledWith(
expect.objectContaining({
@ -611,9 +611,22 @@ describe('ContentGenerationPipeline', () => {
expect(results).toHaveLength(2);
expect(results[0]).toBe(mockGeminiResponse1);
expect(results[1]).toBe(mockGeminiResponse2);
// Parser is now created per-stream via createStreamContext — assert
// that the pipeline asked for a fresh one at stream entry.
expect(mockConverter.createStreamContext).toHaveBeenCalled();
const [, firstChunkContext] = (
mockConverter.convertOpenAIChunkToGemini as Mock
).mock.calls[0];
const [, secondChunkContext] = (
mockConverter.convertOpenAIChunkToGemini as Mock
).mock.calls[1];
expect(firstChunkContext).toEqual(
expect.objectContaining({
model: 'test-model',
modalities: {},
toolCallParser: expect.any(StreamingToolCallParser),
}),
);
expect(secondChunkContext.toolCallParser).toBe(
firstChunkContext.toolCallParser,
);
expect(mockClient.chat.completions.create).toHaveBeenCalledWith(
expect.objectContaining({
stream: true,
@ -725,10 +738,6 @@ describe('ContentGenerationPipeline', () => {
}
expect(results).toHaveLength(0); // No results due to error
// createStreamContext is called exactly once at stream entry; the
// error path no longer needs an explicit parser reset because the
// stream-local context is discarded when the generator unwinds.
expect(mockConverter.createStreamContext).toHaveBeenCalledTimes(1);
expect(mockErrorHandler.handle).toHaveBeenCalledWith(
testError,
expect.any(Object),

View file

@ -10,11 +10,10 @@ import {
type GenerateContentParameters,
GenerateContentResponse,
} from '@google/genai';
import type { Config } from '../../config/config.js';
import type { ContentGeneratorConfig } from '../contentGenerator.js';
import type { OpenAICompatibleProvider } from './provider/index.js';
import { OpenAIContentConverter } from './converter.js';
import type { ErrorHandler, RequestContext } from './errorHandler.js';
import { StreamingToolCallParser } from './streamingToolCallParser.js';
import type { PipelineConfig, RequestContext } from './types.js';
/**
* The OpenAI SDK adds an abort listener for every `chat.completions.create`
@ -46,44 +45,27 @@ export class StreamContentError extends Error {
}
}
export interface PipelineConfig {
cliConfig: Config;
provider: OpenAICompatibleProvider;
contentGeneratorConfig: ContentGeneratorConfig;
errorHandler: ErrorHandler;
}
export type { PipelineConfig } from './types.js';
export class ContentGenerationPipeline {
client: OpenAI;
private converter: OpenAIContentConverter;
private contentGeneratorConfig: ContentGeneratorConfig;
constructor(private config: PipelineConfig) {
this.contentGeneratorConfig = config.contentGeneratorConfig;
this.client = this.config.provider.buildClient();
this.converter = new OpenAIContentConverter(
this.contentGeneratorConfig.model,
this.contentGeneratorConfig.schemaCompliance,
this.contentGeneratorConfig.modalities ?? {},
);
}
async execute(
request: GenerateContentParameters,
userPromptId: string,
): Promise<GenerateContentResponse> {
// Use request.model when explicitly provided (e.g., fastModel for suggestion
// generation), falling back to the configured model as the default.
const effectiveModel = request.model || this.contentGeneratorConfig.model;
this.converter.setModel(effectiveModel);
this.converter.setModalities(this.contentGeneratorConfig.modalities ?? {});
raiseAbortListenerCap(request.config?.abortSignal);
return this.executeWithErrorHandling(
request,
userPromptId,
false,
effectiveModel,
async (openaiRequest) => {
async (openaiRequest, context) => {
const openaiResponse = (await this.client.chat.completions.create(
openaiRequest,
{
@ -92,7 +74,10 @@ export class ContentGenerationPipeline {
)) as OpenAI.Chat.ChatCompletion;
const geminiResponse =
this.converter.convertOpenAIResponseToGemini(openaiResponse);
OpenAIContentConverter.convertOpenAIResponseToGemini(
openaiResponse,
context,
);
return geminiResponse;
},
@ -103,15 +88,11 @@ export class ContentGenerationPipeline {
request: GenerateContentParameters,
userPromptId: string,
): Promise<AsyncGenerator<GenerateContentResponse>> {
const effectiveModel = request.model || this.contentGeneratorConfig.model;
this.converter.setModel(effectiveModel);
this.converter.setModalities(this.contentGeneratorConfig.modalities ?? {});
raiseAbortListenerCap(request.config?.abortSignal);
return this.executeWithErrorHandling(
request,
userPromptId,
true,
effectiveModel,
async (openaiRequest, context) => {
// Stage 1: Create OpenAI stream
const stream = (await this.client.chat.completions.create(
@ -143,13 +124,6 @@ export class ContentGenerationPipeline {
): AsyncGenerator<GenerateContentResponse> {
const collectedGeminiResponses: GenerateContentResponse[] = [];
// Stream-local parser state. Previously the tool-call parser lived on
// the Converter singleton and was reset at stream start — but that
// wiped concurrent streams' in-flight buffers (e.g. parallel subagents
// sharing the same Config.contentGenerator). Scoping it per-stream
// fixes issue #3516.
const streamCtx = this.converter.createStreamContext();
// State for handling chunk merging.
// pendingFinishResponse holds a finish chunk waiting to be merged with
// a subsequent usage-metadata chunk before yielding.
@ -174,9 +148,9 @@ export class ContentGenerationPipeline {
throw new StreamContentError(errorContent);
}
const response = this.converter.convertOpenAIChunkToGemini(
const response = OpenAIContentConverter.convertOpenAIChunkToGemini(
chunk,
streamCtx,
context,
);
// Stage 2b: Filter empty responses to avoid downstream issues
@ -237,13 +211,7 @@ export class ContentGenerationPipeline {
if (pendingFinishResponse && !finishYielded) {
yield pendingFinishResponse;
}
// Stage 2e: Stream completed successfully
context.duration = Date.now() - context.startTime;
} catch (error) {
// No manual parser cleanup needed — streamCtx is stream-local and
// becomes eligible for garbage collection once this generator unwinds.
// Re-throw StreamContentError directly so it can be handled by
// the caller's retry logic (e.g., TPM throttling retry in sendMessageStream)
if (error instanceof StreamContentError) {
@ -337,20 +305,23 @@ export class ContentGenerationPipeline {
private async buildRequest(
request: GenerateContentParameters,
userPromptId: string,
streaming: boolean = false,
effectiveModel: string,
context: RequestContext,
isStreaming: boolean,
): Promise<OpenAI.Chat.ChatCompletionCreateParams> {
const messages = this.converter.convertGeminiRequestToOpenAI(request);
const messages = OpenAIContentConverter.convertGeminiRequestToOpenAI(
request,
context,
);
// Apply provider-specific enhancements
const baseRequest: OpenAI.Chat.ChatCompletionCreateParams = {
model: effectiveModel,
model: context.model,
messages,
...this.buildGenerateContentConfig(request),
};
// Add streaming options if present
if (streaming) {
if (isStreaming) {
(
baseRequest as unknown as OpenAI.Chat.ChatCompletionCreateParamsStreaming
).stream = true;
@ -360,8 +331,10 @@ export class ContentGenerationPipeline {
// Add tools if present and non-empty.
// Some providers reject tools: [] (empty array), so skip when there are no tools.
if (request.config?.tools && request.config.tools.length > 0) {
baseRequest.tools = await this.converter.convertGeminiToolsToOpenAI(
baseRequest.tools =
await OpenAIContentConverter.convertGeminiToolsToOpenAI(
request.config.tools,
this.contentGeneratorConfig.schemaCompliance ?? 'auto',
);
}
@ -497,29 +470,22 @@ export class ContentGenerationPipeline {
request: GenerateContentParameters,
userPromptId: string,
isStreaming: boolean,
effectiveModel: string,
executor: (
openaiRequest: OpenAI.Chat.ChatCompletionCreateParams,
context: RequestContext,
) => Promise<T>,
): Promise<T> {
const context = this.createRequestContext(
userPromptId,
isStreaming,
effectiveModel,
);
const context = this.createRequestContext(request, isStreaming);
try {
const openaiRequest = await this.buildRequest(
request,
userPromptId,
context,
isStreaming,
effectiveModel,
);
const result = await executor(openaiRequest, context);
context.duration = Date.now() - context.startTime;
return result;
} catch (error) {
// Use shared error handling logic
@ -536,7 +502,6 @@ export class ContentGenerationPipeline {
context: RequestContext,
request: GenerateContentParameters,
): Promise<never> {
context.duration = Date.now() - context.startTime;
this.config.errorHandler.handle(error, context, request);
}
@ -544,17 +509,19 @@ export class ContentGenerationPipeline {
* Create request context with common properties
*/
private createRequestContext(
userPromptId: string,
request: GenerateContentParameters,
isStreaming: boolean,
effectiveModel: string,
): RequestContext {
const effectiveModel = request.model || this.contentGeneratorConfig.model;
const toolCallParser = isStreaming
? new StreamingToolCallParser()
: undefined;
return {
userPromptId,
model: effectiveModel,
authType: this.contentGeneratorConfig.authType || 'unknown',
modalities: this.contentGeneratorConfig.modalities ?? {},
startTime: Date.now(),
duration: 0,
isStreaming,
...(toolCallParser ? { toolCallParser } : {}),
};
}
}

View file

@ -0,0 +1,40 @@
/**
* @license
* Copyright 2025 Qwen
* SPDX-License-Identifier: Apache-2.0
*/
import type { GenerateContentParameters } from '@google/genai';
import type { Config } from '../../config/config.js';
import type {
ContentGeneratorConfig,
InputModalities,
} from '../contentGenerator.js';
import type { OpenAICompatibleProvider } from './provider/index.js';
import type { StreamingToolCallParser } from './streamingToolCallParser.js';
export interface RequestContext {
model: string;
modalities: InputModalities;
startTime: number;
toolCallParser?: StreamingToolCallParser;
}
export interface ErrorHandler {
handle(
error: unknown,
context: RequestContext,
request: GenerateContentParameters,
): never;
shouldSuppressErrorLogging(
error: unknown,
request: GenerateContentParameters,
): boolean;
}
export interface PipelineConfig {
cliConfig: Config;
provider: OpenAICompatibleProvider;
contentGeneratorConfig: ContentGeneratorConfig;
errorHandler: ErrorHandler;
}