feat(serve): Stage 1 bridge now multiplexes sessions on one qwen --acp child per workspace

Per LaZzyMan / tanzhenxin reviews + maintainer feedback verified
against `packages/cli/src/acp-integration/acpAgent.ts:194` (the
agent's `private sessions: Map<string, Session>`): qwen-code's ACP
agent natively supports multi-session in one child process. The
Stage 1 bridge previously spawned one child per session for
simplicity, paying N× memory / OAuth / file-cache cost. Now refactored
to leverage the agent's existing multi-session capability — one
`qwen --acp` child per workspace, N sessions share it via
`connection.newSession({cwd, mcpServers})`.

Cost at N=5 sessions on same workspace:
- Before: 300-500 MB RSS (5 children), 5× OAuth refresh, 5× file
  cache, 5× CLAUDE.md parse, 5× cold start
- After: 60-100 MB RSS (one child), one OAuth path, shared
  FileReadCache, parsed once, <200ms cold start after first session

Architecture changes:

- New `ChannelInfo` type holds the shared channel + connection +
  BridgeClient + the set of session ids multiplexing on it.
- New `byWorkspaceChannel: Map<workspace, ChannelInfo>` + new
  `inFlightChannelSpawns` coalesce-map for concurrent channel
  creation.
- New `getOrCreateChannel(workspaceKey)` helper: reuse existing
  channel or spawn one (with `initialize` happening exactly once
  per channel, not once per session). Coalesced via
  `inFlightChannelSpawns` so two parallel callers don't both spawn.
- `doSpawn` now calls `getOrCreateChannel` + `connection.newSession`
  separately (was: spawn+initialize+newSession together per session).
- `BridgeClient` updated: `resolveEntry(sessionId?)` dispatches by
  the sessionId ACP carries in each request — one BridgeClient now
  serves all sessions on its channel. `sessionUpdate`,
  `requestPermission`, etc. all pass `params.sessionId`.
- `channel.exited` cleanup moved into `getOrCreateChannel` and now
  tears down ALL sessions on the channel (not one). Each session
  gets its own `session_died` event so SSE subscribers learn the
  bad news on their own stream.
- `killSession` now removes session from `channelInfo.sessionIds`
  and kills the channel ONLY when its sessionIds set drops to zero.
  Other sessions on the same channel keep running.
- `shutdown` tears down channels (the deduplicated set) and awaits
  both inFlightSpawns and inFlightChannelSpawns.

Cross-workspace channel sharing intentionally NOT done — `acpAgent.ts:
601 (this.settings = loadSettings(cwd))` reloads settings on each
newSession call with a different cwd, so different workspaces in
one child would step on each other. One channel per workspace is
the safe scope.

MCP server children stay per-session for now (each session can have
different mcpServers config). Stage 1.5 follow-up: refcount MCP
children by (workspace, config-hash) so identical configs share.

Tests:
- Updated `spawns fresh per call under sessionScope:thread` → now
  expects `handles.length === 1` (channel reused) but
  `sessionCount === 2` (distinct sessions).
- New: `Stage 1.5 multi-session: N sessions on same workspace share
  ONE channel` (5 sessions, 1 factoryCalls).
- New: `Stage 1.5: killSession on one of N sessions does NOT kill
  the shared channel` (kill 2 of 3, channel still alive; kill 3rd,
  channel killed).
- New: `Stage 1.5: channel.exited tears down ALL multiplexed
  sessions` (each gets its own session_died).
- FakeAgent.newSession suffixes call-count so multiple newSession
  calls on the same channel return distinct ids (matches real
  ACP behavior).

Docs:
- `docs/users/qwen-serve.md` N:1 section rewritten — no longer
  "Stage 1 pays N×, Stage 1.5 fixes". Cost table reflects current
  shared-channel architecture; MCP refcount called out as the one
  remaining Stage 1.5 follow-up; "1 daemon = 1 session" framing
  removed from related sections.
This commit is contained in:
wenshao 2026-05-12 22:24:46 +08:00
parent f29353a255
commit 6a170ef817
3 changed files with 449 additions and 216 deletions

View file

@ -4,7 +4,7 @@ Run Qwen Code as a local HTTP daemon so multiple clients (IDE plugins, web UIs,
> **Status:** Stage 1 (experimental). The protocol surface is locked at the §04 routes table from issue [#3803](https://github.com/QwenLM/qwen-code/issues/3803). Stage 1.5 (`qwen --serve` flag — TUI co-hosts the same HTTP server) and Stage 2 (in-process refactor + `mDNS`/OpenAPI/WebSocket/Prometheus polish) are immediately downstream.
>
> **Scope honesty:** Stage 1 is sized for **developers prototyping clients against the protocol surface** and for **local single-user / small-team collaboration**. Production-grade multi-client / long-running / network-flaky workloads (mobile companions, IM bots reaching 1000+ chats) need Stage 1.5+ guarantees that aren't in this release. Stage 1's bridge also spawns one `qwen --acp` child per session, so opening many parallel sessions on the same workspace pays N× memory until the Stage 1.5 bridge refactor lands (see [§ N parallel sessions](#n-parallel-sessions-stage-1-cost-vs-stage-15-fix) below — qwen-code's ACP agent already supports multi-session in one process; the bridge just doesn't leverage it yet). See [Stage 1.5+ runtime guarantees](#stage-15-runtime-guarantees) for the full gap list and #3803 for the convergence roadmap.
> **Scope honesty:** Stage 1 is sized for **developers prototyping clients against the protocol surface** and for **local single-user / small-team collaboration**. Production-grade multi-client / long-running / network-flaky workloads (mobile companions, IM bots reaching 1000+ chats) need Stage 1.5+ guarantees that aren't in this release. See [Stage 1.5+ runtime guarantees](#stage-15-runtime-guarantees) for the full gap list and #3803 for the convergence roadmap.
## What it gives you
@ -183,7 +183,7 @@ Stage 1's contract is sized for prototyping. Per [#3889 chiga0 downstream-consum
**Blockers for serious downstream use:**
1. **Per-request `sessionScope` override** on `POST /session` — today the daemon-wide default is the only setting; a VSCode extension can't say "I want a private session for this window" against a daemon configured for shared sessions.
2. **`loadSession` / `unstable_resumeSession` over HTTP** — without this, no integration can survive a child crash or daemon restart, and the "1 daemon = 1 session, just spawn another" model is internally inconsistent (orchestrators can't recover state either).
2. **`loadSession` / `unstable_resumeSession` over HTTP** — without this, no integration can survive a child crash or daemon restart, and any orchestrator coordinating the daemon can't recover state either.
3. **Persistent client identity (pair tokens + per-client revocation)** — Stage 1 uses one shared bearer; a leaked token revokes everyone, and `originatorClientId` is client-self-declared rather than daemon-stamped from authenticated identity.
**Reliability baseline:**
@ -238,36 +238,28 @@ In this mode, TUI is a **"super-client"** — it observes the same agent convers
(B) is the more ambitious answer but locks Stage 1.5 into a substantially larger wire surface that must also pass cleanly through the planned in-process refactor. We'd rather walk the smaller scope honestly. The session-state-event taxonomy work — enumerating which TUI flows are local-only by design vs. could plausibly graduate to wire under a future opt-in (B)-flavor extension — moves to [#3803](https://github.com/QwenLM/qwen-code/issues/3803), not Stage 1.5 code.
### N parallel sessions: Stage 1 cost vs. Stage 1.5 fix
### N parallel sessions share one `qwen --acp` child
**Correction to earlier framing** (per [LaZzyMan's comment](https://github.com/QwenLM/qwen-code/pull/3889#issuecomment-4428498510) and verified against `packages/cli/src/acp-integration/acpAgent.ts:194`): `qwen --acp` natively supports multi-session in a single child process (`private sessions: Map<string, Session>` keyed by `sessionId`). The yiliang114 VSCode plugin already uses this — one `qwen --acp` process, many sessions over its stdio. **Multi-session resource sharing is an ACP / qwen-code capability today, not a future roadmap item.**
Multiple sessions on the same workspace **share one `qwen --acp` child process** via the agent's native multi-session support (`packages/cli/src/acp-integration/acpAgent.ts:194: private sessions: Map<string, Session>`). The bridge calls `connection.newSession({cwd, mcpServers})` for each session — the agent stores them in its sessions map and demultiplexes per-call sessionId.
The Stage 1 bridge in this PR doesn't yet leverage it — see `httpAcpBridge.ts:707`:
Concrete cost at N=5 sessions on the same workspace:
```ts
const channel = await channelFactory(workspaceKey); // spawns a new qwen --acp child
...
const newSessionResp = await connection.newSession({…}); // single newSession per child
```
| Resource | Per session | At N=5 |
| ------------------------------------ | ----------- | ---------------------------- |
| Daemon Node process | one | **3050 MB** (one daemon) |
| `qwen --acp` child | shared | **60100 MB** (one child) |
| MCP server children | per-session | 3×N if configs differ |
| `FileReadCache` (in-child heap) | shared | parsed once |
| `CLAUDE.md` / hierarchy memory parse | shared | parsed once |
| OAuth refresh-token state | shared | **one refresh path** |
| Auto-memory learned facts | shared | one knowledge base per child |
| Cold start | first only | <200 ms after first session |
Every fresh session spawns its own child for simplicity (one channel state per child = easier debugging, no cross-session interference during Stage 1 stabilization). The cost-per-session table below describes what THIS bridge pays today, not what's architecturally possible.
The bridge keeps **one channel per workspace** (cross-workspace sharing is intentionally not done — different workspaces have different settings/auth scope, and `acpAgent.ts:601` reloads settings per newSession `cwd`, which would interfere). The channel stays alive while at least one session is live; the last `killSession` (or a channel-level crash) kills the child.
| Resource | Per session (Stage 1 bridge) | At N=5 (Stage 1) | At N=5 (Stage 1.5 bridge) |
| ------------------------------------ | ---------------------------- | ---------------- | ----------------------------- |
| Daemon Node process | ~3050 MB RSS | **150250 MB** | **3050 MB** (one daemon) |
| `qwen --acp` child + sandbox | ~60100 MB RSS | **300500 MB** | **60100 MB** (one child) |
| MCP server children | 3 per session | **15 processes** | 3 if config shared, else 3×N |
| `FileReadCache` (in-child heap) | independent | same files 5× | shared (one child) |
| `CLAUDE.md` / hierarchy memory parse | independent | 5× parse | parsed once per workspace cwd |
| OAuth refresh-token state | independent | 5 parallel | **shared (one process)** |
| Auto-memory learned facts | independent | fragments | shared |
| Cold start | 13 s per new session | each window pays | <200 ms after first session |
**MCP server children** are still per-session today — each session's config can specify different servers, so they're independently spawned. Stage 1.5 follow-up: refcount MCP server children by `(workspace, config-hash)` so identical configs share. Not in scope for this PR.
**Stage 1 (this PR) — accept the N× cost as a temporary design choice.** If your workflow needs many parallel sessions on the same workspace today, prefer in-session topic switching, separate workspaces, or accept the cost.
**Stage 1.5 — bridge refactor to leverage ACP multi-session.** The natural fix: keep one `qwen --acp` child per workspace (or daemon-wide), and call `connection.newSession({ cwd, mcpServers })` multiple times on it. Sessions share the child's process / OAuth / file cache / hierarchy-memory parse; MCP children remain per-session unless the config is identical (configurable). This is a **bridge-side change**, NOT the `#3803 §21 Path A/B/C` intra-daemon multi-session workstream — qwen-code already does intra-process multi-session at the agent layer; the bridge just needs to plug into it. Marked inline in `httpAcpBridge.ts` (`FIXME(stage-1.5)` on `channelFactory`).
**Peer-agent comparison context.** Cursor / Continue / Claude Code / OpenCode / Gemini CLI all do single-process multi-session at their agent layer. **qwen-code does too** — the Stage 1 bridge is the artifact that currently spawns a child per session, and Stage 1.5 closes that gap.
**Peer agents (Cursor / Continue / Claude Code / OpenCode / Gemini CLI) all do single-process multi-session.** qwen-code matches them at the agent layer; the Stage 1 bridge in this PR makes the same architecture visible over HTTP.
## What's next

View file

@ -38,6 +38,7 @@ import {
type AcpChannel,
type ChannelFactory,
} from './httpAcpBridge.js';
import type { BridgeEvent } from './eventBus.js';
// Workspace fixtures must round-trip through `path.resolve` so the
// expected values match what the bridge canonicalizes internally on
@ -88,7 +89,13 @@ class FakeAgent implements Agent {
async newSession(p: NewSessionRequest): Promise<NewSessionResponse> {
this.newSessionCalls.push(p);
const prefix = this.opts.sessionIdPrefix ?? 'sess';
return { sessionId: `${prefix}:${p.cwd}` };
// Stage 1.5 multi-session: one FakeAgent can host multiple
// sessions (same as the real ACP agent), so each newSession call
// returns a fresh id. Suffix by call-count so tests that issue
// multiple newSession on the same channel get distinct ids.
const count = this.newSessionCalls.length;
const suffix = count === 1 ? '' : `#${count}`;
return { sessionId: `${prefix}:${p.cwd}${suffix}` };
}
async loadSession(_p: LoadSessionRequest): Promise<LoadSessionResponse> {
@ -252,7 +259,7 @@ describe('createHttpAcpBridge', () => {
await bridge.shutdown();
});
it('spawns fresh per call under sessionScope:thread', async () => {
it('creates fresh session per call under sessionScope:thread (Stage 1.5 multi-session: shares channel)', async () => {
const handles: ChannelHandle[] = [];
const factory: ChannelFactory = async () => {
const h = makeChannel({ sessionIdPrefix: `s${handles.length}` });
@ -267,10 +274,14 @@ describe('createHttpAcpBridge', () => {
const first = await bridge.spawnOrAttach({ workspaceCwd: WS_A });
const second = await bridge.spawnOrAttach({ workspaceCwd: WS_A });
// Distinct sessions, both freshly created (neither is an attach).
expect(first.sessionId).not.toBe(second.sessionId);
expect(first.attached).toBe(false);
expect(second.attached).toBe(false);
expect(handles).toHaveLength(2);
// Stage 1.5 multi-session: the two thread-scope calls SHARE the
// workspace's `qwen --acp` child. Only one `channelFactory` call.
// Each `newSession()` call to the agent produces a distinct id.
expect(handles).toHaveLength(1);
expect(bridge.sessionCount).toBe(2);
await bridge.shutdown();
@ -2229,5 +2240,119 @@ describe('createHttpAcpBridge', () => {
expect(bridge.sessionCount).toBe(5);
await bridge.shutdown();
});
it('Stage 1.5 multi-session: N sessions on same workspace share ONE channel', async () => {
// The headline of the Stage 1.5 refactor — multiple thread-scope
// sessions on one workspace pay for one `qwen --acp` child, not
// N children. LaZzyMan + tanzhenxin pushed for this; the agent
// already supports it via `acpAgent.ts:194 sessions:
// Map<string, Session>`.
let factoryCalls = 0;
const factory: ChannelFactory = async () => {
factoryCalls++;
return makeChannel({ sessionIdPrefix: `s${factoryCalls}` }).channel;
};
const bridge = createHttpAcpBridge({
channelFactory: factory,
maxSessions: 0,
sessionScope: 'thread',
});
// Spin up 5 sessions on the same workspace.
const sessions = await Promise.all(
Array.from({ length: 5 }, () =>
bridge.spawnOrAttach({ workspaceCwd: WS_A }),
),
);
// 5 distinct sessions...
expect(new Set(sessions.map((s) => s.sessionId)).size).toBe(5);
expect(bridge.sessionCount).toBe(5);
// ...but only ONE channelFactory call (= one child process).
expect(factoryCalls).toBe(1);
await bridge.shutdown();
});
it('Stage 1.5: killSession on one of N sessions does NOT kill the shared channel', async () => {
// Counterpart guarantee: tearing down one session must not take
// its siblings with it. The channel stays alive while
// `channelInfo.sessionIds.size > 0`.
const handles: ChannelHandle[] = [];
const factory: ChannelFactory = async () => {
const h = makeChannel({ sessionIdPrefix: `s${handles.length}` });
handles.push(h);
return h.channel;
};
const bridge = createHttpAcpBridge({
channelFactory: factory,
sessionScope: 'thread',
});
const a = await bridge.spawnOrAttach({ workspaceCwd: WS_A });
const b = await bridge.spawnOrAttach({ workspaceCwd: WS_A });
const c = await bridge.spawnOrAttach({ workspaceCwd: WS_A });
expect(handles).toHaveLength(1);
// Kill one — the other two stay.
await bridge.killSession(b.sessionId);
expect(bridge.sessionCount).toBe(2);
expect(handles[0]?.killed).toBe(false);
// Kill the second — last one alive.
await bridge.killSession(a.sessionId);
expect(bridge.sessionCount).toBe(1);
expect(handles[0]?.killed).toBe(false);
// Kill the last — NOW the channel is killed.
await bridge.killSession(c.sessionId);
expect(bridge.sessionCount).toBe(0);
expect(handles[0]?.killed).toBe(true);
await bridge.shutdown();
});
it('Stage 1.5: channel.exited tears down ALL multiplexed sessions', async () => {
// When the shared child dies (crash, kill, network gone), all
// sessions on it die together — they're truly co-fated. Each
// session's bus gets its own `session_died` event so each SSE
// subscriber learns the bad news on their own stream.
const handles: ChannelHandle[] = [];
const factory: ChannelFactory = async () => {
const h = makeChannel({ sessionIdPrefix: `s${handles.length}` });
handles.push(h);
return h.channel;
};
const bridge = createHttpAcpBridge({
channelFactory: factory,
sessionScope: 'thread',
});
const a = await bridge.spawnOrAttach({ workspaceCwd: WS_A });
const b = await bridge.spawnOrAttach({ workspaceCwd: WS_A });
const c = await bridge.spawnOrAttach({ workspaceCwd: WS_A });
expect(bridge.sessionCount).toBe(3);
// Subscribe so we can observe each session_died.
const eventsByA: BridgeEvent[] = [];
const eventsByB: BridgeEvent[] = [];
const eventsByC: BridgeEvent[] = [];
const drainA = (async () => {
for await (const ev of bridge.subscribeEvents(a.sessionId))
eventsByA.push(ev);
})();
const drainB = (async () => {
for await (const ev of bridge.subscribeEvents(b.sessionId))
eventsByB.push(ev);
})();
const drainC = (async () => {
for await (const ev of bridge.subscribeEvents(c.sessionId))
eventsByC.push(ev);
})();
// Let the subscriptions register before crashing.
await new Promise((r) => setImmediate(r));
// Simulate channel-level crash (child exited).
handles[0]?.crash();
await Promise.all([drainA, drainB, drainC]);
expect(eventsByA[eventsByA.length - 1]?.type).toBe('session_died');
expect(eventsByB[eventsByB.length - 1]?.type).toBe('session_died');
expect(eventsByC[eventsByC.length - 1]?.type).toBe('session_died');
expect(bridge.sessionCount).toBe(0);
await bridge.shutdown();
});
});
});

View file

@ -304,6 +304,40 @@ export interface BridgeOptions {
maxSessions?: number;
}
/**
* One `qwen --acp` child + the ACP connection on top of it, shared by
* all SessionEntries whose workspace maps to this channel. Stage 1.5
* multi-session work (per LaZzyMan / tanzhenxin reviews) leverages
* the agent's native `sessions: Map<string, Session>` (see
* `acp-integration/acpAgent.ts:194`) so multiple `newSession()` calls
* on one channel get separate session ids while sharing the child's
* process / OAuth / file-cache / hierarchy-memory parse.
*
* Lifetime: created on first `spawnOrAttach` for a workspace, kept
* alive while `sessionIds.size > 0`, and killed by `killSession` when
* the last entry leaves OR by `channel.exited` when the child dies.
* Cross-workspace channel sharing is intentionally NOT done in this
* bridge `acpAgent.ts:601 (this.settings = loadSettings(cwd))`
* replaces the cached settings on each newSession call, so different
* workspaces in one child would step on each other's settings. One
* channel per workspace is the safe scope for Stage 1.5.
*/
interface ChannelInfo {
channel: AcpChannel;
connection: ClientSideConnection;
/** Shared BridgeClient — its methods route ACP params by sessionId. */
client: BridgeClient;
workspaceCwd: string;
/**
* Live session ids multiplexed on this channel. Updated when
* `doSpawn` registers a new session and when `killSession` /
* `channel.exited` removes one. When the set drops to empty AND no
* session is mid-attach, the channel is killed and removed from
* `byWorkspaceChannel`.
*/
sessionIds: Set<string>;
}
interface SessionEntry {
sessionId: string;
workspaceCwd: string;
@ -387,7 +421,20 @@ interface PendingPermission {
*/
class BridgeClient implements Client {
constructor(
private readonly resolveEntry: () => SessionEntry | undefined,
/**
* Look up the `SessionEntry` for an ACP call. Stage 1.5 multi-
* session on one channel means `BridgeClient` is shared across
* many sessions, so we can't bind the entry in a closure we
* dispatch by the `sessionId` ACP includes in every per-session
* notification / request. `undefined` sessionId is the fallback
* for ACP calls that don't carry one (none expected on the
* client surface as of this writing) and resolves to whatever
* the channel's most-recent entry is kept defensive to avoid
* silent drops if ACP grows a no-sessionId call.
*/
private readonly resolveEntry: (
sessionId?: string,
) => SessionEntry | undefined,
private readonly registerPending: (pending: PendingPermission) => void,
/**
* Roll back a `registerPending` call when the subsequent publish
@ -412,7 +459,7 @@ class BridgeClient implements Client {
async requestPermission(
params: RequestPermissionRequest,
): Promise<RequestPermissionResponse> {
const entry = this.resolveEntry();
const entry = this.resolveEntry(params.sessionId);
if (!entry) return { outcome: { outcome: 'cancelled' } };
const requestId = randomUUID();
@ -454,7 +501,7 @@ class BridgeClient implements Client {
}
async sessionUpdate(params: SessionNotification): Promise<void> {
const entry = this.resolveEntry();
const entry = this.resolveEntry(params.sessionId);
if (!entry) return;
entry.events.publish({ type: 'session_update', data: params });
}
@ -627,15 +674,27 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
);
}
// Single-scope reuse keyed by canonical workspace path.
// Cleanup-on-crash: `channel.exited` (registered at line ~469) removes
// the entry from both maps + cancels pending permissions + publishes
// `session_died` + closes the EventBus when the child process dies
// between requests. Stage 2's in-process bridge eliminates the
// spawned-child failure mode entirely — but for Stage 1, the next
// prompt against a crashed session sees `SessionNotFoundError`
// (cleanup raced ahead of the request), not a hung channel write.
// Single-scope reuse keyed by canonical workspace path. Tracks the
// SessionEntry that a same-workspace attach should re-use. With
// Stage 1.5 multi-session per channel, this points at the FIRST
// session created for the workspace under `single` scope; under
// `thread` scope additional sessions on the same workspace don't
// overwrite this entry.
const byWorkspace = new Map<string, SessionEntry>();
// Stage 1.5 multi-session: one channel per workspace, N sessions
// multiplex on it via `connection.newSession({cwd, mcpServers})`.
// `byWorkspaceChannel.get(workspaceKey)` returns the shared channel
// for spawn-vs-reuse decisions in `doSpawn`. Channel is kept alive
// while `sessionIds.size > 0`; the last `killSession` (or the
// `channel.exited` cleanup) drops the entry from this map.
const byWorkspaceChannel = new Map<string, ChannelInfo>();
// Coalesces concurrent channel-spawn requests for the same workspace
// (regardless of sessionScope). Without this, two parallel callers
// would both `channelFactory(workspaceKey)` and one of the
// spawned children would never make it into `byWorkspaceChannel`,
// becoming a permanent orphan. Cleared in the `finally` of the
// creator regardless of outcome.
const inFlightChannelSpawns = new Map<string, Promise<ChannelInfo>>();
const byId = new Map<string, SessionEntry>();
// Daemon-wide pending permission table; requestIds are UUIDs so collisions
// across sessions are infeasible in practice.
@ -700,177 +759,196 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
return true;
};
async function doSpawn(
/**
* Get-or-create the shared `qwen --acp` channel for a workspace.
* Stage 1.5 multi-session: one channel hosts N sessions via
* `connection.newSession()`. Concurrent callers coalesce through
* `inFlightChannelSpawns` so we never spawn two children for one
* workspace. The returned `ChannelInfo` is shared caller adds
* their session id to `sessionIds` and uses `info.connection.newSession()`.
*
* Wires up the one-and-only `channel.exited` cleanup on first
* creation so the late-arriving event tears down ALL sessions on
* the channel (vs. the previous 1-session-per-channel design where
* each entry registered its own listener).
*/
async function getOrCreateChannel(
workspaceKey: string,
modelServiceId?: string,
): Promise<BridgeSession> {
// FIXME(stage-1.5): the bridge spawns one `qwen --acp` child per
// session today. But the ACP agent in `acp-integration/acpAgent.ts`
// (line ~194: `private sessions: Map<string, Session>`) natively
// supports multiple concurrent sessions inside one child process
// — yiliang114's VSCode plugin already uses this pattern. Stage
// 1.5 should refactor here to keep one child per workspace (or
// daemon-wide) and call `connection.newSession({ cwd, mcpServers })`
// multiple times on the same channel. Sessions then share the
// child's OAuth state / FileReadCache / CLAUDE.md parse / process
// overhead. This is a bridge-side change; it does NOT require the
// #3803 §21 Path A/B/C intra-daemon multi-session workstream
// (qwen-code already does that at the agent layer; the bridge
// just doesn't plug into it). Pairs naturally with the
// `@qwen-code/acp-bridge` package lift (chiga0 finding 1) —
// expose a `newSession()` method on the `AcpChannel` interface
// distinct from channel creation.
const channel = await channelFactory(workspaceKey);
let entry: SessionEntry | undefined;
const client = new BridgeClient(
() => entry,
registerPending,
(rid) =>
// Roll back a register-then-publish-failed pending so the agent
// doesn't hang waiting on a vote nobody can see.
resolvePending(rid, { outcome: { outcome: 'cancelled' } }),
);
const connection = new ClientSideConnection(() => client, channel.stream);
): Promise<ChannelInfo> {
const existing = byWorkspaceChannel.get(workspaceKey);
if (existing) return existing;
const inFlight = inFlightChannelSpawns.get(workspaceKey);
if (inFlight) return await inFlight;
try {
await withTimeout(
connection.initialize({
protocolVersion: PROTOCOL_VERSION,
clientCapabilities: {
fs: { readTextFile: true, writeTextFile: true },
},
clientInfo: { name: 'qwen-serve-bridge', version: '0' },
}),
initTimeoutMs,
'initialize',
);
const newSessionResp = await withTimeout(
connection.newSession({
cwd: workspaceKey,
mcpServers: [],
}),
initTimeoutMs,
'newSession',
const promise = (async () => {
const channel = await channelFactory(workspaceKey);
const client = new BridgeClient(
(sessionId) => (sessionId ? byId.get(sessionId) : undefined),
registerPending,
(rid) =>
// Roll back a register-then-publish-failed pending so the agent
// doesn't hang waiting on a vote nobody can see.
resolvePending(rid, { outcome: { outcome: 'cancelled' } }),
);
const connection = new ClientSideConnection(() => client, channel.stream);
// Late-shutdown re-check (BUy4U): shutdown() may have flipped
// while we were in `connection.initialize` or
// `connection.newSession` (the ACP handshake takes ~1s on cold
// start). If we commit to the maps now, the snapshot in
// shutdown() already missed this entry — child would leak past
// `process.exit(0)`. Tear down what we have and surface to the
// caller.
//
// This re-check is the LOAD-BEARING correctness contract, not
// a band-aid: `shutdown()` deliberately starts tearing down
// already-registered sessions in parallel with awaiting
// `inFlightSpawns` (faster fan-out), and relies on this
// re-check to catch any spawn whose `newSession` returns AFTER
// shutdown flipped the flag. The alternative — await all
// in-flight spawns to settle BEFORE snapshotting byId — is
// cleaner to reason about but serializes shutdown by
// up-to-`initTimeoutMs` (10s default) before any already-live
// session starts tearing down. We chose parallel + re-check.
// Both A and B coalesced callers see the same rejection
// because `doSpawn`'s promise (cached in `inFlightSpawns`) is
// what rejects.
try {
await withTimeout(
connection.initialize({
protocolVersion: PROTOCOL_VERSION,
clientCapabilities: {
fs: { readTextFile: true, writeTextFile: true },
},
clientInfo: { name: 'qwen-serve-bridge', version: '0' },
}),
initTimeoutMs,
'initialize',
);
} catch (err) {
await channel.kill().catch(() => {});
throw err;
}
// Late-shutdown re-check: if shutdown flipped during `initialize`,
// tear this channel down rather than leak past `process.exit(0)`.
if (shuttingDown) {
await channel.kill().catch(() => {});
throw new Error('HttpAcpBridge is shutting down');
}
entry = {
sessionId: newSessionResp.sessionId,
workspaceCwd: workspaceKey,
const info: ChannelInfo = {
channel,
connection,
events: new EventBus(),
promptQueue: Promise.resolve(),
modelChangeQueue: Promise.resolve(),
pendingPermissionIds: new Set(),
attachCount: 0,
client,
workspaceCwd: workspaceKey,
sessionIds: new Set(),
};
byWorkspace.set(workspaceKey, entry);
byId.set(entry.sessionId, entry);
byWorkspaceChannel.set(workspaceKey, info);
// Cleanup if the child terminates between requests. `channel.exited`
// resolves both for planned shutdown (we already removed the entry
// before calling kill, so the `byId.get(...) === entry` check is
// false and this is a no-op) AND for unplanned crashes (entry is
// still in the maps → cancel pending permissions, publish a
// `session_died` event so live SSE subscribers learn the session is
// gone, close the bus, drop from maps).
const liveEntry = entry;
void channel.exited.then((info) => {
if (byId.get(liveEntry.sessionId) !== liveEntry) return;
cancelPendingForSession(liveEntry.sessionId);
try {
liveEntry.events.publish({
type: 'session_died',
data: {
sessionId: liveEntry.sessionId,
reason: 'channel_closed',
// BX9_P: thread `exitCode` / `signalCode` from the
// spawn factory through so operators triaging a crash
// can read the cause from the SSE frame instead of
// grepping stderr for the pid.
exitCode: info?.exitCode ?? null,
signalCode: info?.signalCode ?? null,
},
});
} catch {
/* bus already closed */
// One-time channel.exited cleanup. The child dying takes ALL
// multiplexed sessions with it — iterate `sessionIds` (snapshot
// first to be safe against concurrent killSession during
// iteration), publish `session_died` on each session's bus,
// remove from byId / byWorkspace / pending tables.
void channel.exited.then((exitInfo) => {
const stillOurs = byWorkspaceChannel.get(workspaceKey) === info;
if (stillOurs) byWorkspaceChannel.delete(workspaceKey);
const sessions = Array.from(info.sessionIds);
info.sessionIds.clear();
for (const sid of sessions) {
const sessEntry = byId.get(sid);
if (!sessEntry) continue;
cancelPendingForSession(sid);
try {
sessEntry.events.publish({
type: 'session_died',
data: {
sessionId: sid,
reason: 'channel_closed',
// BX9_P: thread exitCode/signalCode through.
exitCode: exitInfo?.exitCode ?? null,
signalCode: exitInfo?.signalCode ?? null,
},
});
} catch {
/* bus already closed */
}
byId.delete(sid);
if (byWorkspace.get(sessEntry.workspaceCwd) === sessEntry) {
byWorkspace.delete(sessEntry.workspaceCwd);
}
sessEntry.events.close();
}
byWorkspace.delete(liveEntry.workspaceCwd);
byId.delete(liveEntry.sessionId);
liveEntry.events.close();
});
// ACP `newSession` doesn't take a model id; honor the caller's
// `modelServiceId` by issuing the unstable `setSessionModel` call
// immediately after the session is established. Use the shared
// `applyModelServiceId` helper so the call:
// - races against `transportClosedReject` (a wedged child
// during model switch fails fast instead of consuming the
// full init timeout — A09HD)
// - publishes `model_switched` / `model_switch_failed` to the
// bus so attached clients see the result
// - serializes through `entry.modelChangeQueue` for ordering
// against later attach-with-different-model calls
//
// On failure we DO NOT tear the session down. An earlier rev
// tore it down so the caller "didn't inherit silent drift",
// but that killed an already-functional session and left the
// caller with a 500 and no sessionId to retry against. The
// session is operational on the agent's default model — the
// `model_switch_failed` event tells subscribers exactly what
// happened, and the caller can retry the switch via
// `POST /session/:id/model` once they know the sessionId.
//
// Swallow the rejection inline so the outer try/catch (which
// would `channel.kill()`) doesn't fire. The publish inside
// `applyModelServiceId` is the visible signal; the caller
// gets a 200 and a sessionId regardless, and learns of the
// model issue via the SSE stream.
if (modelServiceId) {
await applyModelServiceId(entry, modelServiceId, initTimeoutMs).catch(
() => {
// Already published `model_switch_failed`; don't take
// down the session.
},
);
}
return info;
})();
return {
sessionId: entry.sessionId,
workspaceCwd: entry.workspaceCwd,
attached: false,
};
} catch (err) {
await channel.kill().catch(() => {});
throw err;
inFlightChannelSpawns.set(workspaceKey, promise);
try {
return await promise;
} finally {
inFlightChannelSpawns.delete(workspaceKey);
}
}
async function doSpawn(
workspaceKey: string,
modelServiceId?: string,
): Promise<BridgeSession> {
// Stage 1.5 multi-session: get-or-create the channel for this
// workspace, then call `connection.newSession()` on it. Sessions
// share the child's process / OAuth / file-cache / hierarchy-
// memory parse via the agent's `sessions: Map<string, Session>`
// (see `acp-integration/acpAgent.ts:194`).
// newSession on an established channel can fail (auth, config,
// etc.) without the channel dying. We DON'T kill the channel on
// newSession failure: other sessions on it may still be live —
// they'd lose their work for a problem orthogonal to them. The
// edge case of "newSession fails on a freshly-created channel
// with no live sessions" leaks an empty channel until something
// else (channel.exited / shutdown) cleans it up; that's much
// better than killing other live sessions, and the
// empty-sessionIds set lets the NEXT caller retry through that
// same channel if it's still healthy.
const channelInfo = await getOrCreateChannel(workspaceKey);
const newSessionResp = await withTimeout(
channelInfo.connection.newSession({
cwd: workspaceKey,
mcpServers: [],
}),
initTimeoutMs,
'newSession',
);
// Late-shutdown re-check (BUy4U): shutdown() may have flipped
// while we were in `connection.newSession` (~1s on cold start).
if (shuttingDown) {
// Don't kill the channel — see comment above. Just throw.
throw new Error('HttpAcpBridge is shutting down');
}
const entry: SessionEntry = {
sessionId: newSessionResp.sessionId,
workspaceCwd: workspaceKey,
channel: channelInfo.channel,
connection: channelInfo.connection,
events: new EventBus(),
promptQueue: Promise.resolve(),
modelChangeQueue: Promise.resolve(),
pendingPermissionIds: new Set(),
attachCount: 0,
};
channelInfo.sessionIds.add(entry.sessionId);
byId.set(entry.sessionId, entry);
// `byWorkspace` is the single-scope attach lookup — only the
// FIRST session for a workspace wins this slot. Subsequent
// thread-scope sessions don't overwrite it.
if (!byWorkspace.has(workspaceKey)) {
byWorkspace.set(workspaceKey, entry);
}
// ACP `newSession` doesn't take a model id; honor the caller's
// `modelServiceId` via `unstable_setSessionModel`. See
// `applyModelServiceId` for rationale (race against
// transportClosedReject, publish model_switched on success,
// model_switch_failed on failure, don't tear down the session).
if (modelServiceId) {
await applyModelServiceId(entry, modelServiceId, initTimeoutMs).catch(
() => {
// Already published `model_switch_failed`; session stays
// operational on the agent's default model.
},
);
}
return {
sessionId: entry.sessionId,
workspaceCwd: entry.workspaceCwd,
attached: false,
};
}
/**
* Send `unstable_setSessionModel` and broadcast a `model_switched`
* event. Used at create-session time (via doSpawn) AND on attach when
@ -1413,11 +1491,18 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
// through.
if (opts?.requireZeroAttaches && entry.attachCount > 0) return;
// Remove from the maps eagerly so concurrent `spawnOrAttach`
// can't reattach to a session we're tearing down. The
// `channel.exited` cleanup at line 461-484 also removes from
// these maps, but eager removal narrows the race window.
byWorkspace.delete(entry.workspaceCwd);
// can't reattach to a session we're tearing down.
if (byWorkspace.get(entry.workspaceCwd) === entry) {
byWorkspace.delete(entry.workspaceCwd);
}
byId.delete(sessionId);
// Stage 1.5 multi-session: detach from the channel. The channel
// dies only when its LAST session leaves — other sessions on
// the same channel keep running.
const channelInfo = byWorkspaceChannel.get(entry.workspaceCwd);
if (channelInfo && channelInfo.channel === entry.channel) {
channelInfo.sessionIds.delete(sessionId);
}
// Resolve any still-pending permission as cancelled (matches the
// shutdown path) so callers awaiting requestPermission unwind.
for (const id of Array.from(entry.pendingPermissionIds)) {
@ -1425,9 +1510,9 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
}
// Publish `session_died` BEFORE closing the bus. After the eager
// `byId.delete` above, the channel.exited handler's
// `byId.get(...) !== entry` guard short-circuits, so the
// automatic publish at crash time wouldn't fire. SSE subscribers
// need this terminal frame to know the session is gone.
// `byId.get(...)` returns undefined so the automatic publish
// at crash time wouldn't fire. SSE subscribers need this
// terminal frame to know the session is gone.
try {
entry.events.publish({
type: 'session_died',
@ -1437,9 +1522,20 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
/* bus already closed */
}
entry.events.close();
await entry.channel.kill().catch(() => {
// Best-effort kill — channel may already be dead.
});
// Only kill the channel when no other sessions remain. ACP
// doesn't expose a per-session "close" call on the agent side,
// so the agent's `sessions: Map<string, Session>` grows by one
// until the channel dies — bounded by `maxSessions` (default
// 20) so memory is capped. FIXME(stage-1.5): if ACP grows a
// `closeSession` notification, send it here so the agent can
// drop the entry from its map immediately rather than at
// channel exit.
if (channelInfo && channelInfo.sessionIds.size === 0) {
byWorkspaceChannel.delete(entry.workspaceCwd);
await channelInfo.channel.kill().catch(() => {
// Best-effort kill — channel may already be dead.
});
}
},
async detachClient(sessionId) {
@ -1479,6 +1575,11 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
// spawning a child this teardown won't see.
shuttingDown = true;
const entries = Array.from(byId.values());
// Snapshot channels too — Stage 1.5 multi-session means N
// sessions may share one channel; we tear down channels
// (which transitively takes all their sessions), not entries
// one-by-one.
const channelInfos = Array.from(byWorkspaceChannel.values());
// Resolve every still-pending permission as cancelled before clearing
// the maps so callers awaiting `requestPermission` unwind cleanly.
for (const e of entries) {
@ -1489,14 +1590,15 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
}
byWorkspace.clear();
byId.clear();
byWorkspaceChannel.clear();
pendingPermissions.clear();
// Publish a terminal `session_died` BEFORE closing each bus so SSE
// subscribers can distinguish "daemon shut down" from a transient
// network error and don't sit indefinitely retrying. The
// channel.exited handler also publishes this on a child crash,
// but at shutdown time the entry has already been removed from
// `byId` (above), so the handler's `byId.get(...) !== entry`
// guard would short-circuit and the event would never fire.
// `byId` (above), so the handler's `byId.get(...)` is undefined
// and the automatic publish wouldn't fire.
for (const e of entries) {
try {
e.events.publish({
@ -1508,16 +1610,26 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
}
e.events.close();
}
// Wait for in-flight spawns too. The snapshot above only sees
// sessions already in `byId`; a `doSpawn` past `channelFactory()`
// but still inside `connection.newSession` (~1s on cold start) is
// not yet registered. Without the await, `bridge.shutdown()`
// resolves before the late re-check at doSpawn:472 tears down its
// half-built channel — the orphan's stderr error fires AFTER the
// daemon claimed graceful shutdown (log-confusing). Settle on
// each, ignoring rejections so a single failure doesn't poison
// the others.
const inFlightAwaits = Array.from(inFlightSpawns.values()).map(
// Wait for in-flight channel spawns + session spawns. The
// snapshots above only see what's already registered; a doSpawn
// past `newSession()` but pre-`byId.set` is missed, as is a
// `getOrCreateChannel` past `channelFactory()` but pre-
// `byWorkspaceChannel.set`. The late-shutdown re-checks at
// doSpawn/getOrCreateChannel catch both — but without these
// awaits, `bridge.shutdown()` would resolve before they
// finish, and the orphan stderr error from a half-built
// child would fire AFTER the daemon claimed graceful
// shutdown (log-confusing).
const inFlightSessionAwaits = Array.from(inFlightSpawns.values()).map(
(p): Promise<void> =>
p.then(
() => undefined,
() => undefined,
),
);
const inFlightChannelAwaits = Array.from(
inFlightChannelSpawns.values(),
).map(
(p): Promise<void> =>
p.then(
() => undefined,
@ -1525,8 +1637,12 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
),
);
await Promise.all([
...entries.map((e) => e.channel.kill().catch(() => {})),
...inFlightAwaits,
// Kill each unique channel once. With multi-session per
// channel, the same channel object can be referenced by
// multiple entries; `channelInfos` is the deduplicated set.
...channelInfos.map((ci) => ci.channel.kill().catch(() => {})),
...inFlightSessionAwaits,
...inFlightChannelAwaits,
]);
},
};