diff --git a/packages/cli/src/serve/httpAcpBridge.ts b/packages/cli/src/serve/httpAcpBridge.ts index b977ec4ee..8a667f0a2 100644 --- a/packages/cli/src/serve/httpAcpBridge.ts +++ b/packages/cli/src/serve/httpAcpBridge.ts @@ -281,6 +281,13 @@ class BridgeClient implements Client { constructor( private readonly resolveEntry: () => SessionEntry | undefined, private readonly registerPending: (pending: PendingPermission) => void, + /** + * Roll back a `registerPending` call when the subsequent publish + * fails (closed bus). Resolves the pending promise as cancelled + * and removes it from the daemon-wide maps so a late + * `respondToPermission` for this id returns 404 cleanly. + */ + private readonly rollbackPending: (requestId: string) => void, ) {} async requestPermission( @@ -296,7 +303,22 @@ class BridgeClient implements Client { sessionId: entry.sessionId, resolve, }); - entry.events.publish({ + // `publish()` returns `undefined` on a closed bus — the + // shutdown path closes per-session buses BEFORE awaiting + // `channel.kill()`, leaving a small window where the agent + // can still issue `requestPermission`. If we registered the + // pending entry above but the publish fails, no SSE + // subscriber will ever see the request → no client can vote + // → the pending promise never resolves → agent's + // `requestPermission` hangs forever (a real bug, not a + // theoretical one — the daemon's shutdown.kill() loop awaits + // each child, and a child stuck waiting on permission would + // pin shutdown until the kill timer expires). + // + // Resolve as `cancelled` immediately if the bus rejected + // the publish. Mirrors the orphan-permission handling in + // `registerPending` itself for the entry-already-gone case. + const published = entry.events.publish({ type: 'permission_request', data: { requestId, @@ -305,6 +327,10 @@ class BridgeClient implements Client { options: params.options, }, }); + if (!published) { + // Roll back the pending registration and resolve cancelled. + this.rollbackPending(requestId); + } }); } @@ -464,7 +490,14 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge { ): Promise { const channel = await channelFactory(workspaceKey); let entry: SessionEntry | undefined; - const client = new BridgeClient(() => entry, registerPending); + const client = new BridgeClient( + () => entry, + registerPending, + (rid) => + // Roll back a register-then-publish-failed pending so the agent + // doesn't hang waiting on a vote nobody can see. + resolvePending(rid, { outcome: { outcome: 'cancelled' } }), + ); const connection = new ClientSideConnection(() => client, channel.stream); try { @@ -1173,6 +1206,29 @@ function canonicalizeWorkspace(p: string): string { } } +/** + * Race `p` against a timeout. The timeout REJECTS the returned + * promise but does NOT abort the underlying operation — `p` keeps + * running to completion (or its own failure) and its eventual + * resolution is silently dropped. + * + * Stage 1 limitation: for `unstable_setSessionModel` the agent may + * complete the model switch AFTER we surfaced the timeout to the + * HTTP caller, leading to drift between caller's perceived model + * and agent's actual model. Subscribers also see contradictory + * SSE events (`model_switch_failed` from the timeout, then a late + * `model_switched` if the agent succeeds). Acceptable for Stage 1 + * because: + * 1. ACP's `unstable_setSessionModel` doesn't accept a cancel + * signal yet (the SDK's `prompt` does, hence `sendPrompt`'s + * explicit `cancel` notification on abort). + * 2. Model switches complete in milliseconds in practice; a + * timeout firing means the agent is genuinely wedged, not + * just slow, and would have been DOA anyway. + * Stage 2 will add abort plumbing once ACP exposes a cancel hook + * for `unstable_setSessionModel`. Tracked in the model-change + * concurrency notes in `applyModelServiceId`. + */ async function withTimeout( p: Promise, ms: number, @@ -1208,19 +1264,37 @@ async function withTimeout( export const defaultSpawnChannelFactory: ChannelFactory = async ( workspaceCwd, ) => { - const cliEntry = process.argv[1]; + // Resolution order: + // 1. `QWEN_CLI_ENTRY` env override — escape hatch for non-standard + // launch paths (bundled binaries, npx wrappers, `node -e`, + // `tsx ./src/...`, custom shims, container images that + // relocate the entry script). Anyone hitting "process.argv[1] + // is empty" or "process.argv[1] points at the wrong file" can + // set this without code changes. + // 2. `process.argv[1]` — works when launched via the `qwen` bin + // shim, which is the common path. + // Fail loudly with an actionable error if neither resolves. + const cliEntry = process.env['QWEN_CLI_ENTRY'] || process.argv[1]; if (!cliEntry) { throw new Error( - 'Cannot determine CLI entry path for spawning the ACP child (process.argv[1] is empty).', + 'Cannot determine CLI entry path for spawning the ACP child: ' + + 'process.argv[1] is empty and QWEN_CLI_ENTRY is unset. ' + + 'Set QWEN_CLI_ENTRY to the absolute path of the qwen entry ' + + 'script (e.g. `export QWEN_CLI_ENTRY=$(which qwen)`) to override.', ); } // Each session takes ~3 file descriptors (stdin/stdout/stderr) for the // child plus a few sockets. Operators running many concurrent sessions // should bump `ulimit -n` accordingly. Stage 1 doesn't pre-flight FD // headroom — Stage 2 in-process drops the per-session FD cost entirely. - // Child stderr is `inherit`ed so it lands in the daemon's stderr; this - // is interleaved across sessions and hard to debug. Stage 4+ remote - // sandboxes will isolate. + // Child stderr is piped (NOT `inherit`ed) so we can prefix each + // line with `[serve pid=… cwd=…]` before forwarding to the + // daemon's stderr — see the prefix-and-forward loop below the + // `spawn(...)` call. Sessions are still interleaved on the + // daemon's stderr stream but each line carries its own session + // identifier, so operators can `grep pid=12345` to pull one + // session's trace cleanly. Stage 4+ remote sandboxes will isolate + // stderr at the transport level. // // Note: spawning `process.execPath` only works when the entry script can // be loaded by raw Node. In dev (e.g. `npm run dev` via `tsx`) the entry @@ -1266,10 +1340,46 @@ export const defaultSpawnChannelFactory: ChannelFactory = async ( // reviewer. const child = spawn(process.execPath, [cliEntry, '--acp'], { cwd: workspaceCwd, - stdio: ['pipe', 'pipe', 'inherit'], + // Pipe stderr (was: 'inherit') so we can prefix each line with + // the spawn's pid + workspace, making per-session crash output + // attributable. Bare 'inherit' sends every child's stderr to + // the daemon's stderr verbatim and unprefixed — under any + // multi-session load the operator's log becomes a salad of + // unattributed traces. + stdio: ['pipe', 'pipe', 'pipe'], env: childEnv, }); + // Forward child stderr to the daemon's stderr line-by-line, with a + // `[serve pid=… cwd=…]` prefix on each line so operators can + // correlate stack traces back to the spawning request. Best-effort: + // a child that prints partial lines without a trailing newline is + // flushed when the stream emits `end`. + if (child.stderr) { + let buf = ''; + const prefix = `[serve pid=${child.pid} cwd=${workspaceCwd}] `; + const flush = (line: string) => { + if (line.length > 0) process.stderr.write(prefix + line + '\n'); + }; + child.stderr.setEncoding('utf8'); + child.stderr.on('data', (chunk: string) => { + buf += chunk; + let nl = buf.indexOf('\n'); + while (nl !== -1) { + flush(buf.slice(0, nl)); + buf = buf.slice(nl + 1); + nl = buf.indexOf('\n'); + } + }); + child.stderr.on('end', () => { + if (buf.length > 0) flush(buf); + }); + child.stderr.on('error', () => { + // Don't crash the daemon if the pipe breaks; the child is + // already gone or about to be. + }); + } + // Build the `exited` promise BEFORE checking stdin/stdout so the listener // is in place before any error event can fire. We treat both `exit` and // `error` as termination — without an `error` listener Node would treat