mirror of
https://github.com/QwenLM/qwen-code.git
synced 2026-05-17 03:57:18 +00:00
fix(serve): close 4 deepseek review threads — closed-bus race + per-session stderr + entry override (#3803)
BBb9H (correctness): `BridgeClient.requestPermission` could orphan a pending permission if the bus closed between `registerPending` and `entry.events.publish` (the shutdown path closes per-session buses BEFORE awaiting `channel.kill()`, so the agent can still issue `requestPermission` in that window). Pending was registered in the daemon-wide map but `publish()` returned `undefined` (closed bus) → no SSE subscriber ever saw the request → no client voted → agent's `requestPermission` hung forever, blocking the daemon's `Promise.all` over child kills. Now: check publish's return; if `undefined`, roll back the pending via a new `rollbackPending` callback that resolves it as `cancelled`. BBb8e (Critical observability): child stderr was `'inherit'` — all sessions' stderr interleaved on the daemon's stderr stream unattributed. Switched to `'pipe'` and forward each line with a `[serve pid=<n> cwd=<dir>]` prefix; operators can now `grep pid=12345` to pull one session's trace cleanly. Updated the now-stale doc comment that claimed inherit was current. BBb8- (deployability): `process.argv[1]` is brittle — fails on non-`qwen` launchers (bundled binaries, npx wrappers, `node -e`, `tsx`, container images that relocate the script). Added `QWEN_CLI_ENTRY` env override as the higher-priority resolution path. Improved the failure message to suggest the env var as the actionable fix. BBb82 (documented limitation): `withTimeout` REJECTS but doesn't ABORT the underlying ACP op. For `unstable_setSessionModel` this means a timed-out caller perceives failure while the agent may eventually complete the switch — drift between caller's perceived model and agent's actual model + contradictory SSE events. Documented as a Stage 1 limitation in the `withTimeout` JSDoc; acceptable because (1) ACP doesn't expose a cancel signal for `unstable_setSessionModel` yet so we couldn't abort even if we wanted to, (2) model switches complete in milliseconds in practice — a timeout means genuinely wedged, not just slow. Stage 2 will add abort plumbing once ACP exposes the hook.
This commit is contained in:
parent
716145934c
commit
f8509dde5a
1 changed files with 118 additions and 8 deletions
|
|
@ -281,6 +281,13 @@ class BridgeClient implements Client {
|
|||
constructor(
|
||||
private readonly resolveEntry: () => SessionEntry | undefined,
|
||||
private readonly registerPending: (pending: PendingPermission) => void,
|
||||
/**
|
||||
* Roll back a `registerPending` call when the subsequent publish
|
||||
* fails (closed bus). Resolves the pending promise as cancelled
|
||||
* and removes it from the daemon-wide maps so a late
|
||||
* `respondToPermission` for this id returns 404 cleanly.
|
||||
*/
|
||||
private readonly rollbackPending: (requestId: string) => void,
|
||||
) {}
|
||||
|
||||
async requestPermission(
|
||||
|
|
@ -296,7 +303,22 @@ class BridgeClient implements Client {
|
|||
sessionId: entry.sessionId,
|
||||
resolve,
|
||||
});
|
||||
entry.events.publish({
|
||||
// `publish()` returns `undefined` on a closed bus — the
|
||||
// shutdown path closes per-session buses BEFORE awaiting
|
||||
// `channel.kill()`, leaving a small window where the agent
|
||||
// can still issue `requestPermission`. If we registered the
|
||||
// pending entry above but the publish fails, no SSE
|
||||
// subscriber will ever see the request → no client can vote
|
||||
// → the pending promise never resolves → agent's
|
||||
// `requestPermission` hangs forever (a real bug, not a
|
||||
// theoretical one — the daemon's shutdown.kill() loop awaits
|
||||
// each child, and a child stuck waiting on permission would
|
||||
// pin shutdown until the kill timer expires).
|
||||
//
|
||||
// Resolve as `cancelled` immediately if the bus rejected
|
||||
// the publish. Mirrors the orphan-permission handling in
|
||||
// `registerPending` itself for the entry-already-gone case.
|
||||
const published = entry.events.publish({
|
||||
type: 'permission_request',
|
||||
data: {
|
||||
requestId,
|
||||
|
|
@ -305,6 +327,10 @@ class BridgeClient implements Client {
|
|||
options: params.options,
|
||||
},
|
||||
});
|
||||
if (!published) {
|
||||
// Roll back the pending registration and resolve cancelled.
|
||||
this.rollbackPending(requestId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -464,7 +490,14 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
|
|||
): Promise<BridgeSession> {
|
||||
const channel = await channelFactory(workspaceKey);
|
||||
let entry: SessionEntry | undefined;
|
||||
const client = new BridgeClient(() => entry, registerPending);
|
||||
const client = new BridgeClient(
|
||||
() => entry,
|
||||
registerPending,
|
||||
(rid) =>
|
||||
// Roll back a register-then-publish-failed pending so the agent
|
||||
// doesn't hang waiting on a vote nobody can see.
|
||||
resolvePending(rid, { outcome: { outcome: 'cancelled' } }),
|
||||
);
|
||||
const connection = new ClientSideConnection(() => client, channel.stream);
|
||||
|
||||
try {
|
||||
|
|
@ -1173,6 +1206,29 @@ function canonicalizeWorkspace(p: string): string {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Race `p` against a timeout. The timeout REJECTS the returned
|
||||
* promise but does NOT abort the underlying operation — `p` keeps
|
||||
* running to completion (or its own failure) and its eventual
|
||||
* resolution is silently dropped.
|
||||
*
|
||||
* Stage 1 limitation: for `unstable_setSessionModel` the agent may
|
||||
* complete the model switch AFTER we surfaced the timeout to the
|
||||
* HTTP caller, leading to drift between caller's perceived model
|
||||
* and agent's actual model. Subscribers also see contradictory
|
||||
* SSE events (`model_switch_failed` from the timeout, then a late
|
||||
* `model_switched` if the agent succeeds). Acceptable for Stage 1
|
||||
* because:
|
||||
* 1. ACP's `unstable_setSessionModel` doesn't accept a cancel
|
||||
* signal yet (the SDK's `prompt` does, hence `sendPrompt`'s
|
||||
* explicit `cancel` notification on abort).
|
||||
* 2. Model switches complete in milliseconds in practice; a
|
||||
* timeout firing means the agent is genuinely wedged, not
|
||||
* just slow, and would have been DOA anyway.
|
||||
* Stage 2 will add abort plumbing once ACP exposes a cancel hook
|
||||
* for `unstable_setSessionModel`. Tracked in the model-change
|
||||
* concurrency notes in `applyModelServiceId`.
|
||||
*/
|
||||
async function withTimeout<T>(
|
||||
p: Promise<T>,
|
||||
ms: number,
|
||||
|
|
@ -1208,19 +1264,37 @@ async function withTimeout<T>(
|
|||
export const defaultSpawnChannelFactory: ChannelFactory = async (
|
||||
workspaceCwd,
|
||||
) => {
|
||||
const cliEntry = process.argv[1];
|
||||
// Resolution order:
|
||||
// 1. `QWEN_CLI_ENTRY` env override — escape hatch for non-standard
|
||||
// launch paths (bundled binaries, npx wrappers, `node -e`,
|
||||
// `tsx ./src/...`, custom shims, container images that
|
||||
// relocate the entry script). Anyone hitting "process.argv[1]
|
||||
// is empty" or "process.argv[1] points at the wrong file" can
|
||||
// set this without code changes.
|
||||
// 2. `process.argv[1]` — works when launched via the `qwen` bin
|
||||
// shim, which is the common path.
|
||||
// Fail loudly with an actionable error if neither resolves.
|
||||
const cliEntry = process.env['QWEN_CLI_ENTRY'] || process.argv[1];
|
||||
if (!cliEntry) {
|
||||
throw new Error(
|
||||
'Cannot determine CLI entry path for spawning the ACP child (process.argv[1] is empty).',
|
||||
'Cannot determine CLI entry path for spawning the ACP child: ' +
|
||||
'process.argv[1] is empty and QWEN_CLI_ENTRY is unset. ' +
|
||||
'Set QWEN_CLI_ENTRY to the absolute path of the qwen entry ' +
|
||||
'script (e.g. `export QWEN_CLI_ENTRY=$(which qwen)`) to override.',
|
||||
);
|
||||
}
|
||||
// Each session takes ~3 file descriptors (stdin/stdout/stderr) for the
|
||||
// child plus a few sockets. Operators running many concurrent sessions
|
||||
// should bump `ulimit -n` accordingly. Stage 1 doesn't pre-flight FD
|
||||
// headroom — Stage 2 in-process drops the per-session FD cost entirely.
|
||||
// Child stderr is `inherit`ed so it lands in the daemon's stderr; this
|
||||
// is interleaved across sessions and hard to debug. Stage 4+ remote
|
||||
// sandboxes will isolate.
|
||||
// Child stderr is piped (NOT `inherit`ed) so we can prefix each
|
||||
// line with `[serve pid=… cwd=…]` before forwarding to the
|
||||
// daemon's stderr — see the prefix-and-forward loop below the
|
||||
// `spawn(...)` call. Sessions are still interleaved on the
|
||||
// daemon's stderr stream but each line carries its own session
|
||||
// identifier, so operators can `grep pid=12345` to pull one
|
||||
// session's trace cleanly. Stage 4+ remote sandboxes will isolate
|
||||
// stderr at the transport level.
|
||||
//
|
||||
// Note: spawning `process.execPath` only works when the entry script can
|
||||
// be loaded by raw Node. In dev (e.g. `npm run dev` via `tsx`) the entry
|
||||
|
|
@ -1266,10 +1340,46 @@ export const defaultSpawnChannelFactory: ChannelFactory = async (
|
|||
// reviewer.
|
||||
const child = spawn(process.execPath, [cliEntry, '--acp'], {
|
||||
cwd: workspaceCwd,
|
||||
stdio: ['pipe', 'pipe', 'inherit'],
|
||||
// Pipe stderr (was: 'inherit') so we can prefix each line with
|
||||
// the spawn's pid + workspace, making per-session crash output
|
||||
// attributable. Bare 'inherit' sends every child's stderr to
|
||||
// the daemon's stderr verbatim and unprefixed — under any
|
||||
// multi-session load the operator's log becomes a salad of
|
||||
// unattributed traces.
|
||||
stdio: ['pipe', 'pipe', 'pipe'],
|
||||
env: childEnv,
|
||||
});
|
||||
|
||||
// Forward child stderr to the daemon's stderr line-by-line, with a
|
||||
// `[serve pid=… cwd=…]` prefix on each line so operators can
|
||||
// correlate stack traces back to the spawning request. Best-effort:
|
||||
// a child that prints partial lines without a trailing newline is
|
||||
// flushed when the stream emits `end`.
|
||||
if (child.stderr) {
|
||||
let buf = '';
|
||||
const prefix = `[serve pid=${child.pid} cwd=${workspaceCwd}] `;
|
||||
const flush = (line: string) => {
|
||||
if (line.length > 0) process.stderr.write(prefix + line + '\n');
|
||||
};
|
||||
child.stderr.setEncoding('utf8');
|
||||
child.stderr.on('data', (chunk: string) => {
|
||||
buf += chunk;
|
||||
let nl = buf.indexOf('\n');
|
||||
while (nl !== -1) {
|
||||
flush(buf.slice(0, nl));
|
||||
buf = buf.slice(nl + 1);
|
||||
nl = buf.indexOf('\n');
|
||||
}
|
||||
});
|
||||
child.stderr.on('end', () => {
|
||||
if (buf.length > 0) flush(buf);
|
||||
});
|
||||
child.stderr.on('error', () => {
|
||||
// Don't crash the daemon if the pipe breaks; the child is
|
||||
// already gone or about to be.
|
||||
});
|
||||
}
|
||||
|
||||
// Build the `exited` promise BEFORE checking stdin/stdout so the listener
|
||||
// is in place before any error event can fire. We treat both `exit` and
|
||||
// `error` as termination — without an `error` listener Node would treat
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue