mirror of
https://github.com/QwenLM/qwen-code.git
synced 2026-05-20 09:24:03 +00:00
fix(serve): address tanzhenxin REQUEST_CHANGES (cold-spawn + streaming-test bind)
Two findings from the CHANGES_REQUESTED review on PR #4113. - T1 (integration-tests/cli/qwen-serve-streaming.test.ts) — high severity: the daemon spawn in `beforeAll` did not pass `--workspace REPO_ROOT`, so under §02 the daemon bound to whatever cwd the test runner was invoked from. Every later `createOrAttachSession({ workspaceCwd: REPO_ROOT })` then 400'd with `workspace_mismatch`, and the entire file — child-crash recovery, multi-client first-responder permission, Last-Event-ID resume — silently no-op'd once `SKIP_LLM_TESTS` was unset. The sibling `qwen-serve-routes.test.ts` got the same fix earlier in this PR; this file was missed in that pass. Added the flag with a comment pointing at the rationale so the omission can't recur. - T2 (packages/cli/src/serve/httpAcpBridge.ts) — medium severity: cold-spawn window orphans the agent child on double-Ctrl+C. The `qwen --acp` child exists from the moment `channelFactory` spawns it, but pre-fix the bridge only added the channel to `aliveChannels` AFTER `connection.initialize()` returned. During the up-to-`initTimeoutMs` (default 10s) handshake window `aliveChannels` was empty, and a double-Ctrl+C in that window played out as: first SIGINT entered `shutdown()` and awaited the in-flight spawn; second SIGINT called `killAllSync()` against an empty set; `process.exit(1)` orphaned the child. Same class of bug the BkUyD invariant set out to close — the post-init overwrite race was covered, the pre-init handshake window wasn't. Fix: move `info` creation + `aliveChannels.add(info)` + the `channel.exited` handler registration BEFORE the `initialize` await. Init-failure / late-shutdown / child-crash-during-handshake all converge on the same cleanup path: mark `isDying = true`, `await channel.kill()`, let the exited handler `aliveChannels .delete(info)` once the OS reaps the process. `channelInfo` (the attach target) is still assigned LAST so `ensureChannel`'s fast-path never returns a still-handshaking channel. Regression test: `killAllSync force-kills the channel during the initialize handshake` uses a bespoke factory whose agent's `initialize` never resolves and asserts `killAllSync` fires killSync against the channel during the handshake window. Pre-fix the test would observe an empty `killSyncCalls` array. bridge: 74/74 (was 73, +1 cold-spawn test); server: 80/80; tsc clean for PR-touched files.
This commit is contained in:
parent
19650b51d4
commit
3e544f384f
3 changed files with 203 additions and 25 deletions
|
|
@ -71,6 +71,17 @@ beforeAll(async () => {
|
|||
TOKEN,
|
||||
'--hostname',
|
||||
'127.0.0.1',
|
||||
// Per #3803 §02 (1 daemon = 1 workspace), pin the bound
|
||||
// workspace so every `createOrAttachSession({ workspaceCwd:
|
||||
// REPO_ROOT })` below matches. Without this the daemon inherits
|
||||
// the test runner's cwd (CI / IDE-launcher / direct vitest
|
||||
// invocations all differ) and every session create returns
|
||||
// 400 workspace_mismatch — the SSE / permission / Last-Event-ID
|
||||
// tests below would all silently 404 once `SKIP_LLM_TESTS` is
|
||||
// unset. Same fix the sibling routes test received earlier in
|
||||
// this PR — missed in this file in the original §02 pass.
|
||||
'--workspace',
|
||||
REPO_ROOT,
|
||||
],
|
||||
{ stdio: ['ignore', 'pipe', 'pipe'] },
|
||||
);
|
||||
|
|
|
|||
|
|
@ -487,6 +487,131 @@ describe('createHttpAcpBridge', () => {
|
|||
void shutdownPromise;
|
||||
});
|
||||
|
||||
it('killAllSync force-kills the channel during the initialize handshake (tanzhenxin cold-spawn-window)', async () => {
|
||||
// tanzhenxin cold-spawn-window finding: the agent child exists
|
||||
// from the moment `channelFactory(boundWorkspace)` returns, but
|
||||
// pre-fix `aliveChannels.add(info)` ran only AFTER the
|
||||
// `initialize` handshake completed (up to `initTimeoutMs`,
|
||||
// default 10s). A double-Ctrl+C in that handshake window played
|
||||
// out as: first SIGINT entered `shutdown()` and awaited the
|
||||
// in-flight spawn; second SIGINT called `killAllSync()` against
|
||||
// an empty `aliveChannels` (the channel hadn't been added yet)
|
||||
// and `process.exit(1)` orphaned the child. The fix moves the
|
||||
// add + the `channel.exited` handler registration BEFORE the
|
||||
// `initialize` await; this test pins that the channel is
|
||||
// reachable via `killAllSync` during the handshake.
|
||||
const killSyncCalls: string[] = [];
|
||||
const factory: ChannelFactory = async () => {
|
||||
// Bespoke agent whose `initialize` never resolves — that's the
|
||||
// handshake-hanging window the finding is about. A real agent
|
||||
// can spend up to `initTimeoutMs` ms here before the bridge's
|
||||
// `withTimeout` aborts it.
|
||||
const ab = new TransformStream<Uint8Array, Uint8Array>();
|
||||
const ba = new TransformStream<Uint8Array, Uint8Array>();
|
||||
const clientStream = ndJsonStream(ab.writable, ba.readable);
|
||||
const agentStream = ndJsonStream(ba.writable, ab.readable);
|
||||
let resolveExited:
|
||||
| ((
|
||||
info?:
|
||||
| {
|
||||
exitCode: number | null;
|
||||
signalCode: NodeJS.Signals | null;
|
||||
}
|
||||
| undefined,
|
||||
) => void)
|
||||
| undefined;
|
||||
const exited = new Promise<
|
||||
| { exitCode: number | null; signalCode: NodeJS.Signals | null }
|
||||
| undefined
|
||||
>((r) => {
|
||||
resolveExited = r;
|
||||
});
|
||||
const stuckAgent: Agent = {
|
||||
async initialize() {
|
||||
// Hang forever — the bridge's `withTimeout` would normally
|
||||
// bound this, but the test asserts behavior DURING the
|
||||
// handshake, so we let it sit until killAllSync resolves
|
||||
// `exited` and tears the channel down externally.
|
||||
return new Promise<InitializeResponse>(() => {});
|
||||
},
|
||||
async newSession() {
|
||||
throw new Error('newSession should not be reached');
|
||||
},
|
||||
async loadSession() {
|
||||
throw new Error('loadSession should not be reached');
|
||||
},
|
||||
async authenticate() {
|
||||
throw new Error('authenticate should not be reached');
|
||||
},
|
||||
async prompt() {
|
||||
throw new Error('prompt should not be reached');
|
||||
},
|
||||
async cancel() {
|
||||
/* no-op */
|
||||
},
|
||||
async setSessionMode() {
|
||||
throw new Error('setSessionMode should not be reached');
|
||||
},
|
||||
async setSessionConfigOption() {
|
||||
throw new Error('setSessionConfigOption should not be reached');
|
||||
},
|
||||
};
|
||||
new AgentSideConnection(() => stuckAgent, agentStream);
|
||||
return {
|
||||
stream: clientStream,
|
||||
exited,
|
||||
kill: async () => {
|
||||
resolveExited!(undefined);
|
||||
},
|
||||
killSync: () => {
|
||||
killSyncCalls.push('called');
|
||||
resolveExited!(undefined);
|
||||
},
|
||||
};
|
||||
};
|
||||
const bridge = makeBridge({
|
||||
channelFactory: factory,
|
||||
// Bump initializeTimeoutMs so it doesn't race with the
|
||||
// killAllSync we fire below. We're NOT testing the timeout
|
||||
// path — we're testing the cold-spawn window before it.
|
||||
initializeTimeoutMs: 30_000,
|
||||
});
|
||||
|
||||
// Kick off a spawn — `initialize` hangs forever in this fake,
|
||||
// so the spawn promise never resolves naturally. Don't await
|
||||
// (would block the test); `.catch` keeps the rejection from
|
||||
// being unhandled when killAllSync eventually tears things down.
|
||||
const spawnPromise = bridge
|
||||
.spawnOrAttach({ workspaceCwd: WS_A })
|
||||
.catch(() => undefined);
|
||||
|
||||
// Yield enough microtasks for `channelFactory` to return AND the
|
||||
// bridge's `info` creation + `aliveChannels.add(info)` + the
|
||||
// `channel.exited` handler registration to all run BEFORE the
|
||||
// bridge enters `await initialize`. Pre-fix the alive-set add
|
||||
// sat AFTER initialize, so any number of yields here would still
|
||||
// find an empty set when killAllSync fires below.
|
||||
for (let i = 0; i < 5; i++) {
|
||||
await new Promise((r) => setImmediate(r));
|
||||
}
|
||||
|
||||
// Operator double-Ctrl+C arrives during the handshake window.
|
||||
bridge.killAllSync();
|
||||
|
||||
// Post-fix expectation: channel was added to `aliveChannels`
|
||||
// BEFORE the `initialize` await, so killAllSync iterates a set
|
||||
// containing it and fires killSync. Pre-fix this array would
|
||||
// have been empty — and `process.exit(1)` after this would have
|
||||
// orphaned the agent child.
|
||||
expect(killSyncCalls).toEqual(['called']);
|
||||
|
||||
// Cleanup: spawnPromise resolves on its own once killSync's
|
||||
// `resolveExited` propagates through the bridge's
|
||||
// `channel.exited` handler and the IIFE's catch reaps the half-
|
||||
// initialized channel.
|
||||
void spawnPromise;
|
||||
});
|
||||
|
||||
it('killSession marks the channel dying so concurrent spawnOrAttach gets a fresh channel', async () => {
|
||||
// After the last session is killed, `channel.kill()` runs through
|
||||
// its SIGTERM grace window before SIGKILL — up to 10s in the real
|
||||
|
|
|
|||
|
|
@ -1151,30 +1151,21 @@ export function createHttpAcpBridge(opts: BridgeOptions): HttpAcpBridge {
|
|||
);
|
||||
const connection = new ClientSideConnection(() => client, channel.stream);
|
||||
|
||||
try {
|
||||
await withTimeout(
|
||||
connection.initialize({
|
||||
protocolVersion: PROTOCOL_VERSION,
|
||||
clientCapabilities: {
|
||||
fs: { readTextFile: true, writeTextFile: true },
|
||||
},
|
||||
clientInfo: { name: 'qwen-serve-bridge', version: '0' },
|
||||
}),
|
||||
initTimeoutMs,
|
||||
'initialize',
|
||||
);
|
||||
} catch (err) {
|
||||
await channel.kill().catch(() => {});
|
||||
throw err;
|
||||
}
|
||||
|
||||
// Late-shutdown re-check: if shutdown flipped during `initialize`,
|
||||
// tear this channel down rather than leak past `process.exit(0)`.
|
||||
if (shuttingDown) {
|
||||
await channel.kill().catch(() => {});
|
||||
throw new Error('HttpAcpBridge is shutting down');
|
||||
}
|
||||
|
||||
// tanzhenxin cold-spawn-window finding: the agent child exists
|
||||
// from the moment `channelFactory(boundWorkspace)` returns, but
|
||||
// pre-fix `aliveChannels.add(info)` ran only AFTER the
|
||||
// `initialize` handshake completed (up to `initTimeoutMs`, default
|
||||
// 10s). A double-Ctrl+C in that handshake window played out as:
|
||||
// first SIGINT entered `shutdown()` and awaited the in-flight
|
||||
// spawn; second SIGINT called `killAllSync()` against an empty
|
||||
// `aliveChannels` (the channel hadn't been added yet) and
|
||||
// `process.exit(1)` orphaned the child. Add to `aliveChannels`
|
||||
// BEFORE the handshake await, and register the `channel.exited`
|
||||
// handler immediately afterward so init-failure / child-crash /
|
||||
// late-shutdown all clean up through the standard exit path
|
||||
// instead of needing bespoke catches. `channelInfo` (the attach
|
||||
// target) stays set only AFTER initialize succeeds — see further
|
||||
// down — so callers don't attach to a still-handshaking channel.
|
||||
const info: ChannelInfo = {
|
||||
channel,
|
||||
connection,
|
||||
|
|
@ -1182,7 +1173,6 @@ export function createHttpAcpBridge(opts: BridgeOptions): HttpAcpBridge {
|
|||
sessionIds: new Set(),
|
||||
isDying: false,
|
||||
};
|
||||
channelInfo = info;
|
||||
aliveChannels.add(info);
|
||||
|
||||
// One-time channel.exited cleanup. The child dying takes ALL
|
||||
|
|
@ -1191,6 +1181,14 @@ export function createHttpAcpBridge(opts: BridgeOptions): HttpAcpBridge {
|
|||
// iteration), publish `session_died` on each session's bus,
|
||||
// remove from byId / defaultEntry / pending tables.
|
||||
//
|
||||
// Registered BEFORE the `initialize` await (tanzhenxin
|
||||
// cold-spawn-window fix above) so init-failure / child-crash /
|
||||
// late-shutdown all converge here. During handshake
|
||||
// `sessionIds` is empty — the loop below no-ops, the stderr
|
||||
// line still fires to tell operators "agent process gone
|
||||
// during init", and `aliveChannels.delete(info)` clears the
|
||||
// entry through the normal exit path.
|
||||
//
|
||||
// tanzhenxin BkUyD: drop from `aliveChannels` ONLY when the OS
|
||||
// process is actually gone. Async kill paths (`killSession`
|
||||
// reap, `shutdown()` await, `doSpawn`'s newSession-failure
|
||||
|
|
@ -1242,6 +1240,50 @@ export function createHttpAcpBridge(opts: BridgeOptions): HttpAcpBridge {
|
|||
}
|
||||
});
|
||||
|
||||
// Initialize handshake. The channel is already in
|
||||
// `aliveChannels` and the `channel.exited` handler above is
|
||||
// registered, so failure paths (init throw, timeout, late
|
||||
// shutdown) only need to mark dying + kill — the handler does
|
||||
// the alive-set cleanup when the OS reaps the child.
|
||||
try {
|
||||
await withTimeout(
|
||||
connection.initialize({
|
||||
protocolVersion: PROTOCOL_VERSION,
|
||||
clientCapabilities: {
|
||||
fs: { readTextFile: true, writeTextFile: true },
|
||||
},
|
||||
clientInfo: { name: 'qwen-serve-bridge', version: '0' },
|
||||
}),
|
||||
initTimeoutMs,
|
||||
'initialize',
|
||||
);
|
||||
} catch (err) {
|
||||
// Mark dying so concurrent `ensureChannel` callers spawn
|
||||
// fresh instead of awaiting this in-flight promise's
|
||||
// `channelInfo` (which we never assigned). The kill below
|
||||
// triggers `channel.exited` → handler runs `aliveChannels
|
||||
// .delete(info)` → entry leaves the alive set after OS reap.
|
||||
info.isDying = true;
|
||||
await channel.kill().catch(() => {});
|
||||
throw err;
|
||||
}
|
||||
|
||||
// Late-shutdown re-check: if shutdown flipped during the
|
||||
// handshake, tear this channel down rather than leak past
|
||||
// `process.exit(0)`. Same cleanup pattern as the init-failure
|
||||
// path: mark dying + kill, let the exited handler reap.
|
||||
if (shuttingDown) {
|
||||
info.isDying = true;
|
||||
await channel.kill().catch(() => {});
|
||||
throw new Error('HttpAcpBridge is shutting down');
|
||||
}
|
||||
|
||||
// Handshake succeeded — now publish the channel as the
|
||||
// attach-available slot. `channelInfo` is assigned LAST so
|
||||
// `ensureChannel`'s fast-path (`if (channelInfo && !.isDying)`)
|
||||
// never returns a still-handshaking channel to a concurrent
|
||||
// caller.
|
||||
channelInfo = info;
|
||||
return info;
|
||||
})();
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue