Merge branch 'main' into feat/debug-logging-refactor

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
tanzhenxin 2026-02-01 20:47:38 +08:00
commit 135df54f27
378 changed files with 40051 additions and 6776 deletions

View file

@ -35,6 +35,7 @@ export interface IControlContext {
permissionMode: PermissionMode;
sdkMcpServers: Set<string>;
mcpClients: Map<string, { client: Client; config: MCPServerConfig }>;
inputClosed: boolean;
onInterrupt?: () => void;
}
@ -52,6 +53,7 @@ export class ControlContext implements IControlContext {
permissionMode: PermissionMode;
sdkMcpServers: Set<string>;
mcpClients: Map<string, { client: Client; config: MCPServerConfig }>;
inputClosed: boolean;
onInterrupt?: () => void;
@ -71,6 +73,7 @@ export class ControlContext implements IControlContext {
this.permissionMode = options.permissionMode || 'default';
this.sdkMcpServers = new Set();
this.mcpClients = new Map();
this.inputClosed = false;
this.onInterrupt = options.onInterrupt;
}
}

View file

@ -42,6 +42,7 @@ function createMockContext(debugMode: boolean = false): IControlContext {
permissionMode: 'default',
sdkMcpServers: new Set<string>(),
mcpClients: new Map(),
inputClosed: false,
};
}
@ -637,6 +638,130 @@ describe('ControlDispatcher', () => {
});
});
describe('markInputClosed', () => {
it('should reject all pending outgoing requests when input closes', () => {
const requestId1 = 'reject-req-1';
const requestId2 = 'reject-req-2';
const resolve1 = vi.fn();
const resolve2 = vi.fn();
const reject1 = vi.fn();
const reject2 = vi.fn();
const timeoutId1 = setTimeout(() => {}, 1000);
const timeoutId2 = setTimeout(() => {}, 1000);
const clearTimeoutSpy = vi.spyOn(global, 'clearTimeout');
const register = (
dispatcher as unknown as {
registerOutgoingRequest: (
id: string,
controller: string,
resolve: (response: ControlResponse) => void,
reject: (error: Error) => void,
timeoutId: NodeJS.Timeout,
) => void;
}
).registerOutgoingRequest.bind(dispatcher);
register(requestId1, 'SystemController', resolve1, reject1, timeoutId1);
register(requestId2, 'SystemController', resolve2, reject2, timeoutId2);
dispatcher.markInputClosed();
expect(reject1).toHaveBeenCalledWith(
expect.objectContaining({ message: 'Input closed' }),
);
expect(reject2).toHaveBeenCalledWith(
expect.objectContaining({ message: 'Input closed' }),
);
expect(clearTimeoutSpy).toHaveBeenCalledWith(timeoutId1);
expect(clearTimeoutSpy).toHaveBeenCalledWith(timeoutId2);
});
it('should mark input as closed on context', () => {
dispatcher.markInputClosed();
expect(mockContext.inputClosed).toBe(true);
});
it('should handle empty pending requests gracefully', () => {
expect(() => dispatcher.markInputClosed()).not.toThrow();
});
it('should be idempotent when called multiple times', () => {
const requestId = 'idempotent-req';
const resolve = vi.fn();
const reject = vi.fn();
const timeoutId = setTimeout(() => {}, 1000);
(
dispatcher as unknown as {
registerOutgoingRequest: (
id: string,
controller: string,
resolve: (response: ControlResponse) => void,
reject: (error: Error) => void,
timeoutId: NodeJS.Timeout,
) => void;
}
).registerOutgoingRequest(
requestId,
'SystemController',
resolve,
reject,
timeoutId,
);
dispatcher.markInputClosed();
const firstRejectCount = vi.mocked(reject).mock.calls.length;
// Call again - should not reject again
dispatcher.markInputClosed();
const secondRejectCount = vi.mocked(reject).mock.calls.length;
expect(secondRejectCount).toBe(firstRejectCount);
});
it('should log input closure in debug mode', () => {
const context = createMockContext(true);
const consoleSpy = vi
.spyOn(console, 'error')
.mockImplementation(() => {});
const dispatcherWithDebug = new ControlDispatcher(context);
const requestId = 'reject-req-debug';
const resolve = vi.fn();
const reject = vi.fn();
const timeoutId = setTimeout(() => {}, 1000);
(
dispatcherWithDebug as unknown as {
registerOutgoingRequest: (
id: string,
controller: string,
resolve: (response: ControlResponse) => void,
reject: (error: Error) => void,
timeoutId: NodeJS.Timeout,
) => void;
}
).registerOutgoingRequest(
requestId,
'SystemController',
resolve,
reject,
timeoutId,
);
dispatcherWithDebug.markInputClosed();
expect(consoleSpy).toHaveBeenCalledWith(
expect.stringContaining(
'[ControlDispatcher] Input closed, rejecting 1 pending outgoing requests',
),
);
consoleSpy.mockRestore();
});
});
describe('shutdown', () => {
it('should cancel all pending incoming requests', () => {
const requestId1 = 'shutdown-req-1';

View file

@ -204,6 +204,36 @@ export class ControlDispatcher implements IPendingRequestRegistry {
}
}
/**
* Marks stdin as closed and rejects all pending outgoing requests.
* After this is called, new outgoing requests will be rejected immediately.
* This should be called when stdin closes to avoid waiting for responses.
*/
markInputClosed(): void {
if (this.context.inputClosed) {
return; // Already marked as closed
}
this.context.inputClosed = true;
const requestIds = Array.from(this.pendingOutgoingRequests.keys());
if (this.context.debugMode) {
console.error(
`[ControlDispatcher] Input closed, rejecting ${requestIds.length} pending outgoing requests`,
);
}
// Reject all currently pending outgoing requests
for (const id of requestIds) {
const pending = this.pendingOutgoingRequests.get(id);
if (pending) {
this.deregisterOutgoingRequest(id);
pending.reject(new Error('Input closed'));
}
}
}
/**
* Stops all pending requests and cleans up all controllers
*/
@ -238,7 +268,7 @@ export class ControlDispatcher implements IPendingRequestRegistry {
}
/**
* Registers an incoming request in the pending registry
* Registers an incoming request in the pending registry.
*/
registerIncomingRequest(
requestId: string,

View file

@ -128,6 +128,11 @@ export abstract class BaseController {
timeoutMs: number = DEFAULT_REQUEST_TIMEOUT_MS,
signal?: AbortSignal,
): Promise<ControlResponse> {
// Check if stream is closed
if (this.context.inputClosed) {
throw new Error('Input closed');
}
// Check if already aborted
if (signal?.aborted) {
throw new Error('Request aborted');

View file

@ -465,21 +465,27 @@ export class PermissionController extends BaseController {
'[PermissionController] Outgoing permission failed:',
error,
);
// On error, use default cancel message
// Extract error message
const errorMessage =
error instanceof Error ? error.message : String(error);
// On error, pass error message as cancel message
// Only pass payload for exec and mcp types that support it
const confirmationType = toolCall.confirmationDetails.type;
if (['edit', 'exec', 'mcp'].includes(confirmationType)) {
const execOrMcpDetails = toolCall.confirmationDetails as
| ToolExecuteConfirmationDetails
| ToolMcpConfirmationDetails;
await execOrMcpDetails.onConfirm(
ToolConfirmationOutcome.Cancel,
undefined,
);
await execOrMcpDetails.onConfirm(ToolConfirmationOutcome.Cancel, {
cancelMessage: `Error: ${errorMessage}`,
});
} else {
// For other types, don't pass payload (backward compatible)
await toolCall.confirmationDetails.onConfirm(
ToolConfirmationOutcome.Cancel,
{
cancelMessage: `Error: ${errorMessage}`,
},
);
}
} finally {

View file

@ -9,7 +9,7 @@ import type {
Config,
ServerGeminiStreamEvent,
} from '@qwen-code/qwen-code-core';
import { GeminiEventType } from '@qwen-code/qwen-code-core';
import { GeminiEventType, OutputFormat } from '@qwen-code/qwen-code-core';
import type { Part } from '@google/genai';
import { JsonOutputAdapter } from './JsonOutputAdapter.js';
@ -17,6 +17,7 @@ function createMockConfig(): Config {
return {
getSessionId: vi.fn().mockReturnValue('test-session-id'),
getModel: vi.fn().mockReturnValue('test-model'),
getOutputFormat: vi.fn().mockReturnValue('json'),
} as unknown as Config;
}
@ -415,6 +416,79 @@ describe('JsonOutputAdapter', () => {
expect(resultMessage.num_turns).toBe(1);
});
it('should emit success result as text to stdout in text mode', () => {
vi.mocked(mockConfig.getOutputFormat).mockReturnValue(OutputFormat.TEXT);
adapter.emitResult({
isError: false,
durationMs: 1000,
apiDurationMs: 800,
numTurns: 1,
});
expect(stdoutWriteSpy).toHaveBeenCalled();
const output = stdoutWriteSpy.mock.calls[0][0] as string;
expect(output).toBe('Response text');
});
it('should emit error result to stderr in text mode', () => {
const stderrWriteSpy = vi
.spyOn(process.stderr, 'write')
.mockImplementation(() => true);
vi.mocked(mockConfig.getOutputFormat).mockReturnValue(OutputFormat.TEXT);
adapter.emitResult({
isError: true,
errorMessage: 'Test error message',
durationMs: 500,
apiDurationMs: 300,
numTurns: 1,
});
expect(stderrWriteSpy).toHaveBeenCalled();
const output = stderrWriteSpy.mock.calls[0][0] as string;
expect(output).toBe('Test error message');
stderrWriteSpy.mockRestore();
});
it('should use custom summary in text mode', () => {
vi.mocked(mockConfig.getOutputFormat).mockReturnValue(OutputFormat.TEXT);
adapter.emitResult({
isError: false,
summary: 'Custom summary text',
durationMs: 1000,
apiDurationMs: 800,
numTurns: 1,
});
expect(stdoutWriteSpy).toHaveBeenCalled();
const output = stdoutWriteSpy.mock.calls[0][0] as string;
expect(output).toBe('Custom summary text');
});
it('should handle empty error message in text mode', () => {
const stderrWriteSpy = vi
.spyOn(process.stderr, 'write')
.mockImplementation(() => true);
vi.mocked(mockConfig.getOutputFormat).mockReturnValue(OutputFormat.TEXT);
adapter.emitResult({
isError: true,
durationMs: 500,
apiDurationMs: 300,
numTurns: 1,
});
expect(stderrWriteSpy).toHaveBeenCalled();
const output = stderrWriteSpy.mock.calls[0][0] as string;
// When no errorMessage is provided, the default 'Unknown error' is used
expect(output).toBe('Unknown error');
stderrWriteSpy.mockRestore();
});
it('should emit error result', () => {
adapter.emitResult({
isError: true,

View file

@ -67,9 +67,17 @@ export class JsonOutputAdapter
);
this.messages.push(resultMessage);
// Emit the entire messages array as JSON (includes all main agent + subagent messages)
const json = JSON.stringify(this.messages);
process.stdout.write(`${json}\n`);
if (this.config.getOutputFormat() === 'text') {
if (resultMessage.is_error) {
process.stderr.write(`${resultMessage.error?.message || ''}`);
} else {
process.stdout.write(`${resultMessage.result}`);
}
} else {
// Emit the entire messages array as JSON (includes all main agent + subagent messages)
const json = JSON.stringify(this.messages);
process.stdout.write(`${json}\n`);
}
}
emitMessage(message: CLIMessage): void {

View file

@ -153,6 +153,7 @@ describe('runNonInteractiveStreamJson', () => {
handleControlResponse: ReturnType<typeof vi.fn>;
handleCancel: ReturnType<typeof vi.fn>;
shutdown: ReturnType<typeof vi.fn>;
markInputClosed: ReturnType<typeof vi.fn>;
getPendingIncomingRequestCount: ReturnType<typeof vi.fn>;
waitForPendingIncomingRequests: ReturnType<typeof vi.fn>;
sdkMcpController: {
@ -192,6 +193,7 @@ describe('runNonInteractiveStreamJson', () => {
handleControlResponse: vi.fn(),
handleCancel: vi.fn(),
shutdown: vi.fn(),
markInputClosed: vi.fn(),
getPendingIncomingRequestCount: vi.fn().mockReturnValue(0),
waitForPendingIncomingRequests: vi.fn().mockResolvedValue(undefined),
sdkMcpController: {

View file

@ -558,7 +558,14 @@ class Session {
throw streamError;
}
// Stream ended - wait for all pending work before shutdown
// Stdin closed - mark input as closed in dispatcher
// This will reject all current pending outgoing requests AND any future requests
// that might be registered by async message handlers still running
if (this.dispatcher) {
this.dispatcher.markInputClosed();
}
// Wait for all pending work before shutdown
await this.waitForAllPendingWork();
await this.shutdown();
} catch (error) {