feat(cli): wire SSE streaming for qwen serve events (#3803)

Stage 1 follow-up that turns prompt into a real streaming experience.
Replaces the in-memory `notifications: SessionNotification[]` buffer
on each session with a per-session EventBus and exposes it through
`GET /session/:id/events` as an `text/event-stream` SSE feed.

EventBus (`packages/cli/src/serve/eventBus.ts`):
  - Monotonic per-session ids (`v: 1` schema). Each `publish` chains an
    id, returning the materialized BridgeEvent.
  - Bounded ring (default 1000) backs `Last-Event-ID` reconnect — a
    consumer that drops can resume from `lastEventId` and replay any
    still-buffered events before live events flow.
  - Per-subscriber bounded queue (default 256). When a slow consumer
    overruns its queue, the bus appends a synthetic `client_evicted`
    terminal frame and closes that subscription so it can't hold the
    daemon hostage. Other subscribers are unaffected.
  - `subscribe()` returns an AsyncIterable — registration is synchronous
    so events `publish`ed immediately after the subscribe land in the
    queue (a generator-style implementation deferred registration to
    first `next()` and raced with publishes).
  - AbortSignal-aware: aborting the signal closes the iterator promptly.

Bridge (`httpAcpBridge.ts`):
  - `BridgeClient.sessionUpdate` now publishes onto the session's
    EventBus instead of pushing to a plain array — every ACP
    notification the agent emits becomes a stream event automatically.
  - New `subscribeEvents(sessionId, opts?)` returns the bus's
    AsyncIterable; throws `SessionNotFoundError` for unknown ids.
  - Shutdown closes every live event bus before killing channels so
    pending consumers unwind cleanly.

Route (`server.ts`):
  - `GET /session/:id/events` sets the SSE content type, advertises a
    3s reconnect hint, and writes a 15s heartbeat comment frame to
    keep proxy/NAT connections alive.
  - Forwards the `Last-Event-ID` header to the bus.
  - `req.on('close')` triggers an AbortController that propagates into
    the bridge subscription so disconnects don't leak subscribers.
  - 404 when the bridge can't find the session.

Capabilities envelope: `STAGE1_FEATURES` now advertises
`session_create`, `session_prompt`, `session_cancel`, `session_events`
in addition to `health`/`capabilities` so clients can light up UI for
the routes that have actually shipped.

Tests (16 new, 0 regressions in the 4995 baseline):
  - 9 EventBus unit cases — id sequencing, live delivery, replay,
    replay+live splice, fan-out to N subscribers, eviction on
    overflow, abort-signal unsubscribe, bus.close() drains
    subscribers, ring-size eviction.
  - 4 bridge subscribe cases — 404, sessionUpdate→event publishing
    via real ACP fake-agent, shutdown closes live subscriptions.
  - 4 SSE route cases against a live HTTP listener — frame format,
    Last-Event-ID forwarding, 404, abort propagation on disconnect.

Verified end-to-end against a real `qwen --acp` child:
  - Subscribed to `/session/$SID/events`, fired `POST /session/$SID/prompt`
    with text content. Captured 13 distinct `event: session_update`
    SSE frames in real time during the model's response — `available_
    commands_update` metadata, 9 `agent_thought_chunk` frames carrying
    the model's chain-of-thought, 3 `agent_message_chunk` frames with
    the actual reply, and a final usage frame with token totals.
  - Frames carry monotonic ids 1..13, the daemon-side counter, and
    are valid SSE per the EventSource spec.
This commit is contained in:
wenshao 2026-05-07 11:21:59 +08:00
parent ca996ecb54
commit 41aa950947
8 changed files with 802 additions and 12 deletions

View file

@ -0,0 +1,172 @@
/**
* @license
* Copyright 2025 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect } from 'vitest';
import {
EventBus,
EVENT_SCHEMA_VERSION,
type BridgeEvent,
} from './eventBus.js';
async function collect(
iter: AsyncIterable<BridgeEvent>,
count: number,
): Promise<BridgeEvent[]> {
const out: BridgeEvent[] = [];
for await (const e of iter) {
out.push(e);
if (out.length >= count) break;
}
return out;
}
describe('EventBus', () => {
it('assigns monotonic ids and the right schema version', () => {
const bus = new EventBus();
const a = bus.publish({ type: 'foo', data: 1 });
const b = bus.publish({ type: 'foo', data: 2 });
expect(a.id).toBe(1);
expect(b.id).toBe(2);
expect(a.v).toBe(EVENT_SCHEMA_VERSION);
expect(bus.lastEventId).toBe(2);
});
it('delivers live publishes to a subscriber', async () => {
const bus = new EventBus();
const abort = new AbortController();
const iter = bus.subscribe({ signal: abort.signal });
// Need to start consuming before publishing so the subscriber is
// registered in the loop below.
setTimeout(() => {
bus.publish({ type: 'foo', data: 'a' });
bus.publish({ type: 'foo', data: 'b' });
}, 5);
const events = await collect(iter, 2);
expect(events.map((e) => e.data)).toEqual(['a', 'b']);
abort.abort();
});
it('replays events newer than lastEventId from the ring', async () => {
const bus = new EventBus();
bus.publish({ type: 'foo', data: 'a' });
bus.publish({ type: 'foo', data: 'b' });
bus.publish({ type: 'foo', data: 'c' });
const abort = new AbortController();
const iter = bus.subscribe({ lastEventId: 1, signal: abort.signal });
const events = await collect(iter, 2);
expect(events.map((e) => e.id)).toEqual([2, 3]);
expect(events.map((e) => e.data)).toEqual(['b', 'c']);
abort.abort();
});
it('replay + live: new events follow the replay tail', async () => {
const bus = new EventBus();
bus.publish({ type: 'foo', data: 'a' });
bus.publish({ type: 'foo', data: 'b' });
const abort = new AbortController();
const iter = bus.subscribe({ lastEventId: 0, signal: abort.signal });
setTimeout(() => bus.publish({ type: 'foo', data: 'c' }), 5);
const events = await collect(iter, 3);
expect(events.map((e) => e.data)).toEqual(['a', 'b', 'c']);
abort.abort();
});
it('fan-outs to multiple subscribers in parallel', async () => {
const bus = new EventBus();
const aborts = [new AbortController(), new AbortController()];
const it1 = bus.subscribe({ signal: aborts[0].signal });
const it2 = bus.subscribe({ signal: aborts[1].signal });
setTimeout(() => {
bus.publish({ type: 'foo', data: 1 });
bus.publish({ type: 'foo', data: 2 });
}, 5);
const [a, b] = await Promise.all([collect(it1, 2), collect(it2, 2)]);
expect(a.map((e) => e.data)).toEqual([1, 2]);
expect(b.map((e) => e.data)).toEqual([1, 2]);
aborts.forEach((c) => c.abort());
});
it('evicts a slow subscriber when its queue overflows', async () => {
const bus = new EventBus();
const abort = new AbortController();
const iter = bus.subscribe({ maxQueued: 2, signal: abort.signal });
// Publish 3 events without draining the iterator. Queue cap is 2; the
// 3rd should trip the eviction path and append a `client_evicted`
// terminal frame.
bus.publish({ type: 'foo', data: 1 });
bus.publish({ type: 'foo', data: 2 });
bus.publish({ type: 'foo', data: 3 });
const collected: BridgeEvent[] = [];
for await (const e of iter) {
collected.push(e);
}
expect(collected).toHaveLength(3);
expect(collected[0]?.data).toBe(1);
expect(collected[1]?.data).toBe(2);
expect(collected[2]?.type).toBe('client_evicted');
expect(bus.subscriberCount).toBe(0);
abort.abort();
});
it('unsubscribes when the abort signal fires', async () => {
const bus = new EventBus();
const abort = new AbortController();
const iter = bus.subscribe({ signal: abort.signal });
setTimeout(() => abort.abort(), 5);
const events: BridgeEvent[] = [];
for await (const e of iter) {
events.push(e);
}
expect(events).toEqual([]);
expect(bus.subscriberCount).toBe(0);
});
it('closes all subscribers on bus.close()', async () => {
const bus = new EventBus();
const abort = new AbortController();
const iter = bus.subscribe({ signal: abort.signal });
setTimeout(() => bus.close(), 5);
const events: BridgeEvent[] = [];
for await (const e of iter) {
events.push(e);
}
expect(events).toEqual([]);
expect(bus.subscriberCount).toBe(0);
});
it('drops the oldest events from the ring beyond ringSize', () => {
const bus = new EventBus(3);
for (let i = 1; i <= 5; i++) bus.publish({ type: 'foo', data: i });
// Internal: only the last 3 should be replayable.
// Subscribe with lastEventId=0 — only ids 3, 4, 5 should be queued.
const abort = new AbortController();
const iter = bus.subscribe({ lastEventId: 0, signal: abort.signal });
void (async () => {
const out: BridgeEvent[] = [];
for await (const e of iter) {
out.push(e);
if (out.length === 3) break;
}
expect(out.map((e) => e.id)).toEqual([3, 4, 5]);
abort.abort();
})();
});
});

View file

@ -0,0 +1,247 @@
/**
* @license
* Copyright 2025 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
/**
* Event-bus for the daemon's per-session NDJSON stream.
*
* Design notes (from issue #3803 §04 / threat-model):
* - Each event carries a monotonic `id` (per session) so the SSE
* `Last-Event-ID` reconnect protocol can pick up where the client left
* off. Backed by a bounded ring of recent events for replay.
* - Subscribers use bounded async queues. A slow subscriber that blows
* past its queue limit is sent a final `client_evicted` event and
* closed; this keeps a stuck client from holding the daemon hostage
* (per the resource-exhaustion entry in the threat-model summary).
* - The bus is push-based; consumers iterate the returned AsyncIterable.
* Aborting the supplied AbortSignal closes the iterator promptly.
*/
export const EVENT_SCHEMA_VERSION = 1 as const;
/** A single frame published on the bus. */
export interface BridgeEvent {
/** Monotonic per-session id, starting at 1. */
id: number;
/** Schema version; bumped on breaking frame changes. */
v: typeof EVENT_SCHEMA_VERSION;
/** Frame type: `session_update`, `client_evicted`, or daemon-pushed events. */
type: string;
/** Frame payload — opaque JSON. */
data: unknown;
/**
* Identifier of the client that triggered the event, when known. Used by
* fan-out consumers to suppress echoes of their own actions.
*/
originatorClientId?: string;
}
export interface SubscribeOptions {
/**
* Resume from after this event id. Events with `id <= lastEventId` are
* skipped (already delivered); newer events still buffered in the ring
* are replayed before live events flow.
*/
lastEventId?: number;
/** Aborts the subscription cleanly. */
signal?: AbortSignal;
/**
* Per-subscriber backlog cap. When exceeded the subscriber is evicted
* with a final `client_evicted` event. Defaults to 256.
*/
maxQueued?: number;
}
const DEFAULT_MAX_QUEUED = 256;
const DEFAULT_RING_SIZE = 1000;
interface InternalSub {
queue: BoundedAsyncQueue<BridgeEvent>;
evicted: boolean;
}
export class EventBus {
private nextId = 1;
private readonly ring: BridgeEvent[] = [];
private readonly subs = new Set<InternalSub>();
private closed = false;
constructor(private readonly ringSize: number = DEFAULT_RING_SIZE) {}
/** Most recent id ever assigned by `publish`. 0 if no events published. */
get lastEventId(): number {
return this.nextId - 1;
}
/** Snapshot of the live subscriber count. */
get subscriberCount(): number {
return this.subs.size;
}
publish(input: Omit<BridgeEvent, 'id' | 'v'>): BridgeEvent {
if (this.closed) {
throw new Error('EventBus is closed; cannot publish');
}
const event: BridgeEvent = {
id: this.nextId++,
v: EVENT_SCHEMA_VERSION,
...input,
};
this.ring.push(event);
if (this.ring.length > this.ringSize) this.ring.shift();
for (const sub of this.subs) {
if (sub.evicted) continue;
if (!sub.queue.push(event)) {
sub.evicted = true;
const evictionFrame: BridgeEvent = {
id: this.nextId++,
v: EVENT_SCHEMA_VERSION,
type: 'client_evicted',
data: { reason: 'queue_overflow', droppedAfter: event.id },
};
// Force-push the eviction frame; close immediately after so the
// consumer iterator unwinds with a final synthetic event.
sub.queue.forcePush(evictionFrame);
sub.queue.close();
}
}
return event;
}
/**
* Note: registration is synchronous by the time `subscribe()` returns,
* the subscriber is already attached and will receive any subsequent
* `publish()` even if the consumer hasn't started iterating yet. (A
* generator-style implementation would defer registration to the first
* `next()` call, which races with publishes that happen before the
* consumer's first await.)
*/
subscribe(opts: SubscribeOptions = {}): AsyncIterable<BridgeEvent> {
if (this.closed) {
return emptyAsyncIterable<BridgeEvent>();
}
const queue = new BoundedAsyncQueue<BridgeEvent>(
opts.maxQueued ?? DEFAULT_MAX_QUEUED,
);
if (opts.lastEventId !== undefined) {
for (const e of this.ring) {
if (e.id > opts.lastEventId) queue.push(e);
}
}
const sub: InternalSub = { queue, evicted: false };
this.subs.add(sub);
const onAbort = () => queue.close();
if (opts.signal) {
if (opts.signal.aborted) {
queue.close();
} else {
opts.signal.addEventListener('abort', onAbort, { once: true });
}
}
const dispose = () => {
this.subs.delete(sub);
opts.signal?.removeEventListener('abort', onAbort);
};
return {
[Symbol.asyncIterator]: (): AsyncIterator<BridgeEvent> => ({
async next(): Promise<IteratorResult<BridgeEvent>> {
const r = await queue.next();
if (r.done) dispose();
return r;
},
async return(): Promise<IteratorResult<BridgeEvent>> {
queue.close();
dispose();
return { value: undefined as unknown as BridgeEvent, done: true };
},
}),
};
}
/** Close all live subscribers and prevent further `publish`/`subscribe`. */
close(): void {
if (this.closed) return;
this.closed = true;
for (const sub of this.subs) sub.queue.close();
this.subs.clear();
}
}
function emptyAsyncIterable<T>(): AsyncIterable<T> {
return {
[Symbol.asyncIterator]: (): AsyncIterator<T> => ({
async next(): Promise<IteratorResult<T>> {
return { value: undefined as unknown as T, done: true };
},
}),
};
}
/**
* Promise-based bounded queue. `push` returns false (instead of blocking or
* throwing) when full so callers can decide how to react the EventBus uses
* that signal to evict slow subscribers.
*/
class BoundedAsyncQueue<T> {
private readonly buf: T[] = [];
private readonly resolvers: Array<(v: IteratorResult<T>) => void> = [];
private closed = false;
constructor(private readonly maxSize: number) {}
/** Returns true if accepted, false if dropped due to overflow. */
push(value: T): boolean {
if (this.closed) return false;
const r = this.resolvers.shift();
if (r) {
r({ value, done: false });
return true;
}
if (this.buf.length >= this.maxSize) return false;
this.buf.push(value);
return true;
}
/** Bypasses the size cap. Used for terminal eviction frames. */
forcePush(value: T): void {
if (this.closed) return;
const r = this.resolvers.shift();
if (r) {
r({ value, done: false });
return;
}
this.buf.push(value);
}
close(): void {
if (this.closed) return;
this.closed = true;
while (this.resolvers.length > 0) {
this.resolvers.shift()!({
value: undefined as unknown as T,
done: true,
});
}
}
next(): Promise<IteratorResult<T>> {
const buffered = this.buf.shift();
if (buffered !== undefined) {
return Promise.resolve({ value: buffered, done: false });
}
if (this.closed) {
return Promise.resolve({
value: undefined as unknown as T,
done: true,
});
}
return new Promise((resolve) => this.resolvers.push(resolve));
}
}

View file

@ -508,4 +508,89 @@ describe('createHttpAcpBridge', () => {
);
});
});
describe('subscribeEvents', () => {
it('throws SessionNotFoundError for unknown session ids', () => {
const bridge = createHttpAcpBridge({
channelFactory: async () => {
throw new Error('factory should not be called');
},
});
expect(() => bridge.subscribeEvents('unknown')).toThrow(
SessionNotFoundError,
);
});
it('publishes session_update events to subscribers when the agent sends them', async () => {
let capturedConn: AgentSideConnection | undefined;
const factory: ChannelFactory = async () => {
// Build a channel pair where we capture the agent-side connection
// so we can drive sessionUpdate notifications from the test.
const ab = new TransformStream<Uint8Array, Uint8Array>();
const ba = new TransformStream<Uint8Array, Uint8Array>();
const clientStream = ndJsonStream(ab.writable, ba.readable);
const agentStream = ndJsonStream(ba.writable, ab.readable);
const fakeAgent = new FakeAgent();
capturedConn = new AgentSideConnection(() => fakeAgent, agentStream);
return {
stream: clientStream,
kill: async () => {},
};
};
const bridge = createHttpAcpBridge({ channelFactory: factory });
const session = await bridge.spawnOrAttach({ workspaceCwd: '/work/a' });
const abort = new AbortController();
const iter = bridge.subscribeEvents(session.sessionId, {
signal: abort.signal,
});
// Send a sessionUpdate from the agent side (fire-and-forget).
void capturedConn!.sessionUpdate({
sessionId: session.sessionId,
update: {
sessionUpdate: 'agent_message_chunk',
content: { type: 'text', text: 'hi' },
},
});
const collected: Array<{ id: number; type: string; data: unknown }> = [];
for await (const e of iter) {
collected.push({ id: e.id, type: e.type, data: e.data });
if (collected.length === 1) break;
}
expect(collected[0]?.type).toBe('session_update');
expect(collected[0]?.id).toBe(1);
abort.abort();
await bridge.shutdown();
});
it('shutdown closes live event subscriptions', async () => {
const factory: ChannelFactory = async () => makeChannel().channel;
const bridge = createHttpAcpBridge({ channelFactory: factory });
const session = await bridge.spawnOrAttach({ workspaceCwd: '/work/a' });
const abort = new AbortController();
const iter = bridge.subscribeEvents(session.sessionId, {
signal: abort.signal,
});
const drain = (async () => {
const events: unknown[] = [];
for await (const e of iter) {
events.push(e);
}
return events;
})();
// Give the subscriber a tick to register.
await new Promise((r) => setTimeout(r, 10));
await bridge.shutdown();
// Subscriber must unwind to completion (no events ever published).
const events = await drain;
expect(events).toEqual([]);
});
});
});

View file

@ -13,6 +13,11 @@ import {
PROTOCOL_VERSION,
ndJsonStream,
} from '@agentclientprotocol/sdk';
import {
EventBus,
type BridgeEvent,
type SubscribeOptions,
} from './eventBus.js';
import type {
CancelNotification,
Client,
@ -82,6 +87,17 @@ export interface HttpAcpBridge {
*/
cancelSession(sessionId: string, req?: CancelNotification): Promise<void>;
/**
* Subscribe to the session's event stream. Returns an AsyncIterable that
* yields published events; supports `Last-Event-ID` reconnect through
* `opts.lastEventId`. Throws `SessionNotFoundError` when the id is
* unknown.
*/
subscribeEvents(
sessionId: string,
opts?: SubscribeOptions,
): AsyncIterable<BridgeEvent>;
/** Test/inspection hook: number of live sessions. */
readonly sessionCount: number;
@ -132,8 +148,8 @@ interface SessionEntry {
workspaceCwd: string;
channel: AcpChannel;
connection: ClientSideConnection;
/** Stage 1 buffer; consumed by SSE wiring in the next PR. */
notifications: SessionNotification[];
/** Per-session event bus drives `GET /session/:id/events`. */
events: EventBus;
/**
* Tail of the per-session prompt queue. Each new prompt chains off the
* resolved (or rejected) state of this promise so prompts run one at a
@ -150,10 +166,10 @@ interface SessionEntry {
*
* Stage 1 behavior:
* - `requestPermission` denies by default. The HTTP `/permission/:requestId`
* route in the next PR will let any attached client cast the deciding
* route in a follow-up will let any attached client cast the deciding
* vote (first-responder wins).
* - `sessionUpdate` notifications are buffered on the session entry; the
* next PR drains the buffer through SSE.
* - `sessionUpdate` notifications publish onto the session's EventBus; SSE
* subscribers (`GET /session/:id/events`) drain it.
* - File reads/writes proxy to local fs (daemon and agent share the host).
*/
class BridgeClient implements Client {
@ -167,7 +183,8 @@ class BridgeClient implements Client {
async sessionUpdate(params: SessionNotification): Promise<void> {
const entry = this.resolveEntry();
if (entry) entry.notifications.push(params);
if (!entry) return;
entry.events.publish({ type: 'session_update', data: params });
}
async writeTextFile(
@ -257,7 +274,7 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
workspaceCwd: workspaceKey,
channel,
connection,
notifications: [],
events: new EventBus(),
promptQueue: Promise.resolve(),
};
byWorkspace.set(workspaceKey, entry);
@ -305,10 +322,17 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
await entry.connection.cancel(notif);
},
subscribeEvents(sessionId, subOpts) {
const entry = byId.get(sessionId);
if (!entry) throw new SessionNotFoundError(sessionId);
return entry.events.subscribe(subOpts);
},
async shutdown() {
const entries = Array.from(byId.values());
byWorkspace.clear();
byId.clear();
for (const e of entries) e.events.close();
await Promise.all(entries.map((e) => e.channel.kill().catch(() => {})));
},
};

View file

@ -20,6 +20,7 @@ export {
export {
createHttpAcpBridge,
defaultSpawnChannelFactory,
SessionNotFoundError,
type AcpChannel,
type BridgeOptions,
type BridgeSession,
@ -27,3 +28,9 @@ export {
type ChannelFactory,
type HttpAcpBridge,
} from './httpAcpBridge.js';
export {
EventBus,
EVENT_SCHEMA_VERSION,
type BridgeEvent,
type SubscribeOptions,
} from './eventBus.js';

View file

@ -19,6 +19,7 @@ import {
type BridgeSpawnRequest,
type HttpAcpBridge,
} from './httpAcpBridge.js';
import type { BridgeEvent, SubscribeOptions } from './eventBus.js';
import {
CAPABILITIES_SCHEMA_VERSION,
STAGE1_FEATURES,
@ -38,6 +39,10 @@ interface FakeBridgeOpts {
req: PromptRequest,
) => Promise<PromptResponse>;
cancelImpl?: (sessionId: string, req?: CancelNotification) => Promise<void>;
subscribeImpl?: (
sessionId: string,
opts?: SubscribeOptions,
) => AsyncIterable<BridgeEvent>;
}
interface FakeBridge extends HttpAcpBridge {
@ -85,6 +90,13 @@ function fakeBridge(opts: FakeBridgeOpts = {}): FakeBridge {
cancelCalls.push({ sessionId, req });
return cancelImpl(sessionId, req);
},
subscribeEvents(sessionId, subOpts) {
if (opts.subscribeImpl) return opts.subscribeImpl(sessionId, subOpts);
// Default: empty stream
return (async function* () {
// empty
})();
},
async shutdown() {
shutdownCalls += 1;
},
@ -398,6 +410,164 @@ describe('runQwenServe', () => {
});
});
describe('GET /session/:id/events (SSE)', () => {
let handle: RunHandle | undefined;
afterEach(async () => {
if (handle) {
await handle.close();
handle = undefined;
}
});
async function readSseFrames(
body: ReadableStream<Uint8Array>,
minFrames: number,
): Promise<Array<{ id?: string; event?: string; data?: string }>> {
const reader = body.getReader();
const decoder = new TextDecoder();
let buf = '';
const frames: Array<{ id?: string; event?: string; data?: string }> = [];
while (frames.length < minFrames) {
const { value, done } = await reader.read();
if (done) break;
buf += decoder.decode(value, { stream: true });
let idx: number;
while ((idx = buf.indexOf('\n\n')) !== -1) {
const raw = buf.slice(0, idx);
buf = buf.slice(idx + 2);
if (!raw || raw.startsWith(':') || raw.startsWith('retry:')) continue;
const frame: { id?: string; event?: string; data?: string } = {};
for (const line of raw.split('\n')) {
if (line.startsWith('id: ')) frame.id = line.slice(4);
else if (line.startsWith('event: ')) frame.event = line.slice(7);
else if (line.startsWith('data: ')) frame.data = line.slice(6);
}
frames.push(frame);
}
}
await reader.cancel();
return frames;
}
it('streams events from the bridge as SSE frames', async () => {
const bridge = fakeBridge({
async *subscribeImpl(_sessionId, _opts) {
yield {
id: 1,
v: 1,
type: 'session_update',
data: { foo: 'bar' },
};
yield { id: 2, v: 1, type: 'session_update', data: { foo: 'baz' } };
// No more events; the stream stays open until the caller aborts.
await new Promise(() => {});
},
});
handle = await runQwenServe(
{ hostname: '127.0.0.1', port: 0, mode: 'http-bridge' },
{ bridge },
);
const port = (handle.server.address() as { port: number }).port;
const res = await fetch(`http://127.0.0.1:${port}/session/sess-A/events`);
expect(res.status).toBe(200);
expect(res.headers.get('content-type')).toContain('text/event-stream');
const frames = await readSseFrames(res.body!, 2);
expect(frames).toHaveLength(2);
expect(frames[0]?.id).toBe('1');
expect(frames[0]?.event).toBe('session_update');
expect(JSON.parse(frames[0]!.data!)).toEqual({
id: 1,
v: 1,
type: 'session_update',
data: { foo: 'bar' },
});
expect(frames[1]?.id).toBe('2');
});
it('forwards Last-Event-ID to the bridge', async () => {
const seen: number[] = [];
const bridge = fakeBridge({
async *subscribeImpl(_sessionId, opts) {
seen.push(opts?.lastEventId ?? -1);
yield { id: 42, v: 1, type: 'session_update', data: 'replay' };
await new Promise(() => {});
},
});
handle = await runQwenServe(
{ hostname: '127.0.0.1', port: 0, mode: 'http-bridge' },
{ bridge },
);
const port = (handle.server.address() as { port: number }).port;
const res = await fetch(`http://127.0.0.1:${port}/session/sess-A/events`, {
headers: { 'Last-Event-ID': '17' },
});
const frames = await readSseFrames(res.body!, 1);
expect(seen).toEqual([17]);
expect(frames[0]?.id).toBe('42');
});
it('returns 404 when the bridge reports unknown session', async () => {
const bridge = fakeBridge({
subscribeImpl: (sessionId) => {
throw new SessionNotFoundError(sessionId);
},
});
handle = await runQwenServe(
{ hostname: '127.0.0.1', port: 0, mode: 'http-bridge' },
{ bridge },
);
const port = (handle.server.address() as { port: number }).port;
const res = await fetch(`http://127.0.0.1:${port}/session/missing/events`);
expect(res.status).toBe(404);
const body = await res.json();
expect(body.sessionId).toBe('missing');
});
it('aborts the bridge subscription when the client disconnects', async () => {
const aborted = { value: false };
const bridge = fakeBridge({
async *subscribeImpl(_sessionId, opts) {
opts?.signal?.addEventListener(
'abort',
() => {
aborted.value = true;
},
{ once: true },
);
yield { id: 1, v: 1, type: 'session_update', data: 'first' };
await new Promise<void>((resolve) => {
opts?.signal?.addEventListener('abort', () => resolve(), {
once: true,
});
});
},
});
handle = await runQwenServe(
{ hostname: '127.0.0.1', port: 0, mode: 'http-bridge' },
{ bridge },
);
const port = (handle.server.address() as { port: number }).port;
const res = await fetch(`http://127.0.0.1:${port}/session/sess-A/events`);
const frames = await readSseFrames(res.body!, 1);
expect(frames).toHaveLength(1);
// readSseFrames calls reader.cancel() once the requested frame count is
// reached, which severs the underlying connection — the daemon's
// `req.on('close')` handler then aborts the bridge subscription.
// Wait briefly for the close handler to propagate to the bridge.
await new Promise((r) => setTimeout(r, 100));
expect(aborted.value).toBe(true);
});
});
describe('runQwenServe SIGINT handler', () => {
it('does not register signal handlers until the listener is up', () => {
// Sanity: we register `once` so we don't leak across test runs.

View file

@ -13,6 +13,7 @@ import {
SessionNotFoundError,
type HttpAcpBridge,
} from './httpAcpBridge.js';
import type { BridgeEvent } from './eventBus.js';
import {
CAPABILITIES_SCHEMA_VERSION,
STAGE1_FEATURES,
@ -101,11 +102,9 @@ export function createServeApp(
: {};
const prompt = body['prompt'];
if (!Array.isArray(prompt)) {
res
.status(400)
.json({
error: '`prompt` is required and must be an array of content blocks',
});
res.status(400).json({
error: '`prompt` is required and must be an array of content blocks',
});
return;
}
try {
@ -137,9 +136,91 @@ export function createServeApp(
}
});
app.get('/session/:id/events', (req, res) => {
const sessionId = req.params['id'];
const lastEventId = parseLastEventId(req.headers['last-event-id']);
let iter: AsyncIterator<BridgeEvent> | undefined;
const abort = new AbortController();
try {
const iterable = bridge.subscribeEvents(sessionId, {
signal: abort.signal,
lastEventId,
});
iter = iterable[Symbol.asyncIterator]();
} catch (err) {
sendBridgeError(res, err);
return;
}
res.status(200);
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache, no-transform');
res.setHeader('Connection', 'keep-alive');
// Disable proxy buffering (nginx); event-stream content type alone
// doesn't always reach the client through every proxy.
res.setHeader('X-Accel-Buffering', 'no');
res.flushHeaders?.();
// Tell EventSource to retry after 3s on disconnect.
res.write('retry: 3000\n\n');
// Heartbeat keeps NAT/proxy connections alive and lets the server
// notice a dead client through write-back-pressure. Comment frame is
// ignored by EventSource.
const heartbeatTimer = setInterval(() => {
if (!res.writableEnded) res.write(': heartbeat\n\n');
}, 15_000);
heartbeatTimer.unref?.();
const cleanup = () => {
clearInterval(heartbeatTimer);
abort.abort();
};
req.on('close', cleanup);
void (async () => {
try {
while (true) {
const next = await iter!.next();
if (next.done) break;
if (res.writableEnded) break;
res.write(formatSseFrame(next.value));
}
} catch (err) {
if (!res.writableEnded) {
res.write(
formatSseFrame({
id: 0,
v: 1,
type: 'stream_error',
data: { error: err instanceof Error ? err.message : String(err) },
}),
);
}
} finally {
cleanup();
if (!res.writableEnded) res.end();
}
})();
});
return app;
}
function parseLastEventId(raw: unknown): number | undefined {
if (typeof raw !== 'string' || !raw) return undefined;
const n = Number.parseInt(raw, 10);
if (!Number.isFinite(n) || n < 0) return undefined;
return n;
}
function formatSseFrame(event: BridgeEvent): string {
// SSE format: id (optional), event (optional), data, blank line.
// Splitting `data` on newlines lets browsers reassemble multi-line JSON.
const dataJson = JSON.stringify(event);
return `id: ${event.id}\nevent: ${event.type}\ndata: ${dataJson}\n\n`;
}
function sendBridgeError(res: import('express').Response, err: unknown): void {
if (err instanceof SessionNotFoundError) {
res.status(404).json({ error: err.message, sessionId: err.sessionId });

View file

@ -50,4 +50,8 @@ export const CAPABILITIES_SCHEMA_VERSION = 1 as const;
export const STAGE1_FEATURES: readonly string[] = [
'health',
'capabilities',
'session_create',
'session_prompt',
'session_cancel',
'session_events',
] as const;