fix(serve): close 4 deepseek review threads — closed-bus race + per-session stderr + entry override (#3803)

BBb9H (correctness): `BridgeClient.requestPermission` could orphan
a pending permission if the bus closed between `registerPending`
and `entry.events.publish` (the shutdown path closes per-session
buses BEFORE awaiting `channel.kill()`, so the agent can still
issue `requestPermission` in that window). Pending was registered
in the daemon-wide map but `publish()` returned `undefined`
(closed bus) → no SSE subscriber ever saw the request → no client
voted → agent's `requestPermission` hung forever, blocking the
daemon's `Promise.all` over child kills. Now: check publish's
return; if `undefined`, roll back the pending via a new
`rollbackPending` callback that resolves it as `cancelled`.

BBb8e (Critical observability): child stderr was `'inherit'` —
all sessions' stderr interleaved on the daemon's stderr stream
unattributed. Switched to `'pipe'` and forward each line with a
`[serve pid=<n> cwd=<dir>]` prefix; operators can now
`grep pid=12345` to pull one session's trace cleanly. Updated
the now-stale doc comment that claimed inherit was current.

BBb8- (deployability): `process.argv[1]` is brittle — fails on
non-`qwen` launchers (bundled binaries, npx wrappers, `node -e`,
`tsx`, container images that relocate the script). Added
`QWEN_CLI_ENTRY` env override as the higher-priority resolution
path. Improved the failure message to suggest the env var as
the actionable fix.

BBb82 (documented limitation): `withTimeout` REJECTS but doesn't
ABORT the underlying ACP op. For `unstable_setSessionModel` this
means a timed-out caller perceives failure while the agent may
eventually complete the switch — drift between caller's perceived
model and agent's actual model + contradictory SSE events.
Documented as a Stage 1 limitation in the `withTimeout` JSDoc;
acceptable because (1) ACP doesn't expose a cancel signal for
`unstable_setSessionModel` yet so we couldn't abort even if we
wanted to, (2) model switches complete in milliseconds in
practice — a timeout means genuinely wedged, not just slow.
Stage 2 will add abort plumbing once ACP exposes the hook.
This commit is contained in:
wenshao 2026-05-11 17:42:41 +08:00
parent 716145934c
commit f8509dde5a

View file

@ -281,6 +281,13 @@ class BridgeClient implements Client {
constructor(
private readonly resolveEntry: () => SessionEntry | undefined,
private readonly registerPending: (pending: PendingPermission) => void,
/**
* Roll back a `registerPending` call when the subsequent publish
* fails (closed bus). Resolves the pending promise as cancelled
* and removes it from the daemon-wide maps so a late
* `respondToPermission` for this id returns 404 cleanly.
*/
private readonly rollbackPending: (requestId: string) => void,
) {}
async requestPermission(
@ -296,7 +303,22 @@ class BridgeClient implements Client {
sessionId: entry.sessionId,
resolve,
});
entry.events.publish({
// `publish()` returns `undefined` on a closed bus — the
// shutdown path closes per-session buses BEFORE awaiting
// `channel.kill()`, leaving a small window where the agent
// can still issue `requestPermission`. If we registered the
// pending entry above but the publish fails, no SSE
// subscriber will ever see the request → no client can vote
// → the pending promise never resolves → agent's
// `requestPermission` hangs forever (a real bug, not a
// theoretical one — the daemon's shutdown.kill() loop awaits
// each child, and a child stuck waiting on permission would
// pin shutdown until the kill timer expires).
//
// Resolve as `cancelled` immediately if the bus rejected
// the publish. Mirrors the orphan-permission handling in
// `registerPending` itself for the entry-already-gone case.
const published = entry.events.publish({
type: 'permission_request',
data: {
requestId,
@ -305,6 +327,10 @@ class BridgeClient implements Client {
options: params.options,
},
});
if (!published) {
// Roll back the pending registration and resolve cancelled.
this.rollbackPending(requestId);
}
});
}
@ -464,7 +490,14 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
): Promise<BridgeSession> {
const channel = await channelFactory(workspaceKey);
let entry: SessionEntry | undefined;
const client = new BridgeClient(() => entry, registerPending);
const client = new BridgeClient(
() => entry,
registerPending,
(rid) =>
// Roll back a register-then-publish-failed pending so the agent
// doesn't hang waiting on a vote nobody can see.
resolvePending(rid, { outcome: { outcome: 'cancelled' } }),
);
const connection = new ClientSideConnection(() => client, channel.stream);
try {
@ -1173,6 +1206,29 @@ function canonicalizeWorkspace(p: string): string {
}
}
/**
* Race `p` against a timeout. The timeout REJECTS the returned
* promise but does NOT abort the underlying operation `p` keeps
* running to completion (or its own failure) and its eventual
* resolution is silently dropped.
*
* Stage 1 limitation: for `unstable_setSessionModel` the agent may
* complete the model switch AFTER we surfaced the timeout to the
* HTTP caller, leading to drift between caller's perceived model
* and agent's actual model. Subscribers also see contradictory
* SSE events (`model_switch_failed` from the timeout, then a late
* `model_switched` if the agent succeeds). Acceptable for Stage 1
* because:
* 1. ACP's `unstable_setSessionModel` doesn't accept a cancel
* signal yet (the SDK's `prompt` does, hence `sendPrompt`'s
* explicit `cancel` notification on abort).
* 2. Model switches complete in milliseconds in practice; a
* timeout firing means the agent is genuinely wedged, not
* just slow, and would have been DOA anyway.
* Stage 2 will add abort plumbing once ACP exposes a cancel hook
* for `unstable_setSessionModel`. Tracked in the model-change
* concurrency notes in `applyModelServiceId`.
*/
async function withTimeout<T>(
p: Promise<T>,
ms: number,
@ -1208,19 +1264,37 @@ async function withTimeout<T>(
export const defaultSpawnChannelFactory: ChannelFactory = async (
workspaceCwd,
) => {
const cliEntry = process.argv[1];
// Resolution order:
// 1. `QWEN_CLI_ENTRY` env override — escape hatch for non-standard
// launch paths (bundled binaries, npx wrappers, `node -e`,
// `tsx ./src/...`, custom shims, container images that
// relocate the entry script). Anyone hitting "process.argv[1]
// is empty" or "process.argv[1] points at the wrong file" can
// set this without code changes.
// 2. `process.argv[1]` — works when launched via the `qwen` bin
// shim, which is the common path.
// Fail loudly with an actionable error if neither resolves.
const cliEntry = process.env['QWEN_CLI_ENTRY'] || process.argv[1];
if (!cliEntry) {
throw new Error(
'Cannot determine CLI entry path for spawning the ACP child (process.argv[1] is empty).',
'Cannot determine CLI entry path for spawning the ACP child: ' +
'process.argv[1] is empty and QWEN_CLI_ENTRY is unset. ' +
'Set QWEN_CLI_ENTRY to the absolute path of the qwen entry ' +
'script (e.g. `export QWEN_CLI_ENTRY=$(which qwen)`) to override.',
);
}
// Each session takes ~3 file descriptors (stdin/stdout/stderr) for the
// child plus a few sockets. Operators running many concurrent sessions
// should bump `ulimit -n` accordingly. Stage 1 doesn't pre-flight FD
// headroom — Stage 2 in-process drops the per-session FD cost entirely.
// Child stderr is `inherit`ed so it lands in the daemon's stderr; this
// is interleaved across sessions and hard to debug. Stage 4+ remote
// sandboxes will isolate.
// Child stderr is piped (NOT `inherit`ed) so we can prefix each
// line with `[serve pid=… cwd=…]` before forwarding to the
// daemon's stderr — see the prefix-and-forward loop below the
// `spawn(...)` call. Sessions are still interleaved on the
// daemon's stderr stream but each line carries its own session
// identifier, so operators can `grep pid=12345` to pull one
// session's trace cleanly. Stage 4+ remote sandboxes will isolate
// stderr at the transport level.
//
// Note: spawning `process.execPath` only works when the entry script can
// be loaded by raw Node. In dev (e.g. `npm run dev` via `tsx`) the entry
@ -1266,10 +1340,46 @@ export const defaultSpawnChannelFactory: ChannelFactory = async (
// reviewer.
const child = spawn(process.execPath, [cliEntry, '--acp'], {
cwd: workspaceCwd,
stdio: ['pipe', 'pipe', 'inherit'],
// Pipe stderr (was: 'inherit') so we can prefix each line with
// the spawn's pid + workspace, making per-session crash output
// attributable. Bare 'inherit' sends every child's stderr to
// the daemon's stderr verbatim and unprefixed — under any
// multi-session load the operator's log becomes a salad of
// unattributed traces.
stdio: ['pipe', 'pipe', 'pipe'],
env: childEnv,
});
// Forward child stderr to the daemon's stderr line-by-line, with a
// `[serve pid=… cwd=…]` prefix on each line so operators can
// correlate stack traces back to the spawning request. Best-effort:
// a child that prints partial lines without a trailing newline is
// flushed when the stream emits `end`.
if (child.stderr) {
let buf = '';
const prefix = `[serve pid=${child.pid} cwd=${workspaceCwd}] `;
const flush = (line: string) => {
if (line.length > 0) process.stderr.write(prefix + line + '\n');
};
child.stderr.setEncoding('utf8');
child.stderr.on('data', (chunk: string) => {
buf += chunk;
let nl = buf.indexOf('\n');
while (nl !== -1) {
flush(buf.slice(0, nl));
buf = buf.slice(nl + 1);
nl = buf.indexOf('\n');
}
});
child.stderr.on('end', () => {
if (buf.length > 0) flush(buf);
});
child.stderr.on('error', () => {
// Don't crash the daemon if the pipe breaks; the child is
// already gone or about to be.
});
}
// Build the `exited` promise BEFORE checking stdin/stdout so the listener
// is in place before any error event can fire. We treat both `exit` and
// `error` as termination — without an `error` listener Node would treat