From 3e544f384ffc273490eafeffcd9dafc30c8612db Mon Sep 17 00:00:00 2001 From: Shaojin Wen Date: Thu, 14 May 2026 20:31:18 +0800 Subject: [PATCH] fix(serve): address tanzhenxin REQUEST_CHANGES (cold-spawn + streaming-test bind) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two findings from the CHANGES_REQUESTED review on PR #4113. - T1 (integration-tests/cli/qwen-serve-streaming.test.ts) — high severity: the daemon spawn in `beforeAll` did not pass `--workspace REPO_ROOT`, so under §02 the daemon bound to whatever cwd the test runner was invoked from. Every later `createOrAttachSession({ workspaceCwd: REPO_ROOT })` then 400'd with `workspace_mismatch`, and the entire file — child-crash recovery, multi-client first-responder permission, Last-Event-ID resume — silently no-op'd once `SKIP_LLM_TESTS` was unset. The sibling `qwen-serve-routes.test.ts` got the same fix earlier in this PR; this file was missed in that pass. Added the flag with a comment pointing at the rationale so the omission can't recur. - T2 (packages/cli/src/serve/httpAcpBridge.ts) — medium severity: cold-spawn window orphans the agent child on double-Ctrl+C. The `qwen --acp` child exists from the moment `channelFactory` spawns it, but pre-fix the bridge only added the channel to `aliveChannels` AFTER `connection.initialize()` returned. During the up-to-`initTimeoutMs` (default 10s) handshake window `aliveChannels` was empty, and a double-Ctrl+C in that window played out as: first SIGINT entered `shutdown()` and awaited the in-flight spawn; second SIGINT called `killAllSync()` against an empty set; `process.exit(1)` orphaned the child. Same class of bug the BkUyD invariant set out to close — the post-init overwrite race was covered, the pre-init handshake window wasn't. Fix: move `info` creation + `aliveChannels.add(info)` + the `channel.exited` handler registration BEFORE the `initialize` await. Init-failure / late-shutdown / child-crash-during-handshake all converge on the same cleanup path: mark `isDying = true`, `await channel.kill()`, let the exited handler `aliveChannels .delete(info)` once the OS reaps the process. `channelInfo` (the attach target) is still assigned LAST so `ensureChannel`'s fast-path never returns a still-handshaking channel. Regression test: `killAllSync force-kills the channel during the initialize handshake` uses a bespoke factory whose agent's `initialize` never resolves and asserts `killAllSync` fires killSync against the channel during the handshake window. Pre-fix the test would observe an empty `killSyncCalls` array. bridge: 74/74 (was 73, +1 cold-spawn test); server: 80/80; tsc clean for PR-touched files. --- .../cli/qwen-serve-streaming.test.ts | 11 ++ packages/cli/src/serve/httpAcpBridge.test.ts | 125 ++++++++++++++++++ packages/cli/src/serve/httpAcpBridge.ts | 92 +++++++++---- 3 files changed, 203 insertions(+), 25 deletions(-) diff --git a/integration-tests/cli/qwen-serve-streaming.test.ts b/integration-tests/cli/qwen-serve-streaming.test.ts index 08369bc23..afc25f6d9 100644 --- a/integration-tests/cli/qwen-serve-streaming.test.ts +++ b/integration-tests/cli/qwen-serve-streaming.test.ts @@ -71,6 +71,17 @@ beforeAll(async () => { TOKEN, '--hostname', '127.0.0.1', + // Per #3803 §02 (1 daemon = 1 workspace), pin the bound + // workspace so every `createOrAttachSession({ workspaceCwd: + // REPO_ROOT })` below matches. Without this the daemon inherits + // the test runner's cwd (CI / IDE-launcher / direct vitest + // invocations all differ) and every session create returns + // 400 workspace_mismatch — the SSE / permission / Last-Event-ID + // tests below would all silently 404 once `SKIP_LLM_TESTS` is + // unset. Same fix the sibling routes test received earlier in + // this PR — missed in this file in the original §02 pass. + '--workspace', + REPO_ROOT, ], { stdio: ['ignore', 'pipe', 'pipe'] }, ); diff --git a/packages/cli/src/serve/httpAcpBridge.test.ts b/packages/cli/src/serve/httpAcpBridge.test.ts index ac6161d35..48b72a0d9 100644 --- a/packages/cli/src/serve/httpAcpBridge.test.ts +++ b/packages/cli/src/serve/httpAcpBridge.test.ts @@ -487,6 +487,131 @@ describe('createHttpAcpBridge', () => { void shutdownPromise; }); + it('killAllSync force-kills the channel during the initialize handshake (tanzhenxin cold-spawn-window)', async () => { + // tanzhenxin cold-spawn-window finding: the agent child exists + // from the moment `channelFactory(boundWorkspace)` returns, but + // pre-fix `aliveChannels.add(info)` ran only AFTER the + // `initialize` handshake completed (up to `initTimeoutMs`, + // default 10s). A double-Ctrl+C in that handshake window played + // out as: first SIGINT entered `shutdown()` and awaited the + // in-flight spawn; second SIGINT called `killAllSync()` against + // an empty `aliveChannels` (the channel hadn't been added yet) + // and `process.exit(1)` orphaned the child. The fix moves the + // add + the `channel.exited` handler registration BEFORE the + // `initialize` await; this test pins that the channel is + // reachable via `killAllSync` during the handshake. + const killSyncCalls: string[] = []; + const factory: ChannelFactory = async () => { + // Bespoke agent whose `initialize` never resolves — that's the + // handshake-hanging window the finding is about. A real agent + // can spend up to `initTimeoutMs` ms here before the bridge's + // `withTimeout` aborts it. + const ab = new TransformStream(); + const ba = new TransformStream(); + const clientStream = ndJsonStream(ab.writable, ba.readable); + const agentStream = ndJsonStream(ba.writable, ab.readable); + let resolveExited: + | (( + info?: + | { + exitCode: number | null; + signalCode: NodeJS.Signals | null; + } + | undefined, + ) => void) + | undefined; + const exited = new Promise< + | { exitCode: number | null; signalCode: NodeJS.Signals | null } + | undefined + >((r) => { + resolveExited = r; + }); + const stuckAgent: Agent = { + async initialize() { + // Hang forever — the bridge's `withTimeout` would normally + // bound this, but the test asserts behavior DURING the + // handshake, so we let it sit until killAllSync resolves + // `exited` and tears the channel down externally. + return new Promise(() => {}); + }, + async newSession() { + throw new Error('newSession should not be reached'); + }, + async loadSession() { + throw new Error('loadSession should not be reached'); + }, + async authenticate() { + throw new Error('authenticate should not be reached'); + }, + async prompt() { + throw new Error('prompt should not be reached'); + }, + async cancel() { + /* no-op */ + }, + async setSessionMode() { + throw new Error('setSessionMode should not be reached'); + }, + async setSessionConfigOption() { + throw new Error('setSessionConfigOption should not be reached'); + }, + }; + new AgentSideConnection(() => stuckAgent, agentStream); + return { + stream: clientStream, + exited, + kill: async () => { + resolveExited!(undefined); + }, + killSync: () => { + killSyncCalls.push('called'); + resolveExited!(undefined); + }, + }; + }; + const bridge = makeBridge({ + channelFactory: factory, + // Bump initializeTimeoutMs so it doesn't race with the + // killAllSync we fire below. We're NOT testing the timeout + // path — we're testing the cold-spawn window before it. + initializeTimeoutMs: 30_000, + }); + + // Kick off a spawn — `initialize` hangs forever in this fake, + // so the spawn promise never resolves naturally. Don't await + // (would block the test); `.catch` keeps the rejection from + // being unhandled when killAllSync eventually tears things down. + const spawnPromise = bridge + .spawnOrAttach({ workspaceCwd: WS_A }) + .catch(() => undefined); + + // Yield enough microtasks for `channelFactory` to return AND the + // bridge's `info` creation + `aliveChannels.add(info)` + the + // `channel.exited` handler registration to all run BEFORE the + // bridge enters `await initialize`. Pre-fix the alive-set add + // sat AFTER initialize, so any number of yields here would still + // find an empty set when killAllSync fires below. + for (let i = 0; i < 5; i++) { + await new Promise((r) => setImmediate(r)); + } + + // Operator double-Ctrl+C arrives during the handshake window. + bridge.killAllSync(); + + // Post-fix expectation: channel was added to `aliveChannels` + // BEFORE the `initialize` await, so killAllSync iterates a set + // containing it and fires killSync. Pre-fix this array would + // have been empty — and `process.exit(1)` after this would have + // orphaned the agent child. + expect(killSyncCalls).toEqual(['called']); + + // Cleanup: spawnPromise resolves on its own once killSync's + // `resolveExited` propagates through the bridge's + // `channel.exited` handler and the IIFE's catch reaps the half- + // initialized channel. + void spawnPromise; + }); + it('killSession marks the channel dying so concurrent spawnOrAttach gets a fresh channel', async () => { // After the last session is killed, `channel.kill()` runs through // its SIGTERM grace window before SIGKILL — up to 10s in the real diff --git a/packages/cli/src/serve/httpAcpBridge.ts b/packages/cli/src/serve/httpAcpBridge.ts index 6b8f98470..265182edd 100644 --- a/packages/cli/src/serve/httpAcpBridge.ts +++ b/packages/cli/src/serve/httpAcpBridge.ts @@ -1151,30 +1151,21 @@ export function createHttpAcpBridge(opts: BridgeOptions): HttpAcpBridge { ); const connection = new ClientSideConnection(() => client, channel.stream); - try { - await withTimeout( - connection.initialize({ - protocolVersion: PROTOCOL_VERSION, - clientCapabilities: { - fs: { readTextFile: true, writeTextFile: true }, - }, - clientInfo: { name: 'qwen-serve-bridge', version: '0' }, - }), - initTimeoutMs, - 'initialize', - ); - } catch (err) { - await channel.kill().catch(() => {}); - throw err; - } - - // Late-shutdown re-check: if shutdown flipped during `initialize`, - // tear this channel down rather than leak past `process.exit(0)`. - if (shuttingDown) { - await channel.kill().catch(() => {}); - throw new Error('HttpAcpBridge is shutting down'); - } - + // tanzhenxin cold-spawn-window finding: the agent child exists + // from the moment `channelFactory(boundWorkspace)` returns, but + // pre-fix `aliveChannels.add(info)` ran only AFTER the + // `initialize` handshake completed (up to `initTimeoutMs`, default + // 10s). A double-Ctrl+C in that handshake window played out as: + // first SIGINT entered `shutdown()` and awaited the in-flight + // spawn; second SIGINT called `killAllSync()` against an empty + // `aliveChannels` (the channel hadn't been added yet) and + // `process.exit(1)` orphaned the child. Add to `aliveChannels` + // BEFORE the handshake await, and register the `channel.exited` + // handler immediately afterward so init-failure / child-crash / + // late-shutdown all clean up through the standard exit path + // instead of needing bespoke catches. `channelInfo` (the attach + // target) stays set only AFTER initialize succeeds — see further + // down — so callers don't attach to a still-handshaking channel. const info: ChannelInfo = { channel, connection, @@ -1182,7 +1173,6 @@ export function createHttpAcpBridge(opts: BridgeOptions): HttpAcpBridge { sessionIds: new Set(), isDying: false, }; - channelInfo = info; aliveChannels.add(info); // One-time channel.exited cleanup. The child dying takes ALL @@ -1191,6 +1181,14 @@ export function createHttpAcpBridge(opts: BridgeOptions): HttpAcpBridge { // iteration), publish `session_died` on each session's bus, // remove from byId / defaultEntry / pending tables. // + // Registered BEFORE the `initialize` await (tanzhenxin + // cold-spawn-window fix above) so init-failure / child-crash / + // late-shutdown all converge here. During handshake + // `sessionIds` is empty — the loop below no-ops, the stderr + // line still fires to tell operators "agent process gone + // during init", and `aliveChannels.delete(info)` clears the + // entry through the normal exit path. + // // tanzhenxin BkUyD: drop from `aliveChannels` ONLY when the OS // process is actually gone. Async kill paths (`killSession` // reap, `shutdown()` await, `doSpawn`'s newSession-failure @@ -1242,6 +1240,50 @@ export function createHttpAcpBridge(opts: BridgeOptions): HttpAcpBridge { } }); + // Initialize handshake. The channel is already in + // `aliveChannels` and the `channel.exited` handler above is + // registered, so failure paths (init throw, timeout, late + // shutdown) only need to mark dying + kill — the handler does + // the alive-set cleanup when the OS reaps the child. + try { + await withTimeout( + connection.initialize({ + protocolVersion: PROTOCOL_VERSION, + clientCapabilities: { + fs: { readTextFile: true, writeTextFile: true }, + }, + clientInfo: { name: 'qwen-serve-bridge', version: '0' }, + }), + initTimeoutMs, + 'initialize', + ); + } catch (err) { + // Mark dying so concurrent `ensureChannel` callers spawn + // fresh instead of awaiting this in-flight promise's + // `channelInfo` (which we never assigned). The kill below + // triggers `channel.exited` → handler runs `aliveChannels + // .delete(info)` → entry leaves the alive set after OS reap. + info.isDying = true; + await channel.kill().catch(() => {}); + throw err; + } + + // Late-shutdown re-check: if shutdown flipped during the + // handshake, tear this channel down rather than leak past + // `process.exit(0)`. Same cleanup pattern as the init-failure + // path: mark dying + kill, let the exited handler reap. + if (shuttingDown) { + info.isDying = true; + await channel.kill().catch(() => {}); + throw new Error('HttpAcpBridge is shutting down'); + } + + // Handshake succeeded — now publish the channel as the + // attach-available slot. `channelInfo` is assigned LAST so + // `ensureChannel`'s fast-path (`if (channelInfo && !.isDying)`) + // never returns a still-handshaking channel to a concurrent + // caller. + channelInfo = info; return info; })();