feat(serve): close 3 chiga0 audit items — ringSize 4000, --max-sessions, /health?deep=1 (#3803)

Three "30-minute" items from chiga0's external architecture audit
(2026-05-11). All actionable within Stage 1 scope; remaining items
in chiga0's review (SaaS positioning, multi-token to Stage 1.5,
acp-bridge package extraction, reference orchestrator) are larger
scoping decisions deferred to Stage 1.5/2.

DEFAULT_RING_SIZE 1000 → 4000 (Risk 4):
- A single long turn can emit hundreds of frames (test plan reports
  13 for a SHORT turn, real workloads can be 10× that). 1000 was
  exhausted by a moderate turn before a 5s reconnect window
  finished. 4000 gives ~30× headroom over a typical busy turn at
  the cost of a few hundred KB RAM/session. Updated user + protocol
  docs and the daemon-client-quickstart example.

--max-sessions <n> (default 20) (Rec 3):
- New `ServeOptions.maxSessions` + matching `BridgeOptions`. Bridge
  throws `SessionLimitExceededError` when `byId.size +
  inFlightSpawns.size >= max` BEFORE issuing a fresh spawn. Attaches
  to existing sessions (single scope) bypass the cap so an idle
  daemon's reconnects keep working at-capacity. `0` disables.
  Default of 20 sized below the design's N≈50 cliff (per-session
  ~30–50 MB RSS + FD pressure). HTTP route maps to 503 with
  `Retry-After: 5` and `code: session_limit_exceeded`. Tests cover:
  cap rejection under thread scope, attach-not-counted under single
  scope, `0` disables. Documented in CLI flags table + protocol
  Common-error section.

/health?deep=1 (Risk 3):
- Default `/health` stays cheap (no bridge access). With `?deep=1`
  the response includes `sessions` and `pendingPermissions` from
  the bridge — touches state so a wedged bridge surfaces as 503
  `{status: "degraded"}` instead of "200 ok" on a zombie daemon
  (the `k8s rolling deploy will see healthy` failure mode chiga0
  flagged). Loopback-vs-non-loopback bearer-exempt logic from the
  earlier A8dZT fix is preserved via a shared handler. Tests cover:
  cheap default, deep response shape, throwing-getter → 503.
This commit is contained in:
wenshao 2026-05-12 07:35:23 +08:00
parent 6f98dc6bef
commit 66ffd7cc66
11 changed files with 303 additions and 19 deletions

View file

