From e133627e8a83fd42809f8eec7552fd69334ec7b6 Mon Sep 17 00:00:00 2001 From: tanzhenxin Date: Tue, 17 Mar 2026 15:45:17 +0800 Subject: [PATCH] feat(core): execute task tools concurrently for improved performance Task tools spawn independent sub-agents with no shared mutable state, making them safe to run in parallel. This change executes all task tools concurrently while keeping other tools sequential to preserve any implicit ordering the model may rely on. Co-authored-by: Qwen-Coder --- .../core/src/core/coreToolScheduler.test.ts | 226 ++++++++++++++++++ packages/core/src/core/coreToolScheduler.ts | 25 +- 2 files changed, 248 insertions(+), 3 deletions(-) diff --git a/packages/core/src/core/coreToolScheduler.test.ts b/packages/core/src/core/coreToolScheduler.test.ts index 3411fff50..918ade81c 100644 --- a/packages/core/src/core/coreToolScheduler.test.ts +++ b/packages/core/src/core/coreToolScheduler.test.ts @@ -2583,3 +2583,229 @@ describe('CoreToolScheduler plan mode with ask_user_question', () => { expect(completedCalls[0].status).toBe('cancelled'); }); }); + +describe('Concurrent task tool execution', () => { + function createScheduler( + tools: Map, + onAllToolCallsComplete: Mock, + onToolCallsUpdate: Mock, + ) { + const mockToolRegistry = { + getTool: (name: string) => tools.get(name), + getFunctionDeclarations: () => [], + tools, + discovery: {}, + registerTool: () => {}, + getToolByName: (name: string) => tools.get(name), + getToolByDisplayName: () => undefined, + getTools: () => [...tools.values()], + discoverTools: async () => {}, + getAllTools: () => [...tools.values()], + getToolsByServer: () => [], + } as unknown as ToolRegistry; + + const mockConfig = { + getSessionId: () => 'test-session-id', + getUsageStatisticsEnabled: () => true, + getDebugMode: () => false, + getApprovalMode: () => ApprovalMode.AUTO_EDIT, + getAllowedTools: () => [], + getContentGeneratorConfig: () => ({ + model: 'test-model', + authType: 'gemini', + }), + getShellExecutionConfig: () => ({ + terminalWidth: 90, + terminalHeight: 30, + }), + storage: { + getProjectTempDir: () => '/tmp', + }, + getTruncateToolOutputThreshold: () => + DEFAULT_TRUNCATE_TOOL_OUTPUT_THRESHOLD, + getTruncateToolOutputLines: () => DEFAULT_TRUNCATE_TOOL_OUTPUT_LINES, + getToolRegistry: () => mockToolRegistry, + getUseModelRouter: () => false, + getGeminiClient: () => null, + getChatRecordingService: () => undefined, + } as unknown as Config; + + return new CoreToolScheduler({ + config: mockConfig, + onAllToolCallsComplete, + onToolCallsUpdate, + getPreferredEditor: () => 'vscode', + onEditorClose: vi.fn(), + }); + } + + it('should execute multiple task tools concurrently', async () => { + const executionLog: string[] = []; + + const taskTool = new MockTool({ + name: 'task', + execute: async (params) => { + const id = (params as { id: string }).id; + executionLog.push(`start:${id}`); + // Simulate async work — concurrent tasks will interleave here + await new Promise((r) => setTimeout(r, 50)); + executionLog.push(`end:${id}`); + return { + llmContent: `Task ${id} done`, + returnDisplay: `Task ${id} done`, + }; + }, + }); + + const tools = new Map([['task', taskTool]]); + const onAllToolCallsComplete = vi.fn(); + const onToolCallsUpdate = vi.fn(); + const scheduler = createScheduler( + tools, + onAllToolCallsComplete, + onToolCallsUpdate, + ); + + const abortController = new AbortController(); + const requests = [ + { + callId: '1', + name: 'task', + args: { id: 'A' }, + isClientInitiated: false, + prompt_id: 'p1', + }, + { + callId: '2', + name: 'task', + args: { id: 'B' }, + isClientInitiated: false, + prompt_id: 'p1', + }, + { + callId: '3', + name: 'task', + args: { id: 'C' }, + isClientInitiated: false, + prompt_id: 'p1', + }, + ]; + + await scheduler.schedule(requests, abortController.signal); + + // All tasks should have completed + expect(onAllToolCallsComplete).toHaveBeenCalled(); + const completedCalls = onAllToolCallsComplete.mock + .calls[0][0] as ToolCall[]; + expect(completedCalls).toHaveLength(3); + expect(completedCalls.every((c) => c.status === 'success')).toBe(true); + + // Verify concurrency: all tasks should start before any finishes + // With sequential execution, the log would be [start:A, end:A, start:B, end:B, ...] + // With concurrent execution, all starts happen before any end + const startIndices = executionLog + .filter((e) => e.startsWith('start:')) + .map((e) => executionLog.indexOf(e)); + const firstEnd = executionLog.findIndex((e) => e.startsWith('end:')); + expect(startIndices.every((i) => i < firstEnd)).toBe(true); + }); + + it('should run task tools concurrently while other tools run sequentially', async () => { + const executionLog: string[] = []; + + const taskTool = new MockTool({ + name: 'task', + execute: async (params) => { + const id = (params as { id: string }).id; + executionLog.push(`task:start:${id}`); + await new Promise((r) => setTimeout(r, 50)); + executionLog.push(`task:end:${id}`); + return { + llmContent: `Task ${id} done`, + returnDisplay: `Task ${id} done`, + }; + }, + }); + + const readTool = new MockTool({ + name: 'read_file', + execute: async (params) => { + const id = (params as { id: string }).id; + executionLog.push(`read:start:${id}`); + await new Promise((r) => setTimeout(r, 20)); + executionLog.push(`read:end:${id}`); + return { + llmContent: `Read ${id} done`, + returnDisplay: `Read ${id} done`, + }; + }, + }); + + const tools = new Map([ + ['task', taskTool], + ['read_file', readTool], + ]); + const onAllToolCallsComplete = vi.fn(); + const onToolCallsUpdate = vi.fn(); + const scheduler = createScheduler( + tools, + onAllToolCallsComplete, + onToolCallsUpdate, + ); + + const abortController = new AbortController(); + const requests = [ + { + callId: '1', + name: 'read_file', + args: { id: '1' }, + isClientInitiated: false, + prompt_id: 'p1', + }, + { + callId: '2', + name: 'task', + args: { id: 'A' }, + isClientInitiated: false, + prompt_id: 'p1', + }, + { + callId: '3', + name: 'read_file', + args: { id: '2' }, + isClientInitiated: false, + prompt_id: 'p1', + }, + { + callId: '4', + name: 'task', + args: { id: 'B' }, + isClientInitiated: false, + prompt_id: 'p1', + }, + ]; + + await scheduler.schedule(requests, abortController.signal); + + expect(onAllToolCallsComplete).toHaveBeenCalled(); + const completedCalls = onAllToolCallsComplete.mock + .calls[0][0] as ToolCall[]; + expect(completedCalls).toHaveLength(4); + expect(completedCalls.every((c) => c.status === 'success')).toBe(true); + + // Non-task tools should execute sequentially: read:1 finishes before read:2 starts + const read1End = executionLog.indexOf('read:end:1'); + const read2Start = executionLog.indexOf('read:start:2'); + expect(read1End).toBeLessThan(read2Start); + + // Task tools should execute concurrently: both start before either ends + const taskAStart = executionLog.indexOf('task:start:A'); + const taskBStart = executionLog.indexOf('task:start:B'); + const firstTaskEnd = Math.min( + executionLog.indexOf('task:end:A'), + executionLog.indexOf('task:end:B'), + ); + expect(taskAStart).toBeLessThan(firstTaskEnd); + expect(taskBStart).toBeLessThan(firstTaskEnd); + }); +}); diff --git a/packages/core/src/core/coreToolScheduler.ts b/packages/core/src/core/coreToolScheduler.ts index 7a8ab2895..20e60bd4d 100644 --- a/packages/core/src/core/coreToolScheduler.ts +++ b/packages/core/src/core/coreToolScheduler.ts @@ -1081,9 +1081,28 @@ export class CoreToolScheduler { (call) => call.status === 'scheduled', ); - for (const toolCall of callsToExecute) { - await this.executeSingleToolCall(toolCall, signal); - } + // Task tools are safe to run concurrently — they spawn independent + // sub-agents with no shared mutable state. All other tools run + // sequentially in their original order to preserve any implicit + // ordering the model may rely on. + const taskCalls = callsToExecute.filter( + (call) => call.request.name === ToolNames.TASK, + ); + const otherCalls = callsToExecute.filter( + (call) => call.request.name !== ToolNames.TASK, + ); + + const taskPromise = Promise.all( + taskCalls.map((tc) => this.executeSingleToolCall(tc, signal)), + ); + + const othersPromise = (async () => { + for (const toolCall of otherCalls) { + await this.executeSingleToolCall(toolCall, signal); + } + })(); + + await Promise.all([taskPromise, othersPromise]); } }