mirror of
https://github.com/QwenLM/qwen-code.git
synced 2026-05-05 15:31:27 +00:00
fix: cli input stream handling and error management, improve e2e and unit tests
This commit is contained in:
parent
6eb16c0bcf
commit
f578ff07a2
13 changed files with 741 additions and 150 deletions
|
|
@ -35,6 +35,7 @@ export interface IControlContext {
|
|||
permissionMode: PermissionMode;
|
||||
sdkMcpServers: Set<string>;
|
||||
mcpClients: Map<string, { client: Client; config: MCPServerConfig }>;
|
||||
inputClosed: boolean;
|
||||
|
||||
onInterrupt?: () => void;
|
||||
}
|
||||
|
|
@ -52,6 +53,7 @@ export class ControlContext implements IControlContext {
|
|||
permissionMode: PermissionMode;
|
||||
sdkMcpServers: Set<string>;
|
||||
mcpClients: Map<string, { client: Client; config: MCPServerConfig }>;
|
||||
inputClosed: boolean;
|
||||
|
||||
onInterrupt?: () => void;
|
||||
|
||||
|
|
@ -71,6 +73,7 @@ export class ControlContext implements IControlContext {
|
|||
this.permissionMode = options.permissionMode || 'default';
|
||||
this.sdkMcpServers = new Set();
|
||||
this.mcpClients = new Map();
|
||||
this.inputClosed = false;
|
||||
this.onInterrupt = options.onInterrupt;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@ function createMockContext(debugMode: boolean = false): IControlContext {
|
|||
permissionMode: 'default',
|
||||
sdkMcpServers: new Set<string>(),
|
||||
mcpClients: new Map(),
|
||||
inputClosed: false,
|
||||
};
|
||||
}
|
||||
|
||||
|
|
@ -637,6 +638,130 @@ describe('ControlDispatcher', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('markInputClosed', () => {
|
||||
it('should reject all pending outgoing requests when input closes', () => {
|
||||
const requestId1 = 'reject-req-1';
|
||||
const requestId2 = 'reject-req-2';
|
||||
const resolve1 = vi.fn();
|
||||
const resolve2 = vi.fn();
|
||||
const reject1 = vi.fn();
|
||||
const reject2 = vi.fn();
|
||||
const timeoutId1 = setTimeout(() => {}, 1000);
|
||||
const timeoutId2 = setTimeout(() => {}, 1000);
|
||||
const clearTimeoutSpy = vi.spyOn(global, 'clearTimeout');
|
||||
|
||||
const register = (
|
||||
dispatcher as unknown as {
|
||||
registerOutgoingRequest: (
|
||||
id: string,
|
||||
controller: string,
|
||||
resolve: (response: ControlResponse) => void,
|
||||
reject: (error: Error) => void,
|
||||
timeoutId: NodeJS.Timeout,
|
||||
) => void;
|
||||
}
|
||||
).registerOutgoingRequest.bind(dispatcher);
|
||||
|
||||
register(requestId1, 'SystemController', resolve1, reject1, timeoutId1);
|
||||
register(requestId2, 'SystemController', resolve2, reject2, timeoutId2);
|
||||
|
||||
dispatcher.markInputClosed();
|
||||
|
||||
expect(reject1).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ message: 'Input closed' }),
|
||||
);
|
||||
expect(reject2).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ message: 'Input closed' }),
|
||||
);
|
||||
expect(clearTimeoutSpy).toHaveBeenCalledWith(timeoutId1);
|
||||
expect(clearTimeoutSpy).toHaveBeenCalledWith(timeoutId2);
|
||||
});
|
||||
|
||||
it('should mark input as closed on context', () => {
|
||||
dispatcher.markInputClosed();
|
||||
expect(mockContext.inputClosed).toBe(true);
|
||||
});
|
||||
|
||||
it('should handle empty pending requests gracefully', () => {
|
||||
expect(() => dispatcher.markInputClosed()).not.toThrow();
|
||||
});
|
||||
|
||||
it('should be idempotent when called multiple times', () => {
|
||||
const requestId = 'idempotent-req';
|
||||
const resolve = vi.fn();
|
||||
const reject = vi.fn();
|
||||
const timeoutId = setTimeout(() => {}, 1000);
|
||||
|
||||
(
|
||||
dispatcher as unknown as {
|
||||
registerOutgoingRequest: (
|
||||
id: string,
|
||||
controller: string,
|
||||
resolve: (response: ControlResponse) => void,
|
||||
reject: (error: Error) => void,
|
||||
timeoutId: NodeJS.Timeout,
|
||||
) => void;
|
||||
}
|
||||
).registerOutgoingRequest(
|
||||
requestId,
|
||||
'SystemController',
|
||||
resolve,
|
||||
reject,
|
||||
timeoutId,
|
||||
);
|
||||
|
||||
dispatcher.markInputClosed();
|
||||
const firstRejectCount = vi.mocked(reject).mock.calls.length;
|
||||
|
||||
// Call again - should not reject again
|
||||
dispatcher.markInputClosed();
|
||||
const secondRejectCount = vi.mocked(reject).mock.calls.length;
|
||||
|
||||
expect(secondRejectCount).toBe(firstRejectCount);
|
||||
});
|
||||
|
||||
it('should log input closure in debug mode', () => {
|
||||
const context = createMockContext(true);
|
||||
const consoleSpy = vi
|
||||
.spyOn(console, 'error')
|
||||
.mockImplementation(() => {});
|
||||
|
||||
const dispatcherWithDebug = new ControlDispatcher(context);
|
||||
const requestId = 'reject-req-debug';
|
||||
const resolve = vi.fn();
|
||||
const reject = vi.fn();
|
||||
const timeoutId = setTimeout(() => {}, 1000);
|
||||
|
||||
(
|
||||
dispatcherWithDebug as unknown as {
|
||||
registerOutgoingRequest: (
|
||||
id: string,
|
||||
controller: string,
|
||||
resolve: (response: ControlResponse) => void,
|
||||
reject: (error: Error) => void,
|
||||
timeoutId: NodeJS.Timeout,
|
||||
) => void;
|
||||
}
|
||||
).registerOutgoingRequest(
|
||||
requestId,
|
||||
'SystemController',
|
||||
resolve,
|
||||
reject,
|
||||
timeoutId,
|
||||
);
|
||||
|
||||
dispatcherWithDebug.markInputClosed();
|
||||
|
||||
expect(consoleSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining(
|
||||
'[ControlDispatcher] Input closed, rejecting 1 pending outgoing requests',
|
||||
),
|
||||
);
|
||||
|
||||
consoleSpy.mockRestore();
|
||||
});
|
||||
});
|
||||
|
||||
describe('shutdown', () => {
|
||||
it('should cancel all pending incoming requests', () => {
|
||||
const requestId1 = 'shutdown-req-1';
|
||||
|
|
|
|||
|
|
@ -207,6 +207,36 @@ export class ControlDispatcher implements IPendingRequestRegistry {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks stdin as closed and rejects all pending outgoing requests.
|
||||
* After this is called, new outgoing requests will be rejected immediately.
|
||||
* This should be called when stdin closes to avoid waiting for responses.
|
||||
*/
|
||||
markInputClosed(): void {
|
||||
if (this.context.inputClosed) {
|
||||
return; // Already marked as closed
|
||||
}
|
||||
|
||||
this.context.inputClosed = true;
|
||||
|
||||
const requestIds = Array.from(this.pendingOutgoingRequests.keys());
|
||||
|
||||
if (this.context.debugMode) {
|
||||
console.error(
|
||||
`[ControlDispatcher] Input closed, rejecting ${requestIds.length} pending outgoing requests`,
|
||||
);
|
||||
}
|
||||
|
||||
// Reject all currently pending outgoing requests
|
||||
for (const id of requestIds) {
|
||||
const pending = this.pendingOutgoingRequests.get(id);
|
||||
if (pending) {
|
||||
this.deregisterOutgoingRequest(id);
|
||||
pending.reject(new Error('Input closed'));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops all pending requests and cleans up all controllers
|
||||
*/
|
||||
|
|
@ -243,7 +273,7 @@ export class ControlDispatcher implements IPendingRequestRegistry {
|
|||
}
|
||||
|
||||
/**
|
||||
* Registers an incoming request in the pending registry
|
||||
* Registers an incoming request in the pending registry.
|
||||
*/
|
||||
registerIncomingRequest(
|
||||
requestId: string,
|
||||
|
|
|
|||
|
|
@ -124,6 +124,11 @@ export abstract class BaseController {
|
|||
timeoutMs: number = DEFAULT_REQUEST_TIMEOUT_MS,
|
||||
signal?: AbortSignal,
|
||||
): Promise<ControlResponse> {
|
||||
// Check if stream is closed
|
||||
if (this.context.inputClosed) {
|
||||
throw new Error('Input closed');
|
||||
}
|
||||
|
||||
// Check if already aborted
|
||||
if (signal?.aborted) {
|
||||
throw new Error('Request aborted');
|
||||
|
|
|
|||
|
|
@ -469,21 +469,27 @@ export class PermissionController extends BaseController {
|
|||
error,
|
||||
);
|
||||
}
|
||||
// On error, use default cancel message
|
||||
|
||||
// Extract error message
|
||||
const errorMessage =
|
||||
error instanceof Error ? error.message : String(error);
|
||||
|
||||
// On error, pass error message as cancel message
|
||||
// Only pass payload for exec and mcp types that support it
|
||||
const confirmationType = toolCall.confirmationDetails.type;
|
||||
if (['edit', 'exec', 'mcp'].includes(confirmationType)) {
|
||||
const execOrMcpDetails = toolCall.confirmationDetails as
|
||||
| ToolExecuteConfirmationDetails
|
||||
| ToolMcpConfirmationDetails;
|
||||
await execOrMcpDetails.onConfirm(
|
||||
ToolConfirmationOutcome.Cancel,
|
||||
undefined,
|
||||
);
|
||||
await execOrMcpDetails.onConfirm(ToolConfirmationOutcome.Cancel, {
|
||||
cancelMessage: `Error: ${errorMessage}`,
|
||||
});
|
||||
} else {
|
||||
// For other types, don't pass payload (backward compatible)
|
||||
await toolCall.confirmationDetails.onConfirm(
|
||||
ToolConfirmationOutcome.Cancel,
|
||||
{
|
||||
cancelMessage: `Error: ${errorMessage}`,
|
||||
},
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
|
|
|
|||
|
|
@ -153,6 +153,7 @@ describe('runNonInteractiveStreamJson', () => {
|
|||
handleControlResponse: ReturnType<typeof vi.fn>;
|
||||
handleCancel: ReturnType<typeof vi.fn>;
|
||||
shutdown: ReturnType<typeof vi.fn>;
|
||||
markInputClosed: ReturnType<typeof vi.fn>;
|
||||
getPendingIncomingRequestCount: ReturnType<typeof vi.fn>;
|
||||
waitForPendingIncomingRequests: ReturnType<typeof vi.fn>;
|
||||
sdkMcpController: {
|
||||
|
|
@ -192,6 +193,7 @@ describe('runNonInteractiveStreamJson', () => {
|
|||
handleControlResponse: vi.fn(),
|
||||
handleCancel: vi.fn(),
|
||||
shutdown: vi.fn(),
|
||||
markInputClosed: vi.fn(),
|
||||
getPendingIncomingRequestCount: vi.fn().mockReturnValue(0),
|
||||
waitForPendingIncomingRequests: vi.fn().mockResolvedValue(undefined),
|
||||
sdkMcpController: {
|
||||
|
|
|
|||
|
|
@ -596,7 +596,14 @@ class Session {
|
|||
throw streamError;
|
||||
}
|
||||
|
||||
// Stream ended - wait for all pending work before shutdown
|
||||
// Stdin closed - mark input as closed in dispatcher
|
||||
// This will reject all current pending outgoing requests AND any future requests
|
||||
// that might be registered by async message handlers still running
|
||||
if (this.dispatcher) {
|
||||
this.dispatcher.markInputClosed();
|
||||
}
|
||||
|
||||
// Wait for all pending work before shutdown
|
||||
await this.waitForAllPendingWork();
|
||||
await this.shutdown();
|
||||
} catch (error) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue