fix(cli): run ACP Agent tool calls concurrently (#2516) (#3463)

* fix(cli): run ACP Agent tool calls concurrently (#2516)

When the model returns multiple Agent tool calls in a single turn, the
ACP Session previously executed them sequentially in a plain for-loop,
multiplying latency by the number of sub-agents spawned.

Mirror the partition logic in coreToolScheduler.partitionToolCalls:
consecutive Agent calls form a parallel batch (safe because sub-agents
have no shared mutable state); any other tool forms its own sequential
batch so the model's implicit ordering is preserved. Response-part
ordering still matches the original functionCalls order.

Add a focused test that uses controllable deferred executes to prove
both Agent calls start before either resolves, and that the fed-back
functionResponse ordering is stable regardless of resolution order.

* Address PR #3463 review: bound concurrency + robust test timing

Two issues raised by the /review bot:

1. The raw Promise.all fan-out bypassed the bounded-concurrency guard
   that coreToolScheduler applies via QWEN_CODE_MAX_TOOL_CONCURRENCY.
   Replaced with an inline runBounded helper that mirrors core's
   runConcurrently (Promise.race on a bounded executing set, default
   cap 10), keeping in-order result collection.

2. The concurrency test used a 10-iteration microtask yield loop before
   asserting both execute() spies had been invoked. That's fragile —
   runTool's pre-execute path (build → getDefaultPermission →
   evaluatePermissionRules → permission branch → PreToolUseHook) has
   more await boundaries than 10 ticks guarantees, and the CI run
   reported call-a still at 0 invocations at the assertion point.

   Reworked the test to wait on an explicit `called` deferred that
   resolves *inside* the execute() mock body. Under sequential
   behaviour only one `called` would ever fire → `Promise.all([called-a,
   called-b])` deadlocks → vitest's per-test timeout surfaces the
   regression. Under the fix both fire before either result resolves.

* fix(acp): degrade gracefully when AgentTool invocation has no eventEmitter

The concurrency test for #2516 timed out on CI with "Test timed out in
5000ms" after the `await Promise.all([called-a, called-b])` rewrite in
the previous review-fix commit. The 5000ms wait was the symptom; the
root cause is that neither `execute()` was ever being called.

runTool's AgentTool branch was guarded with `'eventEmitter' in invocation`,
which is a *key-presence* check. The test mock provides
`{ eventEmitter: undefined, ... }` — the key exists (value undefined),
the branch is entered, and `SubAgentTracker.setup` immediately throws
inside `eventEmitter.on(...)`. The try/catch in runTool swallows the
throw and returns an error response, so `invocation.execute()` never
runs, `called[id].resolve()` never fires, and the test deadlocks.

The earlier review commit (4519c5f9c) interpreted the CI symptom as
"10 microtask yields aren't enough" and rewrote the assertion around a
deferred `Promise.all`. But the old test's `toHaveBeenCalledTimes(1)`
failure with 0 invocations was already the same bug — execute was never
called. The new formulation just converted the visible failure from an
assertion mismatch into a timeout.

Switch the guard to a truthy check against `invocation.eventEmitter`.
Semantics for real AgentTool are unchanged — `agent.ts:392` declares
`readonly eventEmitter: AgentEventEmitter = new AgentEventEmitter()`,
so production always enters the branch. The only new behavior is that
incomplete invocations (or test mocks) skip SubAgentTracker setup
cleanly instead of crashing. `subAgentCleanupFunctions` stays `[]`,
so the cleanup forEach at the success/error paths is a no-op.
This commit is contained in:
zhangxy-zju 2026-04-24 15:22:45 +08:00 committed by GitHub
parent 97926a07fe
commit d75c13aae0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 220 additions and 35 deletions

View file

@ -1082,6 +1082,125 @@ describe('Session', () => {
});
});
describe('tool call concurrency', () => {
it('runs multiple Agent tool calls concurrently (issue #2516)', async () => {
// Each Agent call has two controllable async boundaries:
// - `called` — resolves *when* the test code reaches `execute()`
// - `result` — the promise `execute()` returns, resolved by the
// test after observing both `called` signals.
//
// Under the old sequential for-loop, call-b's `execute()` would
// only run after call-a's `execute()` promise resolved — so the
// `await Promise.all([called-a, called-b])` below deadlocks and
// the test hits vitest's default per-test timeout. Under the
// concurrent implementation both `called` signals fire before
// either `result` is resolved.
type Deferred<T> = {
promise: Promise<T>;
resolve: (v: T) => void;
};
const makeDeferred = <T>(): Deferred<T> => {
let resolve!: (v: T) => void;
const promise = new Promise<T>((r) => {
resolve = r;
});
return { promise, resolve };
};
const called: Record<string, Deferred<void>> = {
'call-a': makeDeferred<void>(),
'call-b': makeDeferred<void>(),
};
const result: Record<string, Deferred<core.ToolResult>> = {
'call-a': makeDeferred<core.ToolResult>(),
'call-b': makeDeferred<core.ToolResult>(),
};
const agentTool = {
name: core.ToolNames.AGENT,
kind: core.Kind.Think,
build: vi.fn().mockImplementation((args: Record<string, unknown>) => {
const id = args['_test_id'] as string;
return {
params: args,
eventEmitter: undefined,
getDefaultPermission: vi.fn().mockResolvedValue('allow'),
getDescription: vi.fn().mockReturnValue(`agent ${id}`),
toolLocations: vi.fn().mockReturnValue([]),
execute: vi.fn().mockImplementation(() => {
called[id].resolve();
return result[id].promise;
}),
};
}),
};
mockToolRegistry.getTool.mockImplementation((name: string) =>
name === core.ToolNames.AGENT ? agentTool : undefined,
);
mockConfig.getApprovalMode = vi
.fn()
.mockReturnValue(ApprovalMode.DEFAULT);
mockConfig.getPermissionManager = vi.fn().mockReturnValue(null);
// Model returns two Agent calls, then an empty stream once results
// are fed back (to terminate the prompt loop).
const sendMessageStream = vi
.fn()
.mockResolvedValueOnce(
createStreamWithChunks([
{
type: core.StreamEventType.CHUNK,
value: {
functionCalls: [
{
id: 'call-a',
name: core.ToolNames.AGENT,
args: { _test_id: 'call-a', subagent_type: 'explore' },
},
{
id: 'call-b',
name: core.ToolNames.AGENT,
args: { _test_id: 'call-b', subagent_type: 'explore' },
},
],
},
},
]),
)
.mockResolvedValueOnce(createEmptyStream());
mockChat.sendMessageStream = sendMessageStream;
const promptPromise = session.prompt({
sessionId: 'test-session-id',
prompt: [{ type: 'text', text: 'spawn two agents' }],
});
// Wait until both `execute()` bodies have been entered. Sequential
// behaviour deadlocks here → vitest times out the test → failure.
await Promise.all([called['call-a'].promise, called['call-b'].promise]);
// Resolve out of order to also verify that final part ordering
// follows the original functionCalls order, not resolution order.
result['call-b'].resolve({ llmContent: 'B-done', returnDisplay: 'B' });
result['call-a'].resolve({ llmContent: 'A-done', returnDisplay: 'A' });
await promptPromise;
// The second sendMessageStream invocation carries the tool responses
// that will be fed back to the model — assert their order matches
// the original function-call order (A before B).
expect(sendMessageStream).toHaveBeenCalledTimes(2);
const followUp = sendMessageStream.mock.calls[1][1] as {
message: Array<{ functionResponse?: { id?: string } }>;
};
const ids = followUp.message
.filter((p) => p.functionResponse)
.map((p) => p.functionResponse?.id);
expect(ids).toEqual(['call-a', 'call-b']);
});
});
describe('system reminders', () => {
// Captures the `message` parts fed into chat.sendMessageStream on the
// first turn so individual tests can assert what the model saw.

View file

@ -522,17 +522,11 @@ export class Session implements SessionContext {
}
if (functionCalls.length > 0) {
const toolResponseParts: Part[] = [];
for (const fc of functionCalls) {
const response = await this.runTool(
pendingSend.signal,
promptId,
fc,
);
toolResponseParts.push(...response);
}
const toolResponseParts = await this.runToolCalls(
pendingSend.signal,
promptId,
functionCalls,
);
nextMessage = { role: 'user', parts: toolResponseParts };
}
}
@ -763,17 +757,11 @@ export class Session implements SessionContext {
// Process tool calls from the follow-up message
if (functionCalls.length > 0) {
const toolResponseParts: Part[] = [];
for (const fc of functionCalls) {
const toolResponse = await this.runTool(
pendingSend.signal,
promptId,
fc,
);
toolResponseParts.push(...toolResponse);
}
const toolResponseParts = await this.runToolCalls(
pendingSend.signal,
promptId,
functionCalls,
);
nextMessage = { role: 'user', parts: toolResponseParts };
}
}
@ -956,11 +944,11 @@ export class Session implements SessionContext {
}
if (functionCalls.length > 0) {
const toolResponseParts: Part[] = [];
for (const fc of functionCalls) {
const response = await this.runTool(ac.signal, promptId, fc);
toolResponseParts.push(...response);
}
const toolResponseParts = await this.runToolCalls(
ac.signal,
promptId,
functionCalls,
);
nextMessage = { role: 'user', parts: toolResponseParts };
}
}
@ -1103,6 +1091,79 @@ export class Session implements SessionContext {
await this.sendUpdate(update);
}
/**
* Execute a batch of model-returned tool calls, running Agent calls
* concurrently while keeping other tools sequential.
*
* Mirrors the partition logic in `coreToolScheduler.partitionToolCalls`:
* consecutive Agent calls form a parallel batch (they spawn independent
* sub-agents with no shared mutable state); any other tool forms its own
* sequential batch to preserve the implicit ordering the model may rely
* on. Response-part ordering matches the original `functionCalls` order.
*/
private async runToolCalls(
abortSignal: AbortSignal,
promptId: string,
functionCalls: FunctionCall[],
): Promise<Part[]> {
type Batch = { concurrent: boolean; calls: FunctionCall[] };
const batches: Batch[] = [];
for (const fc of functionCalls) {
const isAgent = fc.name === ToolNames.AGENT;
const last = batches[batches.length - 1];
if (isAgent && last?.concurrent) {
last.calls.push(fc);
} else {
batches.push({ concurrent: isAgent, calls: [fc] });
}
}
// Bounded-concurrency runner: matches core's `runConcurrently`
// behaviour (`coreToolScheduler.ts:1506`), capped by
// `QWEN_CODE_MAX_TOOL_CONCURRENCY` (default 10). Results are returned
// in input order regardless of resolution order.
const runBounded = async (calls: FunctionCall[]): Promise<Part[][]> => {
const parsed = parseInt(
process.env['QWEN_CODE_MAX_TOOL_CONCURRENCY'] || '',
10,
);
const maxConcurrency =
Number.isFinite(parsed) && parsed >= 1 ? parsed : 10;
const results: Part[][] = new Array(calls.length);
const executing = new Set<Promise<void>>();
for (let i = 0; i < calls.length; i++) {
const idx = i;
const p = this.runTool(abortSignal, promptId, calls[idx])
.then((r) => {
results[idx] = r;
})
.finally(() => {
executing.delete(p);
});
executing.add(p);
if (executing.size >= maxConcurrency) {
await Promise.race(executing);
}
}
await Promise.all(executing);
return results;
};
const parts: Part[] = [];
for (const batch of batches) {
if (batch.concurrent && batch.calls.length > 1) {
const results = await runBounded(batch.calls);
for (const r of results) parts.push(...r);
} else {
for (const fc of batch.calls) {
const r = await this.runTool(abortSignal, promptId, fc);
parts.push(...r);
}
}
}
return parts;
}
/**
* Assemble the per-turn system reminders the model needs to see at the
* start of a user query or cron fire. Mirrors the subagent/plan/arena
@ -1247,14 +1308,19 @@ export class Session implements SessionContext {
try {
const invocation = tool.build(args);
if (isAgentTool && 'eventEmitter' in invocation) {
// Access eventEmitter from AgentTool invocation
const taskEventEmitter = (
invocation as {
eventEmitter: AgentEventEmitter;
}
).eventEmitter;
// Production AgentTool always initializes `eventEmitter` on its
// invocation (`agent.ts:392`). Be defensive about the `undefined`
// case too so an incomplete/custom AgentTool invocation degrades
// gracefully (no sub-agent event forwarding) instead of throwing
// inside SubAgentTracker.setup — the `'eventEmitter' in invocation`
// key-presence check passed for `{ eventEmitter: undefined }` and
// the ensuing `eventEmitter.on(...)` blew up.
const taskEventEmitter = (
invocation as {
eventEmitter?: AgentEventEmitter;
}
).eventEmitter;
if (isAgentTool && taskEventEmitter) {
// Extract subagent metadata from AgentTool call
const parentToolCallId = callId;
const subagentType = (args['subagent_type'] as string) ?? '';