diff --git a/packages/cli/src/acp-integration/session/Session.test.ts b/packages/cli/src/acp-integration/session/Session.test.ts index 67213c446..8e7a5cd6c 100644 --- a/packages/cli/src/acp-integration/session/Session.test.ts +++ b/packages/cli/src/acp-integration/session/Session.test.ts @@ -1069,45 +1069,53 @@ describe('Session', () => { describe('tool call concurrency', () => { it('runs multiple Agent tool calls concurrently (issue #2516)', async () => { - // Build a controllable Agent invocation whose execute() blocks until - // the test resolves it. If the ACP loop runs Agent calls sequentially - // the second `execute` would never be invoked while the first is - // still pending — the concurrent implementation starts both before - // either resolves. - type Deferred = { - promise: Promise; - resolve: (v: core.ToolResult) => void; + // Each Agent call has two controllable async boundaries: + // - `called` — resolves *when* the test code reaches `execute()` + // - `result` — the promise `execute()` returns, resolved by the + // test after observing both `called` signals. + // + // Under the old sequential for-loop, call-b's `execute()` would + // only run after call-a's `execute()` promise resolved — so the + // `await Promise.all([called-a, called-b])` below deadlocks and + // the test hits vitest's default per-test timeout. Under the + // concurrent implementation both `called` signals fire before + // either `result` is resolved. + type Deferred = { + promise: Promise; + resolve: (v: T) => void; }; - const makeDeferred = (): Deferred => { - let resolve!: (v: core.ToolResult) => void; - const promise = new Promise((r) => { + const makeDeferred = (): Deferred => { + let resolve!: (v: T) => void; + const promise = new Promise((r) => { resolve = r; }); return { promise, resolve }; }; - const defs: Record = { - 'call-a': makeDeferred(), - 'call-b': makeDeferred(), + const called: Record> = { + 'call-a': makeDeferred(), + 'call-b': makeDeferred(), + }; + const result: Record> = { + 'call-a': makeDeferred(), + 'call-b': makeDeferred(), }; - const executeSpies: Record> = {}; - const invocationsBuilt: string[] = []; const agentTool = { name: core.ToolNames.AGENT, kind: core.Kind.Think, build: vi.fn().mockImplementation((args: Record) => { const id = args['_test_id'] as string; - invocationsBuilt.push(id); - const execute = vi.fn().mockImplementation(() => defs[id].promise); - executeSpies[id] = execute; return { params: args, eventEmitter: undefined, getDefaultPermission: vi.fn().mockResolvedValue('allow'), getDescription: vi.fn().mockReturnValue(`agent ${id}`), toolLocations: vi.fn().mockReturnValue([]), - execute, + execute: vi.fn().mockImplementation(() => { + called[id].resolve(); + return result[id].promise; + }), }; }), }; @@ -1153,21 +1161,14 @@ describe('Session', () => { prompt: [{ type: 'text', text: 'spawn two agents' }], }); - // Yield to the microtask queue repeatedly so the session has a chance - // to fan out both Agent calls before we resolve anything. 10 ticks is - // generous for the `build → permission-eval → execute` path. - for (let i = 0; i < 10; i++) await Promise.resolve(); - - // Proof of concurrency: both execute() were invoked before either - // resolved. Under the old sequential for-loop this assertion fails - // because call-b's execute would only run after call-a resolved. - expect(executeSpies['call-a']).toHaveBeenCalledTimes(1); - expect(executeSpies['call-b']).toHaveBeenCalledTimes(1); + // Wait until both `execute()` bodies have been entered. Sequential + // behaviour deadlocks here → vitest times out the test → failure. + await Promise.all([called['call-a'].promise, called['call-b'].promise]); // Resolve out of order to also verify that final part ordering // follows the original functionCalls order, not resolution order. - defs['call-b'].resolve({ llmContent: 'B-done', returnDisplay: 'B' }); - defs['call-a'].resolve({ llmContent: 'A-done', returnDisplay: 'A' }); + result['call-b'].resolve({ llmContent: 'B-done', returnDisplay: 'B' }); + result['call-a'].resolve({ llmContent: 'A-done', returnDisplay: 'A' }); await promptPromise; diff --git a/packages/cli/src/acp-integration/session/Session.ts b/packages/cli/src/acp-integration/session/Session.ts index ceb9a741e..1a0fd66b2 100644 --- a/packages/cli/src/acp-integration/session/Session.ts +++ b/packages/cli/src/acp-integration/session/Session.ts @@ -1105,12 +1105,41 @@ export class Session implements SessionContext { } } + // Bounded-concurrency runner: matches core's `runConcurrently` + // behaviour (`coreToolScheduler.ts:1506`), capped by + // `QWEN_CODE_MAX_TOOL_CONCURRENCY` (default 10). Results are returned + // in input order regardless of resolution order. + const runBounded = async (calls: FunctionCall[]): Promise => { + const parsed = parseInt( + process.env['QWEN_CODE_MAX_TOOL_CONCURRENCY'] || '', + 10, + ); + const maxConcurrency = + Number.isFinite(parsed) && parsed >= 1 ? parsed : 10; + const results: Part[][] = new Array(calls.length); + const executing = new Set>(); + for (let i = 0; i < calls.length; i++) { + const idx = i; + const p = this.runTool(abortSignal, promptId, calls[idx]) + .then((r) => { + results[idx] = r; + }) + .finally(() => { + executing.delete(p); + }); + executing.add(p); + if (executing.size >= maxConcurrency) { + await Promise.race(executing); + } + } + await Promise.all(executing); + return results; + }; + const parts: Part[] = []; for (const batch of batches) { if (batch.concurrent && batch.calls.length > 1) { - const results = await Promise.all( - batch.calls.map((fc) => this.runTool(abortSignal, promptId, fc)), - ); + const results = await runBounded(batch.calls); for (const r of results) parts.push(...r); } else { for (const fc of batch.calls) {