fix(cli,sdk,docs): close 21 review threads — env regression + races + doc accuracy (#3803)

CRITICAL regression fix:
- Child env scrub flipped from allowlist back to denylist (just
  QWEN_SERVER_TOKEN). The earlier allowlist was overzealous: it
  dropped OPENAI_API_KEY / ANTHROPIC_API_KEY / GEMINI_API_KEY /
  QWEN_* / DASHSCOPE_API_KEY / custom modelProviders[].envKey, all of
  which the agent legitimately needs to authenticate to the LLM.
  Daemon-mode users with env-only auth would start the daemon, attach
  a session, then watch every prompt fail with auth errors. Threat-
  model rationale documented at the call site: prompt-injected shell
  tools can already read ~/.bashrc, ~/.aws/credentials, etc., so env
  passthrough isn't the security boundary; the user-as-trust-root is.
  QWEN_SERVER_TOKEN stays scrubbed to prevent agent → its own daemon
  escalation.

Other code fixes:
- doSpawn no longer tears down the session when create-time model
  switch fails. The session is still operational on the agent's
  default model; tearing it down left the caller with a 500 and no
  sessionId to retry against. The model_switch_failed SSE event is
  the visible signal; caller can retry via POST /session/:id/model
  once they have the sessionId.
- doSpawn now uses applyModelServiceId for the create-time model
  switch (was raw conn.unstable_setSessionModel + withTimeout). The
  helper races against transportClosedReject too, so a child crash
  during model switch fails fast instead of consuming the full init
  timeout.
- sendPrompt's abort handler now calls cancelPendingForSession
  before the ACP cancel notification (matching cancelSession). A
  client disconnecting mid-permission was leaving the agent stuck
  waiting on a vote that no SSE subscriber would ever cast.
- shutdown() and killSession() now publish a terminal `session_died`
  SSE event before closing the bus. Previously the channel.exited
  handler's "byId.get(...) !== entry" guard short-circuited (entry
  already removed), so SSE subscribers couldn't tell daemon shutdown
  from a transient network error.
- Express error middleware now special-cases `status: 413`
  (EntityTooLargeError from body-parser when a request exceeds the
  10 MB JSON limit) and returns a JSON 413 instead of a misleading
  500.
- /health is now registered BEFORE bearerAuth middleware, so
  liveness probes work without credentials when the daemon was
  started with --token. CORS deny + Host allowlist still apply.
- SSE writes serialize through a per-connection chain so the
  heartbeat interval can no longer interleave with the main event-
  write loop. Two concurrent res.write calls would otherwise bypass
  the backpressure guard and could interleave bytes between SSE
  frames on the wire.

SDK:
- abortTimeout / composeAbortSignals exported for direct unit
  testing. The existing test claimed to cover the polyfill paths via
  subscribeEvents, but subscribeEvents calls _fetch directly (not
  fetchWithTimeout), so composeAbortSignals never ran in the test.
  New tests exercise the helpers directly across native + polyfill
  runtimes.

Doc accuracy fixes:
- daemon-client-quickstart.md: createOrAttachSession({ cwd: ... })
  → ({ workspaceCwd: ... }) (SDK type), client.sendPrompt → prompt,
  client.cancelSession → cancel. The example wouldn't typecheck.
- qwen-serve.md: "binds one workspace" claim removed — a single
  daemon hosts sessions for any cwd the caller passes; the
  per-instance constraint is per-user / scale, not per-workspace.
  Auth verification example switched from /health to /capabilities
  (since /health is now exempt from bearer auth).
- qwen-serve-protocol.md: env var was QWEN_E2E_LLM, real var is
  SKIP_LLM_TESTS (inverted polarity). Streaming test count was 4,
  actually 3. Added Stage 1 limitation notes for "no DELETE
  /session" and "no permission timeout". Added client-side
  ring-buffer gap detection guidance for Last-Event-ID reconnect.

Test updates:
- httpAcpBridge.test.ts: rewrote two tests for the new
  doSpawn-on-model-switch-fail contract (publish event, keep
  session). Updated shutdown-closes-subscriptions test to expect
  the new terminal `session_died` frame.
- server.test.ts: switched bearer-auth rejection probes from
  /health to /capabilities (since /health is now exempt). Added a
  test that locks /health's exemption.
This commit is contained in:
wenshao 2026-05-11 09:27:22 +08:00
parent 73203f33db
commit e74aa99191
9 changed files with 324 additions and 200 deletions

View file