@ -113,7 +113,7 @@ for await (const event of client.subscribeEvents(session.sessionId, {
}
```
The daemon retains the last 1000 events per session in a ring buffer; gaps beyond that window won't be re-deliverable.
The daemon retains the last 4000 events per session in a ring buffer; gaps beyond that window won't be re-deliverable.
## Voting on permissions

View file

@ -40,6 +40,18 @@ with status `400`.
with status `404`.
`POST /session` past the daemon's `--max-sessions` cap returns `503` with a `Retry-After: 5` header and:
```json
{
"error": "Session limit reached (20)",
"code": "session_limit_exceeded",
"limit": 20
}
```
Attaches to existing sessions are NOT counted toward the cap, so an idle daemon's reconnects keep working even when at-capacity.
## Capabilities
Every Stage 1 daemon advertises 9 feature tags. Clients **must** gate UI off `features`, not off `mode` (per design §10).
@ -60,7 +72,15 @@ Every Stage 1 daemon advertises 9 feature tags. Clients **must** gate UI off `fe
### `GET /health`
Liveness probe. Returns `200 {"status":"ok"}` if the listener is up.
Liveness probe. Default form returns `200 {"status":"ok"}` if the listener is up — cheap, no bridge access, suitable for high-frequency k8s/Compose liveness probes.
Pass `?deep=1` (also accepts `?deep=true` or bare `?deep`) for a probe that touches bridge state. The deep probe is what catches a "zombie daemon" — listener answering, bridge wedged:
```json
{ "status": "ok", "sessions": 3, "pendingPermissions": 1 }
```
A 503 with `{"status":"degraded"}` indicates the bridge state access threw — pull the daemon out of rotation and investigate.
**Auth:** required **only on non-loopback binds**. On loopback (`127.0.0.1`, `::1`, `[::1]`) `/health` is registered before the bearer middleware so k8s/Compose probes inside the pod don't need to carry the token. On non-loopback (`--hostname 0.0.0.0` etc.) the route is registered after the bearer middleware and returns 401 without a valid token — otherwise an unauthenticated caller could probe arbitrary addresses to confirm a `qwen serve` exists, a low-severity info leak that combines poorly with port scanning. CORS deny + Host allowlist still apply on the loopback exemption.
@ -237,7 +257,7 @@ The SSE-level `id:` / `event:` lines duplicate `envelope.id` / `envelope.type` f
Reconnect semantics:
- Send `Last-Event-ID: <n>` to replay events with `id > n` from the per-session ring (default depth 1000)
- Send `Last-Event-ID: <n>` to replay events with `id > n` from the per-session ring (default depth 4000)
- **Gap detection (client-side):** if `<n>` predates the oldest event still in the ring (e.g. you reconnect with `Last-Event-ID: 50` but the ring now holds 2001199), the daemon replays from the oldest available event without raising. Compare the first replayed event's `id` against `n + 1`; any difference is the size of the lost window. Stage 2 will inject an explicit `stream_gap` synthetic frame on the daemon side; in Stage 1 detection is the client's responsibility.
- IDs are monotonic per session, starting at 1
- Synthetic terminal frames (`client_evicted`, `stream_error`) intentionally omit `id` so they don't burn a sequence slot for other subscribers

View file

@ -57,7 +57,7 @@ curl -N http://127.0.0.1:4170/session/$SESSION_ID/events
The `data:` line is the **full event envelope**`{id?, v, type, data, originatorClientId?}` — JSON-stringified on a single line. The ACP payload (the `sessionUpdate` block in this example) sits under `data` inside that envelope. The SSE-level `id:` / `event:` lines are convenience for EventSource clients; the same values appear inside the JSON envelope so raw-`fetch` consumers get them too.
Open this **before** sending the prompt — the SSE replay buffer holds the
last 1000 events so a late subscriber can catch up via `Last-Event-ID`,
last 4000 events so a late subscriber can catch up via `Last-Event-ID`,
but for the simple "watch a single prompt" case it's easiest to subscribe
first and let it stream live.
@ -100,12 +100,13 @@ The token comparison is constant-time (SHA-256 + `crypto.timingSafeEqual`); 401
## CLI flags
| Flag | Default | Purpose |
| ------------------- | ----------- | ----------------------------------------------------------------------------------------------------------------------------------- |
| `--port <n>` | `4170` | TCP port. `0` = OS-assigned ephemeral port. |
| `--hostname <addr>` | `127.0.0.1` | Bind interface. Anything beyond loopback requires a token. |
| `--token <str>` | — | Bearer token. Falls back to `QWEN_SERVER_TOKEN` env var (with leading/trailing whitespace stripped — handy for `$(cat token.txt)`). |
| `--http-bridge` | `true` | Stage 1 mode: per-session `qwen --acp` child process. Stage 2 native in-process becomes available later. |
| Flag | Default | Purpose |
| -------------------- | ----------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `--port <n>` | `4170` | TCP port. `0` = OS-assigned ephemeral port. |
| `--hostname <addr>` | `127.0.0.1` | Bind interface. Anything beyond loopback requires a token. |
| `--token <str>` | — | Bearer token. Falls back to `QWEN_SERVER_TOKEN` env var (with leading/trailing whitespace stripped — handy for `$(cat token.txt)`). |
| `--max-sessions <n>` | `20` | Cap on concurrent live sessions. New `POST /session` requests that would spawn a fresh child return `503` (with `Retry-After: 5`) when the cap is hit; attaches to existing sessions are NOT counted. Set to `0` to disable. Sized for single-user / small-team usage; raise it if your deployment has the RAM/FD headroom (~3050 MB per session). |
| `--http-bridge` | `true` | Stage 1 mode: per-session `qwen --acp` child process. Stage 2 native in-process becomes available later. |
## Default deployment threat model

View file

@ -16,6 +16,7 @@ interface ServeArgs {
port: number;
hostname: string;
token?: string;
'max-sessions': number;
// Read from the kebab-case key only — the camelCase mirror that yargs
// synthesizes is convenient for handlers but type-confusing here. The
// handler reads `argv['http-bridge']` directly.
@ -45,6 +46,13 @@ export const serveCommand: CommandModule<unknown, ServeArgs> = {
description:
'Bearer token required on every request. Falls back to the QWEN_SERVER_TOKEN env var.',
})
.option('max-sessions', {
type: 'number',
default: 20,
description:
'Cap on concurrent live sessions. New spawn requests beyond this return 503; ' +
'attach to existing sessions still works. Set to 0 to disable.',
})
.option('http-bridge', {
type: 'boolean',
default: true,
@ -79,6 +87,7 @@ export const serveCommand: CommandModule<unknown, ServeArgs> = {
hostname: argv.hostname,
token: argv.token,
mode: 'http-bridge',
maxSessions: argv['max-sessions'],
});
} catch (err) {
writeStderrLine(

View file

@ -61,7 +61,17 @@ export interface SubscribeOptions {
}
const DEFAULT_MAX_QUEUED = 256;
const DEFAULT_RING_SIZE = 1000;
/**
* Default replay-ring depth per session. Sized for a 5-second
* reconnect window over a chatty turn a single long-running prompt
* can emit hundreds of frames (test plan reports 13 for a short
* turn, real workloads can be 10× that or more once tool-call /
* thought streams pile up). 1000 was the original default and could
* be exhausted by a moderate turn before the client reconnected;
* 4000 gives ~30× headroom over a typical-but-busy turn at the cost
* of a few hundred KB of RAM per session.
*/
const DEFAULT_RING_SIZE = 4000;
/**
* Per-bus subscriber cap. With per-subscriber `maxQueued` defaulting to
* 256 frames, 64 concurrent subscribers caps the per-session subscriber
@ -133,7 +143,7 @@ export class EventBus {
...input,
};
this.ring.push(event);
// Eviction-by-shift is O(n) once the ring is full. With ringSize=1000
// Eviction-by-shift is O(n) once the ring is full. With ringSize=4000
// and per-publish work measured in hundreds of microseconds even on
// chatty sessions, this isn't a real hotspot today. A circular-buffer
// refactor would push it to O(1) but adds index bookkeeping; deferred

View file

@ -1785,4 +1785,87 @@ describe('createHttpAcpBridge', () => {
expect(events[0]?.type).toBe('session_died');
});
});
describe('maxSessions cap (chiga0 Rec 3)', () => {
it('refuses NEW spawns past the cap with SessionLimitExceededError', async () => {
const factory: ChannelFactory = async () => makeChannel().channel;
const bridge = createHttpAcpBridge({
channelFactory: factory,
maxSessions: 2,
// `thread` so each call is a fresh session, not an attach.
sessionScope: 'thread',
});
// First two spawns succeed.
await bridge.spawnOrAttach({ workspaceCwd: WS_A });
await bridge.spawnOrAttach({ workspaceCwd: WS_B });
expect(bridge.sessionCount).toBe(2);
// Third hits the cap.
await expect(
bridge.spawnOrAttach({ workspaceCwd: WS_A }),
).rejects.toMatchObject({
name: 'SessionLimitExceededError',
limit: 2,
});
// Cap rejection must NOT register a new session.
expect(bridge.sessionCount).toBe(2);
await bridge.shutdown();
});
it('attach to an existing session under single scope is NOT counted toward the cap', async () => {
const factory: ChannelFactory = async () => makeChannel().channel;
const bridge = createHttpAcpBridge({
channelFactory: factory,
maxSessions: 1,
sessionScope: 'single',
});
// First call spawns.
const a = await bridge.spawnOrAttach({ workspaceCwd: WS_A });
expect(a.attached).toBe(false);
expect(bridge.sessionCount).toBe(1);
// Second call to the SAME workspace attaches — cap doesn't apply.
const b = await bridge.spawnOrAttach({ workspaceCwd: WS_A });
expect(b.attached).toBe(true);
expect(b.sessionId).toBe(a.sessionId);
expect(bridge.sessionCount).toBe(1);
// But a NEW workspace (would need a fresh spawn) is rejected.
await expect(
bridge.spawnOrAttach({ workspaceCwd: WS_B }),
).rejects.toMatchObject({
name: 'SessionLimitExceededError',
});
await bridge.shutdown();
});
it('maxSessions: 0 disables the cap', async () => {
// Distinct sessionIdPrefix per spawn so each call gets a unique
// sessionId (otherwise they'd collide in `byId` and only the
// last would remain — making `sessionCount` stay at 1).
let n = 0;
const factory: ChannelFactory = async () =>
makeChannel({ sessionIdPrefix: `s${n++}` }).channel;
const bridge = createHttpAcpBridge({
channelFactory: factory,
maxSessions: 0,
sessionScope: 'thread',
});
// 5 spawns is far past the would-be default of 20 isn't, but
// it's enough to confirm the cap is disabled (with default of
// 20 a thread-scope flood could go 5 deep without hitting it
// anyway, so we use a smaller test value with 0/disabled
// explicit so a regression that re-enabled some default cap
// would still surface).
for (let i = 0; i < 5; i++) {
await bridge.spawnOrAttach({ workspaceCwd: WS_A });
}
expect(bridge.sessionCount).toBe(5);
await bridge.shutdown();
});
});
});

View file

@ -177,6 +177,22 @@ export class SessionNotFoundError extends Error {
}
}
/**
* Thrown by `spawnOrAttach` when a fresh-spawn would push `sessionCount`
* past `BridgeOptions.maxSessions`. The HTTP route maps this to 503
* with a `Retry-After` hint. Attaches (same workspace under `single`
* scope) never trip this only NEW children. Distinct error type so
* routes can branch without text-matching.
*/
export class SessionLimitExceededError extends Error {
readonly limit: number;
constructor(limit: number) {
super(`Session limit reached (${limit})`);
this.name = 'SessionLimitExceededError';
this.limit = limit;
}
}
/**
* One ACP NDJSON channel to a single agent. Tests inject a fake by replacing
* the channel factory; production uses `defaultSpawnChannelFactory`.
@ -208,6 +224,14 @@ export interface BridgeOptions {
channelFactory?: ChannelFactory;
/** How long to wait for the child's `initialize` reply before giving up. */
initializeTimeoutMs?: number;
/**
* Cap on concurrent live sessions. `spawnOrAttach` calls that would
* cross this throw `SessionLimitExceededError`; attaches to an
* existing session (same workspace under `single` scope) are not
* counted. `0` / `Infinity` disable the cap. Defaults to 20 see
* `ServeOptions.maxSessions` for the rationale.
*/
maxSessions?: number;
}
interface SessionEntry {
@ -394,9 +418,21 @@ class BridgeClient implements Client {
}
const DEFAULT_INIT_TIMEOUT_MS = 10_000;
const DEFAULT_MAX_SESSIONS = 20;
export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
const sessionScope = opts.sessionScope ?? 'single';
// `undefined` → default 20 (intentionally tight per #3803 N≈50 cliff).
// `0` or non-finite (Infinity / NaN) → unlimited.
// Positive finite → use as-is.
let maxSessions: number;
if (opts.maxSessions === undefined) {
maxSessions = DEFAULT_MAX_SESSIONS;
} else if (opts.maxSessions <= 0 || !Number.isFinite(opts.maxSessions)) {
maxSessions = Infinity;
} else {
maxSessions = opts.maxSessions;
}
if (sessionScope !== 'single' && sessionScope !== 'thread') {
throw new TypeError(
`Invalid sessionScope: ${JSON.stringify(sessionScope)}. ` +
@ -801,6 +837,14 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
}
}
// Cap check: count both registered sessions and in-flight spawns
// (a fresh-spawn races that's about to register hasn't hit
// `byId` yet but should still count toward the limit). Attaches
// returned above bypass this — only NEW children are gated.
if (byId.size + inFlightSpawns.size >= maxSessions) {
throw new SessionLimitExceededError(maxSessions);
}
const promise = doSpawn(workspaceKey, req.modelServiceId);
// Track in-flight spawns regardless of scope. Under `single`
// this also serves the coalescing path above (a parallel

View file

@ -74,7 +74,8 @@ export async function runQwenServe(
);
}
const bridge = deps.bridge ?? createHttpAcpBridge();
const bridge =
deps.bridge ?? createHttpAcpBridge({ maxSessions: opts.maxSessions });
let actualPort = opts.port;
const app = createServeApp(opts, () => actualPort, { bridge });

View file

@ -17,6 +17,7 @@ import type {
SetSessionModelResponse,
} from '@agentclientprotocol/sdk';
import {
SessionLimitExceededError,
SessionNotFoundError,
type BridgeSession,
type BridgeSessionSummary,
@ -758,6 +759,69 @@ describe('createServeApp', () => {
expect(bridge.calls).toHaveLength(0);
});
});
describe('GET /health?deep=1 (chiga0 Risk 3)', () => {
it('default /health stays cheap (no bridge touch)', async () => {
const bridge = fakeBridge();
const app = createServeApp(baseOpts, undefined, { bridge });
const res = await request(app)
.get('/health')
.set('Host', `127.0.0.1:${baseOpts.port}`);
expect(res.status).toBe(200);
expect(res.body).toEqual({ status: 'ok' });
});
it('deep=1 includes bridge state', async () => {
const bridge = fakeBridge();
const app = createServeApp(baseOpts, undefined, { bridge });
const res = await request(app)
.get('/health?deep=1')
.set('Host', `127.0.0.1:${baseOpts.port}`);
expect(res.status).toBe(200);
expect(res.body).toMatchObject({
status: 'ok',
sessions: 0,
pendingPermissions: 0,
});
});
it('deep=1 returns 503 when bridge state access throws', async () => {
// Simulate a wedged bridge by replacing the getter to throw.
const bridge = fakeBridge();
Object.defineProperty(bridge, 'sessionCount', {
get() {
throw new Error('bridge wedged');
},
});
const app = createServeApp(baseOpts, undefined, { bridge });
const res = await request(app)
.get('/health?deep=1')
.set('Host', `127.0.0.1:${baseOpts.port}`);
expect(res.status).toBe(503);
expect(res.body).toEqual({ status: 'degraded' });
});
});
describe('session limit (chiga0 Rec 3 — --max-sessions)', () => {
it('503 + Retry-After + structured error when bridge throws SessionLimitExceededError', async () => {
const bridge = fakeBridge({
spawnImpl: async () => {
throw new SessionLimitExceededError(20);
},
});
const app = createServeApp(baseOpts, undefined, { bridge });
const res = await request(app)
.post('/session')
.set('Host', `127.0.0.1:${baseOpts.port}`)
.send({ cwd: '/work/a' });
expect(res.status).toBe(503);
expect(res.headers['retry-after']).toBe('5');
expect(res.body).toMatchObject({
code: 'session_limit_exceeded',
limit: 20,
});
});
});
});
describe('runQwenServe', () => {

View file

@ -12,6 +12,7 @@ import { bearerAuth, denyBrowserOriginCors, hostAllowlist } from './auth.js';
import { isLoopbackBind } from './loopbackBinds.js';
import {
createHttpAcpBridge,
SessionLimitExceededError,
SessionNotFoundError,
type HttpAcpBridge,
} from './httpAcpBridge.js';
@ -75,11 +76,39 @@ export function createServeApp(
// gate `/health` behind their token alongside everything else.
// CORS deny + Host allowlist still apply to `/health` in both
// cases.
// Shared handler so loopback (pre-auth) and non-loopback (post-auth)
// routes return the same shape. `?deep=1` opts into a probe that
// touches bridge state — if the bridge is wedged the property
// accesses throw and we surface 503 so k8s probes can distinguish
// a zombie daemon from a healthy one. Default (no query) stays
// cheap so high-frequency liveness probes don't load the bridge.
const healthHandler = (
req: import('express').Request,
res: import('express').Response,
): void => {
const deepQuery = req.query['deep'];
const deep = deepQuery === '1' || deepQuery === 'true' || deepQuery === '';
if (!deep) {
res.status(200).json({ status: 'ok' });
return;
}
try {
res.status(200).json({
status: 'ok',
sessions: bridge.sessionCount,
pendingPermissions: bridge.pendingPermissionCount,
});
} catch (err) {
writeStderrLine(
`qwen serve: /health deep probe failed: ${err instanceof Error ? err.message : String(err)}`,
);
res.status(503).json({ status: 'degraded' });
}
};
const loopback = isLoopbackBind(opts.hostname);
if (loopback) {
app.get('/health', (_req, res) => {
res.status(200).json({ status: 'ok' });
});
app.get('/health', healthHandler);
}
app.use(bearerAuth(opts.token));
@ -89,9 +118,7 @@ export function createServeApp(
// Non-loopback: register `/health` AFTER `bearerAuth` so probes
// must carry the token. Otherwise unauthenticated callers can
// ping any reachable address:port to confirm a daemon exists.
app.get('/health', (_req, res) => {
res.status(200).json({ status: 'ok' });
});
app.get('/health', healthHandler);
}
app.get('/capabilities', (_req, res) => {
@ -624,6 +651,20 @@ function sendBridgeError(res: import('express').Response, err: unknown): void {
res.status(404).json({ error: err.message, sessionId: err.sessionId });
return;
}
if (err instanceof SessionLimitExceededError) {
// 503 Service Unavailable + `Retry-After` is the canonical
// "we'd serve you, but we're full right now" shape. The hint
// is intentionally conservative (5s) because a session that
// finishes a prompt frees a slot quickly under normal load;
// a client that backs off too aggressively wastes capacity.
res.set('Retry-After', '5');
res.status(503).json({
error: err.message,
code: 'session_limit_exceeded',
limit: err.limit,
});
return;
}
// 5xx is the kind of error operators need to see in their daemon log
// — bridge ENOMEM, agent stack trace, unexpected throw, etc. Without
// logging here every 500 disappears once the caller consumes the

View file

@ -25,6 +25,17 @@ export interface ServeOptions {
*/
token?: string;
mode: ServeMode;
/**
* Cap on concurrent live sessions. Once `bridge.sessionCount` reaches
* this, new `POST /session` requests that would spawn fresh sessions
* return 503. Attaching to an existing session (same workspace under
* `sessionScope: 'single'`) still works so an idle daemon doesn't
* block reconnects from existing users. Defaults to 20: comfortably
* above single-user usage, well below the design's N50 cliff where
* per-session RSS (~3050 MB) and FD pressure start to bite. Set to
* `0` or `Infinity` to disable.
*/
maxSessions?: number;
}
/**