Address PR #3463 review: bound concurrency + robust test timing

Two issues raised by the /review bot:

1. The raw Promise.all fan-out bypassed the bounded-concurrency guard
   that coreToolScheduler applies via QWEN_CODE_MAX_TOOL_CONCURRENCY.
   Replaced with an inline runBounded helper that mirrors core's
   runConcurrently (Promise.race on a bounded executing set, default
   cap 10), keeping in-order result collection.

2. The concurrency test used a 10-iteration microtask yield loop before
   asserting both execute() spies had been invoked. That's fragile —
   runTool's pre-execute path (build → getDefaultPermission →
   evaluatePermissionRules → permission branch → PreToolUseHook) has
   more await boundaries than 10 ticks guarantees, and the CI run
   reported call-a still at 0 invocations at the assertion point.

   Reworked the test to wait on an explicit `called` deferred that
   resolves *inside* the execute() mock body. Under sequential
   behaviour only one `called` would ever fire → `Promise.all([called-a,
   called-b])` deadlocks → vitest's per-test timeout surfaces the
   regression. Under the fix both fire before either result resolves.
This commit is contained in:
沐目 2026-04-20 23:06:41 +08:00
parent b704ae405c
commit 4519c5f9c1
2 changed files with 65 additions and 35 deletions

View file

@ -1069,45 +1069,53 @@ describe('Session', () => {
describe('tool call concurrency', () => {
it('runs multiple Agent tool calls concurrently (issue #2516)', async () => {
// Build a controllable Agent invocation whose execute() blocks until
// the test resolves it. If the ACP loop runs Agent calls sequentially
// the second `execute` would never be invoked while the first is
// still pending — the concurrent implementation starts both before
// either resolves.
type Deferred = {
promise: Promise<core.ToolResult>;
resolve: (v: core.ToolResult) => void;
// Each Agent call has two controllable async boundaries:
// - `called` — resolves *when* the test code reaches `execute()`
// - `result` — the promise `execute()` returns, resolved by the
// test after observing both `called` signals.
//
// Under the old sequential for-loop, call-b's `execute()` would
// only run after call-a's `execute()` promise resolved — so the
// `await Promise.all([called-a, called-b])` below deadlocks and
// the test hits vitest's default per-test timeout. Under the
// concurrent implementation both `called` signals fire before
// either `result` is resolved.
type Deferred<T> = {
promise: Promise<T>;
resolve: (v: T) => void;
};
const makeDeferred = (): Deferred => {
let resolve!: (v: core.ToolResult) => void;
const promise = new Promise<core.ToolResult>((r) => {
const makeDeferred = <T>(): Deferred<T> => {
let resolve!: (v: T) => void;
const promise = new Promise<T>((r) => {
resolve = r;
});
return { promise, resolve };
};
const defs: Record<string, Deferred> = {
'call-a': makeDeferred(),
'call-b': makeDeferred(),
const called: Record<string, Deferred<void>> = {
'call-a': makeDeferred<void>(),
'call-b': makeDeferred<void>(),
};
const result: Record<string, Deferred<core.ToolResult>> = {
'call-a': makeDeferred<core.ToolResult>(),
'call-b': makeDeferred<core.ToolResult>(),
};
const executeSpies: Record<string, ReturnType<typeof vi.fn>> = {};
const invocationsBuilt: string[] = [];
const agentTool = {
name: core.ToolNames.AGENT,
kind: core.Kind.Think,
build: vi.fn().mockImplementation((args: Record<string, unknown>) => {
const id = args['_test_id'] as string;
invocationsBuilt.push(id);
const execute = vi.fn().mockImplementation(() => defs[id].promise);
executeSpies[id] = execute;
return {
params: args,
eventEmitter: undefined,
getDefaultPermission: vi.fn().mockResolvedValue('allow'),
getDescription: vi.fn().mockReturnValue(`agent ${id}`),
toolLocations: vi.fn().mockReturnValue([]),
execute,
execute: vi.fn().mockImplementation(() => {
called[id].resolve();
return result[id].promise;
}),
};
}),
};
@ -1153,21 +1161,14 @@ describe('Session', () => {
prompt: [{ type: 'text', text: 'spawn two agents' }],
});
// Yield to the microtask queue repeatedly so the session has a chance
// to fan out both Agent calls before we resolve anything. 10 ticks is
// generous for the `build → permission-eval → execute` path.
for (let i = 0; i < 10; i++) await Promise.resolve();
// Proof of concurrency: both execute() were invoked before either
// resolved. Under the old sequential for-loop this assertion fails
// because call-b's execute would only run after call-a resolved.
expect(executeSpies['call-a']).toHaveBeenCalledTimes(1);
expect(executeSpies['call-b']).toHaveBeenCalledTimes(1);
// Wait until both `execute()` bodies have been entered. Sequential
// behaviour deadlocks here → vitest times out the test → failure.
await Promise.all([called['call-a'].promise, called['call-b'].promise]);
// Resolve out of order to also verify that final part ordering
// follows the original functionCalls order, not resolution order.
defs['call-b'].resolve({ llmContent: 'B-done', returnDisplay: 'B' });
defs['call-a'].resolve({ llmContent: 'A-done', returnDisplay: 'A' });
result['call-b'].resolve({ llmContent: 'B-done', returnDisplay: 'B' });
result['call-a'].resolve({ llmContent: 'A-done', returnDisplay: 'A' });
await promptPromise;

View file

@ -1105,12 +1105,41 @@ export class Session implements SessionContext {
}
}
// Bounded-concurrency runner: matches core's `runConcurrently`
// behaviour (`coreToolScheduler.ts:1506`), capped by
// `QWEN_CODE_MAX_TOOL_CONCURRENCY` (default 10). Results are returned
// in input order regardless of resolution order.
const runBounded = async (calls: FunctionCall[]): Promise<Part[][]> => {
const parsed = parseInt(
process.env['QWEN_CODE_MAX_TOOL_CONCURRENCY'] || '',
10,
);
const maxConcurrency =
Number.isFinite(parsed) && parsed >= 1 ? parsed : 10;
const results: Part[][] = new Array(calls.length);
const executing = new Set<Promise<void>>();
for (let i = 0; i < calls.length; i++) {
const idx = i;
const p = this.runTool(abortSignal, promptId, calls[idx])
.then((r) => {
results[idx] = r;
})
.finally(() => {
executing.delete(p);
});
executing.add(p);
if (executing.size >= maxConcurrency) {
await Promise.race(executing);
}
}
await Promise.all(executing);
return results;
};
const parts: Part[] = [];
for (const batch of batches) {
if (batch.concurrent && batch.calls.length > 1) {
const results = await Promise.all(
batch.calls.map((fc) => this.runTool(abortSignal, promptId, fc)),
);
const results = await runBounded(batch.calls);
for (const r of results) parts.push(...r);
} else {
for (const fc of batch.calls) {