diff --git a/packages/cli/src/serve/eventBus.test.ts b/packages/cli/src/serve/eventBus.test.ts new file mode 100644 index 000000000..68a738928 --- /dev/null +++ b/packages/cli/src/serve/eventBus.test.ts @@ -0,0 +1,172 @@ +/** + * @license + * Copyright 2025 Qwen Team + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it, expect } from 'vitest'; +import { + EventBus, + EVENT_SCHEMA_VERSION, + type BridgeEvent, +} from './eventBus.js'; + +async function collect( + iter: AsyncIterable, + count: number, +): Promise { + const out: BridgeEvent[] = []; + for await (const e of iter) { + out.push(e); + if (out.length >= count) break; + } + return out; +} + +describe('EventBus', () => { + it('assigns monotonic ids and the right schema version', () => { + const bus = new EventBus(); + const a = bus.publish({ type: 'foo', data: 1 }); + const b = bus.publish({ type: 'foo', data: 2 }); + expect(a.id).toBe(1); + expect(b.id).toBe(2); + expect(a.v).toBe(EVENT_SCHEMA_VERSION); + expect(bus.lastEventId).toBe(2); + }); + + it('delivers live publishes to a subscriber', async () => { + const bus = new EventBus(); + const abort = new AbortController(); + const iter = bus.subscribe({ signal: abort.signal }); + + // Need to start consuming before publishing so the subscriber is + // registered in the loop below. + setTimeout(() => { + bus.publish({ type: 'foo', data: 'a' }); + bus.publish({ type: 'foo', data: 'b' }); + }, 5); + + const events = await collect(iter, 2); + expect(events.map((e) => e.data)).toEqual(['a', 'b']); + abort.abort(); + }); + + it('replays events newer than lastEventId from the ring', async () => { + const bus = new EventBus(); + bus.publish({ type: 'foo', data: 'a' }); + bus.publish({ type: 'foo', data: 'b' }); + bus.publish({ type: 'foo', data: 'c' }); + + const abort = new AbortController(); + const iter = bus.subscribe({ lastEventId: 1, signal: abort.signal }); + const events = await collect(iter, 2); + expect(events.map((e) => e.id)).toEqual([2, 3]); + expect(events.map((e) => e.data)).toEqual(['b', 'c']); + abort.abort(); + }); + + it('replay + live: new events follow the replay tail', async () => { + const bus = new EventBus(); + bus.publish({ type: 'foo', data: 'a' }); + bus.publish({ type: 'foo', data: 'b' }); + + const abort = new AbortController(); + const iter = bus.subscribe({ lastEventId: 0, signal: abort.signal }); + + setTimeout(() => bus.publish({ type: 'foo', data: 'c' }), 5); + + const events = await collect(iter, 3); + expect(events.map((e) => e.data)).toEqual(['a', 'b', 'c']); + abort.abort(); + }); + + it('fan-outs to multiple subscribers in parallel', async () => { + const bus = new EventBus(); + const aborts = [new AbortController(), new AbortController()]; + const it1 = bus.subscribe({ signal: aborts[0].signal }); + const it2 = bus.subscribe({ signal: aborts[1].signal }); + + setTimeout(() => { + bus.publish({ type: 'foo', data: 1 }); + bus.publish({ type: 'foo', data: 2 }); + }, 5); + + const [a, b] = await Promise.all([collect(it1, 2), collect(it2, 2)]); + expect(a.map((e) => e.data)).toEqual([1, 2]); + expect(b.map((e) => e.data)).toEqual([1, 2]); + aborts.forEach((c) => c.abort()); + }); + + it('evicts a slow subscriber when its queue overflows', async () => { + const bus = new EventBus(); + const abort = new AbortController(); + const iter = bus.subscribe({ maxQueued: 2, signal: abort.signal }); + + // Publish 3 events without draining the iterator. Queue cap is 2; the + // 3rd should trip the eviction path and append a `client_evicted` + // terminal frame. + bus.publish({ type: 'foo', data: 1 }); + bus.publish({ type: 'foo', data: 2 }); + bus.publish({ type: 'foo', data: 3 }); + + const collected: BridgeEvent[] = []; + for await (const e of iter) { + collected.push(e); + } + expect(collected).toHaveLength(3); + expect(collected[0]?.data).toBe(1); + expect(collected[1]?.data).toBe(2); + expect(collected[2]?.type).toBe('client_evicted'); + expect(bus.subscriberCount).toBe(0); + abort.abort(); + }); + + it('unsubscribes when the abort signal fires', async () => { + const bus = new EventBus(); + const abort = new AbortController(); + const iter = bus.subscribe({ signal: abort.signal }); + + setTimeout(() => abort.abort(), 5); + + const events: BridgeEvent[] = []; + for await (const e of iter) { + events.push(e); + } + expect(events).toEqual([]); + expect(bus.subscriberCount).toBe(0); + }); + + it('closes all subscribers on bus.close()', async () => { + const bus = new EventBus(); + const abort = new AbortController(); + const iter = bus.subscribe({ signal: abort.signal }); + + setTimeout(() => bus.close(), 5); + + const events: BridgeEvent[] = []; + for await (const e of iter) { + events.push(e); + } + expect(events).toEqual([]); + expect(bus.subscriberCount).toBe(0); + }); + + it('drops the oldest events from the ring beyond ringSize', () => { + const bus = new EventBus(3); + for (let i = 1; i <= 5; i++) bus.publish({ type: 'foo', data: i }); + // Internal: only the last 3 should be replayable. + // Subscribe with lastEventId=0 — only ids 3, 4, 5 should be queued. + const abort = new AbortController(); + const iter = bus.subscribe({ lastEventId: 0, signal: abort.signal }); + + void (async () => { + const out: BridgeEvent[] = []; + for await (const e of iter) { + out.push(e); + if (out.length === 3) break; + } + expect(out.map((e) => e.id)).toEqual([3, 4, 5]); + abort.abort(); + })(); + }); +}); diff --git a/packages/cli/src/serve/eventBus.ts b/packages/cli/src/serve/eventBus.ts new file mode 100644 index 000000000..e6798d733 --- /dev/null +++ b/packages/cli/src/serve/eventBus.ts @@ -0,0 +1,247 @@ +/** + * @license + * Copyright 2025 Qwen Team + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * Event-bus for the daemon's per-session NDJSON stream. + * + * Design notes (from issue #3803 §04 / threat-model): + * - Each event carries a monotonic `id` (per session) so the SSE + * `Last-Event-ID` reconnect protocol can pick up where the client left + * off. Backed by a bounded ring of recent events for replay. + * - Subscribers use bounded async queues. A slow subscriber that blows + * past its queue limit is sent a final `client_evicted` event and + * closed; this keeps a stuck client from holding the daemon hostage + * (per the resource-exhaustion entry in the threat-model summary). + * - The bus is push-based; consumers iterate the returned AsyncIterable. + * Aborting the supplied AbortSignal closes the iterator promptly. + */ + +export const EVENT_SCHEMA_VERSION = 1 as const; + +/** A single frame published on the bus. */ +export interface BridgeEvent { + /** Monotonic per-session id, starting at 1. */ + id: number; + /** Schema version; bumped on breaking frame changes. */ + v: typeof EVENT_SCHEMA_VERSION; + /** Frame type: `session_update`, `client_evicted`, or daemon-pushed events. */ + type: string; + /** Frame payload — opaque JSON. */ + data: unknown; + /** + * Identifier of the client that triggered the event, when known. Used by + * fan-out consumers to suppress echoes of their own actions. + */ + originatorClientId?: string; +} + +export interface SubscribeOptions { + /** + * Resume from after this event id. Events with `id <= lastEventId` are + * skipped (already delivered); newer events still buffered in the ring + * are replayed before live events flow. + */ + lastEventId?: number; + /** Aborts the subscription cleanly. */ + signal?: AbortSignal; + /** + * Per-subscriber backlog cap. When exceeded the subscriber is evicted + * with a final `client_evicted` event. Defaults to 256. + */ + maxQueued?: number; +} + +const DEFAULT_MAX_QUEUED = 256; +const DEFAULT_RING_SIZE = 1000; + +interface InternalSub { + queue: BoundedAsyncQueue; + evicted: boolean; +} + +export class EventBus { + private nextId = 1; + private readonly ring: BridgeEvent[] = []; + private readonly subs = new Set(); + private closed = false; + + constructor(private readonly ringSize: number = DEFAULT_RING_SIZE) {} + + /** Most recent id ever assigned by `publish`. 0 if no events published. */ + get lastEventId(): number { + return this.nextId - 1; + } + + /** Snapshot of the live subscriber count. */ + get subscriberCount(): number { + return this.subs.size; + } + + publish(input: Omit): BridgeEvent { + if (this.closed) { + throw new Error('EventBus is closed; cannot publish'); + } + const event: BridgeEvent = { + id: this.nextId++, + v: EVENT_SCHEMA_VERSION, + ...input, + }; + this.ring.push(event); + if (this.ring.length > this.ringSize) this.ring.shift(); + for (const sub of this.subs) { + if (sub.evicted) continue; + if (!sub.queue.push(event)) { + sub.evicted = true; + const evictionFrame: BridgeEvent = { + id: this.nextId++, + v: EVENT_SCHEMA_VERSION, + type: 'client_evicted', + data: { reason: 'queue_overflow', droppedAfter: event.id }, + }; + // Force-push the eviction frame; close immediately after so the + // consumer iterator unwinds with a final synthetic event. + sub.queue.forcePush(evictionFrame); + sub.queue.close(); + } + } + return event; + } + + /** + * Note: registration is synchronous — by the time `subscribe()` returns, + * the subscriber is already attached and will receive any subsequent + * `publish()` even if the consumer hasn't started iterating yet. (A + * generator-style implementation would defer registration to the first + * `next()` call, which races with publishes that happen before the + * consumer's first await.) + */ + subscribe(opts: SubscribeOptions = {}): AsyncIterable { + if (this.closed) { + return emptyAsyncIterable(); + } + const queue = new BoundedAsyncQueue( + opts.maxQueued ?? DEFAULT_MAX_QUEUED, + ); + + if (opts.lastEventId !== undefined) { + for (const e of this.ring) { + if (e.id > opts.lastEventId) queue.push(e); + } + } + + const sub: InternalSub = { queue, evicted: false }; + this.subs.add(sub); + + const onAbort = () => queue.close(); + if (opts.signal) { + if (opts.signal.aborted) { + queue.close(); + } else { + opts.signal.addEventListener('abort', onAbort, { once: true }); + } + } + + const dispose = () => { + this.subs.delete(sub); + opts.signal?.removeEventListener('abort', onAbort); + }; + + return { + [Symbol.asyncIterator]: (): AsyncIterator => ({ + async next(): Promise> { + const r = await queue.next(); + if (r.done) dispose(); + return r; + }, + async return(): Promise> { + queue.close(); + dispose(); + return { value: undefined as unknown as BridgeEvent, done: true }; + }, + }), + }; + } + + /** Close all live subscribers and prevent further `publish`/`subscribe`. */ + close(): void { + if (this.closed) return; + this.closed = true; + for (const sub of this.subs) sub.queue.close(); + this.subs.clear(); + } +} + +function emptyAsyncIterable(): AsyncIterable { + return { + [Symbol.asyncIterator]: (): AsyncIterator => ({ + async next(): Promise> { + return { value: undefined as unknown as T, done: true }; + }, + }), + }; +} + +/** + * Promise-based bounded queue. `push` returns false (instead of blocking or + * throwing) when full so callers can decide how to react — the EventBus uses + * that signal to evict slow subscribers. + */ +class BoundedAsyncQueue { + private readonly buf: T[] = []; + private readonly resolvers: Array<(v: IteratorResult) => void> = []; + private closed = false; + + constructor(private readonly maxSize: number) {} + + /** Returns true if accepted, false if dropped due to overflow. */ + push(value: T): boolean { + if (this.closed) return false; + const r = this.resolvers.shift(); + if (r) { + r({ value, done: false }); + return true; + } + if (this.buf.length >= this.maxSize) return false; + this.buf.push(value); + return true; + } + + /** Bypasses the size cap. Used for terminal eviction frames. */ + forcePush(value: T): void { + if (this.closed) return; + const r = this.resolvers.shift(); + if (r) { + r({ value, done: false }); + return; + } + this.buf.push(value); + } + + close(): void { + if (this.closed) return; + this.closed = true; + while (this.resolvers.length > 0) { + this.resolvers.shift()!({ + value: undefined as unknown as T, + done: true, + }); + } + } + + next(): Promise> { + const buffered = this.buf.shift(); + if (buffered !== undefined) { + return Promise.resolve({ value: buffered, done: false }); + } + if (this.closed) { + return Promise.resolve({ + value: undefined as unknown as T, + done: true, + }); + } + return new Promise((resolve) => this.resolvers.push(resolve)); + } +} diff --git a/packages/cli/src/serve/httpAcpBridge.test.ts b/packages/cli/src/serve/httpAcpBridge.test.ts index a9b5c8569..889ee8bba 100644 --- a/packages/cli/src/serve/httpAcpBridge.test.ts +++ b/packages/cli/src/serve/httpAcpBridge.test.ts @@ -508,4 +508,89 @@ describe('createHttpAcpBridge', () => { ); }); }); + + describe('subscribeEvents', () => { + it('throws SessionNotFoundError for unknown session ids', () => { + const bridge = createHttpAcpBridge({ + channelFactory: async () => { + throw new Error('factory should not be called'); + }, + }); + expect(() => bridge.subscribeEvents('unknown')).toThrow( + SessionNotFoundError, + ); + }); + + it('publishes session_update events to subscribers when the agent sends them', async () => { + let capturedConn: AgentSideConnection | undefined; + const factory: ChannelFactory = async () => { + // Build a channel pair where we capture the agent-side connection + // so we can drive sessionUpdate notifications from the test. + const ab = new TransformStream(); + const ba = new TransformStream(); + const clientStream = ndJsonStream(ab.writable, ba.readable); + const agentStream = ndJsonStream(ba.writable, ab.readable); + const fakeAgent = new FakeAgent(); + capturedConn = new AgentSideConnection(() => fakeAgent, agentStream); + return { + stream: clientStream, + kill: async () => {}, + }; + }; + const bridge = createHttpAcpBridge({ channelFactory: factory }); + const session = await bridge.spawnOrAttach({ workspaceCwd: '/work/a' }); + + const abort = new AbortController(); + const iter = bridge.subscribeEvents(session.sessionId, { + signal: abort.signal, + }); + + // Send a sessionUpdate from the agent side (fire-and-forget). + void capturedConn!.sessionUpdate({ + sessionId: session.sessionId, + update: { + sessionUpdate: 'agent_message_chunk', + content: { type: 'text', text: 'hi' }, + }, + }); + + const collected: Array<{ id: number; type: string; data: unknown }> = []; + for await (const e of iter) { + collected.push({ id: e.id, type: e.type, data: e.data }); + if (collected.length === 1) break; + } + expect(collected[0]?.type).toBe('session_update'); + expect(collected[0]?.id).toBe(1); + + abort.abort(); + await bridge.shutdown(); + }); + + it('shutdown closes live event subscriptions', async () => { + const factory: ChannelFactory = async () => makeChannel().channel; + const bridge = createHttpAcpBridge({ channelFactory: factory }); + const session = await bridge.spawnOrAttach({ workspaceCwd: '/work/a' }); + + const abort = new AbortController(); + const iter = bridge.subscribeEvents(session.sessionId, { + signal: abort.signal, + }); + + const drain = (async () => { + const events: unknown[] = []; + for await (const e of iter) { + events.push(e); + } + return events; + })(); + + // Give the subscriber a tick to register. + await new Promise((r) => setTimeout(r, 10)); + await bridge.shutdown(); + + // Subscriber must unwind to completion (no events ever published). + const events = await drain; + expect(events).toEqual([]); + }); + }); }); diff --git a/packages/cli/src/serve/httpAcpBridge.ts b/packages/cli/src/serve/httpAcpBridge.ts index 415554c78..4aa51fdf1 100644 --- a/packages/cli/src/serve/httpAcpBridge.ts +++ b/packages/cli/src/serve/httpAcpBridge.ts @@ -13,6 +13,11 @@ import { PROTOCOL_VERSION, ndJsonStream, } from '@agentclientprotocol/sdk'; +import { + EventBus, + type BridgeEvent, + type SubscribeOptions, +} from './eventBus.js'; import type { CancelNotification, Client, @@ -82,6 +87,17 @@ export interface HttpAcpBridge { */ cancelSession(sessionId: string, req?: CancelNotification): Promise; + /** + * Subscribe to the session's event stream. Returns an AsyncIterable that + * yields published events; supports `Last-Event-ID` reconnect through + * `opts.lastEventId`. Throws `SessionNotFoundError` when the id is + * unknown. + */ + subscribeEvents( + sessionId: string, + opts?: SubscribeOptions, + ): AsyncIterable; + /** Test/inspection hook: number of live sessions. */ readonly sessionCount: number; @@ -132,8 +148,8 @@ interface SessionEntry { workspaceCwd: string; channel: AcpChannel; connection: ClientSideConnection; - /** Stage 1 buffer; consumed by SSE wiring in the next PR. */ - notifications: SessionNotification[]; + /** Per-session event bus drives `GET /session/:id/events`. */ + events: EventBus; /** * Tail of the per-session prompt queue. Each new prompt chains off the * resolved (or rejected) state of this promise so prompts run one at a @@ -150,10 +166,10 @@ interface SessionEntry { * * Stage 1 behavior: * - `requestPermission` denies by default. The HTTP `/permission/:requestId` - * route in the next PR will let any attached client cast the deciding + * route in a follow-up will let any attached client cast the deciding * vote (first-responder wins). - * - `sessionUpdate` notifications are buffered on the session entry; the - * next PR drains the buffer through SSE. + * - `sessionUpdate` notifications publish onto the session's EventBus; SSE + * subscribers (`GET /session/:id/events`) drain it. * - File reads/writes proxy to local fs (daemon and agent share the host). */ class BridgeClient implements Client { @@ -167,7 +183,8 @@ class BridgeClient implements Client { async sessionUpdate(params: SessionNotification): Promise { const entry = this.resolveEntry(); - if (entry) entry.notifications.push(params); + if (!entry) return; + entry.events.publish({ type: 'session_update', data: params }); } async writeTextFile( @@ -257,7 +274,7 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge { workspaceCwd: workspaceKey, channel, connection, - notifications: [], + events: new EventBus(), promptQueue: Promise.resolve(), }; byWorkspace.set(workspaceKey, entry); @@ -305,10 +322,17 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge { await entry.connection.cancel(notif); }, + subscribeEvents(sessionId, subOpts) { + const entry = byId.get(sessionId); + if (!entry) throw new SessionNotFoundError(sessionId); + return entry.events.subscribe(subOpts); + }, + async shutdown() { const entries = Array.from(byId.values()); byWorkspace.clear(); byId.clear(); + for (const e of entries) e.events.close(); await Promise.all(entries.map((e) => e.channel.kill().catch(() => {}))); }, }; diff --git a/packages/cli/src/serve/index.ts b/packages/cli/src/serve/index.ts index 18c414b0d..537c2ea3d 100644 --- a/packages/cli/src/serve/index.ts +++ b/packages/cli/src/serve/index.ts @@ -20,6 +20,7 @@ export { export { createHttpAcpBridge, defaultSpawnChannelFactory, + SessionNotFoundError, type AcpChannel, type BridgeOptions, type BridgeSession, @@ -27,3 +28,9 @@ export { type ChannelFactory, type HttpAcpBridge, } from './httpAcpBridge.js'; +export { + EventBus, + EVENT_SCHEMA_VERSION, + type BridgeEvent, + type SubscribeOptions, +} from './eventBus.js'; diff --git a/packages/cli/src/serve/server.test.ts b/packages/cli/src/serve/server.test.ts index f3d6fd710..4dfebd5f3 100644 --- a/packages/cli/src/serve/server.test.ts +++ b/packages/cli/src/serve/server.test.ts @@ -19,6 +19,7 @@ import { type BridgeSpawnRequest, type HttpAcpBridge, } from './httpAcpBridge.js'; +import type { BridgeEvent, SubscribeOptions } from './eventBus.js'; import { CAPABILITIES_SCHEMA_VERSION, STAGE1_FEATURES, @@ -38,6 +39,10 @@ interface FakeBridgeOpts { req: PromptRequest, ) => Promise; cancelImpl?: (sessionId: string, req?: CancelNotification) => Promise; + subscribeImpl?: ( + sessionId: string, + opts?: SubscribeOptions, + ) => AsyncIterable; } interface FakeBridge extends HttpAcpBridge { @@ -85,6 +90,13 @@ function fakeBridge(opts: FakeBridgeOpts = {}): FakeBridge { cancelCalls.push({ sessionId, req }); return cancelImpl(sessionId, req); }, + subscribeEvents(sessionId, subOpts) { + if (opts.subscribeImpl) return opts.subscribeImpl(sessionId, subOpts); + // Default: empty stream + return (async function* () { + // empty + })(); + }, async shutdown() { shutdownCalls += 1; }, @@ -398,6 +410,164 @@ describe('runQwenServe', () => { }); }); +describe('GET /session/:id/events (SSE)', () => { + let handle: RunHandle | undefined; + + afterEach(async () => { + if (handle) { + await handle.close(); + handle = undefined; + } + }); + + async function readSseFrames( + body: ReadableStream, + minFrames: number, + ): Promise> { + const reader = body.getReader(); + const decoder = new TextDecoder(); + let buf = ''; + const frames: Array<{ id?: string; event?: string; data?: string }> = []; + while (frames.length < minFrames) { + const { value, done } = await reader.read(); + if (done) break; + buf += decoder.decode(value, { stream: true }); + let idx: number; + while ((idx = buf.indexOf('\n\n')) !== -1) { + const raw = buf.slice(0, idx); + buf = buf.slice(idx + 2); + if (!raw || raw.startsWith(':') || raw.startsWith('retry:')) continue; + const frame: { id?: string; event?: string; data?: string } = {}; + for (const line of raw.split('\n')) { + if (line.startsWith('id: ')) frame.id = line.slice(4); + else if (line.startsWith('event: ')) frame.event = line.slice(7); + else if (line.startsWith('data: ')) frame.data = line.slice(6); + } + frames.push(frame); + } + } + await reader.cancel(); + return frames; + } + + it('streams events from the bridge as SSE frames', async () => { + const bridge = fakeBridge({ + async *subscribeImpl(_sessionId, _opts) { + yield { + id: 1, + v: 1, + type: 'session_update', + data: { foo: 'bar' }, + }; + yield { id: 2, v: 1, type: 'session_update', data: { foo: 'baz' } }; + // No more events; the stream stays open until the caller aborts. + await new Promise(() => {}); + }, + }); + handle = await runQwenServe( + { hostname: '127.0.0.1', port: 0, mode: 'http-bridge' }, + { bridge }, + ); + const port = (handle.server.address() as { port: number }).port; + + const res = await fetch(`http://127.0.0.1:${port}/session/sess-A/events`); + expect(res.status).toBe(200); + expect(res.headers.get('content-type')).toContain('text/event-stream'); + + const frames = await readSseFrames(res.body!, 2); + + expect(frames).toHaveLength(2); + expect(frames[0]?.id).toBe('1'); + expect(frames[0]?.event).toBe('session_update'); + expect(JSON.parse(frames[0]!.data!)).toEqual({ + id: 1, + v: 1, + type: 'session_update', + data: { foo: 'bar' }, + }); + expect(frames[1]?.id).toBe('2'); + }); + + it('forwards Last-Event-ID to the bridge', async () => { + const seen: number[] = []; + const bridge = fakeBridge({ + async *subscribeImpl(_sessionId, opts) { + seen.push(opts?.lastEventId ?? -1); + yield { id: 42, v: 1, type: 'session_update', data: 'replay' }; + await new Promise(() => {}); + }, + }); + handle = await runQwenServe( + { hostname: '127.0.0.1', port: 0, mode: 'http-bridge' }, + { bridge }, + ); + const port = (handle.server.address() as { port: number }).port; + + const res = await fetch(`http://127.0.0.1:${port}/session/sess-A/events`, { + headers: { 'Last-Event-ID': '17' }, + }); + const frames = await readSseFrames(res.body!, 1); + + expect(seen).toEqual([17]); + expect(frames[0]?.id).toBe('42'); + }); + + it('returns 404 when the bridge reports unknown session', async () => { + const bridge = fakeBridge({ + subscribeImpl: (sessionId) => { + throw new SessionNotFoundError(sessionId); + }, + }); + handle = await runQwenServe( + { hostname: '127.0.0.1', port: 0, mode: 'http-bridge' }, + { bridge }, + ); + const port = (handle.server.address() as { port: number }).port; + + const res = await fetch(`http://127.0.0.1:${port}/session/missing/events`); + expect(res.status).toBe(404); + const body = await res.json(); + expect(body.sessionId).toBe('missing'); + }); + + it('aborts the bridge subscription when the client disconnects', async () => { + const aborted = { value: false }; + const bridge = fakeBridge({ + async *subscribeImpl(_sessionId, opts) { + opts?.signal?.addEventListener( + 'abort', + () => { + aborted.value = true; + }, + { once: true }, + ); + yield { id: 1, v: 1, type: 'session_update', data: 'first' }; + await new Promise((resolve) => { + opts?.signal?.addEventListener('abort', () => resolve(), { + once: true, + }); + }); + }, + }); + handle = await runQwenServe( + { hostname: '127.0.0.1', port: 0, mode: 'http-bridge' }, + { bridge }, + ); + const port = (handle.server.address() as { port: number }).port; + + const res = await fetch(`http://127.0.0.1:${port}/session/sess-A/events`); + const frames = await readSseFrames(res.body!, 1); + expect(frames).toHaveLength(1); + // readSseFrames calls reader.cancel() once the requested frame count is + // reached, which severs the underlying connection — the daemon's + // `req.on('close')` handler then aborts the bridge subscription. + + // Wait briefly for the close handler to propagate to the bridge. + await new Promise((r) => setTimeout(r, 100)); + expect(aborted.value).toBe(true); + }); +}); + describe('runQwenServe SIGINT handler', () => { it('does not register signal handlers until the listener is up', () => { // Sanity: we register `once` so we don't leak across test runs. diff --git a/packages/cli/src/serve/server.ts b/packages/cli/src/serve/server.ts index 3663cc580..e78a9ccec 100644 --- a/packages/cli/src/serve/server.ts +++ b/packages/cli/src/serve/server.ts @@ -13,6 +13,7 @@ import { SessionNotFoundError, type HttpAcpBridge, } from './httpAcpBridge.js'; +import type { BridgeEvent } from './eventBus.js'; import { CAPABILITIES_SCHEMA_VERSION, STAGE1_FEATURES, @@ -101,11 +102,9 @@ export function createServeApp( : {}; const prompt = body['prompt']; if (!Array.isArray(prompt)) { - res - .status(400) - .json({ - error: '`prompt` is required and must be an array of content blocks', - }); + res.status(400).json({ + error: '`prompt` is required and must be an array of content blocks', + }); return; } try { @@ -137,9 +136,91 @@ export function createServeApp( } }); + app.get('/session/:id/events', (req, res) => { + const sessionId = req.params['id']; + const lastEventId = parseLastEventId(req.headers['last-event-id']); + + let iter: AsyncIterator | undefined; + const abort = new AbortController(); + try { + const iterable = bridge.subscribeEvents(sessionId, { + signal: abort.signal, + lastEventId, + }); + iter = iterable[Symbol.asyncIterator](); + } catch (err) { + sendBridgeError(res, err); + return; + } + + res.status(200); + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache, no-transform'); + res.setHeader('Connection', 'keep-alive'); + // Disable proxy buffering (nginx); event-stream content type alone + // doesn't always reach the client through every proxy. + res.setHeader('X-Accel-Buffering', 'no'); + res.flushHeaders?.(); + // Tell EventSource to retry after 3s on disconnect. + res.write('retry: 3000\n\n'); + + // Heartbeat keeps NAT/proxy connections alive and lets the server + // notice a dead client through write-back-pressure. Comment frame is + // ignored by EventSource. + const heartbeatTimer = setInterval(() => { + if (!res.writableEnded) res.write(': heartbeat\n\n'); + }, 15_000); + heartbeatTimer.unref?.(); + + const cleanup = () => { + clearInterval(heartbeatTimer); + abort.abort(); + }; + req.on('close', cleanup); + + void (async () => { + try { + while (true) { + const next = await iter!.next(); + if (next.done) break; + if (res.writableEnded) break; + res.write(formatSseFrame(next.value)); + } + } catch (err) { + if (!res.writableEnded) { + res.write( + formatSseFrame({ + id: 0, + v: 1, + type: 'stream_error', + data: { error: err instanceof Error ? err.message : String(err) }, + }), + ); + } + } finally { + cleanup(); + if (!res.writableEnded) res.end(); + } + })(); + }); + return app; } +function parseLastEventId(raw: unknown): number | undefined { + if (typeof raw !== 'string' || !raw) return undefined; + const n = Number.parseInt(raw, 10); + if (!Number.isFinite(n) || n < 0) return undefined; + return n; +} + +function formatSseFrame(event: BridgeEvent): string { + // SSE format: id (optional), event (optional), data, blank line. + // Splitting `data` on newlines lets browsers reassemble multi-line JSON. + const dataJson = JSON.stringify(event); + return `id: ${event.id}\nevent: ${event.type}\ndata: ${dataJson}\n\n`; +} + function sendBridgeError(res: import('express').Response, err: unknown): void { if (err instanceof SessionNotFoundError) { res.status(404).json({ error: err.message, sessionId: err.sessionId }); diff --git a/packages/cli/src/serve/types.ts b/packages/cli/src/serve/types.ts index 98eb510d2..a2fd4ec54 100644 --- a/packages/cli/src/serve/types.ts +++ b/packages/cli/src/serve/types.ts @@ -50,4 +50,8 @@ export const CAPABILITIES_SCHEMA_VERSION = 1 as const; export const STAGE1_FEATURES: readonly string[] = [ 'health', 'capabilities', + 'session_create', + 'session_prompt', + 'session_cancel', + 'session_events', ] as const;