diff --git a/docs/developers/examples/daemon-client-quickstart.md b/docs/developers/examples/daemon-client-quickstart.md index 04c69980f..3e1696069 100644 --- a/docs/developers/examples/daemon-client-quickstart.md +++ b/docs/developers/examples/daemon-client-quickstart.md @@ -34,7 +34,7 @@ console.log('Daemon features:', caps.features); // 2. Spawn-or-attach a session for the current workspace. const session = await client.createOrAttachSession({ - cwd: process.cwd(), + workspaceCwd: process.cwd(), }); console.log(`session=${session.sessionId} attached=${session.attached}`); @@ -57,10 +57,10 @@ const subscription = (async () => { })(); // 4. Send a prompt and wait for it to settle. (Order-of-operations -// note: even if `sendPrompt` fires before the SSE handshake +// note: even if `prompt()` fires before the SSE handshake // completes, step 3's `lastEventId: 0` guarantees every event // lands in the iterator.) -const result = await client.sendPrompt(session.sessionId, { +const result = await client.prompt(session.sessionId, { prompt: [{ type: 'text', text: 'Summarize src/main.ts in one sentence.' }], }); console.log('stop reason:', result.stopReason); @@ -143,11 +143,11 @@ Two clients pointed at the same daemon and `cwd` end up on the same session: ```ts // Client A (e.g. an IDE plugin) -const a = await clientA.createOrAttachSession({ cwd: '/work/repo' }); +const a = await clientA.createOrAttachSession({ workspaceCwd: '/work/repo' }); console.log(a.attached); // false — A spawned the agent // Client B (e.g. a web UI on the same machine) -const b = await clientB.createOrAttachSession({ cwd: '/work/repo' }); +const b = await clientB.createOrAttachSession({ workspaceCwd: '/work/repo' }); console.log(b.attached); // true — B joined A's session console.log(a.sessionId === b.sessionId); // true ``` @@ -186,7 +186,7 @@ try { If your user hits Esc: ```ts -await client.cancelSession(session.sessionId); +await client.cancel(session.sessionId); // In the event stream you'll see the prompt resolve with stopReason: "cancelled" ``` diff --git a/docs/developers/qwen-serve-protocol.md b/docs/developers/qwen-serve-protocol.md index 4be936e2b..5891f8ed5 100644 --- a/docs/developers/qwen-serve-protocol.md +++ b/docs/developers/qwen-serve-protocol.md @@ -52,6 +52,12 @@ Every Stage 1 daemon advertises 9 feature tags. Clients **must** gate UI off `fe ## Routes +> **Stage 1 limitation — no `DELETE /session/:id`.** Sessions live until +> the agent child crashes (`session_died`), the daemon process exits, or +> a server-side `killSession` (used internally by orphan-cleanup) fires. +> HTTP clients have no explicit "close one session" route in Stage 1. +> An explicit `DELETE /session/:id` is on the Stage 2 polish list. + ### `GET /health` Liveness probe. Returns `200 {"status":"ok"}` if the listener is up. No auth required even when a token is configured (heartbeat-friendly). @@ -216,6 +222,7 @@ data: {"reason":"queue_overflow","droppedAfter":42} Reconnect semantics: - Send `Last-Event-ID: ` to replay events with `id > n` from the per-session ring (default depth 1000) +- **Gap detection (client-side):** if `` predates the oldest event still in the ring (e.g. you reconnect with `Last-Event-ID: 50` but the ring now holds 200–1199), the daemon replays from the oldest available event without raising. Compare the first replayed event's `id` against `n + 1`; any difference is the size of the lost window. Stage 2 will inject an explicit `stream_gap` synthetic frame on the daemon side; in Stage 1 detection is the client's responsibility. - IDs are monotonic per session, starting at 1 - Synthetic terminal frames (`client_evicted`, `stream_error`) intentionally omit `id` so they don't burn a sequence slot for other subscribers @@ -228,6 +235,19 @@ Backpressure: Cast a vote on a pending `permission_request`. **First responder wins** — once one client answers, every other client trying to answer the same id gets `404`. +> **Stage 1 limitation — no permission timeout.** A `permission_request` +> stays pending until: (a) some client votes here, (b) `POST +/session/:id/cancel` fires, (c) the HTTP client driving the prompt +> disconnects (mid-prompt cancel resolves outstanding permissions as +> `cancelled`), (d) the session is killed, or (e) the daemon shuts +> down. **In a fully-headless deployment with no SSE subscriber, +> `requestPermission` blocks the agent indefinitely** — there's nothing +> to time out the wait. Stage 2 will add a configurable +> `permissionTimeoutMs`. Until then, headless callers should keep an +> SSE subscription open or wrap their prompt loop in their own timeout +> +> - `POST /session/:id/cancel`. + Request: ```json @@ -268,22 +288,22 @@ The connection then closes. ## Environment variables -| Var | Purpose | -| ------------------- | ------------------------------------------------------------------------------------------------------------ | -| `QWEN_SERVER_TOKEN` | Bearer token. Stripped of leading/trailing whitespace at boot. | -| `QWEN_E2E_LLM` | Set to `1` to enable LLM-required integration tests in `integration-tests/cli/qwen-serve-streaming.test.ts`. | +| Var | Purpose | +| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `QWEN_SERVER_TOKEN` | Bearer token. Stripped of leading/trailing whitespace at boot. | +| `SKIP_LLM_TESTS` | Set to `1` to **skip** LLM-required integration tests in `integration-tests/cli/qwen-serve-streaming.test.ts` (default-on for CI envs that lack provider API keys). | ## Source layout -| Path | Purpose | -| ---------------------------------------------------- | -------------------------------------------------------- | -| `packages/cli/src/commands/serve.ts` | yargs command + flag schema | -| `packages/cli/src/serve/runQwenServe.ts` | listener lifecycle + signal handling | -| `packages/cli/src/serve/server.ts` | Express routes + middleware | -| `packages/cli/src/serve/auth.ts` | bearer + Host allowlist + CORS deny | -| `packages/cli/src/serve/httpAcpBridge.ts` | spawn-or-attach + per-session FIFO + permission registry | -| `packages/cli/src/serve/eventBus.ts` | bounded async queue + replay ring | -| `packages/sdk-typescript/src/daemon/DaemonClient.ts` | TS client | -| `packages/sdk-typescript/src/daemon/sse.ts` | EventSource frame parser | -| `integration-tests/cli/qwen-serve-routes.test.ts` | 18 cases, no LLM | -| `integration-tests/cli/qwen-serve-streaming.test.ts` | 4 cases, real `qwen --acp` child (`QWEN_E2E_LLM=1`) | +| Path | Purpose | +| ---------------------------------------------------- | ------------------------------------------------------------------ | +| `packages/cli/src/commands/serve.ts` | yargs command + flag schema | +| `packages/cli/src/serve/runQwenServe.ts` | listener lifecycle + signal handling | +| `packages/cli/src/serve/server.ts` | Express routes + middleware | +| `packages/cli/src/serve/auth.ts` | bearer + Host allowlist + CORS deny | +| `packages/cli/src/serve/httpAcpBridge.ts` | spawn-or-attach + per-session FIFO + permission registry | +| `packages/cli/src/serve/eventBus.ts` | bounded async queue + replay ring | +| `packages/sdk-typescript/src/daemon/DaemonClient.ts` | TS client | +| `packages/sdk-typescript/src/daemon/sse.ts` | EventSource frame parser | +| `integration-tests/cli/qwen-serve-routes.test.ts` | 18 cases, no LLM | +| `integration-tests/cli/qwen-serve-streaming.test.ts` | 3 cases, real `qwen --acp` child (skipped when `SKIP_LLM_TESTS=1`) | diff --git a/docs/users/qwen-serve.md b/docs/users/qwen-serve.md index 698e7a860..7b996ee98 100644 --- a/docs/users/qwen-serve.md +++ b/docs/users/qwen-serve.md @@ -86,10 +86,12 @@ qwen serve --hostname 0.0.0.0 --port 4170 # → boot refuses without QWEN_SERVER_TOKEN ``` -Clients then send `Authorization: Bearer $QWEN_SERVER_TOKEN` on every request. +Clients then send `Authorization: Bearer $QWEN_SERVER_TOKEN` on every request **except `/health`**, which is intentionally exempt so liveness probes (k8s, Compose, monitoring) work without credentials. Use `/capabilities` to verify your token is correct end-to-end: ```bash -curl -H "Authorization: Bearer $QWEN_SERVER_TOKEN" http://your-host:4170/health +curl -H "Authorization: Bearer $QWEN_SERVER_TOKEN" http://your-host:4170/capabilities +# → {"v":1,"mode":"http-bridge","features":[...],"modelServices":[]} +# Wrong token → 401 ``` The token comparison is constant-time (SHA-256 + `crypto.timingSafeEqual`); 401 responses are uniform across "missing header", "wrong scheme", and "wrong token" so a side-channel can't distinguish. @@ -116,7 +118,9 @@ The token comparison is constant-time (SHA-256 + `crypto.timingSafeEqual`); 401 ## Multi-session & remote deployment -Each `qwen serve` process binds **one workspace**. To handle multiple workspaces or multiple users at scale you spawn multiple daemon instances behind an external orchestrator. That orchestrator (multi-tenancy / OIDC / Quota / Audit / k8s) is **out of scope** for the qwen-code project — see issue [#3803](https://github.com/QwenLM/qwen-code/issues/3803) "External Reference Architecture" for the design pointers. +A single `qwen serve` process can manage sessions for any workspace path passed via `cwd` on `POST /session` — under the default `sessionScope: 'single'` it keeps one ACP session per canonicalized workspace, sharing it across every client that posts the same `cwd`. So one daemon will happily host sessions for many workspaces at once. + +To handle multiple **users** (each with their own quota, audit log, sandbox) or to scale beyond one process's reach (cold-start budget, FD count, RSS), you spawn multiple daemon instances behind an external orchestrator. That orchestrator (multi-tenancy / OIDC / Quota / Audit / k8s) is **out of scope** for the qwen-code project — see issue [#3803](https://github.com/QwenLM/qwen-code/issues/3803) "External Reference Architecture" for the design pointers. ## What's next diff --git a/packages/cli/src/serve/httpAcpBridge.test.ts b/packages/cli/src/serve/httpAcpBridge.test.ts index 36bad45b2..6fb75f9ac 100644 --- a/packages/cli/src/serve/httpAcpBridge.test.ts +++ b/packages/cli/src/serve/httpAcpBridge.test.ts @@ -837,66 +837,70 @@ describe('createHttpAcpBridge', () => { await bridge.shutdown(); }); - it('rejects spawnOrAttach when the agent rejects the requested model', async () => { + it('keeps the session alive on model-switch failure and publishes model_switch_failed', async () => { + // Contract (per #3889 review A05Ym): when the agent rejects the + // requested model at create-session time, the session is still + // operational on the agent's default model. The caller gets a + // sessionId they can retry the model switch against (via + // POST /session/:id/model) and observe via the SSE stream. + // Tearing the session down would force the caller into a 500 + // with no way to recover. const { bridge } = setup({ setModelImpl: async () => { throw new Error('unknown model'); }, }); - await expect( - bridge.spawnOrAttach({ - workspaceCwd: WS_A, - modelServiceId: 'definitely-not-a-real-model', - }), - ).rejects.toBeTruthy(); - // Failed spawn must NOT leave the half-initialized session in the maps. - // (The entry's EventBus is also closed in the same teardown — it has - // no externally-reachable subscriber and the bus's internal `closed` - // flag prevents future GC-blocking publishes from the in-flight - // ClientSideConnection observer chain.) - expect(bridge.sessionCount).toBe(0); - await bridge.shutdown(); - }); - - it('a retry after a model-rejection failure uses a fresh EventBus', async () => { - // Regression for the leak path: after a setSessionModel failure we - // tear down the half-initialized session. A subsequent retry must - // create a *new* entry (not silently reuse the old one), and old - // events must not bleed into the new subscriber. - let attempt = 0; - const { bridge } = setup({ - setModelImpl: async () => { - attempt += 1; - if (attempt === 1) throw new Error('first attempt rejected'); - }, - }); - - await expect( - bridge.spawnOrAttach({ - workspaceCwd: WS_A, - modelServiceId: 'try-1', - }), - ).rejects.toBeTruthy(); - expect(bridge.sessionCount).toBe(0); - const session = await bridge.spawnOrAttach({ workspaceCwd: WS_A, - modelServiceId: 'try-2', + modelServiceId: 'definitely-not-a-real-model', }); expect(session.attached).toBe(false); expect(bridge.sessionCount).toBe(1); - - // Subscribe to the live session — should see no events from the - // failed attempt. + // The model_switch_failed event must be on the bus for any + // subscriber that subscribes with `lastEventId: 0` (replay). const abort = new AbortController(); const iter = bridge.subscribeEvents(session.sessionId, { signal: abort.signal, + lastEventId: 0, }); const it = iter[Symbol.asyncIterator](); - // No events have been published yet; aborting closes the queue. + const first = await it.next(); + expect(first.value?.type).toBe('model_switch_failed'); + expect(first.value?.data).toMatchObject({ + sessionId: session.sessionId, + requestedModelId: 'definitely-not-a-real-model', + }); abort.abort(); - const next = await it.next(); - expect(next.done).toBe(true); + await bridge.shutdown(); + }); + + it('attaches to the existing session on retry after a model-switch failure', async () => { + // Per the same A05Ym contract: a follow-up `spawnOrAttach` for + // the same workspace finds the existing session (rather than + // re-spawning a fresh one), and a retry of the model switch + // through `POST /session/:id/model` is the documented recovery + // path. We exercise just the attach side here. + const { bridge } = setup({ + setModelImpl: async () => { + throw new Error('first attempt rejected'); + }, + }); + + const first = await bridge.spawnOrAttach({ + workspaceCwd: WS_A, + modelServiceId: 'try-1', + }); + expect(first.attached).toBe(false); + expect(bridge.sessionCount).toBe(1); + + // Second attach (no modelServiceId so we don't re-trigger the + // failing setModel) reuses the same session. + const second = await bridge.spawnOrAttach({ + workspaceCwd: WS_A, + }); + expect(second.attached).toBe(true); + expect(second.sessionId).toBe(first.sessionId); + expect(bridge.sessionCount).toBe(1); await bridge.shutdown(); }); @@ -1681,9 +1685,13 @@ describe('createHttpAcpBridge', () => { await new Promise((r) => setTimeout(r, 10)); await bridge.shutdown(); - // Subscriber must unwind to completion (no events ever published). - const events = await drain; - expect(events).toEqual([]); + // Subscriber must unwind to completion. Per #3889 review A05Ys + // the bus now publishes a terminal `session_died` event before + // closing on shutdown, so SSE subscribers can distinguish + // daemon shutdown from a transient network error. + const events = (await drain) as Array<{ type: string }>; + expect(events).toHaveLength(1); + expect(events[0]?.type).toBe('session_died'); }); }); }); diff --git a/packages/cli/src/serve/httpAcpBridge.ts b/packages/cli/src/serve/httpAcpBridge.ts index f7cd2f7f8..b349b329f 100644 --- a/packages/cli/src/serve/httpAcpBridge.ts +++ b/packages/cli/src/serve/httpAcpBridge.ts @@ -539,39 +539,37 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge { // ACP `newSession` doesn't take a model id; honor the caller's // `modelServiceId` by issuing the unstable `setSessionModel` call - // immediately after the session is established. If the agent rejects - // the model id, surface it as a session-creation failure so the - // caller doesn't think they got the requested model. + // immediately after the session is established. Use the shared + // `applyModelServiceId` helper so the call: + // - races against `transportClosedReject` (a wedged child + // during model switch fails fast instead of consuming the + // full init timeout — A09HD) + // - publishes `model_switched` / `model_switch_failed` to the + // bus so attached clients see the result + // - serializes through `entry.modelChangeQueue` for ordering + // against later attach-with-different-model calls + // + // On failure we DO NOT tear the session down. An earlier rev + // tore it down so the caller "didn't inherit silent drift", + // but that killed an already-functional session and left the + // caller with a 500 and no sessionId to retry against. The + // session is operational on the agent's default model — the + // `model_switch_failed` event tells subscribers exactly what + // happened, and the caller can retry the switch via + // `POST /session/:id/model` once they know the sessionId. + // + // Swallow the rejection inline so the outer try/catch (which + // would `channel.kill()`) doesn't fire. The publish inside + // `applyModelServiceId` is the visible signal; the caller + // gets a 200 and a sessionId regardless, and learns of the + // model issue via the SSE stream. if (modelServiceId) { - try { - const conn = entry.connection as unknown as { - unstable_setSessionModel(p: { - sessionId: string; - modelId: string; - }): Promise; - }; - await withTimeout( - conn.unstable_setSessionModel({ - sessionId: entry.sessionId, - modelId: modelServiceId, - }), - initTimeoutMs, - 'setSessionModel', - ); - } catch (err) { - // The session is half-initialized — a known sessionId on a real - // child but pointing at the wrong model. Tear it down so the - // caller can retry cleanly instead of inheriting silent drift. - // Close the EventBus too: the agent may have published session_ - // update frames during init that are now orphaned (no subscriber - // can ever reach them — the caller never received the sessionId - // they would need to subscribe). Without an explicit close the - // bus + ring buffer linger until the next GC cycle. - byWorkspace.delete(workspaceKey); - byId.delete(entry.sessionId); - entry.events.close(); - throw err; - } + await applyModelServiceId(entry, modelServiceId, initTimeoutMs).catch( + () => { + // Already published `model_switch_failed`; don't take + // down the session. + }, + ); } return { @@ -829,7 +827,17 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge { // req.on('close')), tell the agent to wind down. ACP cancel is a // notification — the active prompt resolves with // stopReason: 'cancelled', then the next queued prompt can run. + // + // Also resolve any pending permission requests as `cancelled`. + // ACP spec requires `cancel` to settle outstanding + // `requestPermission` calls — `cancelSession()` already does + // this; the abort path here was missing the call. Without it, + // a client disconnecting while the agent is inside + // `requestPermission` leaves the permission promise unresolved + // forever (the agent is stuck waiting on a vote that no SSE + // subscriber will ever cast). const onAbort = () => { + cancelPendingForSession(sessionId); entry.connection.cancel({ sessionId }).catch(() => { // Cancel is fire-and-forget; the agent may already be dead. }); @@ -1012,6 +1020,19 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge { for (const id of Array.from(entry.pendingPermissionIds)) { resolvePending(id, { outcome: { outcome: 'cancelled' } }); } + // Publish `session_died` BEFORE closing the bus. After the eager + // `byId.delete` above, the channel.exited handler's + // `byId.get(...) !== entry` guard short-circuits, so the + // automatic publish at crash time wouldn't fire. SSE subscribers + // need this terminal frame to know the session is gone. + try { + entry.events.publish({ + type: 'session_died', + data: { sessionId, reason: 'killed' }, + }); + } catch { + /* bus already closed */ + } entry.events.close(); await entry.channel.kill().catch(() => { // Best-effort kill — channel may already be dead. @@ -1036,7 +1057,24 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge { byWorkspace.clear(); byId.clear(); pendingPermissions.clear(); - for (const e of entries) e.events.close(); + // Publish a terminal `session_died` BEFORE closing each bus so SSE + // subscribers can distinguish "daemon shut down" from a transient + // network error and don't sit indefinitely retrying. The + // channel.exited handler also publishes this on a child crash, + // but at shutdown time the entry has already been removed from + // `byId` (above), so the handler's `byId.get(...) !== entry` + // guard would short-circuit and the event would never fire. + for (const e of entries) { + try { + e.events.publish({ + type: 'session_died', + data: { sessionId: e.sessionId, reason: 'daemon_shutdown' }, + }); + } catch { + /* bus already closed */ + } + e.events.close(); + } // Wait for in-flight spawns too. The snapshot above only sees // sessions already in `byId`; a `doSpawn` past `channelFactory()` // but still inside `connection.newSession` (~1s on cold start) is @@ -1187,19 +1225,29 @@ export const defaultSpawnChannelFactory: ChannelFactory = async ( // is a `.ts` file Node can't run; users should `npm run build` before // `qwen serve` or set `process.execPath` to a tsx-aware shim. Stage 1 // accepts this — the daemon is meant for built deployments. - // Pass an *allowlisted* environment to the child (see - // ALLOWED_CHILD_ENV_KEYS at module scope for the rationale + the - // allowlist itself). Move-out keeps the Set allocated once, not - // per-spawn. - const childEnv: NodeJS.ProcessEnv = {}; - for (const key of ALLOWED_CHILD_ENV_KEYS) { - const v = process.env[key]; - if (typeof v === 'string') childEnv[key] = v; + // Pass through the daemon's full environment to the child, scrubbing + // ONLY daemon-internal secrets (see SCRUBBED_CHILD_ENV_KEYS at module + // scope). An earlier version used an allowlist, but that broke the + // common deployment shape: users export `OPENAI_API_KEY` / + // `ANTHROPIC_API_KEY` / `QWEN_*` / `DASHSCOPE_API_KEY` / a custom + // `modelProviders[].envKey` to authenticate the agent's LLM calls, + // and core's model config resolves those from `process.env`. An + // exhaustive allowlist can't enumerate user-defined provider keys, + // so the agent ends up unable to authenticate. + // + // Threat-model rationale: the agent already runs as the same UID + // with shell-tool access — anything in `~/.bashrc`, `~/.npmrc`, + // `~/.aws/credentials`, etc. is reachable by prompt injection + // regardless of what we put in `env`. The env passthrough is not + // the security boundary; the user-as-trust-root is. The only thing + // we MUST scrub is `QWEN_SERVER_TOKEN` (daemon-only auth that + // would let a prompt-injected shell turn the agent into an + // authenticated client of its own daemon — escalation the agent + // doesn't otherwise have). + const childEnv: NodeJS.ProcessEnv = { ...process.env }; + for (const key of SCRUBBED_CHILD_ENV_KEYS) { + delete childEnv[key]; } - // Defense-in-depth: the explicit allowlist is exhaustive but the - // delete keeps the security review trail readable — anyone grepping - // for `QWEN_SERVER_TOKEN` finds the scrub explicitly named. - delete childEnv['QWEN_SERVER_TOKEN']; // CodeQL `js/path-injection` flags the `cwd: workspaceCwd` flow. // Stage 1 trust model accepts this — see the function-level comment // above for the design rationale. Defense-in-depth: the cwd is @@ -1257,54 +1305,20 @@ export const defaultSpawnChannelFactory: ChannelFactory = async ( const KILL_HARD_DEADLINE_MS = 10_000; /** - * Environment variables the spawned `qwen --acp` child is allowed to - * inherit from the daemon's environment. Anything else is dropped — - * the agent runs user-supplied prompts with shell-tool access, so - * everything in its env is reachable by prompt injection: API keys - * (OPENAI/ANTHROPIC/GEMINI/DASHSCOPE/...), DB passwords, AWS/GCP - * credentials, OAuth tokens, secrets your operator forgot they - * exported. A denylist requires perfect knowledge of every secret - * anyone might set; an allowlist requires only that we know what - * the agent legitimately needs. + * Environment variables stripped from the spawned `qwen --acp` child's + * environment. Everything else is passed through — see the + * threat-model rationale at the call site in `defaultSpawnChannelFactory`. * - * What's needed: enough to launch Node + load the entry script and - * resolve user-config files (HOME), enough to find binaries and - * standard tools (PATH), enough to localize and identify the user - * for any session-state lookups (LANG/LC_*, USER/LOGNAME). Anything - * beyond this list should be passed to the agent through ACP, not - * through the spawn env. + * Currently just `QWEN_SERVER_TOKEN`: the daemon's own bearer token, + * which the agent doesn't need (it speaks to the daemon over stdio, + * not HTTP). Leaving it in the child's env would let prompt injection + * turn the agent into an authenticated client of its own daemon — an + * escalation the agent doesn't otherwise have. * - * Per-platform extras: NODE-specific knobs (NODE_OPTIONS, NODE_PATH) - * and shell-driven temp dir overrides (TMPDIR/TEMP/TMP) are usually - * benign but can carry instructions; include only the temp-dir ones - * since the agent legitimately writes scratch files. Windows-only - * SYSTEMROOT/USERPROFILE/APPDATA are required by Node itself on - * Windows or the spawn fails. - * - * Defined at module scope so the Set is allocated once at load, - * not rebuilt on every `defaultSpawnChannelFactory` call. + * Defined at module scope so the Set is allocated once at load. */ -const ALLOWED_CHILD_ENV_KEYS: ReadonlySet = new Set([ - 'HOME', - 'PATH', - 'USER', - 'LOGNAME', - 'LANG', - 'LC_ALL', - 'LC_CTYPE', - 'LC_COLLATE', - 'LC_MESSAGES', - 'TMPDIR', - 'TEMP', - 'TMP', - 'NODE_PATH', - // Windows essentials — Node refuses to spawn without these on Win32. - 'SYSTEMROOT', - 'USERPROFILE', - 'APPDATA', - 'LOCALAPPDATA', - 'COMSPEC', - 'PATHEXT', +const SCRUBBED_CHILD_ENV_KEYS: ReadonlySet = new Set([ + 'QWEN_SERVER_TOKEN', ]); function killChild(child: ChildProcess): Promise { diff --git a/packages/cli/src/serve/server.test.ts b/packages/cli/src/serve/server.test.ts index d4e3e9000..e50cb8dbf 100644 --- a/packages/cli/src/serve/server.test.ts +++ b/packages/cli/src/serve/server.test.ts @@ -679,10 +679,16 @@ describe('createServeApp', () => { expect(res.status).toBe(200); }); + // Switched probe endpoint from `/health` to `/capabilities` for + // these auth-rejection tests because per #3889 review A8dZT + // `/health` is now intentionally registered BEFORE the bearer + // middleware so liveness probes work without credentials. + // `/capabilities` is the cheapest endpoint that still goes through + // the auth chain. it('rejects missing Authorization header when token is set', async () => { const app = createServeApp({ ...baseOpts, token: 'secret' }); const res = await request(app) - .get('/health') + .get('/capabilities') .set('Host', `127.0.0.1:${baseOpts.port}`); expect(res.status).toBe(401); }); @@ -690,7 +696,7 @@ describe('createServeApp', () => { it('rejects wrong scheme', async () => { const app = createServeApp({ ...baseOpts, token: 'secret' }); const res = await request(app) - .get('/health') + .get('/capabilities') .set('Host', `127.0.0.1:${baseOpts.port}`) .set('Authorization', 'Basic c2VjcmV0'); expect(res.status).toBe(401); @@ -699,7 +705,7 @@ describe('createServeApp', () => { it('rejects wrong token', async () => { const app = createServeApp({ ...baseOpts, token: 'secret' }); const res = await request(app) - .get('/health') + .get('/capabilities') .set('Host', `127.0.0.1:${baseOpts.port}`) .set('Authorization', 'Bearer wrong'); expect(res.status).toBe(401); @@ -708,11 +714,27 @@ describe('createServeApp', () => { it('accepts the right token', async () => { const app = createServeApp({ ...baseOpts, token: 'secret' }); const res = await request(app) - .get('/health') + .get('/capabilities') .set('Host', `127.0.0.1:${baseOpts.port}`) .set('Authorization', 'Bearer secret'); expect(res.status).toBe(200); }); + + it('exempts /health from bearer auth so liveness probes work without credentials', async () => { + // Per #3889 review A8dZT — the registration order in + // `createServeApp` puts `/health` BEFORE `bearerAuth`, so a + // probe with no credentials still gets 200 even when the daemon + // was started with a token. CORS deny + Host allowlist still + // apply to `/health` (registered before /health), so this is + // not a way to bypass DNS rebinding or browser-origin + // protection. + const app = createServeApp({ ...baseOpts, token: 'secret' }); + const res = await request(app) + .get('/health') + .set('Host', `127.0.0.1:${baseOpts.port}`); + expect(res.status).toBe(200); + expect(res.body).toEqual({ status: 'ok' }); + }); }); }); diff --git a/packages/cli/src/serve/server.ts b/packages/cli/src/serve/server.ts index ba9f124d4..77324a101 100644 --- a/packages/cli/src/serve/server.ts +++ b/packages/cli/src/serve/server.ts @@ -61,13 +61,21 @@ export function createServeApp( // amplified CPU/memory cost from any wrong-token client. app.use(denyBrowserOriginCors); app.use(hostAllowlist(opts.hostname, getPort)); - app.use(bearerAuth(opts.token)); - app.use(express.json({ limit: '10mb' })); + // `/health` is registered BEFORE `bearerAuth` so liveness probes work + // without credentials even when the daemon was started with a + // `--token` (k8s/Compose probes typically don't carry the daemon's + // bearer; round-tripping a 401 just to know the listener is up is + // pure waste). CORS deny + Host allowlist still apply, so a browser + // or a wrong-Host probe is still rejected. Documented exemption in + // `docs/developers/qwen-serve-protocol.md`. app.get('/health', (_req, res) => { res.status(200).json({ status: 'ok' }); }); + app.use(bearerAuth(opts.token)); + app.use(express.json({ limit: '10mb' })); + app.get('/capabilities', (_req, res) => { const envelope: CapabilitiesEnvelope = { v: CAPABILITIES_SCHEMA_VERSION, @@ -309,7 +317,17 @@ export function createServeApp( // payload in user-space memory unboundedly — a slow consumer on a // chatty session can balloon daemon RSS. Wait for `drain` (or // close/error) before scheduling the next write. - const writeWithBackpressure = (chunk: string): Promise => + // + // Concurrency: serialize ALL writes through a per-connection chain + // so the heartbeat (fire-and-forget interval, see below) can't + // interleave with the main event-write loop. Without serialization, + // the heartbeat firing while the main loop is mid-`drain` await + // would issue a second `res.write()` that bypasses the + // backpressure guard — and could even interleave bytes between two + // SSE frames on the wire. The chain is single-flight: each call + // waits for the previous write to settle before scheduling its own. + let writeChain: Promise = Promise.resolve(); + const doWrite = (chunk: string): Promise => new Promise((resolve, reject) => { if (res.writableEnded) { resolve(); @@ -341,6 +359,15 @@ export function createServeApp( res.once('close', onClose); res.once('error', onError); }); + const writeWithBackpressure = (chunk: string): Promise => { + const next = writeChain.then(() => doWrite(chunk)); + // Tail-swallow rejections on the chain itself so a single failed + // write doesn't poison every subsequent call. The CALLER's + // returned promise still rejects — chain-internal failures are + // someone else's problem, not blockers for queueing. + writeChain = next.catch(() => undefined); + return next; + }; // Tell EventSource to retry after 3s on disconnect. Awaiting drain on // the very first write is overkill but cheap — `ok` is true the @@ -446,6 +473,21 @@ export function createServeApp( res.status(400).json({ error: 'Invalid JSON in request body' }); return; } + // body-parser raises a typed error with `status: 413` when a + // request body exceeds the `express.json({ limit: '10mb' })` + // ceiling. Without this branch it falls through to the 500 path + // and clients see a misleading "Internal server error" instead + // of a clear "payload too large" — which is the kind of error + // they can actually act on (chunk the request, raise the limit). + if ( + err && + typeof err === 'object' && + 'status' in err && + (err as { status: number }).status === 413 + ) { + res.status(413).json({ error: 'Request body too large (max 10 MB)' }); + return; + } writeStderrLine( `qwen serve: unhandled error: ${err instanceof Error ? (err.stack ?? err.message) : String(err)}`, ); diff --git a/packages/sdk-typescript/src/daemon/DaemonClient.ts b/packages/sdk-typescript/src/daemon/DaemonClient.ts index 5ca7aac9f..6d9ff7a2d 100644 --- a/packages/sdk-typescript/src/daemon/DaemonClient.ts +++ b/packages/sdk-typescript/src/daemon/DaemonClient.ts @@ -355,7 +355,11 @@ export class DaemonClient { * but exposed as `static` on `AbortSignal`, so cautious feature-detect plus * a polyfill keeps us honest if a runtime ships a stripped-down `AbortSignal`. */ -function abortTimeout(ms: number): AbortSignal { +// Exported solely for direct unit testing — production callers go +// through `fetchWithTimeout` above, but the polyfill paths only fire +// on Node 18.0–20.2 (where `AbortSignal.any` is missing) and that +// runtime can't easily be exercised from the public API surface. +export function abortTimeout(ms: number): AbortSignal { const tFn = ( AbortSignal as unknown as { timeout?: (ms: number) => AbortSignal } ).timeout; @@ -394,7 +398,8 @@ function abortTimeout(ms: number): AbortSignal { * listeners after the first fire is best-effort), but for `fetch`-style * single-shot use the difference is invisible. */ -function composeAbortSignals(signals: AbortSignal[]): AbortSignal { +// Exported solely for direct unit testing — see note on `abortTimeout`. +export function composeAbortSignals(signals: AbortSignal[]): AbortSignal { const anyFn = ( AbortSignal as unknown as { any?: (s: AbortSignal[]) => AbortSignal } ).any; diff --git a/packages/sdk-typescript/test/unit/DaemonClient.test.ts b/packages/sdk-typescript/test/unit/DaemonClient.test.ts index 1a3dbfcee..b08720cad 100644 --- a/packages/sdk-typescript/test/unit/DaemonClient.test.ts +++ b/packages/sdk-typescript/test/unit/DaemonClient.test.ts @@ -8,6 +8,8 @@ import { describe, it, expect, vi } from 'vitest'; import { DaemonClient, DaemonHttpError, + abortTimeout, + composeAbortSignals, } from '../../src/daemon/DaemonClient.js'; function jsonResponse(status: number, body: unknown): Response { @@ -480,35 +482,42 @@ describe('DaemonClient', () => { expect(elapsed).toBeLessThan(2000); }); - it('aborts when the caller-supplied signal fires (composeAbortSignals path)', async () => { - // Verifies that `composeAbortSignals` actually forwards the - // caller's abort to the underlying fetch — the polyfill path - // is the only one Node 18.0–20.2 takes for signal composition. - const fetch = vi.fn( - (_input: RequestInfo | URL, init?: RequestInit) => - new Promise((_res, rej) => { - init?.signal?.addEventListener('abort', () => - rej(new DOMException('aborted', 'AbortError')), - ); - }), - ) as unknown as typeof globalThis.fetch; - const client = new DaemonClient({ - baseUrl: 'http://daemon', - fetch, - fetchTimeoutMs: 60_000, // long, so the caller's abort is what fires - }); - const ctrl = new AbortController(); - setTimeout(() => ctrl.abort(), 30); - await expect( - // subscribeEvents forwards opts.signal through fetchWithTimeout - // — pick a method whose typed surface accepts a signal. - (async () => { - const iter = client.subscribeEvents('s-1', { signal: ctrl.signal }); - for await (const _ of iter) { - void _; - } - })(), - ).rejects.toThrow(); + it('composeAbortSignals forwards the first abort, with or without native AbortSignal.any', async () => { + // Direct-unit test on the helper — `subscribeEvents` bypasses + // `fetchWithTimeout` entirely (it calls `_fetch` directly with + // the caller's signal), so testing through subscribeEvents + // never exercises the polyfill. Calling `composeAbortSignals` + // here covers it on all Node versions: native (`>=20.3`) and + // polyfill (`18.0`–`20.2`) take the same input shape. + const a = new AbortController(); + const b = new AbortController(); + const composed = composeAbortSignals([a.signal, b.signal]); + expect(composed.aborted).toBe(false); + a.abort(new DOMException('first', 'AbortError')); + // The composed signal should follow whichever input fires first. + // Allow a microtask for native AbortSignal.any propagation. + await Promise.resolve(); + expect(composed.aborted).toBe(true); + }); + + it('composeAbortSignals fires immediately if any input is already aborted', () => { + const a = new AbortController(); + a.abort(); + const b = new AbortController(); + const composed = composeAbortSignals([a.signal, b.signal]); + expect(composed.aborted).toBe(true); + }); + + it('abortTimeout fires after the configured delay', async () => { + const t0 = Date.now(); + const sig = abortTimeout(40); + await new Promise((resolve) => + sig.addEventListener('abort', () => resolve(), { once: true }), + ); + const elapsed = Date.now() - t0; + // Generous tolerance — just checking the timer fires. + expect(elapsed).toBeGreaterThanOrEqual(30); + expect(elapsed).toBeLessThan(2000); }); }); });