mirror of
https://github.com/QwenLM/qwen-code.git
synced 2026-05-19 07:54:38 +00:00
Address PR #3463 review: bound concurrency + robust test timing
Two issues raised by the /review bot: 1. The raw Promise.all fan-out bypassed the bounded-concurrency guard that coreToolScheduler applies via QWEN_CODE_MAX_TOOL_CONCURRENCY. Replaced with an inline runBounded helper that mirrors core's runConcurrently (Promise.race on a bounded executing set, default cap 10), keeping in-order result collection. 2. The concurrency test used a 10-iteration microtask yield loop before asserting both execute() spies had been invoked. That's fragile — runTool's pre-execute path (build → getDefaultPermission → evaluatePermissionRules → permission branch → PreToolUseHook) has more await boundaries than 10 ticks guarantees, and the CI run reported call-a still at 0 invocations at the assertion point. Reworked the test to wait on an explicit `called` deferred that resolves *inside* the execute() mock body. Under sequential behaviour only one `called` would ever fire → `Promise.all([called-a, called-b])` deadlocks → vitest's per-test timeout surfaces the regression. Under the fix both fire before either result resolves.
This commit is contained in:
parent
b704ae405c
commit
4519c5f9c1
2 changed files with 65 additions and 35 deletions
|
|
@ -1069,45 +1069,53 @@ describe('Session', () => {
|
|||
|
||||
describe('tool call concurrency', () => {
|
||||
it('runs multiple Agent tool calls concurrently (issue #2516)', async () => {
|
||||
// Build a controllable Agent invocation whose execute() blocks until
|
||||
// the test resolves it. If the ACP loop runs Agent calls sequentially
|
||||
// the second `execute` would never be invoked while the first is
|
||||
// still pending — the concurrent implementation starts both before
|
||||
// either resolves.
|
||||
type Deferred = {
|
||||
promise: Promise<core.ToolResult>;
|
||||
resolve: (v: core.ToolResult) => void;
|
||||
// Each Agent call has two controllable async boundaries:
|
||||
// - `called` — resolves *when* the test code reaches `execute()`
|
||||
// - `result` — the promise `execute()` returns, resolved by the
|
||||
// test after observing both `called` signals.
|
||||
//
|
||||
// Under the old sequential for-loop, call-b's `execute()` would
|
||||
// only run after call-a's `execute()` promise resolved — so the
|
||||
// `await Promise.all([called-a, called-b])` below deadlocks and
|
||||
// the test hits vitest's default per-test timeout. Under the
|
||||
// concurrent implementation both `called` signals fire before
|
||||
// either `result` is resolved.
|
||||
type Deferred<T> = {
|
||||
promise: Promise<T>;
|
||||
resolve: (v: T) => void;
|
||||
};
|
||||
const makeDeferred = (): Deferred => {
|
||||
let resolve!: (v: core.ToolResult) => void;
|
||||
const promise = new Promise<core.ToolResult>((r) => {
|
||||
const makeDeferred = <T>(): Deferred<T> => {
|
||||
let resolve!: (v: T) => void;
|
||||
const promise = new Promise<T>((r) => {
|
||||
resolve = r;
|
||||
});
|
||||
return { promise, resolve };
|
||||
};
|
||||
|
||||
const defs: Record<string, Deferred> = {
|
||||
'call-a': makeDeferred(),
|
||||
'call-b': makeDeferred(),
|
||||
const called: Record<string, Deferred<void>> = {
|
||||
'call-a': makeDeferred<void>(),
|
||||
'call-b': makeDeferred<void>(),
|
||||
};
|
||||
const result: Record<string, Deferred<core.ToolResult>> = {
|
||||
'call-a': makeDeferred<core.ToolResult>(),
|
||||
'call-b': makeDeferred<core.ToolResult>(),
|
||||
};
|
||||
const executeSpies: Record<string, ReturnType<typeof vi.fn>> = {};
|
||||
const invocationsBuilt: string[] = [];
|
||||
|
||||
const agentTool = {
|
||||
name: core.ToolNames.AGENT,
|
||||
kind: core.Kind.Think,
|
||||
build: vi.fn().mockImplementation((args: Record<string, unknown>) => {
|
||||
const id = args['_test_id'] as string;
|
||||
invocationsBuilt.push(id);
|
||||
const execute = vi.fn().mockImplementation(() => defs[id].promise);
|
||||
executeSpies[id] = execute;
|
||||
return {
|
||||
params: args,
|
||||
eventEmitter: undefined,
|
||||
getDefaultPermission: vi.fn().mockResolvedValue('allow'),
|
||||
getDescription: vi.fn().mockReturnValue(`agent ${id}`),
|
||||
toolLocations: vi.fn().mockReturnValue([]),
|
||||
execute,
|
||||
execute: vi.fn().mockImplementation(() => {
|
||||
called[id].resolve();
|
||||
return result[id].promise;
|
||||
}),
|
||||
};
|
||||
}),
|
||||
};
|
||||
|
|
@ -1153,21 +1161,14 @@ describe('Session', () => {
|
|||
prompt: [{ type: 'text', text: 'spawn two agents' }],
|
||||
});
|
||||
|
||||
// Yield to the microtask queue repeatedly so the session has a chance
|
||||
// to fan out both Agent calls before we resolve anything. 10 ticks is
|
||||
// generous for the `build → permission-eval → execute` path.
|
||||
for (let i = 0; i < 10; i++) await Promise.resolve();
|
||||
|
||||
// Proof of concurrency: both execute() were invoked before either
|
||||
// resolved. Under the old sequential for-loop this assertion fails
|
||||
// because call-b's execute would only run after call-a resolved.
|
||||
expect(executeSpies['call-a']).toHaveBeenCalledTimes(1);
|
||||
expect(executeSpies['call-b']).toHaveBeenCalledTimes(1);
|
||||
// Wait until both `execute()` bodies have been entered. Sequential
|
||||
// behaviour deadlocks here → vitest times out the test → failure.
|
||||
await Promise.all([called['call-a'].promise, called['call-b'].promise]);
|
||||
|
||||
// Resolve out of order to also verify that final part ordering
|
||||
// follows the original functionCalls order, not resolution order.
|
||||
defs['call-b'].resolve({ llmContent: 'B-done', returnDisplay: 'B' });
|
||||
defs['call-a'].resolve({ llmContent: 'A-done', returnDisplay: 'A' });
|
||||
result['call-b'].resolve({ llmContent: 'B-done', returnDisplay: 'B' });
|
||||
result['call-a'].resolve({ llmContent: 'A-done', returnDisplay: 'A' });
|
||||
|
||||
await promptPromise;
|
||||
|
||||
|
|
|
|||
|
|
@ -1105,12 +1105,41 @@ export class Session implements SessionContext {
|
|||
}
|
||||
}
|
||||
|
||||
// Bounded-concurrency runner: matches core's `runConcurrently`
|
||||
// behaviour (`coreToolScheduler.ts:1506`), capped by
|
||||
// `QWEN_CODE_MAX_TOOL_CONCURRENCY` (default 10). Results are returned
|
||||
// in input order regardless of resolution order.
|
||||
const runBounded = async (calls: FunctionCall[]): Promise<Part[][]> => {
|
||||
const parsed = parseInt(
|
||||
process.env['QWEN_CODE_MAX_TOOL_CONCURRENCY'] || '',
|
||||
10,
|
||||
);
|
||||
const maxConcurrency =
|
||||
Number.isFinite(parsed) && parsed >= 1 ? parsed : 10;
|
||||
const results: Part[][] = new Array(calls.length);
|
||||
const executing = new Set<Promise<void>>();
|
||||
for (let i = 0; i < calls.length; i++) {
|
||||
const idx = i;
|
||||
const p = this.runTool(abortSignal, promptId, calls[idx])
|
||||
.then((r) => {
|
||||
results[idx] = r;
|
||||
})
|
||||
.finally(() => {
|
||||
executing.delete(p);
|
||||
});
|
||||
executing.add(p);
|
||||
if (executing.size >= maxConcurrency) {
|
||||
await Promise.race(executing);
|
||||
}
|
||||
}
|
||||
await Promise.all(executing);
|
||||
return results;
|
||||
};
|
||||
|
||||
const parts: Part[] = [];
|
||||
for (const batch of batches) {
|
||||
if (batch.concurrent && batch.calls.length > 1) {
|
||||
const results = await Promise.all(
|
||||
batch.calls.map((fc) => this.runTool(abortSignal, promptId, fc)),
|
||||
);
|
||||
const results = await runBounded(batch.calls);
|
||||
for (const r of results) parts.push(...r);
|
||||
} else {
|
||||
for (const fc of batch.calls) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue