diff --git a/packages/cli/src/acp-integration/session/Session.test.ts b/packages/cli/src/acp-integration/session/Session.test.ts index ea8a44dd6..9996980f3 100644 --- a/packages/cli/src/acp-integration/session/Session.test.ts +++ b/packages/cli/src/acp-integration/session/Session.test.ts @@ -1082,6 +1082,125 @@ describe('Session', () => { }); }); + describe('tool call concurrency', () => { + it('runs multiple Agent tool calls concurrently (issue #2516)', async () => { + // Each Agent call has two controllable async boundaries: + // - `called` — resolves *when* the test code reaches `execute()` + // - `result` — the promise `execute()` returns, resolved by the + // test after observing both `called` signals. + // + // Under the old sequential for-loop, call-b's `execute()` would + // only run after call-a's `execute()` promise resolved — so the + // `await Promise.all([called-a, called-b])` below deadlocks and + // the test hits vitest's default per-test timeout. Under the + // concurrent implementation both `called` signals fire before + // either `result` is resolved. + type Deferred = { + promise: Promise; + resolve: (v: T) => void; + }; + const makeDeferred = (): Deferred => { + let resolve!: (v: T) => void; + const promise = new Promise((r) => { + resolve = r; + }); + return { promise, resolve }; + }; + + const called: Record> = { + 'call-a': makeDeferred(), + 'call-b': makeDeferred(), + }; + const result: Record> = { + 'call-a': makeDeferred(), + 'call-b': makeDeferred(), + }; + + const agentTool = { + name: core.ToolNames.AGENT, + kind: core.Kind.Think, + build: vi.fn().mockImplementation((args: Record) => { + const id = args['_test_id'] as string; + return { + params: args, + eventEmitter: undefined, + getDefaultPermission: vi.fn().mockResolvedValue('allow'), + getDescription: vi.fn().mockReturnValue(`agent ${id}`), + toolLocations: vi.fn().mockReturnValue([]), + execute: vi.fn().mockImplementation(() => { + called[id].resolve(); + return result[id].promise; + }), + }; + }), + }; + + mockToolRegistry.getTool.mockImplementation((name: string) => + name === core.ToolNames.AGENT ? agentTool : undefined, + ); + mockConfig.getApprovalMode = vi + .fn() + .mockReturnValue(ApprovalMode.DEFAULT); + mockConfig.getPermissionManager = vi.fn().mockReturnValue(null); + + // Model returns two Agent calls, then an empty stream once results + // are fed back (to terminate the prompt loop). + const sendMessageStream = vi + .fn() + .mockResolvedValueOnce( + createStreamWithChunks([ + { + type: core.StreamEventType.CHUNK, + value: { + functionCalls: [ + { + id: 'call-a', + name: core.ToolNames.AGENT, + args: { _test_id: 'call-a', subagent_type: 'explore' }, + }, + { + id: 'call-b', + name: core.ToolNames.AGENT, + args: { _test_id: 'call-b', subagent_type: 'explore' }, + }, + ], + }, + }, + ]), + ) + .mockResolvedValueOnce(createEmptyStream()); + mockChat.sendMessageStream = sendMessageStream; + + const promptPromise = session.prompt({ + sessionId: 'test-session-id', + prompt: [{ type: 'text', text: 'spawn two agents' }], + }); + + // Wait until both `execute()` bodies have been entered. Sequential + // behaviour deadlocks here → vitest times out the test → failure. + await Promise.all([called['call-a'].promise, called['call-b'].promise]); + + // Resolve out of order to also verify that final part ordering + // follows the original functionCalls order, not resolution order. + result['call-b'].resolve({ llmContent: 'B-done', returnDisplay: 'B' }); + result['call-a'].resolve({ llmContent: 'A-done', returnDisplay: 'A' }); + + await promptPromise; + + // The second sendMessageStream invocation carries the tool responses + // that will be fed back to the model — assert their order matches + // the original function-call order (A before B). + expect(sendMessageStream).toHaveBeenCalledTimes(2); + const followUp = sendMessageStream.mock.calls[1][1] as { + message: Array<{ functionResponse?: { id?: string } }>; + }; + const ids = followUp.message + .filter((p) => p.functionResponse) + .map((p) => p.functionResponse?.id); + expect(ids).toEqual(['call-a', 'call-b']); + }); + }); + describe('system reminders', () => { // Captures the `message` parts fed into chat.sendMessageStream on the // first turn so individual tests can assert what the model saw. diff --git a/packages/cli/src/acp-integration/session/Session.ts b/packages/cli/src/acp-integration/session/Session.ts index 9c4d0f999..9e921434a 100644 --- a/packages/cli/src/acp-integration/session/Session.ts +++ b/packages/cli/src/acp-integration/session/Session.ts @@ -522,17 +522,11 @@ export class Session implements SessionContext { } if (functionCalls.length > 0) { - const toolResponseParts: Part[] = []; - - for (const fc of functionCalls) { - const response = await this.runTool( - pendingSend.signal, - promptId, - fc, - ); - toolResponseParts.push(...response); - } - + const toolResponseParts = await this.runToolCalls( + pendingSend.signal, + promptId, + functionCalls, + ); nextMessage = { role: 'user', parts: toolResponseParts }; } } @@ -763,17 +757,11 @@ export class Session implements SessionContext { // Process tool calls from the follow-up message if (functionCalls.length > 0) { - const toolResponseParts: Part[] = []; - - for (const fc of functionCalls) { - const toolResponse = await this.runTool( - pendingSend.signal, - promptId, - fc, - ); - toolResponseParts.push(...toolResponse); - } - + const toolResponseParts = await this.runToolCalls( + pendingSend.signal, + promptId, + functionCalls, + ); nextMessage = { role: 'user', parts: toolResponseParts }; } } @@ -956,11 +944,11 @@ export class Session implements SessionContext { } if (functionCalls.length > 0) { - const toolResponseParts: Part[] = []; - for (const fc of functionCalls) { - const response = await this.runTool(ac.signal, promptId, fc); - toolResponseParts.push(...response); - } + const toolResponseParts = await this.runToolCalls( + ac.signal, + promptId, + functionCalls, + ); nextMessage = { role: 'user', parts: toolResponseParts }; } } @@ -1103,6 +1091,79 @@ export class Session implements SessionContext { await this.sendUpdate(update); } + /** + * Execute a batch of model-returned tool calls, running Agent calls + * concurrently while keeping other tools sequential. + * + * Mirrors the partition logic in `coreToolScheduler.partitionToolCalls`: + * consecutive Agent calls form a parallel batch (they spawn independent + * sub-agents with no shared mutable state); any other tool forms its own + * sequential batch to preserve the implicit ordering the model may rely + * on. Response-part ordering matches the original `functionCalls` order. + */ + private async runToolCalls( + abortSignal: AbortSignal, + promptId: string, + functionCalls: FunctionCall[], + ): Promise { + type Batch = { concurrent: boolean; calls: FunctionCall[] }; + const batches: Batch[] = []; + for (const fc of functionCalls) { + const isAgent = fc.name === ToolNames.AGENT; + const last = batches[batches.length - 1]; + if (isAgent && last?.concurrent) { + last.calls.push(fc); + } else { + batches.push({ concurrent: isAgent, calls: [fc] }); + } + } + + // Bounded-concurrency runner: matches core's `runConcurrently` + // behaviour (`coreToolScheduler.ts:1506`), capped by + // `QWEN_CODE_MAX_TOOL_CONCURRENCY` (default 10). Results are returned + // in input order regardless of resolution order. + const runBounded = async (calls: FunctionCall[]): Promise => { + const parsed = parseInt( + process.env['QWEN_CODE_MAX_TOOL_CONCURRENCY'] || '', + 10, + ); + const maxConcurrency = + Number.isFinite(parsed) && parsed >= 1 ? parsed : 10; + const results: Part[][] = new Array(calls.length); + const executing = new Set>(); + for (let i = 0; i < calls.length; i++) { + const idx = i; + const p = this.runTool(abortSignal, promptId, calls[idx]) + .then((r) => { + results[idx] = r; + }) + .finally(() => { + executing.delete(p); + }); + executing.add(p); + if (executing.size >= maxConcurrency) { + await Promise.race(executing); + } + } + await Promise.all(executing); + return results; + }; + + const parts: Part[] = []; + for (const batch of batches) { + if (batch.concurrent && batch.calls.length > 1) { + const results = await runBounded(batch.calls); + for (const r of results) parts.push(...r); + } else { + for (const fc of batch.calls) { + const r = await this.runTool(abortSignal, promptId, fc); + parts.push(...r); + } + } + } + return parts; + } + /** * Assemble the per-turn system reminders the model needs to see at the * start of a user query or cron fire. Mirrors the subagent/plan/arena @@ -1247,14 +1308,19 @@ export class Session implements SessionContext { try { const invocation = tool.build(args); - if (isAgentTool && 'eventEmitter' in invocation) { - // Access eventEmitter from AgentTool invocation - const taskEventEmitter = ( - invocation as { - eventEmitter: AgentEventEmitter; - } - ).eventEmitter; - + // Production AgentTool always initializes `eventEmitter` on its + // invocation (`agent.ts:392`). Be defensive about the `undefined` + // case too so an incomplete/custom AgentTool invocation degrades + // gracefully (no sub-agent event forwarding) instead of throwing + // inside SubAgentTracker.setup — the `'eventEmitter' in invocation` + // key-presence check passed for `{ eventEmitter: undefined }` and + // the ensuing `eventEmitter.on(...)` blew up. + const taskEventEmitter = ( + invocation as { + eventEmitter?: AgentEventEmitter; + } + ).eventEmitter; + if (isAgentTool && taskEventEmitter) { // Extract subagent metadata from AgentTool call const parentToolCallId = callId; const subagentType = (args['subagent_type'] as string) ?? '';