@ -34,7 +34,7 @@ console.log('Daemon features:', caps.features);
// 2. Spawn-or-attach a session for the current workspace.
const session = await client.createOrAttachSession({
cwd: process.cwd(),
workspaceCwd: process.cwd(),
});
console.log(`session=${session.sessionId} attached=${session.attached}`);
@ -57,10 +57,10 @@ const subscription = (async () => {
})();
// 4. Send a prompt and wait for it to settle. (Order-of-operations
// note: even if `sendPrompt` fires before the SSE handshake
// note: even if `prompt()` fires before the SSE handshake
// completes, step 3's `lastEventId: 0` guarantees every event
// lands in the iterator.)
const result = await client.sendPrompt(session.sessionId, {
const result = await client.prompt(session.sessionId, {
prompt: [{ type: 'text', text: 'Summarize src/main.ts in one sentence.' }],
});
console.log('stop reason:', result.stopReason);
@ -143,11 +143,11 @@ Two clients pointed at the same daemon and `cwd` end up on the same session:
```ts
// Client A (e.g. an IDE plugin)
const a = await clientA.createOrAttachSession({ cwd: '/work/repo' });
const a = await clientA.createOrAttachSession({ workspaceCwd: '/work/repo' });
console.log(a.attached); // false — A spawned the agent
// Client B (e.g. a web UI on the same machine)
const b = await clientB.createOrAttachSession({ cwd: '/work/repo' });
const b = await clientB.createOrAttachSession({ workspaceCwd: '/work/repo' });
console.log(b.attached); // true — B joined A's session
console.log(a.sessionId === b.sessionId); // true
```
@ -186,7 +186,7 @@ try {
If your user hits Esc:
```ts
await client.cancelSession(session.sessionId);
await client.cancel(session.sessionId);
// In the event stream you'll see the prompt resolve with stopReason: "cancelled"
```

View file

@ -52,6 +52,12 @@ Every Stage 1 daemon advertises 9 feature tags. Clients **must** gate UI off `fe
## Routes
> **Stage 1 limitation — no `DELETE /session/:id`.** Sessions live until
> the agent child crashes (`session_died`), the daemon process exits, or
> a server-side `killSession` (used internally by orphan-cleanup) fires.
> HTTP clients have no explicit "close one session" route in Stage 1.
> An explicit `DELETE /session/:id` is on the Stage 2 polish list.
### `GET /health`
Liveness probe. Returns `200 {"status":"ok"}` if the listener is up. No auth required even when a token is configured (heartbeat-friendly).
@ -216,6 +222,7 @@ data: {"reason":"queue_overflow","droppedAfter":42}
Reconnect semantics:
- Send `Last-Event-ID: <n>` to replay events with `id > n` from the per-session ring (default depth 1000)
- **Gap detection (client-side):** if `<n>` predates the oldest event still in the ring (e.g. you reconnect with `Last-Event-ID: 50` but the ring now holds 2001199), the daemon replays from the oldest available event without raising. Compare the first replayed event's `id` against `n + 1`; any difference is the size of the lost window. Stage 2 will inject an explicit `stream_gap` synthetic frame on the daemon side; in Stage 1 detection is the client's responsibility.
- IDs are monotonic per session, starting at 1
- Synthetic terminal frames (`client_evicted`, `stream_error`) intentionally omit `id` so they don't burn a sequence slot for other subscribers
@ -228,6 +235,19 @@ Backpressure:
Cast a vote on a pending `permission_request`. **First responder wins** — once one client answers, every other client trying to answer the same id gets `404`.
> **Stage 1 limitation — no permission timeout.** A `permission_request`
> stays pending until: (a) some client votes here, (b) `POST
/session/:id/cancel` fires, (c) the HTTP client driving the prompt
> disconnects (mid-prompt cancel resolves outstanding permissions as
> `cancelled`), (d) the session is killed, or (e) the daemon shuts
> down. **In a fully-headless deployment with no SSE subscriber,
> `requestPermission` blocks the agent indefinitely** — there's nothing
> to time out the wait. Stage 2 will add a configurable
> `permissionTimeoutMs`. Until then, headless callers should keep an
> SSE subscription open or wrap their prompt loop in their own timeout
>
> - `POST /session/:id/cancel`.
Request:
```json
@ -268,22 +288,22 @@ The connection then closes.
## Environment variables
| Var | Purpose |
| ------------------- | ------------------------------------------------------------------------------------------------------------ |
| `QWEN_SERVER_TOKEN` | Bearer token. Stripped of leading/trailing whitespace at boot. |
| `QWEN_E2E_LLM` | Set to `1` to enable LLM-required integration tests in `integration-tests/cli/qwen-serve-streaming.test.ts`. |
| Var | Purpose |
| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `QWEN_SERVER_TOKEN` | Bearer token. Stripped of leading/trailing whitespace at boot. |
| `SKIP_LLM_TESTS` | Set to `1` to **skip** LLM-required integration tests in `integration-tests/cli/qwen-serve-streaming.test.ts` (default-on for CI envs that lack provider API keys). |
## Source layout
| Path | Purpose |
| ---------------------------------------------------- | -------------------------------------------------------- |
| `packages/cli/src/commands/serve.ts` | yargs command + flag schema |
| `packages/cli/src/serve/runQwenServe.ts` | listener lifecycle + signal handling |
| `packages/cli/src/serve/server.ts` | Express routes + middleware |
| `packages/cli/src/serve/auth.ts` | bearer + Host allowlist + CORS deny |
| `packages/cli/src/serve/httpAcpBridge.ts` | spawn-or-attach + per-session FIFO + permission registry |
| `packages/cli/src/serve/eventBus.ts` | bounded async queue + replay ring |
| `packages/sdk-typescript/src/daemon/DaemonClient.ts` | TS client |
| `packages/sdk-typescript/src/daemon/sse.ts` | EventSource frame parser |
| `integration-tests/cli/qwen-serve-routes.test.ts` | 18 cases, no LLM |
| `integration-tests/cli/qwen-serve-streaming.test.ts` | 4 cases, real `qwen --acp` child (`QWEN_E2E_LLM=1`) |
| Path | Purpose |
| ---------------------------------------------------- | ------------------------------------------------------------------ |
| `packages/cli/src/commands/serve.ts` | yargs command + flag schema |
| `packages/cli/src/serve/runQwenServe.ts` | listener lifecycle + signal handling |
| `packages/cli/src/serve/server.ts` | Express routes + middleware |
| `packages/cli/src/serve/auth.ts` | bearer + Host allowlist + CORS deny |
| `packages/cli/src/serve/httpAcpBridge.ts` | spawn-or-attach + per-session FIFO + permission registry |
| `packages/cli/src/serve/eventBus.ts` | bounded async queue + replay ring |
| `packages/sdk-typescript/src/daemon/DaemonClient.ts` | TS client |
| `packages/sdk-typescript/src/daemon/sse.ts` | EventSource frame parser |
| `integration-tests/cli/qwen-serve-routes.test.ts` | 18 cases, no LLM |
| `integration-tests/cli/qwen-serve-streaming.test.ts` | 3 cases, real `qwen --acp` child (skipped when `SKIP_LLM_TESTS=1`) |

View file

@ -86,10 +86,12 @@ qwen serve --hostname 0.0.0.0 --port 4170
# → boot refuses without QWEN_SERVER_TOKEN
```
Clients then send `Authorization: Bearer $QWEN_SERVER_TOKEN` on every request.
Clients then send `Authorization: Bearer $QWEN_SERVER_TOKEN` on every request **except `/health`**, which is intentionally exempt so liveness probes (k8s, Compose, monitoring) work without credentials. Use `/capabilities` to verify your token is correct end-to-end:
```bash
curl -H "Authorization: Bearer $QWEN_SERVER_TOKEN" http://your-host:4170/health
curl -H "Authorization: Bearer $QWEN_SERVER_TOKEN" http://your-host:4170/capabilities
# → {"v":1,"mode":"http-bridge","features":[...],"modelServices":[]}
# Wrong token → 401
```
The token comparison is constant-time (SHA-256 + `crypto.timingSafeEqual`); 401 responses are uniform across "missing header", "wrong scheme", and "wrong token" so a side-channel can't distinguish.
@ -116,7 +118,9 @@ The token comparison is constant-time (SHA-256 + `crypto.timingSafeEqual`); 401
## Multi-session & remote deployment
Each `qwen serve` process binds **one workspace**. To handle multiple workspaces or multiple users at scale you spawn multiple daemon instances behind an external orchestrator. That orchestrator (multi-tenancy / OIDC / Quota / Audit / k8s) is **out of scope** for the qwen-code project — see issue [#3803](https://github.com/QwenLM/qwen-code/issues/3803) "External Reference Architecture" for the design pointers.
A single `qwen serve` process can manage sessions for any workspace path passed via `cwd` on `POST /session` — under the default `sessionScope: 'single'` it keeps one ACP session per canonicalized workspace, sharing it across every client that posts the same `cwd`. So one daemon will happily host sessions for many workspaces at once.
To handle multiple **users** (each with their own quota, audit log, sandbox) or to scale beyond one process's reach (cold-start budget, FD count, RSS), you spawn multiple daemon instances behind an external orchestrator. That orchestrator (multi-tenancy / OIDC / Quota / Audit / k8s) is **out of scope** for the qwen-code project — see issue [#3803](https://github.com/QwenLM/qwen-code/issues/3803) "External Reference Architecture" for the design pointers.
## What's next

View file

@ -837,66 +837,70 @@ describe('createHttpAcpBridge', () => {
await bridge.shutdown();
});
it('rejects spawnOrAttach when the agent rejects the requested model', async () => {
it('keeps the session alive on model-switch failure and publishes model_switch_failed', async () => {
// Contract (per #3889 review A05Ym): when the agent rejects the
// requested model at create-session time, the session is still
// operational on the agent's default model. The caller gets a
// sessionId they can retry the model switch against (via
// POST /session/:id/model) and observe via the SSE stream.
// Tearing the session down would force the caller into a 500
// with no way to recover.
const { bridge } = setup({
setModelImpl: async () => {
throw new Error('unknown model');
},
});
await expect(
bridge.spawnOrAttach({
workspaceCwd: WS_A,
modelServiceId: 'definitely-not-a-real-model',
}),
).rejects.toBeTruthy();
// Failed spawn must NOT leave the half-initialized session in the maps.
// (The entry's EventBus is also closed in the same teardown — it has
// no externally-reachable subscriber and the bus's internal `closed`
// flag prevents future GC-blocking publishes from the in-flight
// ClientSideConnection observer chain.)
expect(bridge.sessionCount).toBe(0);
await bridge.shutdown();
});
it('a retry after a model-rejection failure uses a fresh EventBus', async () => {
// Regression for the leak path: after a setSessionModel failure we
// tear down the half-initialized session. A subsequent retry must
// create a *new* entry (not silently reuse the old one), and old
// events must not bleed into the new subscriber.
let attempt = 0;
const { bridge } = setup({
setModelImpl: async () => {
attempt += 1;
if (attempt === 1) throw new Error('first attempt rejected');
},
});
await expect(
bridge.spawnOrAttach({
workspaceCwd: WS_A,
modelServiceId: 'try-1',
}),
).rejects.toBeTruthy();
expect(bridge.sessionCount).toBe(0);
const session = await bridge.spawnOrAttach({
workspaceCwd: WS_A,
modelServiceId: 'try-2',
modelServiceId: 'definitely-not-a-real-model',
});
expect(session.attached).toBe(false);
expect(bridge.sessionCount).toBe(1);
// Subscribe to the live session — should see no events from the
// failed attempt.
// The model_switch_failed event must be on the bus for any
// subscriber that subscribes with `lastEventId: 0` (replay).
const abort = new AbortController();
const iter = bridge.subscribeEvents(session.sessionId, {
signal: abort.signal,
lastEventId: 0,
});
const it = iter[Symbol.asyncIterator]();
// No events have been published yet; aborting closes the queue.
const first = await it.next();
expect(first.value?.type).toBe('model_switch_failed');
expect(first.value?.data).toMatchObject({
sessionId: session.sessionId,
requestedModelId: 'definitely-not-a-real-model',
});
abort.abort();
const next = await it.next();
expect(next.done).toBe(true);
await bridge.shutdown();
});
it('attaches to the existing session on retry after a model-switch failure', async () => {
// Per the same A05Ym contract: a follow-up `spawnOrAttach` for
// the same workspace finds the existing session (rather than
// re-spawning a fresh one), and a retry of the model switch
// through `POST /session/:id/model` is the documented recovery
// path. We exercise just the attach side here.
const { bridge } = setup({
setModelImpl: async () => {
throw new Error('first attempt rejected');
},
});
const first = await bridge.spawnOrAttach({
workspaceCwd: WS_A,
modelServiceId: 'try-1',
});
expect(first.attached).toBe(false);
expect(bridge.sessionCount).toBe(1);
// Second attach (no modelServiceId so we don't re-trigger the
// failing setModel) reuses the same session.
const second = await bridge.spawnOrAttach({
workspaceCwd: WS_A,
});
expect(second.attached).toBe(true);
expect(second.sessionId).toBe(first.sessionId);
expect(bridge.sessionCount).toBe(1);
await bridge.shutdown();
});
@ -1681,9 +1685,13 @@ describe('createHttpAcpBridge', () => {
await new Promise((r) => setTimeout(r, 10));
await bridge.shutdown();
// Subscriber must unwind to completion (no events ever published).
const events = await drain;
expect(events).toEqual([]);
// Subscriber must unwind to completion. Per #3889 review A05Ys
// the bus now publishes a terminal `session_died` event before
// closing on shutdown, so SSE subscribers can distinguish
// daemon shutdown from a transient network error.
const events = (await drain) as Array<{ type: string }>;
expect(events).toHaveLength(1);
expect(events[0]?.type).toBe('session_died');
});
});
});

View file

@ -539,39 +539,37 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
// ACP `newSession` doesn't take a model id; honor the caller's
// `modelServiceId` by issuing the unstable `setSessionModel` call
// immediately after the session is established. If the agent rejects
// the model id, surface it as a session-creation failure so the
// caller doesn't think they got the requested model.
// immediately after the session is established. Use the shared
// `applyModelServiceId` helper so the call:
// - races against `transportClosedReject` (a wedged child
// during model switch fails fast instead of consuming the
// full init timeout — A09HD)
// - publishes `model_switched` / `model_switch_failed` to the
// bus so attached clients see the result
// - serializes through `entry.modelChangeQueue` for ordering
// against later attach-with-different-model calls
//
// On failure we DO NOT tear the session down. An earlier rev
// tore it down so the caller "didn't inherit silent drift",
// but that killed an already-functional session and left the
// caller with a 500 and no sessionId to retry against. The
// session is operational on the agent's default model — the
// `model_switch_failed` event tells subscribers exactly what
// happened, and the caller can retry the switch via
// `POST /session/:id/model` once they know the sessionId.
//
// Swallow the rejection inline so the outer try/catch (which
// would `channel.kill()`) doesn't fire. The publish inside
// `applyModelServiceId` is the visible signal; the caller
// gets a 200 and a sessionId regardless, and learns of the
// model issue via the SSE stream.
if (modelServiceId) {
try {
const conn = entry.connection as unknown as {
unstable_setSessionModel(p: {
sessionId: string;
modelId: string;
}): Promise<unknown>;
};
await withTimeout(
conn.unstable_setSessionModel({
sessionId: entry.sessionId,
modelId: modelServiceId,
}),
initTimeoutMs,
'setSessionModel',
);
} catch (err) {
// The session is half-initialized — a known sessionId on a real
// child but pointing at the wrong model. Tear it down so the
// caller can retry cleanly instead of inheriting silent drift.
// Close the EventBus too: the agent may have published session_
// update frames during init that are now orphaned (no subscriber
// can ever reach them — the caller never received the sessionId
// they would need to subscribe). Without an explicit close the
// bus + ring buffer linger until the next GC cycle.
byWorkspace.delete(workspaceKey);
byId.delete(entry.sessionId);
entry.events.close();
throw err;
}
await applyModelServiceId(entry, modelServiceId, initTimeoutMs).catch(
() => {
// Already published `model_switch_failed`; don't take
// down the session.
},
);
}
return {
@ -829,7 +827,17 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
// req.on('close')), tell the agent to wind down. ACP cancel is a
// notification — the active prompt resolves with
// stopReason: 'cancelled', then the next queued prompt can run.
//
// Also resolve any pending permission requests as `cancelled`.
// ACP spec requires `cancel` to settle outstanding
// `requestPermission` calls — `cancelSession()` already does
// this; the abort path here was missing the call. Without it,
// a client disconnecting while the agent is inside
// `requestPermission` leaves the permission promise unresolved
// forever (the agent is stuck waiting on a vote that no SSE
// subscriber will ever cast).
const onAbort = () => {
cancelPendingForSession(sessionId);
entry.connection.cancel({ sessionId }).catch(() => {
// Cancel is fire-and-forget; the agent may already be dead.
});
@ -1012,6 +1020,19 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
for (const id of Array.from(entry.pendingPermissionIds)) {
resolvePending(id, { outcome: { outcome: 'cancelled' } });
}
// Publish `session_died` BEFORE closing the bus. After the eager
// `byId.delete` above, the channel.exited handler's
// `byId.get(...) !== entry` guard short-circuits, so the
// automatic publish at crash time wouldn't fire. SSE subscribers
// need this terminal frame to know the session is gone.
try {
entry.events.publish({
type: 'session_died',
data: { sessionId, reason: 'killed' },
});
} catch {
/* bus already closed */
}
entry.events.close();
await entry.channel.kill().catch(() => {
// Best-effort kill — channel may already be dead.
@ -1036,7 +1057,24 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
byWorkspace.clear();
byId.clear();
pendingPermissions.clear();
for (const e of entries) e.events.close();
// Publish a terminal `session_died` BEFORE closing each bus so SSE
// subscribers can distinguish "daemon shut down" from a transient
// network error and don't sit indefinitely retrying. The
// channel.exited handler also publishes this on a child crash,
// but at shutdown time the entry has already been removed from
// `byId` (above), so the handler's `byId.get(...) !== entry`
// guard would short-circuit and the event would never fire.
for (const e of entries) {
try {
e.events.publish({
type: 'session_died',
data: { sessionId: e.sessionId, reason: 'daemon_shutdown' },
});
} catch {
/* bus already closed */
}
e.events.close();
}
// Wait for in-flight spawns too. The snapshot above only sees
// sessions already in `byId`; a `doSpawn` past `channelFactory()`
// but still inside `connection.newSession` (~1s on cold start) is
@ -1187,19 +1225,29 @@ export const defaultSpawnChannelFactory: ChannelFactory = async (
// is a `.ts` file Node can't run; users should `npm run build` before
// `qwen serve` or set `process.execPath` to a tsx-aware shim. Stage 1
// accepts this — the daemon is meant for built deployments.
// Pass an *allowlisted* environment to the child (see
// ALLOWED_CHILD_ENV_KEYS at module scope for the rationale + the
// allowlist itself). Move-out keeps the Set allocated once, not
// per-spawn.
const childEnv: NodeJS.ProcessEnv = {};
for (const key of ALLOWED_CHILD_ENV_KEYS) {
const v = process.env[key];
if (typeof v === 'string') childEnv[key] = v;
// Pass through the daemon's full environment to the child, scrubbing
// ONLY daemon-internal secrets (see SCRUBBED_CHILD_ENV_KEYS at module
// scope). An earlier version used an allowlist, but that broke the
// common deployment shape: users export `OPENAI_API_KEY` /
// `ANTHROPIC_API_KEY` / `QWEN_*` / `DASHSCOPE_API_KEY` / a custom
// `modelProviders[].envKey` to authenticate the agent's LLM calls,
// and core's model config resolves those from `process.env`. An
// exhaustive allowlist can't enumerate user-defined provider keys,
// so the agent ends up unable to authenticate.
//
// Threat-model rationale: the agent already runs as the same UID
// with shell-tool access — anything in `~/.bashrc`, `~/.npmrc`,
// `~/.aws/credentials`, etc. is reachable by prompt injection
// regardless of what we put in `env`. The env passthrough is not
// the security boundary; the user-as-trust-root is. The only thing
// we MUST scrub is `QWEN_SERVER_TOKEN` (daemon-only auth that
// would let a prompt-injected shell turn the agent into an
// authenticated client of its own daemon — escalation the agent
// doesn't otherwise have).
const childEnv: NodeJS.ProcessEnv = { ...process.env };
for (const key of SCRUBBED_CHILD_ENV_KEYS) {
delete childEnv[key];
}
// Defense-in-depth: the explicit allowlist is exhaustive but the
// delete keeps the security review trail readable — anyone grepping
// for `QWEN_SERVER_TOKEN` finds the scrub explicitly named.
delete childEnv['QWEN_SERVER_TOKEN'];
// CodeQL `js/path-injection` flags the `cwd: workspaceCwd` flow.
// Stage 1 trust model accepts this — see the function-level comment
// above for the design rationale. Defense-in-depth: the cwd is
@ -1257,54 +1305,20 @@ export const defaultSpawnChannelFactory: ChannelFactory = async (
const KILL_HARD_DEADLINE_MS = 10_000;
/**
* Environment variables the spawned `qwen --acp` child is allowed to
* inherit from the daemon's environment. Anything else is dropped
* the agent runs user-supplied prompts with shell-tool access, so
* everything in its env is reachable by prompt injection: API keys
* (OPENAI/ANTHROPIC/GEMINI/DASHSCOPE/...), DB passwords, AWS/GCP
* credentials, OAuth tokens, secrets your operator forgot they
* exported. A denylist requires perfect knowledge of every secret
* anyone might set; an allowlist requires only that we know what
* the agent legitimately needs.
* Environment variables stripped from the spawned `qwen --acp` child's
* environment. Everything else is passed through see the
* threat-model rationale at the call site in `defaultSpawnChannelFactory`.
*
* What's needed: enough to launch Node + load the entry script and
* resolve user-config files (HOME), enough to find binaries and
* standard tools (PATH), enough to localize and identify the user
* for any session-state lookups (LANG/LC_*, USER/LOGNAME). Anything
* beyond this list should be passed to the agent through ACP, not
* through the spawn env.
* Currently just `QWEN_SERVER_TOKEN`: the daemon's own bearer token,
* which the agent doesn't need (it speaks to the daemon over stdio,
* not HTTP). Leaving it in the child's env would let prompt injection
* turn the agent into an authenticated client of its own daemon an
* escalation the agent doesn't otherwise have.
*
* Per-platform extras: NODE-specific knobs (NODE_OPTIONS, NODE_PATH)
* and shell-driven temp dir overrides (TMPDIR/TEMP/TMP) are usually
* benign but can carry instructions; include only the temp-dir ones
* since the agent legitimately writes scratch files. Windows-only
* SYSTEMROOT/USERPROFILE/APPDATA are required by Node itself on
* Windows or the spawn fails.
*
* Defined at module scope so the Set is allocated once at load,
* not rebuilt on every `defaultSpawnChannelFactory` call.
* Defined at module scope so the Set is allocated once at load.
*/
const ALLOWED_CHILD_ENV_KEYS: ReadonlySet<string> = new Set([
'HOME',
'PATH',
'USER',
'LOGNAME',
'LANG',
'LC_ALL',
'LC_CTYPE',
'LC_COLLATE',
'LC_MESSAGES',
'TMPDIR',
'TEMP',
'TMP',
'NODE_PATH',
// Windows essentials — Node refuses to spawn without these on Win32.
'SYSTEMROOT',
'USERPROFILE',
'APPDATA',
'LOCALAPPDATA',
'COMSPEC',
'PATHEXT',
const SCRUBBED_CHILD_ENV_KEYS: ReadonlySet<string> = new Set([
'QWEN_SERVER_TOKEN',
]);
function killChild(child: ChildProcess): Promise<void> {

View file

@ -679,10 +679,16 @@ describe('createServeApp', () => {
expect(res.status).toBe(200);
});
// Switched probe endpoint from `/health` to `/capabilities` for
// these auth-rejection tests because per #3889 review A8dZT
// `/health` is now intentionally registered BEFORE the bearer
// middleware so liveness probes work without credentials.
// `/capabilities` is the cheapest endpoint that still goes through
// the auth chain.
it('rejects missing Authorization header when token is set', async () => {
const app = createServeApp({ ...baseOpts, token: 'secret' });
const res = await request(app)
.get('/health')
.get('/capabilities')
.set('Host', `127.0.0.1:${baseOpts.port}`);
expect(res.status).toBe(401);
});
@ -690,7 +696,7 @@ describe('createServeApp', () => {
it('rejects wrong scheme', async () => {
const app = createServeApp({ ...baseOpts, token: 'secret' });
const res = await request(app)
.get('/health')
.get('/capabilities')
.set('Host', `127.0.0.1:${baseOpts.port}`)
.set('Authorization', 'Basic c2VjcmV0');
expect(res.status).toBe(401);
@ -699,7 +705,7 @@ describe('createServeApp', () => {
it('rejects wrong token', async () => {
const app = createServeApp({ ...baseOpts, token: 'secret' });
const res = await request(app)
.get('/health')
.get('/capabilities')
.set('Host', `127.0.0.1:${baseOpts.port}`)
.set('Authorization', 'Bearer wrong');
expect(res.status).toBe(401);
@ -708,11 +714,27 @@ describe('createServeApp', () => {
it('accepts the right token', async () => {
const app = createServeApp({ ...baseOpts, token: 'secret' });
const res = await request(app)
.get('/health')
.get('/capabilities')
.set('Host', `127.0.0.1:${baseOpts.port}`)
.set('Authorization', 'Bearer secret');
expect(res.status).toBe(200);
});
it('exempts /health from bearer auth so liveness probes work without credentials', async () => {
// Per #3889 review A8dZT — the registration order in
// `createServeApp` puts `/health` BEFORE `bearerAuth`, so a
// probe with no credentials still gets 200 even when the daemon
// was started with a token. CORS deny + Host allowlist still
// apply to `/health` (registered before /health), so this is
// not a way to bypass DNS rebinding or browser-origin
// protection.
const app = createServeApp({ ...baseOpts, token: 'secret' });
const res = await request(app)
.get('/health')
.set('Host', `127.0.0.1:${baseOpts.port}`);
expect(res.status).toBe(200);
expect(res.body).toEqual({ status: 'ok' });
});
});
});

View file

@ -61,13 +61,21 @@ export function createServeApp(
// amplified CPU/memory cost from any wrong-token client.
app.use(denyBrowserOriginCors);
app.use(hostAllowlist(opts.hostname, getPort));
app.use(bearerAuth(opts.token));
app.use(express.json({ limit: '10mb' }));
// `/health` is registered BEFORE `bearerAuth` so liveness probes work
// without credentials even when the daemon was started with a
// `--token` (k8s/Compose probes typically don't carry the daemon's
// bearer; round-tripping a 401 just to know the listener is up is
// pure waste). CORS deny + Host allowlist still apply, so a browser
// or a wrong-Host probe is still rejected. Documented exemption in
// `docs/developers/qwen-serve-protocol.md`.
app.get('/health', (_req, res) => {
res.status(200).json({ status: 'ok' });
});
app.use(bearerAuth(opts.token));
app.use(express.json({ limit: '10mb' }));
app.get('/capabilities', (_req, res) => {
const envelope: CapabilitiesEnvelope = {
v: CAPABILITIES_SCHEMA_VERSION,
@ -309,7 +317,17 @@ export function createServeApp(
// payload in user-space memory unboundedly — a slow consumer on a
// chatty session can balloon daemon RSS. Wait for `drain` (or
// close/error) before scheduling the next write.
const writeWithBackpressure = (chunk: string): Promise<void> =>
//
// Concurrency: serialize ALL writes through a per-connection chain
// so the heartbeat (fire-and-forget interval, see below) can't
// interleave with the main event-write loop. Without serialization,
// the heartbeat firing while the main loop is mid-`drain` await
// would issue a second `res.write()` that bypasses the
// backpressure guard — and could even interleave bytes between two
// SSE frames on the wire. The chain is single-flight: each call
// waits for the previous write to settle before scheduling its own.
let writeChain: Promise<void> = Promise.resolve();
const doWrite = (chunk: string): Promise<void> =>
new Promise((resolve, reject) => {
if (res.writableEnded) {
resolve();
@ -341,6 +359,15 @@ export function createServeApp(
res.once('close', onClose);
res.once('error', onError);
});
const writeWithBackpressure = (chunk: string): Promise<void> => {
const next = writeChain.then(() => doWrite(chunk));
// Tail-swallow rejections on the chain itself so a single failed
// write doesn't poison every subsequent call. The CALLER's
// returned promise still rejects — chain-internal failures are
// someone else's problem, not blockers for queueing.
writeChain = next.catch(() => undefined);
return next;
};
// Tell EventSource to retry after 3s on disconnect. Awaiting drain on
// the very first write is overkill but cheap — `ok` is true the
@ -446,6 +473,21 @@ export function createServeApp(
res.status(400).json({ error: 'Invalid JSON in request body' });
return;
}
// body-parser raises a typed error with `status: 413` when a
// request body exceeds the `express.json({ limit: '10mb' })`
// ceiling. Without this branch it falls through to the 500 path
// and clients see a misleading "Internal server error" instead
// of a clear "payload too large" — which is the kind of error
// they can actually act on (chunk the request, raise the limit).
if (
err &&
typeof err === 'object' &&
'status' in err &&
(err as { status: number }).status === 413
) {
res.status(413).json({ error: 'Request body too large (max 10 MB)' });
return;
}
writeStderrLine(
`qwen serve: unhandled error: ${err instanceof Error ? (err.stack ?? err.message) : String(err)}`,
);

View file

@ -355,7 +355,11 @@ export class DaemonClient {
* but exposed as `static` on `AbortSignal`, so cautious feature-detect plus
* a polyfill keeps us honest if a runtime ships a stripped-down `AbortSignal`.
*/
function abortTimeout(ms: number): AbortSignal {
// Exported solely for direct unit testing — production callers go
// through `fetchWithTimeout` above, but the polyfill paths only fire
// on Node 18.020.2 (where `AbortSignal.any` is missing) and that
// runtime can't easily be exercised from the public API surface.
export function abortTimeout(ms: number): AbortSignal {
const tFn = (
AbortSignal as unknown as { timeout?: (ms: number) => AbortSignal }
).timeout;
@ -394,7 +398,8 @@ function abortTimeout(ms: number): AbortSignal {
* listeners after the first fire is best-effort), but for `fetch`-style
* single-shot use the difference is invisible.
*/
function composeAbortSignals(signals: AbortSignal[]): AbortSignal {
// Exported solely for direct unit testing — see note on `abortTimeout`.
export function composeAbortSignals(signals: AbortSignal[]): AbortSignal {
const anyFn = (
AbortSignal as unknown as { any?: (s: AbortSignal[]) => AbortSignal }
).any;

View file

@ -8,6 +8,8 @@ import { describe, it, expect, vi } from 'vitest';
import {
DaemonClient,
DaemonHttpError,
abortTimeout,
composeAbortSignals,
} from '../../src/daemon/DaemonClient.js';
function jsonResponse(status: number, body: unknown): Response {
@ -480,35 +482,42 @@ describe('DaemonClient', () => {
expect(elapsed).toBeLessThan(2000);
});
it('aborts when the caller-supplied signal fires (composeAbortSignals path)', async () => {
// Verifies that `composeAbortSignals` actually forwards the
// caller's abort to the underlying fetch — the polyfill path
// is the only one Node 18.020.2 takes for signal composition.
const fetch = vi.fn(
(_input: RequestInfo | URL, init?: RequestInit) =>
new Promise<Response>((_res, rej) => {
init?.signal?.addEventListener('abort', () =>
rej(new DOMException('aborted', 'AbortError')),
);
}),
) as unknown as typeof globalThis.fetch;
const client = new DaemonClient({
baseUrl: 'http://daemon',
fetch,
fetchTimeoutMs: 60_000, // long, so the caller's abort is what fires
});
const ctrl = new AbortController();
setTimeout(() => ctrl.abort(), 30);
await expect(
// subscribeEvents forwards opts.signal through fetchWithTimeout
// — pick a method whose typed surface accepts a signal.
(async () => {
const iter = client.subscribeEvents('s-1', { signal: ctrl.signal });
for await (const _ of iter) {
void _;
}
})(),
).rejects.toThrow();
it('composeAbortSignals forwards the first abort, with or without native AbortSignal.any', async () => {
// Direct-unit test on the helper — `subscribeEvents` bypasses
// `fetchWithTimeout` entirely (it calls `_fetch` directly with
// the caller's signal), so testing through subscribeEvents
// never exercises the polyfill. Calling `composeAbortSignals`
// here covers it on all Node versions: native (`>=20.3`) and
// polyfill (`18.0``20.2`) take the same input shape.
const a = new AbortController();
const b = new AbortController();
const composed = composeAbortSignals([a.signal, b.signal]);
expect(composed.aborted).toBe(false);
a.abort(new DOMException('first', 'AbortError'));
// The composed signal should follow whichever input fires first.
// Allow a microtask for native AbortSignal.any propagation.
await Promise.resolve();
expect(composed.aborted).toBe(true);
});
it('composeAbortSignals fires immediately if any input is already aborted', () => {
const a = new AbortController();
a.abort();
const b = new AbortController();
const composed = composeAbortSignals([a.signal, b.signal]);
expect(composed.aborted).toBe(true);
});
it('abortTimeout fires after the configured delay', async () => {
const t0 = Date.now();
const sig = abortTimeout(40);
await new Promise<void>((resolve) =>
sig.addEventListener('abort', () => resolve(), { once: true }),
);
const elapsed = Date.now() - t0;
// Generous tolerance — just checking the timer fires.
expect(elapsed).toBeGreaterThanOrEqual(30);
expect(elapsed).toBeLessThan(2000);
});
});
});