mirror of
https://github.com/QwenLM/qwen-code.git
synced 2026-05-17 03:57:18 +00:00
feat(serve): Stage 1 bridge now multiplexes sessions on one qwen --acp child per workspace
Per LaZzyMan / tanzhenxin reviews + maintainer feedback verified
against `packages/cli/src/acp-integration/acpAgent.ts:194` (the
agent's `private sessions: Map<string, Session>`): qwen-code's ACP
agent natively supports multi-session in one child process. The
Stage 1 bridge previously spawned one child per session for
simplicity, paying N× memory / OAuth / file-cache cost. Now refactored
to leverage the agent's existing multi-session capability — one
`qwen --acp` child per workspace, N sessions share it via
`connection.newSession({cwd, mcpServers})`.
Cost at N=5 sessions on same workspace:
- Before: 300-500 MB RSS (5 children), 5× OAuth refresh, 5× file
cache, 5× CLAUDE.md parse, 5× cold start
- After: 60-100 MB RSS (one child), one OAuth path, shared
FileReadCache, parsed once, <200ms cold start after first session
Architecture changes:
- New `ChannelInfo` type holds the shared channel + connection +
BridgeClient + the set of session ids multiplexing on it.
- New `byWorkspaceChannel: Map<workspace, ChannelInfo>` + new
`inFlightChannelSpawns` coalesce-map for concurrent channel
creation.
- New `getOrCreateChannel(workspaceKey)` helper: reuse existing
channel or spawn one (with `initialize` happening exactly once
per channel, not once per session). Coalesced via
`inFlightChannelSpawns` so two parallel callers don't both spawn.
- `doSpawn` now calls `getOrCreateChannel` + `connection.newSession`
separately (was: spawn+initialize+newSession together per session).
- `BridgeClient` updated: `resolveEntry(sessionId?)` dispatches by
the sessionId ACP carries in each request — one BridgeClient now
serves all sessions on its channel. `sessionUpdate`,
`requestPermission`, etc. all pass `params.sessionId`.
- `channel.exited` cleanup moved into `getOrCreateChannel` and now
tears down ALL sessions on the channel (not one). Each session
gets its own `session_died` event so SSE subscribers learn the
bad news on their own stream.
- `killSession` now removes session from `channelInfo.sessionIds`
and kills the channel ONLY when its sessionIds set drops to zero.
Other sessions on the same channel keep running.
- `shutdown` tears down channels (the deduplicated set) and awaits
both inFlightSpawns and inFlightChannelSpawns.
Cross-workspace channel sharing intentionally NOT done — `acpAgent.ts:
601 (this.settings = loadSettings(cwd))` reloads settings on each
newSession call with a different cwd, so different workspaces in
one child would step on each other. One channel per workspace is
the safe scope.
MCP server children stay per-session for now (each session can have
different mcpServers config). Stage 1.5 follow-up: refcount MCP
children by (workspace, config-hash) so identical configs share.
Tests:
- Updated `spawns fresh per call under sessionScope:thread` → now
expects `handles.length === 1` (channel reused) but
`sessionCount === 2` (distinct sessions).
- New: `Stage 1.5 multi-session: N sessions on same workspace share
ONE channel` (5 sessions, 1 factoryCalls).
- New: `Stage 1.5: killSession on one of N sessions does NOT kill
the shared channel` (kill 2 of 3, channel still alive; kill 3rd,
channel killed).
- New: `Stage 1.5: channel.exited tears down ALL multiplexed
sessions` (each gets its own session_died).
- FakeAgent.newSession suffixes call-count so multiple newSession
calls on the same channel return distinct ids (matches real
ACP behavior).
Docs:
- `docs/users/qwen-serve.md` N:1 section rewritten — no longer
"Stage 1 pays N×, Stage 1.5 fixes". Cost table reflects current
shared-channel architecture; MCP refcount called out as the one
remaining Stage 1.5 follow-up; "1 daemon = 1 session" framing
removed from related sections.
This commit is contained in:
parent
f29353a255
commit
6a170ef817
3 changed files with 449 additions and 216 deletions
|
|
@ -4,7 +4,7 @@ Run Qwen Code as a local HTTP daemon so multiple clients (IDE plugins, web UIs,
|
|||
|
||||
> **Status:** Stage 1 (experimental). The protocol surface is locked at the §04 routes table from issue [#3803](https://github.com/QwenLM/qwen-code/issues/3803). Stage 1.5 (`qwen --serve` flag — TUI co-hosts the same HTTP server) and Stage 2 (in-process refactor + `mDNS`/OpenAPI/WebSocket/Prometheus polish) are immediately downstream.
|
||||
>
|
||||
> **Scope honesty:** Stage 1 is sized for **developers prototyping clients against the protocol surface** and for **local single-user / small-team collaboration**. Production-grade multi-client / long-running / network-flaky workloads (mobile companions, IM bots reaching 1000+ chats) need Stage 1.5+ guarantees that aren't in this release. Stage 1's bridge also spawns one `qwen --acp` child per session, so opening many parallel sessions on the same workspace pays N× memory until the Stage 1.5 bridge refactor lands (see [§ N parallel sessions](#n-parallel-sessions-stage-1-cost-vs-stage-15-fix) below — qwen-code's ACP agent already supports multi-session in one process; the bridge just doesn't leverage it yet). See [Stage 1.5+ runtime guarantees](#stage-15-runtime-guarantees) for the full gap list and #3803 for the convergence roadmap.
|
||||
> **Scope honesty:** Stage 1 is sized for **developers prototyping clients against the protocol surface** and for **local single-user / small-team collaboration**. Production-grade multi-client / long-running / network-flaky workloads (mobile companions, IM bots reaching 1000+ chats) need Stage 1.5+ guarantees that aren't in this release. See [Stage 1.5+ runtime guarantees](#stage-15-runtime-guarantees) for the full gap list and #3803 for the convergence roadmap.
|
||||
|
||||
## What it gives you
|
||||
|
||||
|
|
@ -183,7 +183,7 @@ Stage 1's contract is sized for prototyping. Per [#3889 chiga0 downstream-consum
|
|||
**Blockers for serious downstream use:**
|
||||
|
||||
1. **Per-request `sessionScope` override** on `POST /session` — today the daemon-wide default is the only setting; a VSCode extension can't say "I want a private session for this window" against a daemon configured for shared sessions.
|
||||
2. **`loadSession` / `unstable_resumeSession` over HTTP** — without this, no integration can survive a child crash or daemon restart, and the "1 daemon = 1 session, just spawn another" model is internally inconsistent (orchestrators can't recover state either).
|
||||
2. **`loadSession` / `unstable_resumeSession` over HTTP** — without this, no integration can survive a child crash or daemon restart, and any orchestrator coordinating the daemon can't recover state either.
|
||||
3. **Persistent client identity (pair tokens + per-client revocation)** — Stage 1 uses one shared bearer; a leaked token revokes everyone, and `originatorClientId` is client-self-declared rather than daemon-stamped from authenticated identity.
|
||||
|
||||
**Reliability baseline:**
|
||||
|
|
@ -238,36 +238,28 @@ In this mode, TUI is a **"super-client"** — it observes the same agent convers
|
|||
|
||||
(B) is the more ambitious answer but locks Stage 1.5 into a substantially larger wire surface that must also pass cleanly through the planned in-process refactor. We'd rather walk the smaller scope honestly. The session-state-event taxonomy work — enumerating which TUI flows are local-only by design vs. could plausibly graduate to wire under a future opt-in (B)-flavor extension — moves to [#3803](https://github.com/QwenLM/qwen-code/issues/3803), not Stage 1.5 code.
|
||||
|
||||
### N parallel sessions: Stage 1 cost vs. Stage 1.5 fix
|
||||
### N parallel sessions share one `qwen --acp` child
|
||||
|
||||
**Correction to earlier framing** (per [LaZzyMan's comment](https://github.com/QwenLM/qwen-code/pull/3889#issuecomment-4428498510) and verified against `packages/cli/src/acp-integration/acpAgent.ts:194`): `qwen --acp` natively supports multi-session in a single child process (`private sessions: Map<string, Session>` keyed by `sessionId`). The yiliang114 VSCode plugin already uses this — one `qwen --acp` process, many sessions over its stdio. **Multi-session resource sharing is an ACP / qwen-code capability today, not a future roadmap item.**
|
||||
Multiple sessions on the same workspace **share one `qwen --acp` child process** via the agent's native multi-session support (`packages/cli/src/acp-integration/acpAgent.ts:194: private sessions: Map<string, Session>`). The bridge calls `connection.newSession({cwd, mcpServers})` for each session — the agent stores them in its sessions map and demultiplexes per-call sessionId.
|
||||
|
||||
The Stage 1 bridge in this PR doesn't yet leverage it — see `httpAcpBridge.ts:707`:
|
||||
Concrete cost at N=5 sessions on the same workspace:
|
||||
|
||||
```ts
|
||||
const channel = await channelFactory(workspaceKey); // spawns a new qwen --acp child
|
||||
...
|
||||
const newSessionResp = await connection.newSession({…}); // single newSession per child
|
||||
```
|
||||
| Resource | Per session | At N=5 |
|
||||
| ------------------------------------ | ----------- | ---------------------------- |
|
||||
| Daemon Node process | one | **30–50 MB** (one daemon) |
|
||||
| `qwen --acp` child | shared | **60–100 MB** (one child) |
|
||||
| MCP server children | per-session | 3×N if configs differ |
|
||||
| `FileReadCache` (in-child heap) | shared | parsed once |
|
||||
| `CLAUDE.md` / hierarchy memory parse | shared | parsed once |
|
||||
| OAuth refresh-token state | shared | **one refresh path** |
|
||||
| Auto-memory learned facts | shared | one knowledge base per child |
|
||||
| Cold start | first only | <200 ms after first session |
|
||||
|
||||
Every fresh session spawns its own child for simplicity (one channel state per child = easier debugging, no cross-session interference during Stage 1 stabilization). The cost-per-session table below describes what THIS bridge pays today, not what's architecturally possible.
|
||||
The bridge keeps **one channel per workspace** (cross-workspace sharing is intentionally not done — different workspaces have different settings/auth scope, and `acpAgent.ts:601` reloads settings per newSession `cwd`, which would interfere). The channel stays alive while at least one session is live; the last `killSession` (or a channel-level crash) kills the child.
|
||||
|
||||
| Resource | Per session (Stage 1 bridge) | At N=5 (Stage 1) | At N=5 (Stage 1.5 bridge) |
|
||||
| ------------------------------------ | ---------------------------- | ---------------- | ----------------------------- |
|
||||
| Daemon Node process | ~30–50 MB RSS | **150–250 MB** | **30–50 MB** (one daemon) |
|
||||
| `qwen --acp` child + sandbox | ~60–100 MB RSS | **300–500 MB** | **60–100 MB** (one child) |
|
||||
| MCP server children | 3 per session | **15 processes** | 3 if config shared, else 3×N |
|
||||
| `FileReadCache` (in-child heap) | independent | same files 5× | shared (one child) |
|
||||
| `CLAUDE.md` / hierarchy memory parse | independent | 5× parse | parsed once per workspace cwd |
|
||||
| OAuth refresh-token state | independent | 5 parallel | **shared (one process)** |
|
||||
| Auto-memory learned facts | independent | fragments | shared |
|
||||
| Cold start | 1–3 s per new session | each window pays | <200 ms after first session |
|
||||
**MCP server children** are still per-session today — each session's config can specify different servers, so they're independently spawned. Stage 1.5 follow-up: refcount MCP server children by `(workspace, config-hash)` so identical configs share. Not in scope for this PR.
|
||||
|
||||
**Stage 1 (this PR) — accept the N× cost as a temporary design choice.** If your workflow needs many parallel sessions on the same workspace today, prefer in-session topic switching, separate workspaces, or accept the cost.
|
||||
|
||||
**Stage 1.5 — bridge refactor to leverage ACP multi-session.** The natural fix: keep one `qwen --acp` child per workspace (or daemon-wide), and call `connection.newSession({ cwd, mcpServers })` multiple times on it. Sessions share the child's process / OAuth / file cache / hierarchy-memory parse; MCP children remain per-session unless the config is identical (configurable). This is a **bridge-side change**, NOT the `#3803 §21 Path A/B/C` intra-daemon multi-session workstream — qwen-code already does intra-process multi-session at the agent layer; the bridge just needs to plug into it. Marked inline in `httpAcpBridge.ts` (`FIXME(stage-1.5)` on `channelFactory`).
|
||||
|
||||
**Peer-agent comparison context.** Cursor / Continue / Claude Code / OpenCode / Gemini CLI all do single-process multi-session at their agent layer. **qwen-code does too** — the Stage 1 bridge is the artifact that currently spawns a child per session, and Stage 1.5 closes that gap.
|
||||
**Peer agents (Cursor / Continue / Claude Code / OpenCode / Gemini CLI) all do single-process multi-session.** qwen-code matches them at the agent layer; the Stage 1 bridge in this PR makes the same architecture visible over HTTP.
|
||||
|
||||
## What's next
|
||||
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ import {
|
|||
type AcpChannel,
|
||||
type ChannelFactory,
|
||||
} from './httpAcpBridge.js';
|
||||
import type { BridgeEvent } from './eventBus.js';
|
||||
|
||||
// Workspace fixtures must round-trip through `path.resolve` so the
|
||||
// expected values match what the bridge canonicalizes internally on
|
||||
|
|
@ -88,7 +89,13 @@ class FakeAgent implements Agent {
|
|||
async newSession(p: NewSessionRequest): Promise<NewSessionResponse> {
|
||||
this.newSessionCalls.push(p);
|
||||
const prefix = this.opts.sessionIdPrefix ?? 'sess';
|
||||
return { sessionId: `${prefix}:${p.cwd}` };
|
||||
// Stage 1.5 multi-session: one FakeAgent can host multiple
|
||||
// sessions (same as the real ACP agent), so each newSession call
|
||||
// returns a fresh id. Suffix by call-count so tests that issue
|
||||
// multiple newSession on the same channel get distinct ids.
|
||||
const count = this.newSessionCalls.length;
|
||||
const suffix = count === 1 ? '' : `#${count}`;
|
||||
return { sessionId: `${prefix}:${p.cwd}${suffix}` };
|
||||
}
|
||||
|
||||
async loadSession(_p: LoadSessionRequest): Promise<LoadSessionResponse> {
|
||||
|
|
@ -252,7 +259,7 @@ describe('createHttpAcpBridge', () => {
|
|||
await bridge.shutdown();
|
||||
});
|
||||
|
||||
it('spawns fresh per call under sessionScope:thread', async () => {
|
||||
it('creates fresh session per call under sessionScope:thread (Stage 1.5 multi-session: shares channel)', async () => {
|
||||
const handles: ChannelHandle[] = [];
|
||||
const factory: ChannelFactory = async () => {
|
||||
const h = makeChannel({ sessionIdPrefix: `s${handles.length}` });
|
||||
|
|
@ -267,10 +274,14 @@ describe('createHttpAcpBridge', () => {
|
|||
const first = await bridge.spawnOrAttach({ workspaceCwd: WS_A });
|
||||
const second = await bridge.spawnOrAttach({ workspaceCwd: WS_A });
|
||||
|
||||
// Distinct sessions, both freshly created (neither is an attach).
|
||||
expect(first.sessionId).not.toBe(second.sessionId);
|
||||
expect(first.attached).toBe(false);
|
||||
expect(second.attached).toBe(false);
|
||||
expect(handles).toHaveLength(2);
|
||||
// Stage 1.5 multi-session: the two thread-scope calls SHARE the
|
||||
// workspace's `qwen --acp` child. Only one `channelFactory` call.
|
||||
// Each `newSession()` call to the agent produces a distinct id.
|
||||
expect(handles).toHaveLength(1);
|
||||
expect(bridge.sessionCount).toBe(2);
|
||||
|
||||
await bridge.shutdown();
|
||||
|
|
@ -2229,5 +2240,119 @@ describe('createHttpAcpBridge', () => {
|
|||
expect(bridge.sessionCount).toBe(5);
|
||||
await bridge.shutdown();
|
||||
});
|
||||
|
||||
it('Stage 1.5 multi-session: N sessions on same workspace share ONE channel', async () => {
|
||||
// The headline of the Stage 1.5 refactor — multiple thread-scope
|
||||
// sessions on one workspace pay for one `qwen --acp` child, not
|
||||
// N children. LaZzyMan + tanzhenxin pushed for this; the agent
|
||||
// already supports it via `acpAgent.ts:194 sessions:
|
||||
// Map<string, Session>`.
|
||||
let factoryCalls = 0;
|
||||
const factory: ChannelFactory = async () => {
|
||||
factoryCalls++;
|
||||
return makeChannel({ sessionIdPrefix: `s${factoryCalls}` }).channel;
|
||||
};
|
||||
const bridge = createHttpAcpBridge({
|
||||
channelFactory: factory,
|
||||
maxSessions: 0,
|
||||
sessionScope: 'thread',
|
||||
});
|
||||
// Spin up 5 sessions on the same workspace.
|
||||
const sessions = await Promise.all(
|
||||
Array.from({ length: 5 }, () =>
|
||||
bridge.spawnOrAttach({ workspaceCwd: WS_A }),
|
||||
),
|
||||
);
|
||||
// 5 distinct sessions...
|
||||
expect(new Set(sessions.map((s) => s.sessionId)).size).toBe(5);
|
||||
expect(bridge.sessionCount).toBe(5);
|
||||
// ...but only ONE channelFactory call (= one child process).
|
||||
expect(factoryCalls).toBe(1);
|
||||
await bridge.shutdown();
|
||||
});
|
||||
|
||||
it('Stage 1.5: killSession on one of N sessions does NOT kill the shared channel', async () => {
|
||||
// Counterpart guarantee: tearing down one session must not take
|
||||
// its siblings with it. The channel stays alive while
|
||||
// `channelInfo.sessionIds.size > 0`.
|
||||
const handles: ChannelHandle[] = [];
|
||||
const factory: ChannelFactory = async () => {
|
||||
const h = makeChannel({ sessionIdPrefix: `s${handles.length}` });
|
||||
handles.push(h);
|
||||
return h.channel;
|
||||
};
|
||||
const bridge = createHttpAcpBridge({
|
||||
channelFactory: factory,
|
||||
sessionScope: 'thread',
|
||||
});
|
||||
const a = await bridge.spawnOrAttach({ workspaceCwd: WS_A });
|
||||
const b = await bridge.spawnOrAttach({ workspaceCwd: WS_A });
|
||||
const c = await bridge.spawnOrAttach({ workspaceCwd: WS_A });
|
||||
expect(handles).toHaveLength(1);
|
||||
// Kill one — the other two stay.
|
||||
await bridge.killSession(b.sessionId);
|
||||
expect(bridge.sessionCount).toBe(2);
|
||||
expect(handles[0]?.killed).toBe(false);
|
||||
// Kill the second — last one alive.
|
||||
await bridge.killSession(a.sessionId);
|
||||
expect(bridge.sessionCount).toBe(1);
|
||||
expect(handles[0]?.killed).toBe(false);
|
||||
// Kill the last — NOW the channel is killed.
|
||||
await bridge.killSession(c.sessionId);
|
||||
expect(bridge.sessionCount).toBe(0);
|
||||
expect(handles[0]?.killed).toBe(true);
|
||||
await bridge.shutdown();
|
||||
});
|
||||
|
||||
it('Stage 1.5: channel.exited tears down ALL multiplexed sessions', async () => {
|
||||
// When the shared child dies (crash, kill, network gone), all
|
||||
// sessions on it die together — they're truly co-fated. Each
|
||||
// session's bus gets its own `session_died` event so each SSE
|
||||
// subscriber learns the bad news on their own stream.
|
||||
const handles: ChannelHandle[] = [];
|
||||
const factory: ChannelFactory = async () => {
|
||||
const h = makeChannel({ sessionIdPrefix: `s${handles.length}` });
|
||||
handles.push(h);
|
||||
return h.channel;
|
||||
};
|
||||
const bridge = createHttpAcpBridge({
|
||||
channelFactory: factory,
|
||||
sessionScope: 'thread',
|
||||
});
|
||||
const a = await bridge.spawnOrAttach({ workspaceCwd: WS_A });
|
||||
const b = await bridge.spawnOrAttach({ workspaceCwd: WS_A });
|
||||
const c = await bridge.spawnOrAttach({ workspaceCwd: WS_A });
|
||||
expect(bridge.sessionCount).toBe(3);
|
||||
|
||||
// Subscribe so we can observe each session_died.
|
||||
const eventsByA: BridgeEvent[] = [];
|
||||
const eventsByB: BridgeEvent[] = [];
|
||||
const eventsByC: BridgeEvent[] = [];
|
||||
const drainA = (async () => {
|
||||
for await (const ev of bridge.subscribeEvents(a.sessionId))
|
||||
eventsByA.push(ev);
|
||||
})();
|
||||
const drainB = (async () => {
|
||||
for await (const ev of bridge.subscribeEvents(b.sessionId))
|
||||
eventsByB.push(ev);
|
||||
})();
|
||||
const drainC = (async () => {
|
||||
for await (const ev of bridge.subscribeEvents(c.sessionId))
|
||||
eventsByC.push(ev);
|
||||
})();
|
||||
// Let the subscriptions register before crashing.
|
||||
await new Promise((r) => setImmediate(r));
|
||||
|
||||
// Simulate channel-level crash (child exited).
|
||||
handles[0]?.crash();
|
||||
await Promise.all([drainA, drainB, drainC]);
|
||||
|
||||
expect(eventsByA[eventsByA.length - 1]?.type).toBe('session_died');
|
||||
expect(eventsByB[eventsByB.length - 1]?.type).toBe('session_died');
|
||||
expect(eventsByC[eventsByC.length - 1]?.type).toBe('session_died');
|
||||
expect(bridge.sessionCount).toBe(0);
|
||||
|
||||
await bridge.shutdown();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -304,6 +304,40 @@ export interface BridgeOptions {
|
|||
maxSessions?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* One `qwen --acp` child + the ACP connection on top of it, shared by
|
||||
* all SessionEntries whose workspace maps to this channel. Stage 1.5
|
||||
* multi-session work (per LaZzyMan / tanzhenxin reviews) leverages
|
||||
* the agent's native `sessions: Map<string, Session>` (see
|
||||
* `acp-integration/acpAgent.ts:194`) so multiple `newSession()` calls
|
||||
* on one channel get separate session ids while sharing the child's
|
||||
* process / OAuth / file-cache / hierarchy-memory parse.
|
||||
*
|
||||
* Lifetime: created on first `spawnOrAttach` for a workspace, kept
|
||||
* alive while `sessionIds.size > 0`, and killed by `killSession` when
|
||||
* the last entry leaves OR by `channel.exited` when the child dies.
|
||||
* Cross-workspace channel sharing is intentionally NOT done in this
|
||||
* bridge — `acpAgent.ts:601 (this.settings = loadSettings(cwd))`
|
||||
* replaces the cached settings on each newSession call, so different
|
||||
* workspaces in one child would step on each other's settings. One
|
||||
* channel per workspace is the safe scope for Stage 1.5.
|
||||
*/
|
||||
interface ChannelInfo {
|
||||
channel: AcpChannel;
|
||||
connection: ClientSideConnection;
|
||||
/** Shared BridgeClient — its methods route ACP params by sessionId. */
|
||||
client: BridgeClient;
|
||||
workspaceCwd: string;
|
||||
/**
|
||||
* Live session ids multiplexed on this channel. Updated when
|
||||
* `doSpawn` registers a new session and when `killSession` /
|
||||
* `channel.exited` removes one. When the set drops to empty AND no
|
||||
* session is mid-attach, the channel is killed and removed from
|
||||
* `byWorkspaceChannel`.
|
||||
*/
|
||||
sessionIds: Set<string>;
|
||||
}
|
||||
|
||||
interface SessionEntry {
|
||||
sessionId: string;
|
||||
workspaceCwd: string;
|
||||
|
|
@ -387,7 +421,20 @@ interface PendingPermission {
|
|||
*/
|
||||
class BridgeClient implements Client {
|
||||
constructor(
|
||||
private readonly resolveEntry: () => SessionEntry | undefined,
|
||||
/**
|
||||
* Look up the `SessionEntry` for an ACP call. Stage 1.5 multi-
|
||||
* session on one channel means `BridgeClient` is shared across
|
||||
* many sessions, so we can't bind the entry in a closure — we
|
||||
* dispatch by the `sessionId` ACP includes in every per-session
|
||||
* notification / request. `undefined` sessionId is the fallback
|
||||
* for ACP calls that don't carry one (none expected on the
|
||||
* client surface as of this writing) and resolves to whatever
|
||||
* the channel's most-recent entry is — kept defensive to avoid
|
||||
* silent drops if ACP grows a no-sessionId call.
|
||||
*/
|
||||
private readonly resolveEntry: (
|
||||
sessionId?: string,
|
||||
) => SessionEntry | undefined,
|
||||
private readonly registerPending: (pending: PendingPermission) => void,
|
||||
/**
|
||||
* Roll back a `registerPending` call when the subsequent publish
|
||||
|
|
@ -412,7 +459,7 @@ class BridgeClient implements Client {
|
|||
async requestPermission(
|
||||
params: RequestPermissionRequest,
|
||||
): Promise<RequestPermissionResponse> {
|
||||
const entry = this.resolveEntry();
|
||||
const entry = this.resolveEntry(params.sessionId);
|
||||
if (!entry) return { outcome: { outcome: 'cancelled' } };
|
||||
|
||||
const requestId = randomUUID();
|
||||
|
|
@ -454,7 +501,7 @@ class BridgeClient implements Client {
|
|||
}
|
||||
|
||||
async sessionUpdate(params: SessionNotification): Promise<void> {
|
||||
const entry = this.resolveEntry();
|
||||
const entry = this.resolveEntry(params.sessionId);
|
||||
if (!entry) return;
|
||||
entry.events.publish({ type: 'session_update', data: params });
|
||||
}
|
||||
|
|
@ -627,15 +674,27 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
|
|||
);
|
||||
}
|
||||
|
||||
// Single-scope reuse keyed by canonical workspace path.
|
||||
// Cleanup-on-crash: `channel.exited` (registered at line ~469) removes
|
||||
// the entry from both maps + cancels pending permissions + publishes
|
||||
// `session_died` + closes the EventBus when the child process dies
|
||||
// between requests. Stage 2's in-process bridge eliminates the
|
||||
// spawned-child failure mode entirely — but for Stage 1, the next
|
||||
// prompt against a crashed session sees `SessionNotFoundError`
|
||||
// (cleanup raced ahead of the request), not a hung channel write.
|
||||
// Single-scope reuse keyed by canonical workspace path. Tracks the
|
||||
// SessionEntry that a same-workspace attach should re-use. With
|
||||
// Stage 1.5 multi-session per channel, this points at the FIRST
|
||||
// session created for the workspace under `single` scope; under
|
||||
// `thread` scope additional sessions on the same workspace don't
|
||||
// overwrite this entry.
|
||||
const byWorkspace = new Map<string, SessionEntry>();
|
||||
// Stage 1.5 multi-session: one channel per workspace, N sessions
|
||||
// multiplex on it via `connection.newSession({cwd, mcpServers})`.
|
||||
// `byWorkspaceChannel.get(workspaceKey)` returns the shared channel
|
||||
// for spawn-vs-reuse decisions in `doSpawn`. Channel is kept alive
|
||||
// while `sessionIds.size > 0`; the last `killSession` (or the
|
||||
// `channel.exited` cleanup) drops the entry from this map.
|
||||
const byWorkspaceChannel = new Map<string, ChannelInfo>();
|
||||
// Coalesces concurrent channel-spawn requests for the same workspace
|
||||
// (regardless of sessionScope). Without this, two parallel callers
|
||||
// would both `channelFactory(workspaceKey)` and one of the
|
||||
// spawned children would never make it into `byWorkspaceChannel`,
|
||||
// becoming a permanent orphan. Cleared in the `finally` of the
|
||||
// creator regardless of outcome.
|
||||
const inFlightChannelSpawns = new Map<string, Promise<ChannelInfo>>();
|
||||
const byId = new Map<string, SessionEntry>();
|
||||
// Daemon-wide pending permission table; requestIds are UUIDs so collisions
|
||||
// across sessions are infeasible in practice.
|
||||
|
|
@ -700,177 +759,196 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
|
|||
return true;
|
||||
};
|
||||
|
||||
async function doSpawn(
|
||||
/**
|
||||
* Get-or-create the shared `qwen --acp` channel for a workspace.
|
||||
* Stage 1.5 multi-session: one channel hosts N sessions via
|
||||
* `connection.newSession()`. Concurrent callers coalesce through
|
||||
* `inFlightChannelSpawns` so we never spawn two children for one
|
||||
* workspace. The returned `ChannelInfo` is shared — caller adds
|
||||
* their session id to `sessionIds` and uses `info.connection.newSession()`.
|
||||
*
|
||||
* Wires up the one-and-only `channel.exited` cleanup on first
|
||||
* creation so the late-arriving event tears down ALL sessions on
|
||||
* the channel (vs. the previous 1-session-per-channel design where
|
||||
* each entry registered its own listener).
|
||||
*/
|
||||
async function getOrCreateChannel(
|
||||
workspaceKey: string,
|
||||
modelServiceId?: string,
|
||||
): Promise<BridgeSession> {
|
||||
// FIXME(stage-1.5): the bridge spawns one `qwen --acp` child per
|
||||
// session today. But the ACP agent in `acp-integration/acpAgent.ts`
|
||||
// (line ~194: `private sessions: Map<string, Session>`) natively
|
||||
// supports multiple concurrent sessions inside one child process
|
||||
// — yiliang114's VSCode plugin already uses this pattern. Stage
|
||||
// 1.5 should refactor here to keep one child per workspace (or
|
||||
// daemon-wide) and call `connection.newSession({ cwd, mcpServers })`
|
||||
// multiple times on the same channel. Sessions then share the
|
||||
// child's OAuth state / FileReadCache / CLAUDE.md parse / process
|
||||
// overhead. This is a bridge-side change; it does NOT require the
|
||||
// #3803 §21 Path A/B/C intra-daemon multi-session workstream
|
||||
// (qwen-code already does that at the agent layer; the bridge
|
||||
// just doesn't plug into it). Pairs naturally with the
|
||||
// `@qwen-code/acp-bridge` package lift (chiga0 finding 1) —
|
||||
// expose a `newSession()` method on the `AcpChannel` interface
|
||||
// distinct from channel creation.
|
||||
const channel = await channelFactory(workspaceKey);
|
||||
let entry: SessionEntry | undefined;
|
||||
const client = new BridgeClient(
|
||||
() => entry,
|
||||
registerPending,
|
||||
(rid) =>
|
||||
// Roll back a register-then-publish-failed pending so the agent
|
||||
// doesn't hang waiting on a vote nobody can see.
|
||||
resolvePending(rid, { outcome: { outcome: 'cancelled' } }),
|
||||
);
|
||||
const connection = new ClientSideConnection(() => client, channel.stream);
|
||||
): Promise<ChannelInfo> {
|
||||
const existing = byWorkspaceChannel.get(workspaceKey);
|
||||
if (existing) return existing;
|
||||
const inFlight = inFlightChannelSpawns.get(workspaceKey);
|
||||
if (inFlight) return await inFlight;
|
||||
|
||||
try {
|
||||
await withTimeout(
|
||||
connection.initialize({
|
||||
protocolVersion: PROTOCOL_VERSION,
|
||||
clientCapabilities: {
|
||||
fs: { readTextFile: true, writeTextFile: true },
|
||||
},
|
||||
clientInfo: { name: 'qwen-serve-bridge', version: '0' },
|
||||
}),
|
||||
initTimeoutMs,
|
||||
'initialize',
|
||||
);
|
||||
const newSessionResp = await withTimeout(
|
||||
connection.newSession({
|
||||
cwd: workspaceKey,
|
||||
mcpServers: [],
|
||||
}),
|
||||
initTimeoutMs,
|
||||
'newSession',
|
||||
const promise = (async () => {
|
||||
const channel = await channelFactory(workspaceKey);
|
||||
const client = new BridgeClient(
|
||||
(sessionId) => (sessionId ? byId.get(sessionId) : undefined),
|
||||
registerPending,
|
||||
(rid) =>
|
||||
// Roll back a register-then-publish-failed pending so the agent
|
||||
// doesn't hang waiting on a vote nobody can see.
|
||||
resolvePending(rid, { outcome: { outcome: 'cancelled' } }),
|
||||
);
|
||||
const connection = new ClientSideConnection(() => client, channel.stream);
|
||||
|
||||
// Late-shutdown re-check (BUy4U): shutdown() may have flipped
|
||||
// while we were in `connection.initialize` or
|
||||
// `connection.newSession` (the ACP handshake takes ~1s on cold
|
||||
// start). If we commit to the maps now, the snapshot in
|
||||
// shutdown() already missed this entry — child would leak past
|
||||
// `process.exit(0)`. Tear down what we have and surface to the
|
||||
// caller.
|
||||
//
|
||||
// This re-check is the LOAD-BEARING correctness contract, not
|
||||
// a band-aid: `shutdown()` deliberately starts tearing down
|
||||
// already-registered sessions in parallel with awaiting
|
||||
// `inFlightSpawns` (faster fan-out), and relies on this
|
||||
// re-check to catch any spawn whose `newSession` returns AFTER
|
||||
// shutdown flipped the flag. The alternative — await all
|
||||
// in-flight spawns to settle BEFORE snapshotting byId — is
|
||||
// cleaner to reason about but serializes shutdown by
|
||||
// up-to-`initTimeoutMs` (10s default) before any already-live
|
||||
// session starts tearing down. We chose parallel + re-check.
|
||||
// Both A and B coalesced callers see the same rejection
|
||||
// because `doSpawn`'s promise (cached in `inFlightSpawns`) is
|
||||
// what rejects.
|
||||
try {
|
||||
await withTimeout(
|
||||
connection.initialize({
|
||||
protocolVersion: PROTOCOL_VERSION,
|
||||
clientCapabilities: {
|
||||
fs: { readTextFile: true, writeTextFile: true },
|
||||
},
|
||||
clientInfo: { name: 'qwen-serve-bridge', version: '0' },
|
||||
}),
|
||||
initTimeoutMs,
|
||||
'initialize',
|
||||
);
|
||||
} catch (err) {
|
||||
await channel.kill().catch(() => {});
|
||||
throw err;
|
||||
}
|
||||
|
||||
// Late-shutdown re-check: if shutdown flipped during `initialize`,
|
||||
// tear this channel down rather than leak past `process.exit(0)`.
|
||||
if (shuttingDown) {
|
||||
await channel.kill().catch(() => {});
|
||||
throw new Error('HttpAcpBridge is shutting down');
|
||||
}
|
||||
entry = {
|
||||
sessionId: newSessionResp.sessionId,
|
||||
workspaceCwd: workspaceKey,
|
||||
|
||||
const info: ChannelInfo = {
|
||||
channel,
|
||||
connection,
|
||||
events: new EventBus(),
|
||||
promptQueue: Promise.resolve(),
|
||||
modelChangeQueue: Promise.resolve(),
|
||||
pendingPermissionIds: new Set(),
|
||||
attachCount: 0,
|
||||
client,
|
||||
workspaceCwd: workspaceKey,
|
||||
sessionIds: new Set(),
|
||||
};
|
||||
byWorkspace.set(workspaceKey, entry);
|
||||
byId.set(entry.sessionId, entry);
|
||||
byWorkspaceChannel.set(workspaceKey, info);
|
||||
|
||||
// Cleanup if the child terminates between requests. `channel.exited`
|
||||
// resolves both for planned shutdown (we already removed the entry
|
||||
// before calling kill, so the `byId.get(...) === entry` check is
|
||||
// false and this is a no-op) AND for unplanned crashes (entry is
|
||||
// still in the maps → cancel pending permissions, publish a
|
||||
// `session_died` event so live SSE subscribers learn the session is
|
||||
// gone, close the bus, drop from maps).
|
||||
const liveEntry = entry;
|
||||
void channel.exited.then((info) => {
|
||||
if (byId.get(liveEntry.sessionId) !== liveEntry) return;
|
||||
cancelPendingForSession(liveEntry.sessionId);
|
||||
try {
|
||||
liveEntry.events.publish({
|
||||
type: 'session_died',
|
||||
data: {
|
||||
sessionId: liveEntry.sessionId,
|
||||
reason: 'channel_closed',
|
||||
// BX9_P: thread `exitCode` / `signalCode` from the
|
||||
// spawn factory through so operators triaging a crash
|
||||
// can read the cause from the SSE frame instead of
|
||||
// grepping stderr for the pid.
|
||||
exitCode: info?.exitCode ?? null,
|
||||
signalCode: info?.signalCode ?? null,
|
||||
},
|
||||
});
|
||||
} catch {
|
||||
/* bus already closed */
|
||||
// One-time channel.exited cleanup. The child dying takes ALL
|
||||
// multiplexed sessions with it — iterate `sessionIds` (snapshot
|
||||
// first to be safe against concurrent killSession during
|
||||
// iteration), publish `session_died` on each session's bus,
|
||||
// remove from byId / byWorkspace / pending tables.
|
||||
void channel.exited.then((exitInfo) => {
|
||||
const stillOurs = byWorkspaceChannel.get(workspaceKey) === info;
|
||||
if (stillOurs) byWorkspaceChannel.delete(workspaceKey);
|
||||
const sessions = Array.from(info.sessionIds);
|
||||
info.sessionIds.clear();
|
||||
for (const sid of sessions) {
|
||||
const sessEntry = byId.get(sid);
|
||||
if (!sessEntry) continue;
|
||||
cancelPendingForSession(sid);
|
||||
try {
|
||||
sessEntry.events.publish({
|
||||
type: 'session_died',
|
||||
data: {
|
||||
sessionId: sid,
|
||||
reason: 'channel_closed',
|
||||
// BX9_P: thread exitCode/signalCode through.
|
||||
exitCode: exitInfo?.exitCode ?? null,
|
||||
signalCode: exitInfo?.signalCode ?? null,
|
||||
},
|
||||
});
|
||||
} catch {
|
||||
/* bus already closed */
|
||||
}
|
||||
byId.delete(sid);
|
||||
if (byWorkspace.get(sessEntry.workspaceCwd) === sessEntry) {
|
||||
byWorkspace.delete(sessEntry.workspaceCwd);
|
||||
}
|
||||
sessEntry.events.close();
|
||||
}
|
||||
byWorkspace.delete(liveEntry.workspaceCwd);
|
||||
byId.delete(liveEntry.sessionId);
|
||||
liveEntry.events.close();
|
||||
});
|
||||
|
||||
// ACP `newSession` doesn't take a model id; honor the caller's
|
||||
// `modelServiceId` by issuing the unstable `setSessionModel` call
|
||||
// immediately after the session is established. Use the shared
|
||||
// `applyModelServiceId` helper so the call:
|
||||
// - races against `transportClosedReject` (a wedged child
|
||||
// during model switch fails fast instead of consuming the
|
||||
// full init timeout — A09HD)
|
||||
// - publishes `model_switched` / `model_switch_failed` to the
|
||||
// bus so attached clients see the result
|
||||
// - serializes through `entry.modelChangeQueue` for ordering
|
||||
// against later attach-with-different-model calls
|
||||
//
|
||||
// On failure we DO NOT tear the session down. An earlier rev
|
||||
// tore it down so the caller "didn't inherit silent drift",
|
||||
// but that killed an already-functional session and left the
|
||||
// caller with a 500 and no sessionId to retry against. The
|
||||
// session is operational on the agent's default model — the
|
||||
// `model_switch_failed` event tells subscribers exactly what
|
||||
// happened, and the caller can retry the switch via
|
||||
// `POST /session/:id/model` once they know the sessionId.
|
||||
//
|
||||
// Swallow the rejection inline so the outer try/catch (which
|
||||
// would `channel.kill()`) doesn't fire. The publish inside
|
||||
// `applyModelServiceId` is the visible signal; the caller
|
||||
// gets a 200 and a sessionId regardless, and learns of the
|
||||
// model issue via the SSE stream.
|
||||
if (modelServiceId) {
|
||||
await applyModelServiceId(entry, modelServiceId, initTimeoutMs).catch(
|
||||
() => {
|
||||
// Already published `model_switch_failed`; don't take
|
||||
// down the session.
|
||||
},
|
||||
);
|
||||
}
|
||||
return info;
|
||||
})();
|
||||
|
||||
return {
|
||||
sessionId: entry.sessionId,
|
||||
workspaceCwd: entry.workspaceCwd,
|
||||
attached: false,
|
||||
};
|
||||
} catch (err) {
|
||||
await channel.kill().catch(() => {});
|
||||
throw err;
|
||||
inFlightChannelSpawns.set(workspaceKey, promise);
|
||||
try {
|
||||
return await promise;
|
||||
} finally {
|
||||
inFlightChannelSpawns.delete(workspaceKey);
|
||||
}
|
||||
}
|
||||
|
||||
async function doSpawn(
|
||||
workspaceKey: string,
|
||||
modelServiceId?: string,
|
||||
): Promise<BridgeSession> {
|
||||
// Stage 1.5 multi-session: get-or-create the channel for this
|
||||
// workspace, then call `connection.newSession()` on it. Sessions
|
||||
// share the child's process / OAuth / file-cache / hierarchy-
|
||||
// memory parse via the agent's `sessions: Map<string, Session>`
|
||||
// (see `acp-integration/acpAgent.ts:194`).
|
||||
// newSession on an established channel can fail (auth, config,
|
||||
// etc.) without the channel dying. We DON'T kill the channel on
|
||||
// newSession failure: other sessions on it may still be live —
|
||||
// they'd lose their work for a problem orthogonal to them. The
|
||||
// edge case of "newSession fails on a freshly-created channel
|
||||
// with no live sessions" leaks an empty channel until something
|
||||
// else (channel.exited / shutdown) cleans it up; that's much
|
||||
// better than killing other live sessions, and the
|
||||
// empty-sessionIds set lets the NEXT caller retry through that
|
||||
// same channel if it's still healthy.
|
||||
const channelInfo = await getOrCreateChannel(workspaceKey);
|
||||
const newSessionResp = await withTimeout(
|
||||
channelInfo.connection.newSession({
|
||||
cwd: workspaceKey,
|
||||
mcpServers: [],
|
||||
}),
|
||||
initTimeoutMs,
|
||||
'newSession',
|
||||
);
|
||||
|
||||
// Late-shutdown re-check (BUy4U): shutdown() may have flipped
|
||||
// while we were in `connection.newSession` (~1s on cold start).
|
||||
if (shuttingDown) {
|
||||
// Don't kill the channel — see comment above. Just throw.
|
||||
throw new Error('HttpAcpBridge is shutting down');
|
||||
}
|
||||
|
||||
const entry: SessionEntry = {
|
||||
sessionId: newSessionResp.sessionId,
|
||||
workspaceCwd: workspaceKey,
|
||||
channel: channelInfo.channel,
|
||||
connection: channelInfo.connection,
|
||||
events: new EventBus(),
|
||||
promptQueue: Promise.resolve(),
|
||||
modelChangeQueue: Promise.resolve(),
|
||||
pendingPermissionIds: new Set(),
|
||||
attachCount: 0,
|
||||
};
|
||||
channelInfo.sessionIds.add(entry.sessionId);
|
||||
byId.set(entry.sessionId, entry);
|
||||
// `byWorkspace` is the single-scope attach lookup — only the
|
||||
// FIRST session for a workspace wins this slot. Subsequent
|
||||
// thread-scope sessions don't overwrite it.
|
||||
if (!byWorkspace.has(workspaceKey)) {
|
||||
byWorkspace.set(workspaceKey, entry);
|
||||
}
|
||||
|
||||
// ACP `newSession` doesn't take a model id; honor the caller's
|
||||
// `modelServiceId` via `unstable_setSessionModel`. See
|
||||
// `applyModelServiceId` for rationale (race against
|
||||
// transportClosedReject, publish model_switched on success,
|
||||
// model_switch_failed on failure, don't tear down the session).
|
||||
if (modelServiceId) {
|
||||
await applyModelServiceId(entry, modelServiceId, initTimeoutMs).catch(
|
||||
() => {
|
||||
// Already published `model_switch_failed`; session stays
|
||||
// operational on the agent's default model.
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
return {
|
||||
sessionId: entry.sessionId,
|
||||
workspaceCwd: entry.workspaceCwd,
|
||||
attached: false,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Send `unstable_setSessionModel` and broadcast a `model_switched`
|
||||
* event. Used at create-session time (via doSpawn) AND on attach when
|
||||
|
|
@ -1413,11 +1491,18 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
|
|||
// through.
|
||||
if (opts?.requireZeroAttaches && entry.attachCount > 0) return;
|
||||
// Remove from the maps eagerly so concurrent `spawnOrAttach`
|
||||
// can't reattach to a session we're tearing down. The
|
||||
// `channel.exited` cleanup at line 461-484 also removes from
|
||||
// these maps, but eager removal narrows the race window.
|
||||
byWorkspace.delete(entry.workspaceCwd);
|
||||
// can't reattach to a session we're tearing down.
|
||||
if (byWorkspace.get(entry.workspaceCwd) === entry) {
|
||||
byWorkspace.delete(entry.workspaceCwd);
|
||||
}
|
||||
byId.delete(sessionId);
|
||||
// Stage 1.5 multi-session: detach from the channel. The channel
|
||||
// dies only when its LAST session leaves — other sessions on
|
||||
// the same channel keep running.
|
||||
const channelInfo = byWorkspaceChannel.get(entry.workspaceCwd);
|
||||
if (channelInfo && channelInfo.channel === entry.channel) {
|
||||
channelInfo.sessionIds.delete(sessionId);
|
||||
}
|
||||
// Resolve any still-pending permission as cancelled (matches the
|
||||
// shutdown path) so callers awaiting requestPermission unwind.
|
||||
for (const id of Array.from(entry.pendingPermissionIds)) {
|
||||
|
|
@ -1425,9 +1510,9 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
|
|||
}
|
||||
// Publish `session_died` BEFORE closing the bus. After the eager
|
||||
// `byId.delete` above, the channel.exited handler's
|
||||
// `byId.get(...) !== entry` guard short-circuits, so the
|
||||
// automatic publish at crash time wouldn't fire. SSE subscribers
|
||||
// need this terminal frame to know the session is gone.
|
||||
// `byId.get(...)` returns undefined so the automatic publish
|
||||
// at crash time wouldn't fire. SSE subscribers need this
|
||||
// terminal frame to know the session is gone.
|
||||
try {
|
||||
entry.events.publish({
|
||||
type: 'session_died',
|
||||
|
|
@ -1437,9 +1522,20 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
|
|||
/* bus already closed */
|
||||
}
|
||||
entry.events.close();
|
||||
await entry.channel.kill().catch(() => {
|
||||
// Best-effort kill — channel may already be dead.
|
||||
});
|
||||
// Only kill the channel when no other sessions remain. ACP
|
||||
// doesn't expose a per-session "close" call on the agent side,
|
||||
// so the agent's `sessions: Map<string, Session>` grows by one
|
||||
// until the channel dies — bounded by `maxSessions` (default
|
||||
// 20) so memory is capped. FIXME(stage-1.5): if ACP grows a
|
||||
// `closeSession` notification, send it here so the agent can
|
||||
// drop the entry from its map immediately rather than at
|
||||
// channel exit.
|
||||
if (channelInfo && channelInfo.sessionIds.size === 0) {
|
||||
byWorkspaceChannel.delete(entry.workspaceCwd);
|
||||
await channelInfo.channel.kill().catch(() => {
|
||||
// Best-effort kill — channel may already be dead.
|
||||
});
|
||||
}
|
||||
},
|
||||
|
||||
async detachClient(sessionId) {
|
||||
|
|
@ -1479,6 +1575,11 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
|
|||
// spawning a child this teardown won't see.
|
||||
shuttingDown = true;
|
||||
const entries = Array.from(byId.values());
|
||||
// Snapshot channels too — Stage 1.5 multi-session means N
|
||||
// sessions may share one channel; we tear down channels
|
||||
// (which transitively takes all their sessions), not entries
|
||||
// one-by-one.
|
||||
const channelInfos = Array.from(byWorkspaceChannel.values());
|
||||
// Resolve every still-pending permission as cancelled before clearing
|
||||
// the maps so callers awaiting `requestPermission` unwind cleanly.
|
||||
for (const e of entries) {
|
||||
|
|
@ -1489,14 +1590,15 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
|
|||
}
|
||||
byWorkspace.clear();
|
||||
byId.clear();
|
||||
byWorkspaceChannel.clear();
|
||||
pendingPermissions.clear();
|
||||
// Publish a terminal `session_died` BEFORE closing each bus so SSE
|
||||
// subscribers can distinguish "daemon shut down" from a transient
|
||||
// network error and don't sit indefinitely retrying. The
|
||||
// channel.exited handler also publishes this on a child crash,
|
||||
// but at shutdown time the entry has already been removed from
|
||||
// `byId` (above), so the handler's `byId.get(...) !== entry`
|
||||
// guard would short-circuit and the event would never fire.
|
||||
// `byId` (above), so the handler's `byId.get(...)` is undefined
|
||||
// and the automatic publish wouldn't fire.
|
||||
for (const e of entries) {
|
||||
try {
|
||||
e.events.publish({
|
||||
|
|
@ -1508,16 +1610,26 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
|
|||
}
|
||||
e.events.close();
|
||||
}
|
||||
// Wait for in-flight spawns too. The snapshot above only sees
|
||||
// sessions already in `byId`; a `doSpawn` past `channelFactory()`
|
||||
// but still inside `connection.newSession` (~1s on cold start) is
|
||||
// not yet registered. Without the await, `bridge.shutdown()`
|
||||
// resolves before the late re-check at doSpawn:472 tears down its
|
||||
// half-built channel — the orphan's stderr error fires AFTER the
|
||||
// daemon claimed graceful shutdown (log-confusing). Settle on
|
||||
// each, ignoring rejections so a single failure doesn't poison
|
||||
// the others.
|
||||
const inFlightAwaits = Array.from(inFlightSpawns.values()).map(
|
||||
// Wait for in-flight channel spawns + session spawns. The
|
||||
// snapshots above only see what's already registered; a doSpawn
|
||||
// past `newSession()` but pre-`byId.set` is missed, as is a
|
||||
// `getOrCreateChannel` past `channelFactory()` but pre-
|
||||
// `byWorkspaceChannel.set`. The late-shutdown re-checks at
|
||||
// doSpawn/getOrCreateChannel catch both — but without these
|
||||
// awaits, `bridge.shutdown()` would resolve before they
|
||||
// finish, and the orphan stderr error from a half-built
|
||||
// child would fire AFTER the daemon claimed graceful
|
||||
// shutdown (log-confusing).
|
||||
const inFlightSessionAwaits = Array.from(inFlightSpawns.values()).map(
|
||||
(p): Promise<void> =>
|
||||
p.then(
|
||||
() => undefined,
|
||||
() => undefined,
|
||||
),
|
||||
);
|
||||
const inFlightChannelAwaits = Array.from(
|
||||
inFlightChannelSpawns.values(),
|
||||
).map(
|
||||
(p): Promise<void> =>
|
||||
p.then(
|
||||
() => undefined,
|
||||
|
|
@ -1525,8 +1637,12 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
|
|||
),
|
||||
);
|
||||
await Promise.all([
|
||||
...entries.map((e) => e.channel.kill().catch(() => {})),
|
||||
...inFlightAwaits,
|
||||
// Kill each unique channel once. With multi-session per
|
||||
// channel, the same channel object can be referenced by
|
||||
// multiple entries; `channelInfos` is the deduplicated set.
|
||||
...channelInfos.map((ci) => ci.channel.kill().catch(() => {})),
|
||||
...inFlightSessionAwaits,
|
||||
...inFlightChannelAwaits,
|
||||
]);
|
||||
},
|
||||
};
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue