Merge pull request #2367 from QwenLM/refactor/send-message-type

fix(core): strip orphaned user entries before retry to prevent API errors
This commit is contained in:
pomelo 2026-03-16 09:43:59 +08:00 committed by GitHub
commit 15424f263b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 296 additions and 68 deletions

View file

@ -20,6 +20,7 @@ import {
uiTelemetryService,
FatalInputError,
ApprovalMode,
SendMessageType,
} from '@qwen-code/qwen-code-core';
import type { Part } from '@google/genai';
import { runNonInteractive } from './nonInteractiveCli.js';
@ -250,7 +251,7 @@ describe('runNonInteractive', () => {
[{ text: 'Test input' }],
expect.any(AbortSignal),
'prompt-id-1',
{ isContinuation: false },
{ type: SendMessageType.UserQuery },
);
expect(processStdoutSpy).toHaveBeenCalledWith('Hello World');
expect(mockShutdownTelemetry).toHaveBeenCalled();
@ -300,21 +301,21 @@ describe('runNonInteractive', () => {
outputUpdateHandler: expect.any(Function),
}),
);
// Verify first call has isContinuation: false
// Verify first call has type: UserQuery
expect(mockGeminiClient.sendMessageStream).toHaveBeenNthCalledWith(
1,
[{ text: 'Use a tool' }],
expect.any(AbortSignal),
'prompt-id-2',
{ isContinuation: false },
{ type: SendMessageType.UserQuery },
);
// Verify second call (after tool execution) has isContinuation: true
// Verify second call (after tool execution) has type: ToolResult
expect(mockGeminiClient.sendMessageStream).toHaveBeenNthCalledWith(
2,
[{ text: 'Tool response' }],
expect.any(AbortSignal),
'prompt-id-2',
{ isContinuation: true },
{ type: SendMessageType.ToolResult },
);
expect(processStdoutSpy).toHaveBeenCalledWith('Final answer');
});
@ -383,7 +384,7 @@ describe('runNonInteractive', () => {
],
expect.any(AbortSignal),
'prompt-id-3',
{ isContinuation: true },
{ type: SendMessageType.ToolResult },
);
expect(processStdoutSpy).toHaveBeenCalledWith('Sorry, let me try again.');
});
@ -507,7 +508,7 @@ describe('runNonInteractive', () => {
processedParts,
expect.any(AbortSignal),
'prompt-id-7',
{ isContinuation: false },
{ type: SendMessageType.UserQuery },
);
// 6. Assert the final output is correct
@ -539,7 +540,7 @@ describe('runNonInteractive', () => {
[{ text: 'Test input' }],
expect.any(AbortSignal),
'prompt-id-1',
{ isContinuation: false },
{ type: SendMessageType.UserQuery },
);
// JSON adapter emits array of messages, last one is result with stats
@ -694,7 +695,7 @@ describe('runNonInteractive', () => {
[{ text: 'Empty response test' }],
expect.any(AbortSignal),
'prompt-id-empty',
{ isContinuation: false },
{ type: SendMessageType.UserQuery },
);
// JSON adapter emits array of messages, last one is result with stats
@ -881,7 +882,7 @@ describe('runNonInteractive', () => {
[{ text: 'Prompt from command' }],
expect.any(AbortSignal),
'prompt-id-slash',
{ isContinuation: false },
{ type: SendMessageType.UserQuery },
);
expect(processStdoutSpy).toHaveBeenCalledWith('Response from command');
@ -941,7 +942,7 @@ describe('runNonInteractive', () => {
[{ text: '/unknowncommand' }],
expect.any(AbortSignal),
'prompt-id-unknown',
{ isContinuation: false },
{ type: SendMessageType.UserQuery },
);
expect(processStdoutSpy).toHaveBeenCalledWith('Response to unknown');
@ -1299,7 +1300,7 @@ describe('runNonInteractive', () => {
[{ text: 'Message from stream-json input' }],
expect.any(AbortSignal),
'prompt-envelope',
{ isContinuation: false },
{ type: SendMessageType.UserQuery },
);
});
@ -1775,7 +1776,7 @@ describe('runNonInteractive', () => {
[{ text: 'Simple string content' }],
expect.any(AbortSignal),
'prompt-string-content',
{ isContinuation: false },
{ type: SendMessageType.UserQuery },
);
// UserMessage with array of text blocks
@ -1808,7 +1809,7 @@ describe('runNonInteractive', () => {
[{ text: 'First part' }, { text: 'Second part' }],
expect.any(AbortSignal),
'prompt-blocks-content',
{ isContinuation: false },
{ type: SendMessageType.UserQuery },
);
});
});

View file

@ -19,6 +19,7 @@ import {
uiTelemetryService,
parseAndFormatApiError,
createDebugLogger,
SendMessageType,
} from '@qwen-code/qwen-code-core';
import type { Content, Part, PartListUnion } from '@google/genai';
import type { CLIUserMessage, PermissionMode } from './nonInteractive/types.js';
@ -265,7 +266,11 @@ export async function runNonInteractive(
currentMessages[0]?.parts || [],
abortController.signal,
prompt_id,
{ isContinuation: !isFirstTurn },
{
type: isFirstTurn
? SendMessageType.UserQuery
: SendMessageType.ToolResult,
},
);
isFirstTurn = false;

View file

@ -28,6 +28,7 @@ import {
ApprovalMode,
AuthType,
GeminiEventType as ServerGeminiEventType,
SendMessageType,
ToolErrorType,
ToolConfirmationOutcome,
} from '@qwen-code/qwen-code-core';
@ -482,7 +483,7 @@ describe('useGeminiStream', () => {
expectedMergedResponse,
expect.any(AbortSignal),
'prompt-id-2',
{ isContinuation: true },
{ type: SendMessageType.ToolResult },
);
});
@ -806,7 +807,7 @@ describe('useGeminiStream', () => {
toolCallResponseParts,
expect.any(AbortSignal),
'prompt-id-4',
{ isContinuation: true },
{ type: SendMessageType.ToolResult },
);
});
@ -1122,7 +1123,7 @@ describe('useGeminiStream', () => {
'This is the actual prompt from the command file.',
expect.any(AbortSignal),
expect.any(String),
undefined,
{ type: SendMessageType.UserQuery },
);
expect(mockScheduleToolCalls).not.toHaveBeenCalled();
@ -1149,7 +1150,7 @@ describe('useGeminiStream', () => {
'',
expect.any(AbortSignal),
expect.any(String),
undefined,
{ type: SendMessageType.UserQuery },
);
});
});
@ -1168,7 +1169,7 @@ describe('useGeminiStream', () => {
'// This is a line comment',
expect.any(AbortSignal),
expect.any(String),
undefined,
{ type: SendMessageType.UserQuery },
);
});
});
@ -1187,7 +1188,7 @@ describe('useGeminiStream', () => {
'/* This is a block comment */',
expect.any(AbortSignal),
expect.any(String),
undefined,
{ type: SendMessageType.UserQuery },
);
});
});
@ -2091,7 +2092,7 @@ describe('useGeminiStream', () => {
processedQueryParts, // Argument 1: The parts array directly
expect.any(AbortSignal), // Argument 2: An AbortSignal
expect.any(String), // Argument 3: The prompt_id string
undefined, // Argument 4: Options (undefined for normal prompts)
{ type: SendMessageType.UserQuery }, // Argument 4: The options
);
});
@ -2776,7 +2777,7 @@ describe('useGeminiStream', () => {
'First query',
expect.any(AbortSignal),
expect.any(String),
undefined,
{ type: SendMessageType.UserQuery },
);
// Verify only the first query was added to history
@ -2828,14 +2829,14 @@ describe('useGeminiStream', () => {
'First query',
expect.any(AbortSignal),
expect.any(String),
undefined,
{ type: SendMessageType.UserQuery },
);
expect(mockSendMessageStream).toHaveBeenNthCalledWith(
2,
'Second query',
expect.any(AbortSignal),
expect.any(String),
undefined,
{ type: SendMessageType.UserQuery },
);
});
@ -2858,7 +2859,7 @@ describe('useGeminiStream', () => {
'Second query',
expect.any(AbortSignal),
expect.any(String),
undefined,
{ type: SendMessageType.UserQuery },
);
});
});

View file

@ -19,14 +19,17 @@ import type {
} from '@qwen-code/qwen-code-core';
import {
GeminiEventType as ServerGeminiEventType,
SendMessageType,
createDebugLogger,
getErrorMessage,
isNodeError,
MessageSenderType,
logUserPrompt,
logUserRetry,
GitService,
UnauthorizedError,
UserPromptEvent,
UserRetryEvent,
logConversationFinishedEvent,
ConversationFinishedEvent,
ApprovalMode,
@ -1082,19 +1085,22 @@ export const useGeminiStream = (
const submitQuery = useCallback(
async (
query: PartListUnion,
options?: { isContinuation: boolean; skipPreparation?: boolean },
submitType: SendMessageType = SendMessageType.UserQuery,
prompt_id?: string,
) => {
// Prevent concurrent executions of submitQuery, but allow continuations
// which are part of the same logical flow (tool responses)
if (isSubmittingQueryRef.current && !options?.isContinuation) {
if (
isSubmittingQueryRef.current &&
submitType !== SendMessageType.ToolResult
) {
return;
}
if (
(streamingState === StreamingState.Responding ||
streamingState === StreamingState.WaitingForConfirmation) &&
!options?.isContinuation
submitType !== SendMessageType.ToolResult
)
return;
@ -1104,7 +1110,7 @@ export const useGeminiStream = (
const userMessageTimestamp = Date.now();
// Reset quota error flag when starting a new query (not a continuation)
if (!options?.isContinuation) {
if (submitType !== SendMessageType.ToolResult) {
setModelSwitchedFromQuotaError(false);
// Commit any pending retry error to history (without hint) since the
// user is starting a new conversation turn.
@ -1127,14 +1133,15 @@ export const useGeminiStream = (
}
return promptIdContext.run(prompt_id, async () => {
const { queryToSend, shouldProceed } = options?.skipPreparation
? { queryToSend: query, shouldProceed: true }
: await prepareQueryForGemini(
query,
userMessageTimestamp,
abortSignal,
prompt_id!,
);
const { queryToSend, shouldProceed } =
submitType === SendMessageType.Retry
? { queryToSend: query, shouldProceed: true }
: await prepareQueryForGemini(
query,
userMessageTimestamp,
abortSignal,
prompt_id!,
);
if (!shouldProceed || queryToSend === null) {
isSubmittingQueryRef.current = false;
@ -1142,7 +1149,7 @@ export const useGeminiStream = (
}
// Check image format support for non-continuations
if (!options?.isContinuation) {
if (submitType === SendMessageType.UserQuery) {
const formatCheck = checkImageFormatsSupport(queryToSend);
if (formatCheck.hasUnsupportedFormats) {
addItem(
@ -1159,7 +1166,7 @@ export const useGeminiStream = (
lastPromptRef.current = finalQueryToSend;
lastPromptErroredRef.current = false;
if (!options?.isContinuation) {
if (submitType === SendMessageType.UserQuery) {
// trigger new prompt event for session stats in CLI
startNewPrompt();
@ -1180,6 +1187,10 @@ export const useGeminiStream = (
setThought(null);
}
if (submitType === SendMessageType.Retry) {
logUserRetry(config, new UserRetryEvent(prompt_id));
}
setIsResponding(true);
setInitError(null);
@ -1188,7 +1199,7 @@ export const useGeminiStream = (
finalQueryToSend,
abortSignal,
prompt_id!,
options,
{ type: submitType },
);
const processingStatus = await processGeminiStreamEvents(
@ -1276,7 +1287,7 @@ export const useGeminiStream = (
*
* When conditions are met:
* - Clears any pending auto-retry countdown to avoid duplicate retries
* - Re-submits the last query with skipPreparation: true for faster retry
* - Re-submits the last query with isRetry: true, reusing the same prompt_id
*
* This function is exposed via UIActionsContext and triggered by InputPrompt
* when the user presses Ctrl+Y (bound to Command.RETRY_LAST in keyBindings.ts).
@ -1303,10 +1314,7 @@ export const useGeminiStream = (
clearRetryCountdown();
await submitQuery(lastPrompt, {
isContinuation: false,
skipPreparation: true,
});
await submitQuery(lastPrompt, SendMessageType.Retry);
}, [streamingState, addItem, clearRetryCountdown, submitQuery]);
const handleApprovalModeChange = useCallback(
@ -1452,13 +1460,7 @@ export const useGeminiStream = (
return;
}
submitQuery(
responsesToSend,
{
isContinuation: true,
},
prompt_ids[0],
);
submitQuery(responsesToSend, SendMessageType.ToolResult, prompt_ids[0]);
},
[
isResponding,

View file

@ -15,7 +15,7 @@ import {
} from 'vitest';
import type { Content, GenerateContentResponse, Part } from '@google/genai';
import { GeminiClient } from './client.js';
import { GeminiClient, SendMessageType } from './client.js';
import { findCompressSplitPoint } from '../services/chatCompressionService.js';
import {
AuthType,
@ -1551,7 +1551,7 @@ Other open files:
[{ text: 'Start conversation' }],
signal,
'prompt-id-3',
{ isContinuation: false },
{ type: SendMessageType.UserQuery },
Number.MAX_SAFE_INTEGER, // Bypass the MAX_TURNS protection
);
@ -2304,6 +2304,70 @@ Other open files:
// Assert - loop detection methods should not be called when skipLoopDetection is true
expect(ldMock.addAndCheck).not.toHaveBeenCalled();
});
describe('retry sendMessageType', () => {
it('should call stripOrphanedUserEntriesFromHistory before executing', async () => {
const mockChat: Partial<GeminiChat> = {
addHistory: vi.fn(),
getHistory: vi.fn().mockReturnValue([]),
setHistory: vi.fn(),
stripThoughtsFromHistory: vi.fn(),
stripOrphanedUserEntriesFromHistory: vi.fn(),
};
client['chat'] = mockChat as GeminiChat;
const mockStream = (async function* () {
yield { type: 'content', value: 'retry response' };
})();
mockTurnRunFn.mockReturnValue(mockStream);
// Act: send with retry type
const stream = client.sendMessageStream(
[{ text: 'second message' }],
new AbortController().signal,
'prompt-retry',
{ type: SendMessageType.Retry },
);
for await (const _ of stream) {
/* consume */
}
// Assert: the cleanup method was called
expect(
mockChat.stripOrphanedUserEntriesFromHistory,
).toHaveBeenCalledOnce();
});
it('should not increment sessionTurnCount for retry', async () => {
const mockChat: Partial<GeminiChat> = {
addHistory: vi.fn(),
getHistory: vi.fn().mockReturnValue([]),
setHistory: vi.fn(),
stripThoughtsFromHistory: vi.fn(),
stripOrphanedUserEntriesFromHistory: vi.fn(),
};
client['chat'] = mockChat as GeminiChat;
const mockStream = (async function* () {
yield { type: 'content', value: 'ok' };
})();
mockTurnRunFn.mockReturnValue(mockStream);
const turnCountBefore = client['sessionTurnCount'];
const stream = client.sendMessageStream(
[{ text: 'retry' }],
new AbortController().signal,
'prompt-retry-3',
{ type: SendMessageType.Retry },
);
for await (const _ of stream) {
/* consume */
}
expect(client['sessionTurnCount']).toBe(turnCountBefore);
});
});
});
describe('generateContent', () => {

View file

@ -85,6 +85,17 @@ import type { StopHookOutput } from '../hooks/types.js';
const MAX_TURNS = 100;
export enum SendMessageType {
UserQuery = 'userQuery',
ToolResult = 'toolResult',
Retry = 'retry',
Hook = 'hook',
}
export interface SendMessageOptions {
type: SendMessageType;
}
export class GeminiClient {
private chat?: GeminiChat;
private sessionTurnCount = 0;
@ -152,6 +163,10 @@ export class GeminiClient {
this.getChat().stripThoughtsFromHistory();
}
private stripOrphanedUserEntriesFromHistory() {
this.getChat().stripOrphanedUserEntriesFromHistory();
}
setHistory(history: Content[]) {
this.getChat().setHistory(history);
this.forceFullIdeContext = true;
@ -414,13 +429,19 @@ export class GeminiClient {
request: PartListUnion,
signal: AbortSignal,
prompt_id: string,
options?: { isContinuation: boolean },
options?: SendMessageOptions,
turns: number = MAX_TURNS,
): AsyncGenerator<ServerGeminiStreamEvent, Turn> {
const messageType = options?.type ?? SendMessageType.UserQuery;
if (messageType === SendMessageType.Retry) {
this.stripOrphanedUserEntriesFromHistory();
}
// Fire UserPromptSubmit hook through MessageBus (only if hooks are enabled)
const hooksEnabled = this.config.getEnableHooks();
const messageBus = this.config.getMessageBus();
if (hooksEnabled && messageBus) {
if (messageType !== SendMessageType.Retry && hooksEnabled && messageBus) {
const promptText = partToString(request);
const response = await messageBus.request<
HookExecutionRequest,
@ -462,7 +483,7 @@ export class GeminiClient {
}
}
if (!options?.isContinuation) {
if (messageType === SendMessageType.UserQuery) {
this.loopDetector.reset(prompt_id);
this.lastPromptId = prompt_id;
@ -472,14 +493,18 @@ export class GeminiClient {
// strip thoughts from history before sending the message
this.stripThoughtsFromHistory();
}
this.sessionTurnCount++;
if (
this.config.getMaxSessionTurns() > 0 &&
this.sessionTurnCount > this.config.getMaxSessionTurns()
) {
yield { type: GeminiEventType.MaxSessionTurns };
return new Turn(this.getChat(), prompt_id);
if (messageType !== SendMessageType.Retry) {
this.sessionTurnCount++;
if (
this.config.getMaxSessionTurns() > 0 &&
this.sessionTurnCount > this.config.getMaxSessionTurns()
) {
yield { type: GeminiEventType.MaxSessionTurns };
return new Turn(this.getChat(), prompt_id);
}
}
// Ensure turns never exceeds MAX_TURNS to prevent infinite loops
const boundedTurns = Math.min(turns, MAX_TURNS);
if (!boundedTurns) {
@ -543,7 +568,7 @@ export class GeminiClient {
// append system reminders to the request
let requestToSent = await flatMapTextParts(request, async (text) => [text]);
if (!options?.isContinuation) {
if (messageType === SendMessageType.UserQuery) {
const systemReminders = [];
// add subagent system reminder if there are subagents
@ -636,7 +661,7 @@ export class GeminiClient {
continueRequest,
signal,
prompt_id,
{ isContinuation: true },
{ type: SendMessageType.Hook },
boundedTurns - 1,
);
}

View file

@ -1718,4 +1718,73 @@ describe('GeminiChat', async () => {
]);
});
});
describe('stripOrphanedUserEntriesFromHistory', () => {
it('should pop a single trailing user entry', () => {
chat.setHistory([
{ role: 'user', parts: [{ text: 'first message' }] },
{ role: 'model', parts: [{ text: 'first response' }] },
{ role: 'user', parts: [{ text: 'orphaned message' }] },
]);
chat.stripOrphanedUserEntriesFromHistory();
expect(chat.getHistory()).toEqual([
{ role: 'user', parts: [{ text: 'first message' }] },
{ role: 'model', parts: [{ text: 'first response' }] },
]);
});
it('should pop multiple trailing user entries', () => {
chat.setHistory([
{ role: 'user', parts: [{ text: 'query' }] },
{
role: 'model',
parts: [{ functionCall: { name: 'tool', args: {} } }],
},
{ role: 'user', parts: [{ text: 'IDE context' }] },
{
role: 'user',
parts: [
{
functionResponse: {
name: 'tool',
response: { result: 'ok' },
},
},
],
},
]);
chat.stripOrphanedUserEntriesFromHistory();
expect(chat.getHistory()).toEqual([
{ role: 'user', parts: [{ text: 'query' }] },
{
role: 'model',
parts: [{ functionCall: { name: 'tool', args: {} } }],
},
]);
});
it('should be a no-op when last entry is a model response', () => {
const history = [
{ role: 'user', parts: [{ text: 'hello' }] },
{ role: 'model', parts: [{ text: 'hi' }] },
];
chat.setHistory([...history]);
chat.stripOrphanedUserEntriesFromHistory();
expect(chat.getHistory()).toEqual(history);
});
it('should handle empty history', () => {
chat.setHistory([]);
chat.stripOrphanedUserEntriesFromHistory();
expect(chat.getHistory()).toEqual([]);
});
});
});

View file

@ -571,6 +571,20 @@ export class GeminiChat {
.filter((content) => content.parts && content.parts.length > 0);
}
/**
* Pop all orphaned trailing user entries from chat history.
* In a valid conversation the last entry is always a model response;
* any trailing user entries are leftovers from a request that failed.
*/
stripOrphanedUserEntriesFromHistory(): void {
while (
this.history.length > 0 &&
this.history[this.history.length - 1]!.role === 'user'
) {
this.history.pop();
}
}
setTools(tools: Tool[]): void {
this.generationConfig.tools = tools;
}

View file

@ -7,6 +7,7 @@
export const SERVICE_NAME = 'qwen-code';
export const EVENT_USER_PROMPT = 'qwen-code.user_prompt';
export const EVENT_USER_RETRY = 'qwen-code.user_retry';
export const EVENT_TOOL_CALL = 'qwen-code.tool_call';
export const EVENT_API_REQUEST = 'qwen-code.api_request';
export const EVENT_API_ERROR = 'qwen-code.api_error';

View file

@ -27,6 +27,7 @@ export {
export {
logStartSession,
logUserPrompt,
logUserRetry,
logToolCall,
logApiRequest,
logApiError,
@ -54,6 +55,7 @@ export {
SlashCommandStatus,
EndSessionEvent,
UserPromptEvent,
UserRetryEvent,
ApiRequestEvent,
ApiErrorEvent,
ApiResponseEvent,

View file

@ -20,6 +20,7 @@ import {
EVENT_IDE_CONNECTION,
EVENT_TOOL_CALL,
EVENT_USER_PROMPT,
EVENT_USER_RETRY,
EVENT_FLASH_FALLBACK,
EVENT_NEXT_SPEAKER_CHECK,
SERVICE_NAME,
@ -66,6 +67,7 @@ import type {
StartSessionEvent,
ToolCallEvent,
UserPromptEvent,
UserRetryEvent,
FlashFallbackEvent,
NextSpeakerCheckEvent,
LoopDetectedEvent,
@ -169,6 +171,25 @@ export function logUserPrompt(config: Config, event: UserPromptEvent): void {
logger.emit(logRecord);
}
export function logUserRetry(config: Config, event: UserRetryEvent): void {
QwenLogger.getInstance(config)?.logRetryEvent(event);
if (!isTelemetrySdkInitialized()) return;
const attributes: LogAttributes = {
...getCommonAttributes(config),
'event.name': EVENT_USER_RETRY,
'event.timestamp': new Date().toISOString(),
prompt_id: event.prompt_id,
};
const logger = logs.getLogger(SERVICE_NAME);
const logRecord: LogRecord = {
body: `User retry.`,
attributes,
};
logger.emit(logRecord);
}
export function logToolCall(config: Config, event: ToolCallEvent): void {
const uiEvent = {
...event,

View file

@ -42,6 +42,7 @@ import type {
AuthEvent,
SkillLaunchEvent,
UserFeedbackEvent,
UserRetryEvent,
RipgrepFallbackEvent,
EndSessionEvent,
ExtensionUpdateEvent,
@ -465,7 +466,6 @@ export class QwenLogger {
logNewPromptEvent(event: UserPromptEvent): void {
const rumEvent = this.createActionEvent('user', 'new_prompt', {
properties: {
auth_type: event.auth_type,
prompt_id: event.prompt_id,
prompt_length: event.prompt_length,
},
@ -475,6 +475,17 @@ export class QwenLogger {
this.flushIfNeeded();
}
logRetryEvent(event: UserRetryEvent): void {
const rumEvent = this.createActionEvent('user', 'retry', {
properties: {
prompt_id: event.prompt_id,
},
});
this.enqueueLogEvent(rumEvent);
this.flushIfNeeded();
}
logSlashCommandEvent(event: SlashCommandEvent): void {
const rumEvent = this.createActionEvent('user', 'slash_command', {
properties: {

View file

@ -148,6 +148,18 @@ export class UserPromptEvent implements BaseTelemetryEvent {
}
}
export class UserRetryEvent implements BaseTelemetryEvent {
'event.name': 'user_retry';
'event.timestamp': string;
prompt_id: string;
constructor(prompt_id: string) {
this['event.name'] = 'user_retry';
this['event.timestamp'] = new Date().toISOString();
this.prompt_id = prompt_id;
}
}
export class ToolCallEvent implements BaseTelemetryEvent {
'event.name': 'tool_call';
'event.timestamp': string;