mirror of
https://github.com/QwenLM/qwen-code.git
synced 2026-05-17 03:57:18 +00:00
fix(cli,sdk,docs): close 21 review threads — env regression + races + doc accuracy (#3803)
CRITICAL regression fix:
- Child env scrub flipped from allowlist back to denylist (just
QWEN_SERVER_TOKEN). The earlier allowlist was overzealous: it
dropped OPENAI_API_KEY / ANTHROPIC_API_KEY / GEMINI_API_KEY /
QWEN_* / DASHSCOPE_API_KEY / custom modelProviders[].envKey, all of
which the agent legitimately needs to authenticate to the LLM.
Daemon-mode users with env-only auth would start the daemon, attach
a session, then watch every prompt fail with auth errors. Threat-
model rationale documented at the call site: prompt-injected shell
tools can already read ~/.bashrc, ~/.aws/credentials, etc., so env
passthrough isn't the security boundary; the user-as-trust-root is.
QWEN_SERVER_TOKEN stays scrubbed to prevent agent → its own daemon
escalation.
Other code fixes:
- doSpawn no longer tears down the session when create-time model
switch fails. The session is still operational on the agent's
default model; tearing it down left the caller with a 500 and no
sessionId to retry against. The model_switch_failed SSE event is
the visible signal; caller can retry via POST /session/:id/model
once they have the sessionId.
- doSpawn now uses applyModelServiceId for the create-time model
switch (was raw conn.unstable_setSessionModel + withTimeout). The
helper races against transportClosedReject too, so a child crash
during model switch fails fast instead of consuming the full init
timeout.
- sendPrompt's abort handler now calls cancelPendingForSession
before the ACP cancel notification (matching cancelSession). A
client disconnecting mid-permission was leaving the agent stuck
waiting on a vote that no SSE subscriber would ever cast.
- shutdown() and killSession() now publish a terminal `session_died`
SSE event before closing the bus. Previously the channel.exited
handler's "byId.get(...) !== entry" guard short-circuited (entry
already removed), so SSE subscribers couldn't tell daemon shutdown
from a transient network error.
- Express error middleware now special-cases `status: 413`
(EntityTooLargeError from body-parser when a request exceeds the
10 MB JSON limit) and returns a JSON 413 instead of a misleading
500.
- /health is now registered BEFORE bearerAuth middleware, so
liveness probes work without credentials when the daemon was
started with --token. CORS deny + Host allowlist still apply.
- SSE writes serialize through a per-connection chain so the
heartbeat interval can no longer interleave with the main event-
write loop. Two concurrent res.write calls would otherwise bypass
the backpressure guard and could interleave bytes between SSE
frames on the wire.
SDK:
- abortTimeout / composeAbortSignals exported for direct unit
testing. The existing test claimed to cover the polyfill paths via
subscribeEvents, but subscribeEvents calls _fetch directly (not
fetchWithTimeout), so composeAbortSignals never ran in the test.
New tests exercise the helpers directly across native + polyfill
runtimes.
Doc accuracy fixes:
- daemon-client-quickstart.md: createOrAttachSession({ cwd: ... })
→ ({ workspaceCwd: ... }) (SDK type), client.sendPrompt → prompt,
client.cancelSession → cancel. The example wouldn't typecheck.
- qwen-serve.md: "binds one workspace" claim removed — a single
daemon hosts sessions for any cwd the caller passes; the
per-instance constraint is per-user / scale, not per-workspace.
Auth verification example switched from /health to /capabilities
(since /health is now exempt from bearer auth).
- qwen-serve-protocol.md: env var was QWEN_E2E_LLM, real var is
SKIP_LLM_TESTS (inverted polarity). Streaming test count was 4,
actually 3. Added Stage 1 limitation notes for "no DELETE
/session" and "no permission timeout". Added client-side
ring-buffer gap detection guidance for Last-Event-ID reconnect.
Test updates:
- httpAcpBridge.test.ts: rewrote two tests for the new
doSpawn-on-model-switch-fail contract (publish event, keep
session). Updated shutdown-closes-subscriptions test to expect
the new terminal `session_died` frame.
- server.test.ts: switched bearer-auth rejection probes from
/health to /capabilities (since /health is now exempt). Added a
test that locks /health's exemption.
This commit is contained in:
parent
73203f33db
commit
e74aa99191
9 changed files with 324 additions and 200 deletions
|
|
@ -34,7 +34,7 @@ console.log('Daemon features:', caps.features);
|
|||
|
||||
// 2. Spawn-or-attach a session for the current workspace.
|
||||
const session = await client.createOrAttachSession({
|
||||
cwd: process.cwd(),
|
||||
workspaceCwd: process.cwd(),
|
||||
});
|
||||
console.log(`session=${session.sessionId} attached=${session.attached}`);
|
||||
|
||||
|
|
@ -57,10 +57,10 @@ const subscription = (async () => {
|
|||
})();
|
||||
|
||||
// 4. Send a prompt and wait for it to settle. (Order-of-operations
|
||||
// note: even if `sendPrompt` fires before the SSE handshake
|
||||
// note: even if `prompt()` fires before the SSE handshake
|
||||
// completes, step 3's `lastEventId: 0` guarantees every event
|
||||
// lands in the iterator.)
|
||||
const result = await client.sendPrompt(session.sessionId, {
|
||||
const result = await client.prompt(session.sessionId, {
|
||||
prompt: [{ type: 'text', text: 'Summarize src/main.ts in one sentence.' }],
|
||||
});
|
||||
console.log('stop reason:', result.stopReason);
|
||||
|
|
@ -143,11 +143,11 @@ Two clients pointed at the same daemon and `cwd` end up on the same session:
|
|||
|
||||
```ts
|
||||
// Client A (e.g. an IDE plugin)
|
||||
const a = await clientA.createOrAttachSession({ cwd: '/work/repo' });
|
||||
const a = await clientA.createOrAttachSession({ workspaceCwd: '/work/repo' });
|
||||
console.log(a.attached); // false — A spawned the agent
|
||||
|
||||
// Client B (e.g. a web UI on the same machine)
|
||||
const b = await clientB.createOrAttachSession({ cwd: '/work/repo' });
|
||||
const b = await clientB.createOrAttachSession({ workspaceCwd: '/work/repo' });
|
||||
console.log(b.attached); // true — B joined A's session
|
||||
console.log(a.sessionId === b.sessionId); // true
|
||||
```
|
||||
|
|
@ -186,7 +186,7 @@ try {
|
|||
If your user hits Esc:
|
||||
|
||||
```ts
|
||||
await client.cancelSession(session.sessionId);
|
||||
await client.cancel(session.sessionId);
|
||||
// In the event stream you'll see the prompt resolve with stopReason: "cancelled"
|
||||
```
|
||||
|
||||
|
|
|
|||
|
|
@ -52,6 +52,12 @@ Every Stage 1 daemon advertises 9 feature tags. Clients **must** gate UI off `fe
|
|||
|
||||
## Routes
|
||||
|
||||
> **Stage 1 limitation — no `DELETE /session/:id`.** Sessions live until
|
||||
> the agent child crashes (`session_died`), the daemon process exits, or
|
||||
> a server-side `killSession` (used internally by orphan-cleanup) fires.
|
||||
> HTTP clients have no explicit "close one session" route in Stage 1.
|
||||
> An explicit `DELETE /session/:id` is on the Stage 2 polish list.
|
||||
|
||||
### `GET /health`
|
||||
|
||||
Liveness probe. Returns `200 {"status":"ok"}` if the listener is up. No auth required even when a token is configured (heartbeat-friendly).
|
||||
|
|
@ -216,6 +222,7 @@ data: {"reason":"queue_overflow","droppedAfter":42}
|
|||
Reconnect semantics:
|
||||
|
||||
- Send `Last-Event-ID: <n>` to replay events with `id > n` from the per-session ring (default depth 1000)
|
||||
- **Gap detection (client-side):** if `<n>` predates the oldest event still in the ring (e.g. you reconnect with `Last-Event-ID: 50` but the ring now holds 200–1199), the daemon replays from the oldest available event without raising. Compare the first replayed event's `id` against `n + 1`; any difference is the size of the lost window. Stage 2 will inject an explicit `stream_gap` synthetic frame on the daemon side; in Stage 1 detection is the client's responsibility.
|
||||
- IDs are monotonic per session, starting at 1
|
||||
- Synthetic terminal frames (`client_evicted`, `stream_error`) intentionally omit `id` so they don't burn a sequence slot for other subscribers
|
||||
|
||||
|
|
@ -228,6 +235,19 @@ Backpressure:
|
|||
|
||||
Cast a vote on a pending `permission_request`. **First responder wins** — once one client answers, every other client trying to answer the same id gets `404`.
|
||||
|
||||
> **Stage 1 limitation — no permission timeout.** A `permission_request`
|
||||
> stays pending until: (a) some client votes here, (b) `POST
|
||||
/session/:id/cancel` fires, (c) the HTTP client driving the prompt
|
||||
> disconnects (mid-prompt cancel resolves outstanding permissions as
|
||||
> `cancelled`), (d) the session is killed, or (e) the daemon shuts
|
||||
> down. **In a fully-headless deployment with no SSE subscriber,
|
||||
> `requestPermission` blocks the agent indefinitely** — there's nothing
|
||||
> to time out the wait. Stage 2 will add a configurable
|
||||
> `permissionTimeoutMs`. Until then, headless callers should keep an
|
||||
> SSE subscription open or wrap their prompt loop in their own timeout
|
||||
>
|
||||
> - `POST /session/:id/cancel`.
|
||||
|
||||
Request:
|
||||
|
||||
```json
|
||||
|
|
@ -268,22 +288,22 @@ The connection then closes.
|
|||
|
||||
## Environment variables
|
||||
|
||||
| Var | Purpose |
|
||||
| ------------------- | ------------------------------------------------------------------------------------------------------------ |
|
||||
| `QWEN_SERVER_TOKEN` | Bearer token. Stripped of leading/trailing whitespace at boot. |
|
||||
| `QWEN_E2E_LLM` | Set to `1` to enable LLM-required integration tests in `integration-tests/cli/qwen-serve-streaming.test.ts`. |
|
||||
| Var | Purpose |
|
||||
| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| `QWEN_SERVER_TOKEN` | Bearer token. Stripped of leading/trailing whitespace at boot. |
|
||||
| `SKIP_LLM_TESTS` | Set to `1` to **skip** LLM-required integration tests in `integration-tests/cli/qwen-serve-streaming.test.ts` (default-on for CI envs that lack provider API keys). |
|
||||
|
||||
## Source layout
|
||||
|
||||
| Path | Purpose |
|
||||
| ---------------------------------------------------- | -------------------------------------------------------- |
|
||||
| `packages/cli/src/commands/serve.ts` | yargs command + flag schema |
|
||||
| `packages/cli/src/serve/runQwenServe.ts` | listener lifecycle + signal handling |
|
||||
| `packages/cli/src/serve/server.ts` | Express routes + middleware |
|
||||
| `packages/cli/src/serve/auth.ts` | bearer + Host allowlist + CORS deny |
|
||||
| `packages/cli/src/serve/httpAcpBridge.ts` | spawn-or-attach + per-session FIFO + permission registry |
|
||||
| `packages/cli/src/serve/eventBus.ts` | bounded async queue + replay ring |
|
||||
| `packages/sdk-typescript/src/daemon/DaemonClient.ts` | TS client |
|
||||
| `packages/sdk-typescript/src/daemon/sse.ts` | EventSource frame parser |
|
||||
| `integration-tests/cli/qwen-serve-routes.test.ts` | 18 cases, no LLM |
|
||||
| `integration-tests/cli/qwen-serve-streaming.test.ts` | 4 cases, real `qwen --acp` child (`QWEN_E2E_LLM=1`) |
|
||||
| Path | Purpose |
|
||||
| ---------------------------------------------------- | ------------------------------------------------------------------ |
|
||||
| `packages/cli/src/commands/serve.ts` | yargs command + flag schema |
|
||||
| `packages/cli/src/serve/runQwenServe.ts` | listener lifecycle + signal handling |
|
||||
| `packages/cli/src/serve/server.ts` | Express routes + middleware |
|
||||
| `packages/cli/src/serve/auth.ts` | bearer + Host allowlist + CORS deny |
|
||||
| `packages/cli/src/serve/httpAcpBridge.ts` | spawn-or-attach + per-session FIFO + permission registry |
|
||||
| `packages/cli/src/serve/eventBus.ts` | bounded async queue + replay ring |
|
||||
| `packages/sdk-typescript/src/daemon/DaemonClient.ts` | TS client |
|
||||
| `packages/sdk-typescript/src/daemon/sse.ts` | EventSource frame parser |
|
||||
| `integration-tests/cli/qwen-serve-routes.test.ts` | 18 cases, no LLM |
|
||||
| `integration-tests/cli/qwen-serve-streaming.test.ts` | 3 cases, real `qwen --acp` child (skipped when `SKIP_LLM_TESTS=1`) |
|
||||
|
|
|
|||
|
|
@ -86,10 +86,12 @@ qwen serve --hostname 0.0.0.0 --port 4170
|
|||
# → boot refuses without QWEN_SERVER_TOKEN
|
||||
```
|
||||
|
||||
Clients then send `Authorization: Bearer $QWEN_SERVER_TOKEN` on every request.
|
||||
Clients then send `Authorization: Bearer $QWEN_SERVER_TOKEN` on every request **except `/health`**, which is intentionally exempt so liveness probes (k8s, Compose, monitoring) work without credentials. Use `/capabilities` to verify your token is correct end-to-end:
|
||||
|
||||
```bash
|
||||
curl -H "Authorization: Bearer $QWEN_SERVER_TOKEN" http://your-host:4170/health
|
||||
curl -H "Authorization: Bearer $QWEN_SERVER_TOKEN" http://your-host:4170/capabilities
|
||||
# → {"v":1,"mode":"http-bridge","features":[...],"modelServices":[]}
|
||||
# Wrong token → 401
|
||||
```
|
||||
|
||||
The token comparison is constant-time (SHA-256 + `crypto.timingSafeEqual`); 401 responses are uniform across "missing header", "wrong scheme", and "wrong token" so a side-channel can't distinguish.
|
||||
|
|
@ -116,7 +118,9 @@ The token comparison is constant-time (SHA-256 + `crypto.timingSafeEqual`); 401
|
|||
|
||||
## Multi-session & remote deployment
|
||||
|
||||
Each `qwen serve` process binds **one workspace**. To handle multiple workspaces or multiple users at scale you spawn multiple daemon instances behind an external orchestrator. That orchestrator (multi-tenancy / OIDC / Quota / Audit / k8s) is **out of scope** for the qwen-code project — see issue [#3803](https://github.com/QwenLM/qwen-code/issues/3803) "External Reference Architecture" for the design pointers.
|
||||
A single `qwen serve` process can manage sessions for any workspace path passed via `cwd` on `POST /session` — under the default `sessionScope: 'single'` it keeps one ACP session per canonicalized workspace, sharing it across every client that posts the same `cwd`. So one daemon will happily host sessions for many workspaces at once.
|
||||
|
||||
To handle multiple **users** (each with their own quota, audit log, sandbox) or to scale beyond one process's reach (cold-start budget, FD count, RSS), you spawn multiple daemon instances behind an external orchestrator. That orchestrator (multi-tenancy / OIDC / Quota / Audit / k8s) is **out of scope** for the qwen-code project — see issue [#3803](https://github.com/QwenLM/qwen-code/issues/3803) "External Reference Architecture" for the design pointers.
|
||||
|
||||
## What's next
|
||||
|
||||
|
|
|
|||
|
|
@ -837,66 +837,70 @@ describe('createHttpAcpBridge', () => {
|
|||
await bridge.shutdown();
|
||||
});
|
||||
|
||||
it('rejects spawnOrAttach when the agent rejects the requested model', async () => {
|
||||
it('keeps the session alive on model-switch failure and publishes model_switch_failed', async () => {
|
||||
// Contract (per #3889 review A05Ym): when the agent rejects the
|
||||
// requested model at create-session time, the session is still
|
||||
// operational on the agent's default model. The caller gets a
|
||||
// sessionId they can retry the model switch against (via
|
||||
// POST /session/:id/model) and observe via the SSE stream.
|
||||
// Tearing the session down would force the caller into a 500
|
||||
// with no way to recover.
|
||||
const { bridge } = setup({
|
||||
setModelImpl: async () => {
|
||||
throw new Error('unknown model');
|
||||
},
|
||||
});
|
||||
await expect(
|
||||
bridge.spawnOrAttach({
|
||||
workspaceCwd: WS_A,
|
||||
modelServiceId: 'definitely-not-a-real-model',
|
||||
}),
|
||||
).rejects.toBeTruthy();
|
||||
// Failed spawn must NOT leave the half-initialized session in the maps.
|
||||
// (The entry's EventBus is also closed in the same teardown — it has
|
||||
// no externally-reachable subscriber and the bus's internal `closed`
|
||||
// flag prevents future GC-blocking publishes from the in-flight
|
||||
// ClientSideConnection observer chain.)
|
||||
expect(bridge.sessionCount).toBe(0);
|
||||
await bridge.shutdown();
|
||||
});
|
||||
|
||||
it('a retry after a model-rejection failure uses a fresh EventBus', async () => {
|
||||
// Regression for the leak path: after a setSessionModel failure we
|
||||
// tear down the half-initialized session. A subsequent retry must
|
||||
// create a *new* entry (not silently reuse the old one), and old
|
||||
// events must not bleed into the new subscriber.
|
||||
let attempt = 0;
|
||||
const { bridge } = setup({
|
||||
setModelImpl: async () => {
|
||||
attempt += 1;
|
||||
if (attempt === 1) throw new Error('first attempt rejected');
|
||||
},
|
||||
});
|
||||
|
||||
await expect(
|
||||
bridge.spawnOrAttach({
|
||||
workspaceCwd: WS_A,
|
||||
modelServiceId: 'try-1',
|
||||
}),
|
||||
).rejects.toBeTruthy();
|
||||
expect(bridge.sessionCount).toBe(0);
|
||||
|
||||
const session = await bridge.spawnOrAttach({
|
||||
workspaceCwd: WS_A,
|
||||
modelServiceId: 'try-2',
|
||||
modelServiceId: 'definitely-not-a-real-model',
|
||||
});
|
||||
expect(session.attached).toBe(false);
|
||||
expect(bridge.sessionCount).toBe(1);
|
||||
|
||||
// Subscribe to the live session — should see no events from the
|
||||
// failed attempt.
|
||||
// The model_switch_failed event must be on the bus for any
|
||||
// subscriber that subscribes with `lastEventId: 0` (replay).
|
||||
const abort = new AbortController();
|
||||
const iter = bridge.subscribeEvents(session.sessionId, {
|
||||
signal: abort.signal,
|
||||
lastEventId: 0,
|
||||
});
|
||||
const it = iter[Symbol.asyncIterator]();
|
||||
// No events have been published yet; aborting closes the queue.
|
||||
const first = await it.next();
|
||||
expect(first.value?.type).toBe('model_switch_failed');
|
||||
expect(first.value?.data).toMatchObject({
|
||||
sessionId: session.sessionId,
|
||||
requestedModelId: 'definitely-not-a-real-model',
|
||||
});
|
||||
abort.abort();
|
||||
const next = await it.next();
|
||||
expect(next.done).toBe(true);
|
||||
await bridge.shutdown();
|
||||
});
|
||||
|
||||
it('attaches to the existing session on retry after a model-switch failure', async () => {
|
||||
// Per the same A05Ym contract: a follow-up `spawnOrAttach` for
|
||||
// the same workspace finds the existing session (rather than
|
||||
// re-spawning a fresh one), and a retry of the model switch
|
||||
// through `POST /session/:id/model` is the documented recovery
|
||||
// path. We exercise just the attach side here.
|
||||
const { bridge } = setup({
|
||||
setModelImpl: async () => {
|
||||
throw new Error('first attempt rejected');
|
||||
},
|
||||
});
|
||||
|
||||
const first = await bridge.spawnOrAttach({
|
||||
workspaceCwd: WS_A,
|
||||
modelServiceId: 'try-1',
|
||||
});
|
||||
expect(first.attached).toBe(false);
|
||||
expect(bridge.sessionCount).toBe(1);
|
||||
|
||||
// Second attach (no modelServiceId so we don't re-trigger the
|
||||
// failing setModel) reuses the same session.
|
||||
const second = await bridge.spawnOrAttach({
|
||||
workspaceCwd: WS_A,
|
||||
});
|
||||
expect(second.attached).toBe(true);
|
||||
expect(second.sessionId).toBe(first.sessionId);
|
||||
expect(bridge.sessionCount).toBe(1);
|
||||
|
||||
await bridge.shutdown();
|
||||
});
|
||||
|
|
@ -1681,9 +1685,13 @@ describe('createHttpAcpBridge', () => {
|
|||
await new Promise((r) => setTimeout(r, 10));
|
||||
await bridge.shutdown();
|
||||
|
||||
// Subscriber must unwind to completion (no events ever published).
|
||||
const events = await drain;
|
||||
expect(events).toEqual([]);
|
||||
// Subscriber must unwind to completion. Per #3889 review A05Ys
|
||||
// the bus now publishes a terminal `session_died` event before
|
||||
// closing on shutdown, so SSE subscribers can distinguish
|
||||
// daemon shutdown from a transient network error.
|
||||
const events = (await drain) as Array<{ type: string }>;
|
||||
expect(events).toHaveLength(1);
|
||||
expect(events[0]?.type).toBe('session_died');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -539,39 +539,37 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
|
|||
|
||||
// ACP `newSession` doesn't take a model id; honor the caller's
|
||||
// `modelServiceId` by issuing the unstable `setSessionModel` call
|
||||
// immediately after the session is established. If the agent rejects
|
||||
// the model id, surface it as a session-creation failure so the
|
||||
// caller doesn't think they got the requested model.
|
||||
// immediately after the session is established. Use the shared
|
||||
// `applyModelServiceId` helper so the call:
|
||||
// - races against `transportClosedReject` (a wedged child
|
||||
// during model switch fails fast instead of consuming the
|
||||
// full init timeout — A09HD)
|
||||
// - publishes `model_switched` / `model_switch_failed` to the
|
||||
// bus so attached clients see the result
|
||||
// - serializes through `entry.modelChangeQueue` for ordering
|
||||
// against later attach-with-different-model calls
|
||||
//
|
||||
// On failure we DO NOT tear the session down. An earlier rev
|
||||
// tore it down so the caller "didn't inherit silent drift",
|
||||
// but that killed an already-functional session and left the
|
||||
// caller with a 500 and no sessionId to retry against. The
|
||||
// session is operational on the agent's default model — the
|
||||
// `model_switch_failed` event tells subscribers exactly what
|
||||
// happened, and the caller can retry the switch via
|
||||
// `POST /session/:id/model` once they know the sessionId.
|
||||
//
|
||||
// Swallow the rejection inline so the outer try/catch (which
|
||||
// would `channel.kill()`) doesn't fire. The publish inside
|
||||
// `applyModelServiceId` is the visible signal; the caller
|
||||
// gets a 200 and a sessionId regardless, and learns of the
|
||||
// model issue via the SSE stream.
|
||||
if (modelServiceId) {
|
||||
try {
|
||||
const conn = entry.connection as unknown as {
|
||||
unstable_setSessionModel(p: {
|
||||
sessionId: string;
|
||||
modelId: string;
|
||||
}): Promise<unknown>;
|
||||
};
|
||||
await withTimeout(
|
||||
conn.unstable_setSessionModel({
|
||||
sessionId: entry.sessionId,
|
||||
modelId: modelServiceId,
|
||||
}),
|
||||
initTimeoutMs,
|
||||
'setSessionModel',
|
||||
);
|
||||
} catch (err) {
|
||||
// The session is half-initialized — a known sessionId on a real
|
||||
// child but pointing at the wrong model. Tear it down so the
|
||||
// caller can retry cleanly instead of inheriting silent drift.
|
||||
// Close the EventBus too: the agent may have published session_
|
||||
// update frames during init that are now orphaned (no subscriber
|
||||
// can ever reach them — the caller never received the sessionId
|
||||
// they would need to subscribe). Without an explicit close the
|
||||
// bus + ring buffer linger until the next GC cycle.
|
||||
byWorkspace.delete(workspaceKey);
|
||||
byId.delete(entry.sessionId);
|
||||
entry.events.close();
|
||||
throw err;
|
||||
}
|
||||
await applyModelServiceId(entry, modelServiceId, initTimeoutMs).catch(
|
||||
() => {
|
||||
// Already published `model_switch_failed`; don't take
|
||||
// down the session.
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
return {
|
||||
|
|
@ -829,7 +827,17 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
|
|||
// req.on('close')), tell the agent to wind down. ACP cancel is a
|
||||
// notification — the active prompt resolves with
|
||||
// stopReason: 'cancelled', then the next queued prompt can run.
|
||||
//
|
||||
// Also resolve any pending permission requests as `cancelled`.
|
||||
// ACP spec requires `cancel` to settle outstanding
|
||||
// `requestPermission` calls — `cancelSession()` already does
|
||||
// this; the abort path here was missing the call. Without it,
|
||||
// a client disconnecting while the agent is inside
|
||||
// `requestPermission` leaves the permission promise unresolved
|
||||
// forever (the agent is stuck waiting on a vote that no SSE
|
||||
// subscriber will ever cast).
|
||||
const onAbort = () => {
|
||||
cancelPendingForSession(sessionId);
|
||||
entry.connection.cancel({ sessionId }).catch(() => {
|
||||
// Cancel is fire-and-forget; the agent may already be dead.
|
||||
});
|
||||
|
|
@ -1012,6 +1020,19 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
|
|||
for (const id of Array.from(entry.pendingPermissionIds)) {
|
||||
resolvePending(id, { outcome: { outcome: 'cancelled' } });
|
||||
}
|
||||
// Publish `session_died` BEFORE closing the bus. After the eager
|
||||
// `byId.delete` above, the channel.exited handler's
|
||||
// `byId.get(...) !== entry` guard short-circuits, so the
|
||||
// automatic publish at crash time wouldn't fire. SSE subscribers
|
||||
// need this terminal frame to know the session is gone.
|
||||
try {
|
||||
entry.events.publish({
|
||||
type: 'session_died',
|
||||
data: { sessionId, reason: 'killed' },
|
||||
});
|
||||
} catch {
|
||||
/* bus already closed */
|
||||
}
|
||||
entry.events.close();
|
||||
await entry.channel.kill().catch(() => {
|
||||
// Best-effort kill — channel may already be dead.
|
||||
|
|
@ -1036,7 +1057,24 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
|
|||
byWorkspace.clear();
|
||||
byId.clear();
|
||||
pendingPermissions.clear();
|
||||
for (const e of entries) e.events.close();
|
||||
// Publish a terminal `session_died` BEFORE closing each bus so SSE
|
||||
// subscribers can distinguish "daemon shut down" from a transient
|
||||
// network error and don't sit indefinitely retrying. The
|
||||
// channel.exited handler also publishes this on a child crash,
|
||||
// but at shutdown time the entry has already been removed from
|
||||
// `byId` (above), so the handler's `byId.get(...) !== entry`
|
||||
// guard would short-circuit and the event would never fire.
|
||||
for (const e of entries) {
|
||||
try {
|
||||
e.events.publish({
|
||||
type: 'session_died',
|
||||
data: { sessionId: e.sessionId, reason: 'daemon_shutdown' },
|
||||
});
|
||||
} catch {
|
||||
/* bus already closed */
|
||||
}
|
||||
e.events.close();
|
||||
}
|
||||
// Wait for in-flight spawns too. The snapshot above only sees
|
||||
// sessions already in `byId`; a `doSpawn` past `channelFactory()`
|
||||
// but still inside `connection.newSession` (~1s on cold start) is
|
||||
|
|
@ -1187,19 +1225,29 @@ export const defaultSpawnChannelFactory: ChannelFactory = async (
|
|||
// is a `.ts` file Node can't run; users should `npm run build` before
|
||||
// `qwen serve` or set `process.execPath` to a tsx-aware shim. Stage 1
|
||||
// accepts this — the daemon is meant for built deployments.
|
||||
// Pass an *allowlisted* environment to the child (see
|
||||
// ALLOWED_CHILD_ENV_KEYS at module scope for the rationale + the
|
||||
// allowlist itself). Move-out keeps the Set allocated once, not
|
||||
// per-spawn.
|
||||
const childEnv: NodeJS.ProcessEnv = {};
|
||||
for (const key of ALLOWED_CHILD_ENV_KEYS) {
|
||||
const v = process.env[key];
|
||||
if (typeof v === 'string') childEnv[key] = v;
|
||||
// Pass through the daemon's full environment to the child, scrubbing
|
||||
// ONLY daemon-internal secrets (see SCRUBBED_CHILD_ENV_KEYS at module
|
||||
// scope). An earlier version used an allowlist, but that broke the
|
||||
// common deployment shape: users export `OPENAI_API_KEY` /
|
||||
// `ANTHROPIC_API_KEY` / `QWEN_*` / `DASHSCOPE_API_KEY` / a custom
|
||||
// `modelProviders[].envKey` to authenticate the agent's LLM calls,
|
||||
// and core's model config resolves those from `process.env`. An
|
||||
// exhaustive allowlist can't enumerate user-defined provider keys,
|
||||
// so the agent ends up unable to authenticate.
|
||||
//
|
||||
// Threat-model rationale: the agent already runs as the same UID
|
||||
// with shell-tool access — anything in `~/.bashrc`, `~/.npmrc`,
|
||||
// `~/.aws/credentials`, etc. is reachable by prompt injection
|
||||
// regardless of what we put in `env`. The env passthrough is not
|
||||
// the security boundary; the user-as-trust-root is. The only thing
|
||||
// we MUST scrub is `QWEN_SERVER_TOKEN` (daemon-only auth that
|
||||
// would let a prompt-injected shell turn the agent into an
|
||||
// authenticated client of its own daemon — escalation the agent
|
||||
// doesn't otherwise have).
|
||||
const childEnv: NodeJS.ProcessEnv = { ...process.env };
|
||||
for (const key of SCRUBBED_CHILD_ENV_KEYS) {
|
||||
delete childEnv[key];
|
||||
}
|
||||
// Defense-in-depth: the explicit allowlist is exhaustive but the
|
||||
// delete keeps the security review trail readable — anyone grepping
|
||||
// for `QWEN_SERVER_TOKEN` finds the scrub explicitly named.
|
||||
delete childEnv['QWEN_SERVER_TOKEN'];
|
||||
// CodeQL `js/path-injection` flags the `cwd: workspaceCwd` flow.
|
||||
// Stage 1 trust model accepts this — see the function-level comment
|
||||
// above for the design rationale. Defense-in-depth: the cwd is
|
||||
|
|
@ -1257,54 +1305,20 @@ export const defaultSpawnChannelFactory: ChannelFactory = async (
|
|||
const KILL_HARD_DEADLINE_MS = 10_000;
|
||||
|
||||
/**
|
||||
* Environment variables the spawned `qwen --acp` child is allowed to
|
||||
* inherit from the daemon's environment. Anything else is dropped —
|
||||
* the agent runs user-supplied prompts with shell-tool access, so
|
||||
* everything in its env is reachable by prompt injection: API keys
|
||||
* (OPENAI/ANTHROPIC/GEMINI/DASHSCOPE/...), DB passwords, AWS/GCP
|
||||
* credentials, OAuth tokens, secrets your operator forgot they
|
||||
* exported. A denylist requires perfect knowledge of every secret
|
||||
* anyone might set; an allowlist requires only that we know what
|
||||
* the agent legitimately needs.
|
||||
* Environment variables stripped from the spawned `qwen --acp` child's
|
||||
* environment. Everything else is passed through — see the
|
||||
* threat-model rationale at the call site in `defaultSpawnChannelFactory`.
|
||||
*
|
||||
* What's needed: enough to launch Node + load the entry script and
|
||||
* resolve user-config files (HOME), enough to find binaries and
|
||||
* standard tools (PATH), enough to localize and identify the user
|
||||
* for any session-state lookups (LANG/LC_*, USER/LOGNAME). Anything
|
||||
* beyond this list should be passed to the agent through ACP, not
|
||||
* through the spawn env.
|
||||
* Currently just `QWEN_SERVER_TOKEN`: the daemon's own bearer token,
|
||||
* which the agent doesn't need (it speaks to the daemon over stdio,
|
||||
* not HTTP). Leaving it in the child's env would let prompt injection
|
||||
* turn the agent into an authenticated client of its own daemon — an
|
||||
* escalation the agent doesn't otherwise have.
|
||||
*
|
||||
* Per-platform extras: NODE-specific knobs (NODE_OPTIONS, NODE_PATH)
|
||||
* and shell-driven temp dir overrides (TMPDIR/TEMP/TMP) are usually
|
||||
* benign but can carry instructions; include only the temp-dir ones
|
||||
* since the agent legitimately writes scratch files. Windows-only
|
||||
* SYSTEMROOT/USERPROFILE/APPDATA are required by Node itself on
|
||||
* Windows or the spawn fails.
|
||||
*
|
||||
* Defined at module scope so the Set is allocated once at load,
|
||||
* not rebuilt on every `defaultSpawnChannelFactory` call.
|
||||
* Defined at module scope so the Set is allocated once at load.
|
||||
*/
|
||||
const ALLOWED_CHILD_ENV_KEYS: ReadonlySet<string> = new Set([
|
||||
'HOME',
|
||||
'PATH',
|
||||
'USER',
|
||||
'LOGNAME',
|
||||
'LANG',
|
||||
'LC_ALL',
|
||||
'LC_CTYPE',
|
||||
'LC_COLLATE',
|
||||
'LC_MESSAGES',
|
||||
'TMPDIR',
|
||||
'TEMP',
|
||||
'TMP',
|
||||
'NODE_PATH',
|
||||
// Windows essentials — Node refuses to spawn without these on Win32.
|
||||
'SYSTEMROOT',
|
||||
'USERPROFILE',
|
||||
'APPDATA',
|
||||
'LOCALAPPDATA',
|
||||
'COMSPEC',
|
||||
'PATHEXT',
|
||||
const SCRUBBED_CHILD_ENV_KEYS: ReadonlySet<string> = new Set([
|
||||
'QWEN_SERVER_TOKEN',
|
||||
]);
|
||||
|
||||
function killChild(child: ChildProcess): Promise<void> {
|
||||
|
|
|
|||
|
|
@ -679,10 +679,16 @@ describe('createServeApp', () => {
|
|||
expect(res.status).toBe(200);
|
||||
});
|
||||
|
||||
// Switched probe endpoint from `/health` to `/capabilities` for
|
||||
// these auth-rejection tests because per #3889 review A8dZT
|
||||
// `/health` is now intentionally registered BEFORE the bearer
|
||||
// middleware so liveness probes work without credentials.
|
||||
// `/capabilities` is the cheapest endpoint that still goes through
|
||||
// the auth chain.
|
||||
it('rejects missing Authorization header when token is set', async () => {
|
||||
const app = createServeApp({ ...baseOpts, token: 'secret' });
|
||||
const res = await request(app)
|
||||
.get('/health')
|
||||
.get('/capabilities')
|
||||
.set('Host', `127.0.0.1:${baseOpts.port}`);
|
||||
expect(res.status).toBe(401);
|
||||
});
|
||||
|
|
@ -690,7 +696,7 @@ describe('createServeApp', () => {
|
|||
it('rejects wrong scheme', async () => {
|
||||
const app = createServeApp({ ...baseOpts, token: 'secret' });
|
||||
const res = await request(app)
|
||||
.get('/health')
|
||||
.get('/capabilities')
|
||||
.set('Host', `127.0.0.1:${baseOpts.port}`)
|
||||
.set('Authorization', 'Basic c2VjcmV0');
|
||||
expect(res.status).toBe(401);
|
||||
|
|
@ -699,7 +705,7 @@ describe('createServeApp', () => {
|
|||
it('rejects wrong token', async () => {
|
||||
const app = createServeApp({ ...baseOpts, token: 'secret' });
|
||||
const res = await request(app)
|
||||
.get('/health')
|
||||
.get('/capabilities')
|
||||
.set('Host', `127.0.0.1:${baseOpts.port}`)
|
||||
.set('Authorization', 'Bearer wrong');
|
||||
expect(res.status).toBe(401);
|
||||
|
|
@ -708,11 +714,27 @@ describe('createServeApp', () => {
|
|||
it('accepts the right token', async () => {
|
||||
const app = createServeApp({ ...baseOpts, token: 'secret' });
|
||||
const res = await request(app)
|
||||
.get('/health')
|
||||
.get('/capabilities')
|
||||
.set('Host', `127.0.0.1:${baseOpts.port}`)
|
||||
.set('Authorization', 'Bearer secret');
|
||||
expect(res.status).toBe(200);
|
||||
});
|
||||
|
||||
it('exempts /health from bearer auth so liveness probes work without credentials', async () => {
|
||||
// Per #3889 review A8dZT — the registration order in
|
||||
// `createServeApp` puts `/health` BEFORE `bearerAuth`, so a
|
||||
// probe with no credentials still gets 200 even when the daemon
|
||||
// was started with a token. CORS deny + Host allowlist still
|
||||
// apply to `/health` (registered before /health), so this is
|
||||
// not a way to bypass DNS rebinding or browser-origin
|
||||
// protection.
|
||||
const app = createServeApp({ ...baseOpts, token: 'secret' });
|
||||
const res = await request(app)
|
||||
.get('/health')
|
||||
.set('Host', `127.0.0.1:${baseOpts.port}`);
|
||||
expect(res.status).toBe(200);
|
||||
expect(res.body).toEqual({ status: 'ok' });
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -61,13 +61,21 @@ export function createServeApp(
|
|||
// amplified CPU/memory cost from any wrong-token client.
|
||||
app.use(denyBrowserOriginCors);
|
||||
app.use(hostAllowlist(opts.hostname, getPort));
|
||||
app.use(bearerAuth(opts.token));
|
||||
app.use(express.json({ limit: '10mb' }));
|
||||
|
||||
// `/health` is registered BEFORE `bearerAuth` so liveness probes work
|
||||
// without credentials even when the daemon was started with a
|
||||
// `--token` (k8s/Compose probes typically don't carry the daemon's
|
||||
// bearer; round-tripping a 401 just to know the listener is up is
|
||||
// pure waste). CORS deny + Host allowlist still apply, so a browser
|
||||
// or a wrong-Host probe is still rejected. Documented exemption in
|
||||
// `docs/developers/qwen-serve-protocol.md`.
|
||||
app.get('/health', (_req, res) => {
|
||||
res.status(200).json({ status: 'ok' });
|
||||
});
|
||||
|
||||
app.use(bearerAuth(opts.token));
|
||||
app.use(express.json({ limit: '10mb' }));
|
||||
|
||||
app.get('/capabilities', (_req, res) => {
|
||||
const envelope: CapabilitiesEnvelope = {
|
||||
v: CAPABILITIES_SCHEMA_VERSION,
|
||||
|
|
@ -309,7 +317,17 @@ export function createServeApp(
|
|||
// payload in user-space memory unboundedly — a slow consumer on a
|
||||
// chatty session can balloon daemon RSS. Wait for `drain` (or
|
||||
// close/error) before scheduling the next write.
|
||||
const writeWithBackpressure = (chunk: string): Promise<void> =>
|
||||
//
|
||||
// Concurrency: serialize ALL writes through a per-connection chain
|
||||
// so the heartbeat (fire-and-forget interval, see below) can't
|
||||
// interleave with the main event-write loop. Without serialization,
|
||||
// the heartbeat firing while the main loop is mid-`drain` await
|
||||
// would issue a second `res.write()` that bypasses the
|
||||
// backpressure guard — and could even interleave bytes between two
|
||||
// SSE frames on the wire. The chain is single-flight: each call
|
||||
// waits for the previous write to settle before scheduling its own.
|
||||
let writeChain: Promise<void> = Promise.resolve();
|
||||
const doWrite = (chunk: string): Promise<void> =>
|
||||
new Promise((resolve, reject) => {
|
||||
if (res.writableEnded) {
|
||||
resolve();
|
||||
|
|
@ -341,6 +359,15 @@ export function createServeApp(
|
|||
res.once('close', onClose);
|
||||
res.once('error', onError);
|
||||
});
|
||||
const writeWithBackpressure = (chunk: string): Promise<void> => {
|
||||
const next = writeChain.then(() => doWrite(chunk));
|
||||
// Tail-swallow rejections on the chain itself so a single failed
|
||||
// write doesn't poison every subsequent call. The CALLER's
|
||||
// returned promise still rejects — chain-internal failures are
|
||||
// someone else's problem, not blockers for queueing.
|
||||
writeChain = next.catch(() => undefined);
|
||||
return next;
|
||||
};
|
||||
|
||||
// Tell EventSource to retry after 3s on disconnect. Awaiting drain on
|
||||
// the very first write is overkill but cheap — `ok` is true the
|
||||
|
|
@ -446,6 +473,21 @@ export function createServeApp(
|
|||
res.status(400).json({ error: 'Invalid JSON in request body' });
|
||||
return;
|
||||
}
|
||||
// body-parser raises a typed error with `status: 413` when a
|
||||
// request body exceeds the `express.json({ limit: '10mb' })`
|
||||
// ceiling. Without this branch it falls through to the 500 path
|
||||
// and clients see a misleading "Internal server error" instead
|
||||
// of a clear "payload too large" — which is the kind of error
|
||||
// they can actually act on (chunk the request, raise the limit).
|
||||
if (
|
||||
err &&
|
||||
typeof err === 'object' &&
|
||||
'status' in err &&
|
||||
(err as { status: number }).status === 413
|
||||
) {
|
||||
res.status(413).json({ error: 'Request body too large (max 10 MB)' });
|
||||
return;
|
||||
}
|
||||
writeStderrLine(
|
||||
`qwen serve: unhandled error: ${err instanceof Error ? (err.stack ?? err.message) : String(err)}`,
|
||||
);
|
||||
|
|
|
|||
|
|
@ -355,7 +355,11 @@ export class DaemonClient {
|
|||
* but exposed as `static` on `AbortSignal`, so cautious feature-detect plus
|
||||
* a polyfill keeps us honest if a runtime ships a stripped-down `AbortSignal`.
|
||||
*/
|
||||
function abortTimeout(ms: number): AbortSignal {
|
||||
// Exported solely for direct unit testing — production callers go
|
||||
// through `fetchWithTimeout` above, but the polyfill paths only fire
|
||||
// on Node 18.0–20.2 (where `AbortSignal.any` is missing) and that
|
||||
// runtime can't easily be exercised from the public API surface.
|
||||
export function abortTimeout(ms: number): AbortSignal {
|
||||
const tFn = (
|
||||
AbortSignal as unknown as { timeout?: (ms: number) => AbortSignal }
|
||||
).timeout;
|
||||
|
|
@ -394,7 +398,8 @@ function abortTimeout(ms: number): AbortSignal {
|
|||
* listeners after the first fire is best-effort), but for `fetch`-style
|
||||
* single-shot use the difference is invisible.
|
||||
*/
|
||||
function composeAbortSignals(signals: AbortSignal[]): AbortSignal {
|
||||
// Exported solely for direct unit testing — see note on `abortTimeout`.
|
||||
export function composeAbortSignals(signals: AbortSignal[]): AbortSignal {
|
||||
const anyFn = (
|
||||
AbortSignal as unknown as { any?: (s: AbortSignal[]) => AbortSignal }
|
||||
).any;
|
||||
|
|
|
|||
|
|
@ -8,6 +8,8 @@ import { describe, it, expect, vi } from 'vitest';
|
|||
import {
|
||||
DaemonClient,
|
||||
DaemonHttpError,
|
||||
abortTimeout,
|
||||
composeAbortSignals,
|
||||
} from '../../src/daemon/DaemonClient.js';
|
||||
|
||||
function jsonResponse(status: number, body: unknown): Response {
|
||||
|
|
@ -480,35 +482,42 @@ describe('DaemonClient', () => {
|
|||
expect(elapsed).toBeLessThan(2000);
|
||||
});
|
||||
|
||||
it('aborts when the caller-supplied signal fires (composeAbortSignals path)', async () => {
|
||||
// Verifies that `composeAbortSignals` actually forwards the
|
||||
// caller's abort to the underlying fetch — the polyfill path
|
||||
// is the only one Node 18.0–20.2 takes for signal composition.
|
||||
const fetch = vi.fn(
|
||||
(_input: RequestInfo | URL, init?: RequestInit) =>
|
||||
new Promise<Response>((_res, rej) => {
|
||||
init?.signal?.addEventListener('abort', () =>
|
||||
rej(new DOMException('aborted', 'AbortError')),
|
||||
);
|
||||
}),
|
||||
) as unknown as typeof globalThis.fetch;
|
||||
const client = new DaemonClient({
|
||||
baseUrl: 'http://daemon',
|
||||
fetch,
|
||||
fetchTimeoutMs: 60_000, // long, so the caller's abort is what fires
|
||||
});
|
||||
const ctrl = new AbortController();
|
||||
setTimeout(() => ctrl.abort(), 30);
|
||||
await expect(
|
||||
// subscribeEvents forwards opts.signal through fetchWithTimeout
|
||||
// — pick a method whose typed surface accepts a signal.
|
||||
(async () => {
|
||||
const iter = client.subscribeEvents('s-1', { signal: ctrl.signal });
|
||||
for await (const _ of iter) {
|
||||
void _;
|
||||
}
|
||||
})(),
|
||||
).rejects.toThrow();
|
||||
it('composeAbortSignals forwards the first abort, with or without native AbortSignal.any', async () => {
|
||||
// Direct-unit test on the helper — `subscribeEvents` bypasses
|
||||
// `fetchWithTimeout` entirely (it calls `_fetch` directly with
|
||||
// the caller's signal), so testing through subscribeEvents
|
||||
// never exercises the polyfill. Calling `composeAbortSignals`
|
||||
// here covers it on all Node versions: native (`>=20.3`) and
|
||||
// polyfill (`18.0`–`20.2`) take the same input shape.
|
||||
const a = new AbortController();
|
||||
const b = new AbortController();
|
||||
const composed = composeAbortSignals([a.signal, b.signal]);
|
||||
expect(composed.aborted).toBe(false);
|
||||
a.abort(new DOMException('first', 'AbortError'));
|
||||
// The composed signal should follow whichever input fires first.
|
||||
// Allow a microtask for native AbortSignal.any propagation.
|
||||
await Promise.resolve();
|
||||
expect(composed.aborted).toBe(true);
|
||||
});
|
||||
|
||||
it('composeAbortSignals fires immediately if any input is already aborted', () => {
|
||||
const a = new AbortController();
|
||||
a.abort();
|
||||
const b = new AbortController();
|
||||
const composed = composeAbortSignals([a.signal, b.signal]);
|
||||
expect(composed.aborted).toBe(true);
|
||||
});
|
||||
|
||||
it('abortTimeout fires after the configured delay', async () => {
|
||||
const t0 = Date.now();
|
||||
const sig = abortTimeout(40);
|
||||
await new Promise<void>((resolve) =>
|
||||
sig.addEventListener('abort', () => resolve(), { once: true }),
|
||||
);
|
||||
const elapsed = Date.now() - t0;
|
||||
// Generous tolerance — just checking the timer fires.
|
||||
expect(elapsed).toBeGreaterThanOrEqual(30);
|
||||
expect(elapsed).toBeLessThan(2000);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue