diff --git a/docs/users/qwen-serve.md b/docs/users/qwen-serve.md index 02a1640f0..66c1b8acc 100644 --- a/docs/users/qwen-serve.md +++ b/docs/users/qwen-serve.md @@ -4,7 +4,7 @@ Run Qwen Code as a local HTTP daemon so multiple clients (IDE plugins, web UIs, > **Status:** Stage 1 (experimental). The protocol surface is locked at the §04 routes table from issue [#3803](https://github.com/QwenLM/qwen-code/issues/3803). Stage 1.5 (`qwen --serve` flag — TUI co-hosts the same HTTP server) and Stage 2 (in-process refactor + `mDNS`/OpenAPI/WebSocket/Prometheus polish) are immediately downstream. > -> **Scope honesty:** Stage 1 is sized for **developers prototyping clients against the protocol surface** and for **local single-user / small-team collaboration**. Production-grade multi-client / long-running / network-flaky workloads (mobile companions, IM bots reaching 1000+ chats) need Stage 1.5+ guarantees that aren't in this release. Stage 1's bridge also spawns one `qwen --acp` child per session, so opening many parallel sessions on the same workspace pays N× memory until the Stage 1.5 bridge refactor lands (see [§ N parallel sessions](#n-parallel-sessions-stage-1-cost-vs-stage-15-fix) below — qwen-code's ACP agent already supports multi-session in one process; the bridge just doesn't leverage it yet). See [Stage 1.5+ runtime guarantees](#stage-15-runtime-guarantees) for the full gap list and #3803 for the convergence roadmap. +> **Scope honesty:** Stage 1 is sized for **developers prototyping clients against the protocol surface** and for **local single-user / small-team collaboration**. Production-grade multi-client / long-running / network-flaky workloads (mobile companions, IM bots reaching 1000+ chats) need Stage 1.5+ guarantees that aren't in this release. See [Stage 1.5+ runtime guarantees](#stage-15-runtime-guarantees) for the full gap list and #3803 for the convergence roadmap. ## What it gives you @@ -183,7 +183,7 @@ Stage 1's contract is sized for prototyping. Per [#3889 chiga0 downstream-consum **Blockers for serious downstream use:** 1. **Per-request `sessionScope` override** on `POST /session` — today the daemon-wide default is the only setting; a VSCode extension can't say "I want a private session for this window" against a daemon configured for shared sessions. -2. **`loadSession` / `unstable_resumeSession` over HTTP** — without this, no integration can survive a child crash or daemon restart, and the "1 daemon = 1 session, just spawn another" model is internally inconsistent (orchestrators can't recover state either). +2. **`loadSession` / `unstable_resumeSession` over HTTP** — without this, no integration can survive a child crash or daemon restart, and any orchestrator coordinating the daemon can't recover state either. 3. **Persistent client identity (pair tokens + per-client revocation)** — Stage 1 uses one shared bearer; a leaked token revokes everyone, and `originatorClientId` is client-self-declared rather than daemon-stamped from authenticated identity. **Reliability baseline:** @@ -238,36 +238,28 @@ In this mode, TUI is a **"super-client"** — it observes the same agent convers (B) is the more ambitious answer but locks Stage 1.5 into a substantially larger wire surface that must also pass cleanly through the planned in-process refactor. We'd rather walk the smaller scope honestly. The session-state-event taxonomy work — enumerating which TUI flows are local-only by design vs. could plausibly graduate to wire under a future opt-in (B)-flavor extension — moves to [#3803](https://github.com/QwenLM/qwen-code/issues/3803), not Stage 1.5 code. -### N parallel sessions: Stage 1 cost vs. Stage 1.5 fix +### N parallel sessions share one `qwen --acp` child -**Correction to earlier framing** (per [LaZzyMan's comment](https://github.com/QwenLM/qwen-code/pull/3889#issuecomment-4428498510) and verified against `packages/cli/src/acp-integration/acpAgent.ts:194`): `qwen --acp` natively supports multi-session in a single child process (`private sessions: Map` keyed by `sessionId`). The yiliang114 VSCode plugin already uses this — one `qwen --acp` process, many sessions over its stdio. **Multi-session resource sharing is an ACP / qwen-code capability today, not a future roadmap item.** +Multiple sessions on the same workspace **share one `qwen --acp` child process** via the agent's native multi-session support (`packages/cli/src/acp-integration/acpAgent.ts:194: private sessions: Map`). The bridge calls `connection.newSession({cwd, mcpServers})` for each session — the agent stores them in its sessions map and demultiplexes per-call sessionId. -The Stage 1 bridge in this PR doesn't yet leverage it — see `httpAcpBridge.ts:707`: +Concrete cost at N=5 sessions on the same workspace: -```ts -const channel = await channelFactory(workspaceKey); // spawns a new qwen --acp child -... -const newSessionResp = await connection.newSession({…}); // single newSession per child -``` +| Resource | Per session | At N=5 | +| ------------------------------------ | ----------- | ---------------------------- | +| Daemon Node process | one | **30–50 MB** (one daemon) | +| `qwen --acp` child | shared | **60–100 MB** (one child) | +| MCP server children | per-session | 3×N if configs differ | +| `FileReadCache` (in-child heap) | shared | parsed once | +| `CLAUDE.md` / hierarchy memory parse | shared | parsed once | +| OAuth refresh-token state | shared | **one refresh path** | +| Auto-memory learned facts | shared | one knowledge base per child | +| Cold start | first only | <200 ms after first session | -Every fresh session spawns its own child for simplicity (one channel state per child = easier debugging, no cross-session interference during Stage 1 stabilization). The cost-per-session table below describes what THIS bridge pays today, not what's architecturally possible. +The bridge keeps **one channel per workspace** (cross-workspace sharing is intentionally not done — different workspaces have different settings/auth scope, and `acpAgent.ts:601` reloads settings per newSession `cwd`, which would interfere). The channel stays alive while at least one session is live; the last `killSession` (or a channel-level crash) kills the child. -| Resource | Per session (Stage 1 bridge) | At N=5 (Stage 1) | At N=5 (Stage 1.5 bridge) | -| ------------------------------------ | ---------------------------- | ---------------- | ----------------------------- | -| Daemon Node process | ~30–50 MB RSS | **150–250 MB** | **30–50 MB** (one daemon) | -| `qwen --acp` child + sandbox | ~60–100 MB RSS | **300–500 MB** | **60–100 MB** (one child) | -| MCP server children | 3 per session | **15 processes** | 3 if config shared, else 3×N | -| `FileReadCache` (in-child heap) | independent | same files 5× | shared (one child) | -| `CLAUDE.md` / hierarchy memory parse | independent | 5× parse | parsed once per workspace cwd | -| OAuth refresh-token state | independent | 5 parallel | **shared (one process)** | -| Auto-memory learned facts | independent | fragments | shared | -| Cold start | 1–3 s per new session | each window pays | <200 ms after first session | +**MCP server children** are still per-session today — each session's config can specify different servers, so they're independently spawned. Stage 1.5 follow-up: refcount MCP server children by `(workspace, config-hash)` so identical configs share. Not in scope for this PR. -**Stage 1 (this PR) — accept the N× cost as a temporary design choice.** If your workflow needs many parallel sessions on the same workspace today, prefer in-session topic switching, separate workspaces, or accept the cost. - -**Stage 1.5 — bridge refactor to leverage ACP multi-session.** The natural fix: keep one `qwen --acp` child per workspace (or daemon-wide), and call `connection.newSession({ cwd, mcpServers })` multiple times on it. Sessions share the child's process / OAuth / file cache / hierarchy-memory parse; MCP children remain per-session unless the config is identical (configurable). This is a **bridge-side change**, NOT the `#3803 §21 Path A/B/C` intra-daemon multi-session workstream — qwen-code already does intra-process multi-session at the agent layer; the bridge just needs to plug into it. Marked inline in `httpAcpBridge.ts` (`FIXME(stage-1.5)` on `channelFactory`). - -**Peer-agent comparison context.** Cursor / Continue / Claude Code / OpenCode / Gemini CLI all do single-process multi-session at their agent layer. **qwen-code does too** — the Stage 1 bridge is the artifact that currently spawns a child per session, and Stage 1.5 closes that gap. +**Peer agents (Cursor / Continue / Claude Code / OpenCode / Gemini CLI) all do single-process multi-session.** qwen-code matches them at the agent layer; the Stage 1 bridge in this PR makes the same architecture visible over HTTP. ## What's next diff --git a/packages/cli/src/serve/httpAcpBridge.test.ts b/packages/cli/src/serve/httpAcpBridge.test.ts index a33d3ffcd..c9af28836 100644 --- a/packages/cli/src/serve/httpAcpBridge.test.ts +++ b/packages/cli/src/serve/httpAcpBridge.test.ts @@ -38,6 +38,7 @@ import { type AcpChannel, type ChannelFactory, } from './httpAcpBridge.js'; +import type { BridgeEvent } from './eventBus.js'; // Workspace fixtures must round-trip through `path.resolve` so the // expected values match what the bridge canonicalizes internally on @@ -88,7 +89,13 @@ class FakeAgent implements Agent { async newSession(p: NewSessionRequest): Promise { this.newSessionCalls.push(p); const prefix = this.opts.sessionIdPrefix ?? 'sess'; - return { sessionId: `${prefix}:${p.cwd}` }; + // Stage 1.5 multi-session: one FakeAgent can host multiple + // sessions (same as the real ACP agent), so each newSession call + // returns a fresh id. Suffix by call-count so tests that issue + // multiple newSession on the same channel get distinct ids. + const count = this.newSessionCalls.length; + const suffix = count === 1 ? '' : `#${count}`; + return { sessionId: `${prefix}:${p.cwd}${suffix}` }; } async loadSession(_p: LoadSessionRequest): Promise { @@ -252,7 +259,7 @@ describe('createHttpAcpBridge', () => { await bridge.shutdown(); }); - it('spawns fresh per call under sessionScope:thread', async () => { + it('creates fresh session per call under sessionScope:thread (Stage 1.5 multi-session: shares channel)', async () => { const handles: ChannelHandle[] = []; const factory: ChannelFactory = async () => { const h = makeChannel({ sessionIdPrefix: `s${handles.length}` }); @@ -267,10 +274,14 @@ describe('createHttpAcpBridge', () => { const first = await bridge.spawnOrAttach({ workspaceCwd: WS_A }); const second = await bridge.spawnOrAttach({ workspaceCwd: WS_A }); + // Distinct sessions, both freshly created (neither is an attach). expect(first.sessionId).not.toBe(second.sessionId); expect(first.attached).toBe(false); expect(second.attached).toBe(false); - expect(handles).toHaveLength(2); + // Stage 1.5 multi-session: the two thread-scope calls SHARE the + // workspace's `qwen --acp` child. Only one `channelFactory` call. + // Each `newSession()` call to the agent produces a distinct id. + expect(handles).toHaveLength(1); expect(bridge.sessionCount).toBe(2); await bridge.shutdown(); @@ -2229,5 +2240,119 @@ describe('createHttpAcpBridge', () => { expect(bridge.sessionCount).toBe(5); await bridge.shutdown(); }); + + it('Stage 1.5 multi-session: N sessions on same workspace share ONE channel', async () => { + // The headline of the Stage 1.5 refactor — multiple thread-scope + // sessions on one workspace pay for one `qwen --acp` child, not + // N children. LaZzyMan + tanzhenxin pushed for this; the agent + // already supports it via `acpAgent.ts:194 sessions: + // Map`. + let factoryCalls = 0; + const factory: ChannelFactory = async () => { + factoryCalls++; + return makeChannel({ sessionIdPrefix: `s${factoryCalls}` }).channel; + }; + const bridge = createHttpAcpBridge({ + channelFactory: factory, + maxSessions: 0, + sessionScope: 'thread', + }); + // Spin up 5 sessions on the same workspace. + const sessions = await Promise.all( + Array.from({ length: 5 }, () => + bridge.spawnOrAttach({ workspaceCwd: WS_A }), + ), + ); + // 5 distinct sessions... + expect(new Set(sessions.map((s) => s.sessionId)).size).toBe(5); + expect(bridge.sessionCount).toBe(5); + // ...but only ONE channelFactory call (= one child process). + expect(factoryCalls).toBe(1); + await bridge.shutdown(); + }); + + it('Stage 1.5: killSession on one of N sessions does NOT kill the shared channel', async () => { + // Counterpart guarantee: tearing down one session must not take + // its siblings with it. The channel stays alive while + // `channelInfo.sessionIds.size > 0`. + const handles: ChannelHandle[] = []; + const factory: ChannelFactory = async () => { + const h = makeChannel({ sessionIdPrefix: `s${handles.length}` }); + handles.push(h); + return h.channel; + }; + const bridge = createHttpAcpBridge({ + channelFactory: factory, + sessionScope: 'thread', + }); + const a = await bridge.spawnOrAttach({ workspaceCwd: WS_A }); + const b = await bridge.spawnOrAttach({ workspaceCwd: WS_A }); + const c = await bridge.spawnOrAttach({ workspaceCwd: WS_A }); + expect(handles).toHaveLength(1); + // Kill one — the other two stay. + await bridge.killSession(b.sessionId); + expect(bridge.sessionCount).toBe(2); + expect(handles[0]?.killed).toBe(false); + // Kill the second — last one alive. + await bridge.killSession(a.sessionId); + expect(bridge.sessionCount).toBe(1); + expect(handles[0]?.killed).toBe(false); + // Kill the last — NOW the channel is killed. + await bridge.killSession(c.sessionId); + expect(bridge.sessionCount).toBe(0); + expect(handles[0]?.killed).toBe(true); + await bridge.shutdown(); + }); + + it('Stage 1.5: channel.exited tears down ALL multiplexed sessions', async () => { + // When the shared child dies (crash, kill, network gone), all + // sessions on it die together — they're truly co-fated. Each + // session's bus gets its own `session_died` event so each SSE + // subscriber learns the bad news on their own stream. + const handles: ChannelHandle[] = []; + const factory: ChannelFactory = async () => { + const h = makeChannel({ sessionIdPrefix: `s${handles.length}` }); + handles.push(h); + return h.channel; + }; + const bridge = createHttpAcpBridge({ + channelFactory: factory, + sessionScope: 'thread', + }); + const a = await bridge.spawnOrAttach({ workspaceCwd: WS_A }); + const b = await bridge.spawnOrAttach({ workspaceCwd: WS_A }); + const c = await bridge.spawnOrAttach({ workspaceCwd: WS_A }); + expect(bridge.sessionCount).toBe(3); + + // Subscribe so we can observe each session_died. + const eventsByA: BridgeEvent[] = []; + const eventsByB: BridgeEvent[] = []; + const eventsByC: BridgeEvent[] = []; + const drainA = (async () => { + for await (const ev of bridge.subscribeEvents(a.sessionId)) + eventsByA.push(ev); + })(); + const drainB = (async () => { + for await (const ev of bridge.subscribeEvents(b.sessionId)) + eventsByB.push(ev); + })(); + const drainC = (async () => { + for await (const ev of bridge.subscribeEvents(c.sessionId)) + eventsByC.push(ev); + })(); + // Let the subscriptions register before crashing. + await new Promise((r) => setImmediate(r)); + + // Simulate channel-level crash (child exited). + handles[0]?.crash(); + await Promise.all([drainA, drainB, drainC]); + + expect(eventsByA[eventsByA.length - 1]?.type).toBe('session_died'); + expect(eventsByB[eventsByB.length - 1]?.type).toBe('session_died'); + expect(eventsByC[eventsByC.length - 1]?.type).toBe('session_died'); + expect(bridge.sessionCount).toBe(0); + + await bridge.shutdown(); + }); }); }); diff --git a/packages/cli/src/serve/httpAcpBridge.ts b/packages/cli/src/serve/httpAcpBridge.ts index 4316d04cc..a6afbf349 100644 --- a/packages/cli/src/serve/httpAcpBridge.ts +++ b/packages/cli/src/serve/httpAcpBridge.ts @@ -304,6 +304,40 @@ export interface BridgeOptions { maxSessions?: number; } +/** + * One `qwen --acp` child + the ACP connection on top of it, shared by + * all SessionEntries whose workspace maps to this channel. Stage 1.5 + * multi-session work (per LaZzyMan / tanzhenxin reviews) leverages + * the agent's native `sessions: Map` (see + * `acp-integration/acpAgent.ts:194`) so multiple `newSession()` calls + * on one channel get separate session ids while sharing the child's + * process / OAuth / file-cache / hierarchy-memory parse. + * + * Lifetime: created on first `spawnOrAttach` for a workspace, kept + * alive while `sessionIds.size > 0`, and killed by `killSession` when + * the last entry leaves OR by `channel.exited` when the child dies. + * Cross-workspace channel sharing is intentionally NOT done in this + * bridge — `acpAgent.ts:601 (this.settings = loadSettings(cwd))` + * replaces the cached settings on each newSession call, so different + * workspaces in one child would step on each other's settings. One + * channel per workspace is the safe scope for Stage 1.5. + */ +interface ChannelInfo { + channel: AcpChannel; + connection: ClientSideConnection; + /** Shared BridgeClient — its methods route ACP params by sessionId. */ + client: BridgeClient; + workspaceCwd: string; + /** + * Live session ids multiplexed on this channel. Updated when + * `doSpawn` registers a new session and when `killSession` / + * `channel.exited` removes one. When the set drops to empty AND no + * session is mid-attach, the channel is killed and removed from + * `byWorkspaceChannel`. + */ + sessionIds: Set; +} + interface SessionEntry { sessionId: string; workspaceCwd: string; @@ -387,7 +421,20 @@ interface PendingPermission { */ class BridgeClient implements Client { constructor( - private readonly resolveEntry: () => SessionEntry | undefined, + /** + * Look up the `SessionEntry` for an ACP call. Stage 1.5 multi- + * session on one channel means `BridgeClient` is shared across + * many sessions, so we can't bind the entry in a closure — we + * dispatch by the `sessionId` ACP includes in every per-session + * notification / request. `undefined` sessionId is the fallback + * for ACP calls that don't carry one (none expected on the + * client surface as of this writing) and resolves to whatever + * the channel's most-recent entry is — kept defensive to avoid + * silent drops if ACP grows a no-sessionId call. + */ + private readonly resolveEntry: ( + sessionId?: string, + ) => SessionEntry | undefined, private readonly registerPending: (pending: PendingPermission) => void, /** * Roll back a `registerPending` call when the subsequent publish @@ -412,7 +459,7 @@ class BridgeClient implements Client { async requestPermission( params: RequestPermissionRequest, ): Promise { - const entry = this.resolveEntry(); + const entry = this.resolveEntry(params.sessionId); if (!entry) return { outcome: { outcome: 'cancelled' } }; const requestId = randomUUID(); @@ -454,7 +501,7 @@ class BridgeClient implements Client { } async sessionUpdate(params: SessionNotification): Promise { - const entry = this.resolveEntry(); + const entry = this.resolveEntry(params.sessionId); if (!entry) return; entry.events.publish({ type: 'session_update', data: params }); } @@ -627,15 +674,27 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge { ); } - // Single-scope reuse keyed by canonical workspace path. - // Cleanup-on-crash: `channel.exited` (registered at line ~469) removes - // the entry from both maps + cancels pending permissions + publishes - // `session_died` + closes the EventBus when the child process dies - // between requests. Stage 2's in-process bridge eliminates the - // spawned-child failure mode entirely — but for Stage 1, the next - // prompt against a crashed session sees `SessionNotFoundError` - // (cleanup raced ahead of the request), not a hung channel write. + // Single-scope reuse keyed by canonical workspace path. Tracks the + // SessionEntry that a same-workspace attach should re-use. With + // Stage 1.5 multi-session per channel, this points at the FIRST + // session created for the workspace under `single` scope; under + // `thread` scope additional sessions on the same workspace don't + // overwrite this entry. const byWorkspace = new Map(); + // Stage 1.5 multi-session: one channel per workspace, N sessions + // multiplex on it via `connection.newSession({cwd, mcpServers})`. + // `byWorkspaceChannel.get(workspaceKey)` returns the shared channel + // for spawn-vs-reuse decisions in `doSpawn`. Channel is kept alive + // while `sessionIds.size > 0`; the last `killSession` (or the + // `channel.exited` cleanup) drops the entry from this map. + const byWorkspaceChannel = new Map(); + // Coalesces concurrent channel-spawn requests for the same workspace + // (regardless of sessionScope). Without this, two parallel callers + // would both `channelFactory(workspaceKey)` and one of the + // spawned children would never make it into `byWorkspaceChannel`, + // becoming a permanent orphan. Cleared in the `finally` of the + // creator regardless of outcome. + const inFlightChannelSpawns = new Map>(); const byId = new Map(); // Daemon-wide pending permission table; requestIds are UUIDs so collisions // across sessions are infeasible in practice. @@ -700,177 +759,196 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge { return true; }; - async function doSpawn( + /** + * Get-or-create the shared `qwen --acp` channel for a workspace. + * Stage 1.5 multi-session: one channel hosts N sessions via + * `connection.newSession()`. Concurrent callers coalesce through + * `inFlightChannelSpawns` so we never spawn two children for one + * workspace. The returned `ChannelInfo` is shared — caller adds + * their session id to `sessionIds` and uses `info.connection.newSession()`. + * + * Wires up the one-and-only `channel.exited` cleanup on first + * creation so the late-arriving event tears down ALL sessions on + * the channel (vs. the previous 1-session-per-channel design where + * each entry registered its own listener). + */ + async function getOrCreateChannel( workspaceKey: string, - modelServiceId?: string, - ): Promise { - // FIXME(stage-1.5): the bridge spawns one `qwen --acp` child per - // session today. But the ACP agent in `acp-integration/acpAgent.ts` - // (line ~194: `private sessions: Map`) natively - // supports multiple concurrent sessions inside one child process - // — yiliang114's VSCode plugin already uses this pattern. Stage - // 1.5 should refactor here to keep one child per workspace (or - // daemon-wide) and call `connection.newSession({ cwd, mcpServers })` - // multiple times on the same channel. Sessions then share the - // child's OAuth state / FileReadCache / CLAUDE.md parse / process - // overhead. This is a bridge-side change; it does NOT require the - // #3803 §21 Path A/B/C intra-daemon multi-session workstream - // (qwen-code already does that at the agent layer; the bridge - // just doesn't plug into it). Pairs naturally with the - // `@qwen-code/acp-bridge` package lift (chiga0 finding 1) — - // expose a `newSession()` method on the `AcpChannel` interface - // distinct from channel creation. - const channel = await channelFactory(workspaceKey); - let entry: SessionEntry | undefined; - const client = new BridgeClient( - () => entry, - registerPending, - (rid) => - // Roll back a register-then-publish-failed pending so the agent - // doesn't hang waiting on a vote nobody can see. - resolvePending(rid, { outcome: { outcome: 'cancelled' } }), - ); - const connection = new ClientSideConnection(() => client, channel.stream); + ): Promise { + const existing = byWorkspaceChannel.get(workspaceKey); + if (existing) return existing; + const inFlight = inFlightChannelSpawns.get(workspaceKey); + if (inFlight) return await inFlight; - try { - await withTimeout( - connection.initialize({ - protocolVersion: PROTOCOL_VERSION, - clientCapabilities: { - fs: { readTextFile: true, writeTextFile: true }, - }, - clientInfo: { name: 'qwen-serve-bridge', version: '0' }, - }), - initTimeoutMs, - 'initialize', - ); - const newSessionResp = await withTimeout( - connection.newSession({ - cwd: workspaceKey, - mcpServers: [], - }), - initTimeoutMs, - 'newSession', + const promise = (async () => { + const channel = await channelFactory(workspaceKey); + const client = new BridgeClient( + (sessionId) => (sessionId ? byId.get(sessionId) : undefined), + registerPending, + (rid) => + // Roll back a register-then-publish-failed pending so the agent + // doesn't hang waiting on a vote nobody can see. + resolvePending(rid, { outcome: { outcome: 'cancelled' } }), ); + const connection = new ClientSideConnection(() => client, channel.stream); - // Late-shutdown re-check (BUy4U): shutdown() may have flipped - // while we were in `connection.initialize` or - // `connection.newSession` (the ACP handshake takes ~1s on cold - // start). If we commit to the maps now, the snapshot in - // shutdown() already missed this entry — child would leak past - // `process.exit(0)`. Tear down what we have and surface to the - // caller. - // - // This re-check is the LOAD-BEARING correctness contract, not - // a band-aid: `shutdown()` deliberately starts tearing down - // already-registered sessions in parallel with awaiting - // `inFlightSpawns` (faster fan-out), and relies on this - // re-check to catch any spawn whose `newSession` returns AFTER - // shutdown flipped the flag. The alternative — await all - // in-flight spawns to settle BEFORE snapshotting byId — is - // cleaner to reason about but serializes shutdown by - // up-to-`initTimeoutMs` (10s default) before any already-live - // session starts tearing down. We chose parallel + re-check. - // Both A and B coalesced callers see the same rejection - // because `doSpawn`'s promise (cached in `inFlightSpawns`) is - // what rejects. + try { + await withTimeout( + connection.initialize({ + protocolVersion: PROTOCOL_VERSION, + clientCapabilities: { + fs: { readTextFile: true, writeTextFile: true }, + }, + clientInfo: { name: 'qwen-serve-bridge', version: '0' }, + }), + initTimeoutMs, + 'initialize', + ); + } catch (err) { + await channel.kill().catch(() => {}); + throw err; + } + + // Late-shutdown re-check: if shutdown flipped during `initialize`, + // tear this channel down rather than leak past `process.exit(0)`. if (shuttingDown) { await channel.kill().catch(() => {}); throw new Error('HttpAcpBridge is shutting down'); } - entry = { - sessionId: newSessionResp.sessionId, - workspaceCwd: workspaceKey, + + const info: ChannelInfo = { channel, connection, - events: new EventBus(), - promptQueue: Promise.resolve(), - modelChangeQueue: Promise.resolve(), - pendingPermissionIds: new Set(), - attachCount: 0, + client, + workspaceCwd: workspaceKey, + sessionIds: new Set(), }; - byWorkspace.set(workspaceKey, entry); - byId.set(entry.sessionId, entry); + byWorkspaceChannel.set(workspaceKey, info); - // Cleanup if the child terminates between requests. `channel.exited` - // resolves both for planned shutdown (we already removed the entry - // before calling kill, so the `byId.get(...) === entry` check is - // false and this is a no-op) AND for unplanned crashes (entry is - // still in the maps → cancel pending permissions, publish a - // `session_died` event so live SSE subscribers learn the session is - // gone, close the bus, drop from maps). - const liveEntry = entry; - void channel.exited.then((info) => { - if (byId.get(liveEntry.sessionId) !== liveEntry) return; - cancelPendingForSession(liveEntry.sessionId); - try { - liveEntry.events.publish({ - type: 'session_died', - data: { - sessionId: liveEntry.sessionId, - reason: 'channel_closed', - // BX9_P: thread `exitCode` / `signalCode` from the - // spawn factory through so operators triaging a crash - // can read the cause from the SSE frame instead of - // grepping stderr for the pid. - exitCode: info?.exitCode ?? null, - signalCode: info?.signalCode ?? null, - }, - }); - } catch { - /* bus already closed */ + // One-time channel.exited cleanup. The child dying takes ALL + // multiplexed sessions with it — iterate `sessionIds` (snapshot + // first to be safe against concurrent killSession during + // iteration), publish `session_died` on each session's bus, + // remove from byId / byWorkspace / pending tables. + void channel.exited.then((exitInfo) => { + const stillOurs = byWorkspaceChannel.get(workspaceKey) === info; + if (stillOurs) byWorkspaceChannel.delete(workspaceKey); + const sessions = Array.from(info.sessionIds); + info.sessionIds.clear(); + for (const sid of sessions) { + const sessEntry = byId.get(sid); + if (!sessEntry) continue; + cancelPendingForSession(sid); + try { + sessEntry.events.publish({ + type: 'session_died', + data: { + sessionId: sid, + reason: 'channel_closed', + // BX9_P: thread exitCode/signalCode through. + exitCode: exitInfo?.exitCode ?? null, + signalCode: exitInfo?.signalCode ?? null, + }, + }); + } catch { + /* bus already closed */ + } + byId.delete(sid); + if (byWorkspace.get(sessEntry.workspaceCwd) === sessEntry) { + byWorkspace.delete(sessEntry.workspaceCwd); + } + sessEntry.events.close(); } - byWorkspace.delete(liveEntry.workspaceCwd); - byId.delete(liveEntry.sessionId); - liveEntry.events.close(); }); - // ACP `newSession` doesn't take a model id; honor the caller's - // `modelServiceId` by issuing the unstable `setSessionModel` call - // immediately after the session is established. Use the shared - // `applyModelServiceId` helper so the call: - // - races against `transportClosedReject` (a wedged child - // during model switch fails fast instead of consuming the - // full init timeout — A09HD) - // - publishes `model_switched` / `model_switch_failed` to the - // bus so attached clients see the result - // - serializes through `entry.modelChangeQueue` for ordering - // against later attach-with-different-model calls - // - // On failure we DO NOT tear the session down. An earlier rev - // tore it down so the caller "didn't inherit silent drift", - // but that killed an already-functional session and left the - // caller with a 500 and no sessionId to retry against. The - // session is operational on the agent's default model — the - // `model_switch_failed` event tells subscribers exactly what - // happened, and the caller can retry the switch via - // `POST /session/:id/model` once they know the sessionId. - // - // Swallow the rejection inline so the outer try/catch (which - // would `channel.kill()`) doesn't fire. The publish inside - // `applyModelServiceId` is the visible signal; the caller - // gets a 200 and a sessionId regardless, and learns of the - // model issue via the SSE stream. - if (modelServiceId) { - await applyModelServiceId(entry, modelServiceId, initTimeoutMs).catch( - () => { - // Already published `model_switch_failed`; don't take - // down the session. - }, - ); - } + return info; + })(); - return { - sessionId: entry.sessionId, - workspaceCwd: entry.workspaceCwd, - attached: false, - }; - } catch (err) { - await channel.kill().catch(() => {}); - throw err; + inFlightChannelSpawns.set(workspaceKey, promise); + try { + return await promise; + } finally { + inFlightChannelSpawns.delete(workspaceKey); } } + async function doSpawn( + workspaceKey: string, + modelServiceId?: string, + ): Promise { + // Stage 1.5 multi-session: get-or-create the channel for this + // workspace, then call `connection.newSession()` on it. Sessions + // share the child's process / OAuth / file-cache / hierarchy- + // memory parse via the agent's `sessions: Map` + // (see `acp-integration/acpAgent.ts:194`). + // newSession on an established channel can fail (auth, config, + // etc.) without the channel dying. We DON'T kill the channel on + // newSession failure: other sessions on it may still be live — + // they'd lose their work for a problem orthogonal to them. The + // edge case of "newSession fails on a freshly-created channel + // with no live sessions" leaks an empty channel until something + // else (channel.exited / shutdown) cleans it up; that's much + // better than killing other live sessions, and the + // empty-sessionIds set lets the NEXT caller retry through that + // same channel if it's still healthy. + const channelInfo = await getOrCreateChannel(workspaceKey); + const newSessionResp = await withTimeout( + channelInfo.connection.newSession({ + cwd: workspaceKey, + mcpServers: [], + }), + initTimeoutMs, + 'newSession', + ); + + // Late-shutdown re-check (BUy4U): shutdown() may have flipped + // while we were in `connection.newSession` (~1s on cold start). + if (shuttingDown) { + // Don't kill the channel — see comment above. Just throw. + throw new Error('HttpAcpBridge is shutting down'); + } + + const entry: SessionEntry = { + sessionId: newSessionResp.sessionId, + workspaceCwd: workspaceKey, + channel: channelInfo.channel, + connection: channelInfo.connection, + events: new EventBus(), + promptQueue: Promise.resolve(), + modelChangeQueue: Promise.resolve(), + pendingPermissionIds: new Set(), + attachCount: 0, + }; + channelInfo.sessionIds.add(entry.sessionId); + byId.set(entry.sessionId, entry); + // `byWorkspace` is the single-scope attach lookup — only the + // FIRST session for a workspace wins this slot. Subsequent + // thread-scope sessions don't overwrite it. + if (!byWorkspace.has(workspaceKey)) { + byWorkspace.set(workspaceKey, entry); + } + + // ACP `newSession` doesn't take a model id; honor the caller's + // `modelServiceId` via `unstable_setSessionModel`. See + // `applyModelServiceId` for rationale (race against + // transportClosedReject, publish model_switched on success, + // model_switch_failed on failure, don't tear down the session). + if (modelServiceId) { + await applyModelServiceId(entry, modelServiceId, initTimeoutMs).catch( + () => { + // Already published `model_switch_failed`; session stays + // operational on the agent's default model. + }, + ); + } + + return { + sessionId: entry.sessionId, + workspaceCwd: entry.workspaceCwd, + attached: false, + }; + } + /** * Send `unstable_setSessionModel` and broadcast a `model_switched` * event. Used at create-session time (via doSpawn) AND on attach when @@ -1413,11 +1491,18 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge { // through. if (opts?.requireZeroAttaches && entry.attachCount > 0) return; // Remove from the maps eagerly so concurrent `spawnOrAttach` - // can't reattach to a session we're tearing down. The - // `channel.exited` cleanup at line 461-484 also removes from - // these maps, but eager removal narrows the race window. - byWorkspace.delete(entry.workspaceCwd); + // can't reattach to a session we're tearing down. + if (byWorkspace.get(entry.workspaceCwd) === entry) { + byWorkspace.delete(entry.workspaceCwd); + } byId.delete(sessionId); + // Stage 1.5 multi-session: detach from the channel. The channel + // dies only when its LAST session leaves — other sessions on + // the same channel keep running. + const channelInfo = byWorkspaceChannel.get(entry.workspaceCwd); + if (channelInfo && channelInfo.channel === entry.channel) { + channelInfo.sessionIds.delete(sessionId); + } // Resolve any still-pending permission as cancelled (matches the // shutdown path) so callers awaiting requestPermission unwind. for (const id of Array.from(entry.pendingPermissionIds)) { @@ -1425,9 +1510,9 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge { } // Publish `session_died` BEFORE closing the bus. After the eager // `byId.delete` above, the channel.exited handler's - // `byId.get(...) !== entry` guard short-circuits, so the - // automatic publish at crash time wouldn't fire. SSE subscribers - // need this terminal frame to know the session is gone. + // `byId.get(...)` returns undefined so the automatic publish + // at crash time wouldn't fire. SSE subscribers need this + // terminal frame to know the session is gone. try { entry.events.publish({ type: 'session_died', @@ -1437,9 +1522,20 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge { /* bus already closed */ } entry.events.close(); - await entry.channel.kill().catch(() => { - // Best-effort kill — channel may already be dead. - }); + // Only kill the channel when no other sessions remain. ACP + // doesn't expose a per-session "close" call on the agent side, + // so the agent's `sessions: Map` grows by one + // until the channel dies — bounded by `maxSessions` (default + // 20) so memory is capped. FIXME(stage-1.5): if ACP grows a + // `closeSession` notification, send it here so the agent can + // drop the entry from its map immediately rather than at + // channel exit. + if (channelInfo && channelInfo.sessionIds.size === 0) { + byWorkspaceChannel.delete(entry.workspaceCwd); + await channelInfo.channel.kill().catch(() => { + // Best-effort kill — channel may already be dead. + }); + } }, async detachClient(sessionId) { @@ -1479,6 +1575,11 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge { // spawning a child this teardown won't see. shuttingDown = true; const entries = Array.from(byId.values()); + // Snapshot channels too — Stage 1.5 multi-session means N + // sessions may share one channel; we tear down channels + // (which transitively takes all their sessions), not entries + // one-by-one. + const channelInfos = Array.from(byWorkspaceChannel.values()); // Resolve every still-pending permission as cancelled before clearing // the maps so callers awaiting `requestPermission` unwind cleanly. for (const e of entries) { @@ -1489,14 +1590,15 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge { } byWorkspace.clear(); byId.clear(); + byWorkspaceChannel.clear(); pendingPermissions.clear(); // Publish a terminal `session_died` BEFORE closing each bus so SSE // subscribers can distinguish "daemon shut down" from a transient // network error and don't sit indefinitely retrying. The // channel.exited handler also publishes this on a child crash, // but at shutdown time the entry has already been removed from - // `byId` (above), so the handler's `byId.get(...) !== entry` - // guard would short-circuit and the event would never fire. + // `byId` (above), so the handler's `byId.get(...)` is undefined + // and the automatic publish wouldn't fire. for (const e of entries) { try { e.events.publish({ @@ -1508,16 +1610,26 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge { } e.events.close(); } - // Wait for in-flight spawns too. The snapshot above only sees - // sessions already in `byId`; a `doSpawn` past `channelFactory()` - // but still inside `connection.newSession` (~1s on cold start) is - // not yet registered. Without the await, `bridge.shutdown()` - // resolves before the late re-check at doSpawn:472 tears down its - // half-built channel — the orphan's stderr error fires AFTER the - // daemon claimed graceful shutdown (log-confusing). Settle on - // each, ignoring rejections so a single failure doesn't poison - // the others. - const inFlightAwaits = Array.from(inFlightSpawns.values()).map( + // Wait for in-flight channel spawns + session spawns. The + // snapshots above only see what's already registered; a doSpawn + // past `newSession()` but pre-`byId.set` is missed, as is a + // `getOrCreateChannel` past `channelFactory()` but pre- + // `byWorkspaceChannel.set`. The late-shutdown re-checks at + // doSpawn/getOrCreateChannel catch both — but without these + // awaits, `bridge.shutdown()` would resolve before they + // finish, and the orphan stderr error from a half-built + // child would fire AFTER the daemon claimed graceful + // shutdown (log-confusing). + const inFlightSessionAwaits = Array.from(inFlightSpawns.values()).map( + (p): Promise => + p.then( + () => undefined, + () => undefined, + ), + ); + const inFlightChannelAwaits = Array.from( + inFlightChannelSpawns.values(), + ).map( (p): Promise => p.then( () => undefined, @@ -1525,8 +1637,12 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge { ), ); await Promise.all([ - ...entries.map((e) => e.channel.kill().catch(() => {})), - ...inFlightAwaits, + // Kill each unique channel once. With multi-session per + // channel, the same channel object can be referenced by + // multiple entries; `channelInfos` is the deduplicated set. + ...channelInfos.map((ci) => ci.channel.kill().catch(() => {})), + ...inFlightSessionAwaits, + ...inFlightChannelAwaits, ]); }, };