mirror of
https://github.com/QwenLM/qwen-code.git
synced 2026-05-17 03:57:18 +00:00
feat(cli): wire SSE streaming for qwen serve events (#3803)
Stage 1 follow-up that turns prompt into a real streaming experience.
Replaces the in-memory `notifications: SessionNotification[]` buffer
on each session with a per-session EventBus and exposes it through
`GET /session/:id/events` as an `text/event-stream` SSE feed.
EventBus (`packages/cli/src/serve/eventBus.ts`):
- Monotonic per-session ids (`v: 1` schema). Each `publish` chains an
id, returning the materialized BridgeEvent.
- Bounded ring (default 1000) backs `Last-Event-ID` reconnect — a
consumer that drops can resume from `lastEventId` and replay any
still-buffered events before live events flow.
- Per-subscriber bounded queue (default 256). When a slow consumer
overruns its queue, the bus appends a synthetic `client_evicted`
terminal frame and closes that subscription so it can't hold the
daemon hostage. Other subscribers are unaffected.
- `subscribe()` returns an AsyncIterable — registration is synchronous
so events `publish`ed immediately after the subscribe land in the
queue (a generator-style implementation deferred registration to
first `next()` and raced with publishes).
- AbortSignal-aware: aborting the signal closes the iterator promptly.
Bridge (`httpAcpBridge.ts`):
- `BridgeClient.sessionUpdate` now publishes onto the session's
EventBus instead of pushing to a plain array — every ACP
notification the agent emits becomes a stream event automatically.
- New `subscribeEvents(sessionId, opts?)` returns the bus's
AsyncIterable; throws `SessionNotFoundError` for unknown ids.
- Shutdown closes every live event bus before killing channels so
pending consumers unwind cleanly.
Route (`server.ts`):
- `GET /session/:id/events` sets the SSE content type, advertises a
3s reconnect hint, and writes a 15s heartbeat comment frame to
keep proxy/NAT connections alive.
- Forwards the `Last-Event-ID` header to the bus.
- `req.on('close')` triggers an AbortController that propagates into
the bridge subscription so disconnects don't leak subscribers.
- 404 when the bridge can't find the session.
Capabilities envelope: `STAGE1_FEATURES` now advertises
`session_create`, `session_prompt`, `session_cancel`, `session_events`
in addition to `health`/`capabilities` so clients can light up UI for
the routes that have actually shipped.
Tests (16 new, 0 regressions in the 4995 baseline):
- 9 EventBus unit cases — id sequencing, live delivery, replay,
replay+live splice, fan-out to N subscribers, eviction on
overflow, abort-signal unsubscribe, bus.close() drains
subscribers, ring-size eviction.
- 4 bridge subscribe cases — 404, sessionUpdate→event publishing
via real ACP fake-agent, shutdown closes live subscriptions.
- 4 SSE route cases against a live HTTP listener — frame format,
Last-Event-ID forwarding, 404, abort propagation on disconnect.
Verified end-to-end against a real `qwen --acp` child:
- Subscribed to `/session/$SID/events`, fired `POST /session/$SID/prompt`
with text content. Captured 13 distinct `event: session_update`
SSE frames in real time during the model's response — `available_
commands_update` metadata, 9 `agent_thought_chunk` frames carrying
the model's chain-of-thought, 3 `agent_message_chunk` frames with
the actual reply, and a final usage frame with token totals.
- Frames carry monotonic ids 1..13, the daemon-side counter, and
are valid SSE per the EventSource spec.
This commit is contained in:
parent
ca996ecb54
commit
41aa950947
8 changed files with 802 additions and 12 deletions
172
packages/cli/src/serve/eventBus.test.ts
Normal file
172
packages/cli/src/serve/eventBus.test.ts
Normal file
|
|
@ -0,0 +1,172 @@
|
|||
/**
|
||||
* @license
|
||||
* Copyright 2025 Qwen Team
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import { describe, it, expect } from 'vitest';
|
||||
import {
|
||||
EventBus,
|
||||
EVENT_SCHEMA_VERSION,
|
||||
type BridgeEvent,
|
||||
} from './eventBus.js';
|
||||
|
||||
async function collect(
|
||||
iter: AsyncIterable<BridgeEvent>,
|
||||
count: number,
|
||||
): Promise<BridgeEvent[]> {
|
||||
const out: BridgeEvent[] = [];
|
||||
for await (const e of iter) {
|
||||
out.push(e);
|
||||
if (out.length >= count) break;
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
describe('EventBus', () => {
|
||||
it('assigns monotonic ids and the right schema version', () => {
|
||||
const bus = new EventBus();
|
||||
const a = bus.publish({ type: 'foo', data: 1 });
|
||||
const b = bus.publish({ type: 'foo', data: 2 });
|
||||
expect(a.id).toBe(1);
|
||||
expect(b.id).toBe(2);
|
||||
expect(a.v).toBe(EVENT_SCHEMA_VERSION);
|
||||
expect(bus.lastEventId).toBe(2);
|
||||
});
|
||||
|
||||
it('delivers live publishes to a subscriber', async () => {
|
||||
const bus = new EventBus();
|
||||
const abort = new AbortController();
|
||||
const iter = bus.subscribe({ signal: abort.signal });
|
||||
|
||||
// Need to start consuming before publishing so the subscriber is
|
||||
// registered in the loop below.
|
||||
setTimeout(() => {
|
||||
bus.publish({ type: 'foo', data: 'a' });
|
||||
bus.publish({ type: 'foo', data: 'b' });
|
||||
}, 5);
|
||||
|
||||
const events = await collect(iter, 2);
|
||||
expect(events.map((e) => e.data)).toEqual(['a', 'b']);
|
||||
abort.abort();
|
||||
});
|
||||
|
||||
it('replays events newer than lastEventId from the ring', async () => {
|
||||
const bus = new EventBus();
|
||||
bus.publish({ type: 'foo', data: 'a' });
|
||||
bus.publish({ type: 'foo', data: 'b' });
|
||||
bus.publish({ type: 'foo', data: 'c' });
|
||||
|
||||
const abort = new AbortController();
|
||||
const iter = bus.subscribe({ lastEventId: 1, signal: abort.signal });
|
||||
const events = await collect(iter, 2);
|
||||
expect(events.map((e) => e.id)).toEqual([2, 3]);
|
||||
expect(events.map((e) => e.data)).toEqual(['b', 'c']);
|
||||
abort.abort();
|
||||
});
|
||||
|
||||
it('replay + live: new events follow the replay tail', async () => {
|
||||
const bus = new EventBus();
|
||||
bus.publish({ type: 'foo', data: 'a' });
|
||||
bus.publish({ type: 'foo', data: 'b' });
|
||||
|
||||
const abort = new AbortController();
|
||||
const iter = bus.subscribe({ lastEventId: 0, signal: abort.signal });
|
||||
|
||||
setTimeout(() => bus.publish({ type: 'foo', data: 'c' }), 5);
|
||||
|
||||
const events = await collect(iter, 3);
|
||||
expect(events.map((e) => e.data)).toEqual(['a', 'b', 'c']);
|
||||
abort.abort();
|
||||
});
|
||||
|
||||
it('fan-outs to multiple subscribers in parallel', async () => {
|
||||
const bus = new EventBus();
|
||||
const aborts = [new AbortController(), new AbortController()];
|
||||
const it1 = bus.subscribe({ signal: aborts[0].signal });
|
||||
const it2 = bus.subscribe({ signal: aborts[1].signal });
|
||||
|
||||
setTimeout(() => {
|
||||
bus.publish({ type: 'foo', data: 1 });
|
||||
bus.publish({ type: 'foo', data: 2 });
|
||||
}, 5);
|
||||
|
||||
const [a, b] = await Promise.all([collect(it1, 2), collect(it2, 2)]);
|
||||
expect(a.map((e) => e.data)).toEqual([1, 2]);
|
||||
expect(b.map((e) => e.data)).toEqual([1, 2]);
|
||||
aborts.forEach((c) => c.abort());
|
||||
});
|
||||
|
||||
it('evicts a slow subscriber when its queue overflows', async () => {
|
||||
const bus = new EventBus();
|
||||
const abort = new AbortController();
|
||||
const iter = bus.subscribe({ maxQueued: 2, signal: abort.signal });
|
||||
|
||||
// Publish 3 events without draining the iterator. Queue cap is 2; the
|
||||
// 3rd should trip the eviction path and append a `client_evicted`
|
||||
// terminal frame.
|
||||
bus.publish({ type: 'foo', data: 1 });
|
||||
bus.publish({ type: 'foo', data: 2 });
|
||||
bus.publish({ type: 'foo', data: 3 });
|
||||
|
||||
const collected: BridgeEvent[] = [];
|
||||
for await (const e of iter) {
|
||||
collected.push(e);
|
||||
}
|
||||
expect(collected).toHaveLength(3);
|
||||
expect(collected[0]?.data).toBe(1);
|
||||
expect(collected[1]?.data).toBe(2);
|
||||
expect(collected[2]?.type).toBe('client_evicted');
|
||||
expect(bus.subscriberCount).toBe(0);
|
||||
abort.abort();
|
||||
});
|
||||
|
||||
it('unsubscribes when the abort signal fires', async () => {
|
||||
const bus = new EventBus();
|
||||
const abort = new AbortController();
|
||||
const iter = bus.subscribe({ signal: abort.signal });
|
||||
|
||||
setTimeout(() => abort.abort(), 5);
|
||||
|
||||
const events: BridgeEvent[] = [];
|
||||
for await (const e of iter) {
|
||||
events.push(e);
|
||||
}
|
||||
expect(events).toEqual([]);
|
||||
expect(bus.subscriberCount).toBe(0);
|
||||
});
|
||||
|
||||
it('closes all subscribers on bus.close()', async () => {
|
||||
const bus = new EventBus();
|
||||
const abort = new AbortController();
|
||||
const iter = bus.subscribe({ signal: abort.signal });
|
||||
|
||||
setTimeout(() => bus.close(), 5);
|
||||
|
||||
const events: BridgeEvent[] = [];
|
||||
for await (const e of iter) {
|
||||
events.push(e);
|
||||
}
|
||||
expect(events).toEqual([]);
|
||||
expect(bus.subscriberCount).toBe(0);
|
||||
});
|
||||
|
||||
it('drops the oldest events from the ring beyond ringSize', () => {
|
||||
const bus = new EventBus(3);
|
||||
for (let i = 1; i <= 5; i++) bus.publish({ type: 'foo', data: i });
|
||||
// Internal: only the last 3 should be replayable.
|
||||
// Subscribe with lastEventId=0 — only ids 3, 4, 5 should be queued.
|
||||
const abort = new AbortController();
|
||||
const iter = bus.subscribe({ lastEventId: 0, signal: abort.signal });
|
||||
|
||||
void (async () => {
|
||||
const out: BridgeEvent[] = [];
|
||||
for await (const e of iter) {
|
||||
out.push(e);
|
||||
if (out.length === 3) break;
|
||||
}
|
||||
expect(out.map((e) => e.id)).toEqual([3, 4, 5]);
|
||||
abort.abort();
|
||||
})();
|
||||
});
|
||||
});
|
||||
247
packages/cli/src/serve/eventBus.ts
Normal file
247
packages/cli/src/serve/eventBus.ts
Normal file
|
|
@ -0,0 +1,247 @@
|
|||
/**
|
||||
* @license
|
||||
* Copyright 2025 Qwen Team
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
/**
|
||||
* Event-bus for the daemon's per-session NDJSON stream.
|
||||
*
|
||||
* Design notes (from issue #3803 §04 / threat-model):
|
||||
* - Each event carries a monotonic `id` (per session) so the SSE
|
||||
* `Last-Event-ID` reconnect protocol can pick up where the client left
|
||||
* off. Backed by a bounded ring of recent events for replay.
|
||||
* - Subscribers use bounded async queues. A slow subscriber that blows
|
||||
* past its queue limit is sent a final `client_evicted` event and
|
||||
* closed; this keeps a stuck client from holding the daemon hostage
|
||||
* (per the resource-exhaustion entry in the threat-model summary).
|
||||
* - The bus is push-based; consumers iterate the returned AsyncIterable.
|
||||
* Aborting the supplied AbortSignal closes the iterator promptly.
|
||||
*/
|
||||
|
||||
export const EVENT_SCHEMA_VERSION = 1 as const;
|
||||
|
||||
/** A single frame published on the bus. */
|
||||
export interface BridgeEvent {
|
||||
/** Monotonic per-session id, starting at 1. */
|
||||
id: number;
|
||||
/** Schema version; bumped on breaking frame changes. */
|
||||
v: typeof EVENT_SCHEMA_VERSION;
|
||||
/** Frame type: `session_update`, `client_evicted`, or daemon-pushed events. */
|
||||
type: string;
|
||||
/** Frame payload — opaque JSON. */
|
||||
data: unknown;
|
||||
/**
|
||||
* Identifier of the client that triggered the event, when known. Used by
|
||||
* fan-out consumers to suppress echoes of their own actions.
|
||||
*/
|
||||
originatorClientId?: string;
|
||||
}
|
||||
|
||||
export interface SubscribeOptions {
|
||||
/**
|
||||
* Resume from after this event id. Events with `id <= lastEventId` are
|
||||
* skipped (already delivered); newer events still buffered in the ring
|
||||
* are replayed before live events flow.
|
||||
*/
|
||||
lastEventId?: number;
|
||||
/** Aborts the subscription cleanly. */
|
||||
signal?: AbortSignal;
|
||||
/**
|
||||
* Per-subscriber backlog cap. When exceeded the subscriber is evicted
|
||||
* with a final `client_evicted` event. Defaults to 256.
|
||||
*/
|
||||
maxQueued?: number;
|
||||
}
|
||||
|
||||
const DEFAULT_MAX_QUEUED = 256;
|
||||
const DEFAULT_RING_SIZE = 1000;
|
||||
|
||||
interface InternalSub {
|
||||
queue: BoundedAsyncQueue<BridgeEvent>;
|
||||
evicted: boolean;
|
||||
}
|
||||
|
||||
export class EventBus {
|
||||
private nextId = 1;
|
||||
private readonly ring: BridgeEvent[] = [];
|
||||
private readonly subs = new Set<InternalSub>();
|
||||
private closed = false;
|
||||
|
||||
constructor(private readonly ringSize: number = DEFAULT_RING_SIZE) {}
|
||||
|
||||
/** Most recent id ever assigned by `publish`. 0 if no events published. */
|
||||
get lastEventId(): number {
|
||||
return this.nextId - 1;
|
||||
}
|
||||
|
||||
/** Snapshot of the live subscriber count. */
|
||||
get subscriberCount(): number {
|
||||
return this.subs.size;
|
||||
}
|
||||
|
||||
publish(input: Omit<BridgeEvent, 'id' | 'v'>): BridgeEvent {
|
||||
if (this.closed) {
|
||||
throw new Error('EventBus is closed; cannot publish');
|
||||
}
|
||||
const event: BridgeEvent = {
|
||||
id: this.nextId++,
|
||||
v: EVENT_SCHEMA_VERSION,
|
||||
...input,
|
||||
};
|
||||
this.ring.push(event);
|
||||
if (this.ring.length > this.ringSize) this.ring.shift();
|
||||
for (const sub of this.subs) {
|
||||
if (sub.evicted) continue;
|
||||
if (!sub.queue.push(event)) {
|
||||
sub.evicted = true;
|
||||
const evictionFrame: BridgeEvent = {
|
||||
id: this.nextId++,
|
||||
v: EVENT_SCHEMA_VERSION,
|
||||
type: 'client_evicted',
|
||||
data: { reason: 'queue_overflow', droppedAfter: event.id },
|
||||
};
|
||||
// Force-push the eviction frame; close immediately after so the
|
||||
// consumer iterator unwinds with a final synthetic event.
|
||||
sub.queue.forcePush(evictionFrame);
|
||||
sub.queue.close();
|
||||
}
|
||||
}
|
||||
return event;
|
||||
}
|
||||
|
||||
/**
|
||||
* Note: registration is synchronous — by the time `subscribe()` returns,
|
||||
* the subscriber is already attached and will receive any subsequent
|
||||
* `publish()` even if the consumer hasn't started iterating yet. (A
|
||||
* generator-style implementation would defer registration to the first
|
||||
* `next()` call, which races with publishes that happen before the
|
||||
* consumer's first await.)
|
||||
*/
|
||||
subscribe(opts: SubscribeOptions = {}): AsyncIterable<BridgeEvent> {
|
||||
if (this.closed) {
|
||||
return emptyAsyncIterable<BridgeEvent>();
|
||||
}
|
||||
const queue = new BoundedAsyncQueue<BridgeEvent>(
|
||||
opts.maxQueued ?? DEFAULT_MAX_QUEUED,
|
||||
);
|
||||
|
||||
if (opts.lastEventId !== undefined) {
|
||||
for (const e of this.ring) {
|
||||
if (e.id > opts.lastEventId) queue.push(e);
|
||||
}
|
||||
}
|
||||
|
||||
const sub: InternalSub = { queue, evicted: false };
|
||||
this.subs.add(sub);
|
||||
|
||||
const onAbort = () => queue.close();
|
||||
if (opts.signal) {
|
||||
if (opts.signal.aborted) {
|
||||
queue.close();
|
||||
} else {
|
||||
opts.signal.addEventListener('abort', onAbort, { once: true });
|
||||
}
|
||||
}
|
||||
|
||||
const dispose = () => {
|
||||
this.subs.delete(sub);
|
||||
opts.signal?.removeEventListener('abort', onAbort);
|
||||
};
|
||||
|
||||
return {
|
||||
[Symbol.asyncIterator]: (): AsyncIterator<BridgeEvent> => ({
|
||||
async next(): Promise<IteratorResult<BridgeEvent>> {
|
||||
const r = await queue.next();
|
||||
if (r.done) dispose();
|
||||
return r;
|
||||
},
|
||||
async return(): Promise<IteratorResult<BridgeEvent>> {
|
||||
queue.close();
|
||||
dispose();
|
||||
return { value: undefined as unknown as BridgeEvent, done: true };
|
||||
},
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
/** Close all live subscribers and prevent further `publish`/`subscribe`. */
|
||||
close(): void {
|
||||
if (this.closed) return;
|
||||
this.closed = true;
|
||||
for (const sub of this.subs) sub.queue.close();
|
||||
this.subs.clear();
|
||||
}
|
||||
}
|
||||
|
||||
function emptyAsyncIterable<T>(): AsyncIterable<T> {
|
||||
return {
|
||||
[Symbol.asyncIterator]: (): AsyncIterator<T> => ({
|
||||
async next(): Promise<IteratorResult<T>> {
|
||||
return { value: undefined as unknown as T, done: true };
|
||||
},
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Promise-based bounded queue. `push` returns false (instead of blocking or
|
||||
* throwing) when full so callers can decide how to react — the EventBus uses
|
||||
* that signal to evict slow subscribers.
|
||||
*/
|
||||
class BoundedAsyncQueue<T> {
|
||||
private readonly buf: T[] = [];
|
||||
private readonly resolvers: Array<(v: IteratorResult<T>) => void> = [];
|
||||
private closed = false;
|
||||
|
||||
constructor(private readonly maxSize: number) {}
|
||||
|
||||
/** Returns true if accepted, false if dropped due to overflow. */
|
||||
push(value: T): boolean {
|
||||
if (this.closed) return false;
|
||||
const r = this.resolvers.shift();
|
||||
if (r) {
|
||||
r({ value, done: false });
|
||||
return true;
|
||||
}
|
||||
if (this.buf.length >= this.maxSize) return false;
|
||||
this.buf.push(value);
|
||||
return true;
|
||||
}
|
||||
|
||||
/** Bypasses the size cap. Used for terminal eviction frames. */
|
||||
forcePush(value: T): void {
|
||||
if (this.closed) return;
|
||||
const r = this.resolvers.shift();
|
||||
if (r) {
|
||||
r({ value, done: false });
|
||||
return;
|
||||
}
|
||||
this.buf.push(value);
|
||||
}
|
||||
|
||||
close(): void {
|
||||
if (this.closed) return;
|
||||
this.closed = true;
|
||||
while (this.resolvers.length > 0) {
|
||||
this.resolvers.shift()!({
|
||||
value: undefined as unknown as T,
|
||||
done: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
next(): Promise<IteratorResult<T>> {
|
||||
const buffered = this.buf.shift();
|
||||
if (buffered !== undefined) {
|
||||
return Promise.resolve({ value: buffered, done: false });
|
||||
}
|
||||
if (this.closed) {
|
||||
return Promise.resolve({
|
||||
value: undefined as unknown as T,
|
||||
done: true,
|
||||
});
|
||||
}
|
||||
return new Promise((resolve) => this.resolvers.push(resolve));
|
||||
}
|
||||
}
|
||||
|
|
@ -508,4 +508,89 @@ describe('createHttpAcpBridge', () => {
|
|||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('subscribeEvents', () => {
|
||||
it('throws SessionNotFoundError for unknown session ids', () => {
|
||||
const bridge = createHttpAcpBridge({
|
||||
channelFactory: async () => {
|
||||
throw new Error('factory should not be called');
|
||||
},
|
||||
});
|
||||
expect(() => bridge.subscribeEvents('unknown')).toThrow(
|
||||
SessionNotFoundError,
|
||||
);
|
||||
});
|
||||
|
||||
it('publishes session_update events to subscribers when the agent sends them', async () => {
|
||||
let capturedConn: AgentSideConnection | undefined;
|
||||
const factory: ChannelFactory = async () => {
|
||||
// Build a channel pair where we capture the agent-side connection
|
||||
// so we can drive sessionUpdate notifications from the test.
|
||||
const ab = new TransformStream<Uint8Array, Uint8Array>();
|
||||
const ba = new TransformStream<Uint8Array, Uint8Array>();
|
||||
const clientStream = ndJsonStream(ab.writable, ba.readable);
|
||||
const agentStream = ndJsonStream(ba.writable, ab.readable);
|
||||
const fakeAgent = new FakeAgent();
|
||||
capturedConn = new AgentSideConnection(() => fakeAgent, agentStream);
|
||||
return {
|
||||
stream: clientStream,
|
||||
kill: async () => {},
|
||||
};
|
||||
};
|
||||
const bridge = createHttpAcpBridge({ channelFactory: factory });
|
||||
const session = await bridge.spawnOrAttach({ workspaceCwd: '/work/a' });
|
||||
|
||||
const abort = new AbortController();
|
||||
const iter = bridge.subscribeEvents(session.sessionId, {
|
||||
signal: abort.signal,
|
||||
});
|
||||
|
||||
// Send a sessionUpdate from the agent side (fire-and-forget).
|
||||
void capturedConn!.sessionUpdate({
|
||||
sessionId: session.sessionId,
|
||||
update: {
|
||||
sessionUpdate: 'agent_message_chunk',
|
||||
content: { type: 'text', text: 'hi' },
|
||||
},
|
||||
});
|
||||
|
||||
const collected: Array<{ id: number; type: string; data: unknown }> = [];
|
||||
for await (const e of iter) {
|
||||
collected.push({ id: e.id, type: e.type, data: e.data });
|
||||
if (collected.length === 1) break;
|
||||
}
|
||||
expect(collected[0]?.type).toBe('session_update');
|
||||
expect(collected[0]?.id).toBe(1);
|
||||
|
||||
abort.abort();
|
||||
await bridge.shutdown();
|
||||
});
|
||||
|
||||
it('shutdown closes live event subscriptions', async () => {
|
||||
const factory: ChannelFactory = async () => makeChannel().channel;
|
||||
const bridge = createHttpAcpBridge({ channelFactory: factory });
|
||||
const session = await bridge.spawnOrAttach({ workspaceCwd: '/work/a' });
|
||||
|
||||
const abort = new AbortController();
|
||||
const iter = bridge.subscribeEvents(session.sessionId, {
|
||||
signal: abort.signal,
|
||||
});
|
||||
|
||||
const drain = (async () => {
|
||||
const events: unknown[] = [];
|
||||
for await (const e of iter) {
|
||||
events.push(e);
|
||||
}
|
||||
return events;
|
||||
})();
|
||||
|
||||
// Give the subscriber a tick to register.
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
await bridge.shutdown();
|
||||
|
||||
// Subscriber must unwind to completion (no events ever published).
|
||||
const events = await drain;
|
||||
expect(events).toEqual([]);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -13,6 +13,11 @@ import {
|
|||
PROTOCOL_VERSION,
|
||||
ndJsonStream,
|
||||
} from '@agentclientprotocol/sdk';
|
||||
import {
|
||||
EventBus,
|
||||
type BridgeEvent,
|
||||
type SubscribeOptions,
|
||||
} from './eventBus.js';
|
||||
import type {
|
||||
CancelNotification,
|
||||
Client,
|
||||
|
|
@ -82,6 +87,17 @@ export interface HttpAcpBridge {
|
|||
*/
|
||||
cancelSession(sessionId: string, req?: CancelNotification): Promise<void>;
|
||||
|
||||
/**
|
||||
* Subscribe to the session's event stream. Returns an AsyncIterable that
|
||||
* yields published events; supports `Last-Event-ID` reconnect through
|
||||
* `opts.lastEventId`. Throws `SessionNotFoundError` when the id is
|
||||
* unknown.
|
||||
*/
|
||||
subscribeEvents(
|
||||
sessionId: string,
|
||||
opts?: SubscribeOptions,
|
||||
): AsyncIterable<BridgeEvent>;
|
||||
|
||||
/** Test/inspection hook: number of live sessions. */
|
||||
readonly sessionCount: number;
|
||||
|
||||
|
|
@ -132,8 +148,8 @@ interface SessionEntry {
|
|||
workspaceCwd: string;
|
||||
channel: AcpChannel;
|
||||
connection: ClientSideConnection;
|
||||
/** Stage 1 buffer; consumed by SSE wiring in the next PR. */
|
||||
notifications: SessionNotification[];
|
||||
/** Per-session event bus drives `GET /session/:id/events`. */
|
||||
events: EventBus;
|
||||
/**
|
||||
* Tail of the per-session prompt queue. Each new prompt chains off the
|
||||
* resolved (or rejected) state of this promise so prompts run one at a
|
||||
|
|
@ -150,10 +166,10 @@ interface SessionEntry {
|
|||
*
|
||||
* Stage 1 behavior:
|
||||
* - `requestPermission` denies by default. The HTTP `/permission/:requestId`
|
||||
* route in the next PR will let any attached client cast the deciding
|
||||
* route in a follow-up will let any attached client cast the deciding
|
||||
* vote (first-responder wins).
|
||||
* - `sessionUpdate` notifications are buffered on the session entry; the
|
||||
* next PR drains the buffer through SSE.
|
||||
* - `sessionUpdate` notifications publish onto the session's EventBus; SSE
|
||||
* subscribers (`GET /session/:id/events`) drain it.
|
||||
* - File reads/writes proxy to local fs (daemon and agent share the host).
|
||||
*/
|
||||
class BridgeClient implements Client {
|
||||
|
|
@ -167,7 +183,8 @@ class BridgeClient implements Client {
|
|||
|
||||
async sessionUpdate(params: SessionNotification): Promise<void> {
|
||||
const entry = this.resolveEntry();
|
||||
if (entry) entry.notifications.push(params);
|
||||
if (!entry) return;
|
||||
entry.events.publish({ type: 'session_update', data: params });
|
||||
}
|
||||
|
||||
async writeTextFile(
|
||||
|
|
@ -257,7 +274,7 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
|
|||
workspaceCwd: workspaceKey,
|
||||
channel,
|
||||
connection,
|
||||
notifications: [],
|
||||
events: new EventBus(),
|
||||
promptQueue: Promise.resolve(),
|
||||
};
|
||||
byWorkspace.set(workspaceKey, entry);
|
||||
|
|
@ -305,10 +322,17 @@ export function createHttpAcpBridge(opts: BridgeOptions = {}): HttpAcpBridge {
|
|||
await entry.connection.cancel(notif);
|
||||
},
|
||||
|
||||
subscribeEvents(sessionId, subOpts) {
|
||||
const entry = byId.get(sessionId);
|
||||
if (!entry) throw new SessionNotFoundError(sessionId);
|
||||
return entry.events.subscribe(subOpts);
|
||||
},
|
||||
|
||||
async shutdown() {
|
||||
const entries = Array.from(byId.values());
|
||||
byWorkspace.clear();
|
||||
byId.clear();
|
||||
for (const e of entries) e.events.close();
|
||||
await Promise.all(entries.map((e) => e.channel.kill().catch(() => {})));
|
||||
},
|
||||
};
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ export {
|
|||
export {
|
||||
createHttpAcpBridge,
|
||||
defaultSpawnChannelFactory,
|
||||
SessionNotFoundError,
|
||||
type AcpChannel,
|
||||
type BridgeOptions,
|
||||
type BridgeSession,
|
||||
|
|
@ -27,3 +28,9 @@ export {
|
|||
type ChannelFactory,
|
||||
type HttpAcpBridge,
|
||||
} from './httpAcpBridge.js';
|
||||
export {
|
||||
EventBus,
|
||||
EVENT_SCHEMA_VERSION,
|
||||
type BridgeEvent,
|
||||
type SubscribeOptions,
|
||||
} from './eventBus.js';
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import {
|
|||
type BridgeSpawnRequest,
|
||||
type HttpAcpBridge,
|
||||
} from './httpAcpBridge.js';
|
||||
import type { BridgeEvent, SubscribeOptions } from './eventBus.js';
|
||||
import {
|
||||
CAPABILITIES_SCHEMA_VERSION,
|
||||
STAGE1_FEATURES,
|
||||
|
|
@ -38,6 +39,10 @@ interface FakeBridgeOpts {
|
|||
req: PromptRequest,
|
||||
) => Promise<PromptResponse>;
|
||||
cancelImpl?: (sessionId: string, req?: CancelNotification) => Promise<void>;
|
||||
subscribeImpl?: (
|
||||
sessionId: string,
|
||||
opts?: SubscribeOptions,
|
||||
) => AsyncIterable<BridgeEvent>;
|
||||
}
|
||||
|
||||
interface FakeBridge extends HttpAcpBridge {
|
||||
|
|
@ -85,6 +90,13 @@ function fakeBridge(opts: FakeBridgeOpts = {}): FakeBridge {
|
|||
cancelCalls.push({ sessionId, req });
|
||||
return cancelImpl(sessionId, req);
|
||||
},
|
||||
subscribeEvents(sessionId, subOpts) {
|
||||
if (opts.subscribeImpl) return opts.subscribeImpl(sessionId, subOpts);
|
||||
// Default: empty stream
|
||||
return (async function* () {
|
||||
// empty
|
||||
})();
|
||||
},
|
||||
async shutdown() {
|
||||
shutdownCalls += 1;
|
||||
},
|
||||
|
|
@ -398,6 +410,164 @@ describe('runQwenServe', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('GET /session/:id/events (SSE)', () => {
|
||||
let handle: RunHandle | undefined;
|
||||
|
||||
afterEach(async () => {
|
||||
if (handle) {
|
||||
await handle.close();
|
||||
handle = undefined;
|
||||
}
|
||||
});
|
||||
|
||||
async function readSseFrames(
|
||||
body: ReadableStream<Uint8Array>,
|
||||
minFrames: number,
|
||||
): Promise<Array<{ id?: string; event?: string; data?: string }>> {
|
||||
const reader = body.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
let buf = '';
|
||||
const frames: Array<{ id?: string; event?: string; data?: string }> = [];
|
||||
while (frames.length < minFrames) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
buf += decoder.decode(value, { stream: true });
|
||||
let idx: number;
|
||||
while ((idx = buf.indexOf('\n\n')) !== -1) {
|
||||
const raw = buf.slice(0, idx);
|
||||
buf = buf.slice(idx + 2);
|
||||
if (!raw || raw.startsWith(':') || raw.startsWith('retry:')) continue;
|
||||
const frame: { id?: string; event?: string; data?: string } = {};
|
||||
for (const line of raw.split('\n')) {
|
||||
if (line.startsWith('id: ')) frame.id = line.slice(4);
|
||||
else if (line.startsWith('event: ')) frame.event = line.slice(7);
|
||||
else if (line.startsWith('data: ')) frame.data = line.slice(6);
|
||||
}
|
||||
frames.push(frame);
|
||||
}
|
||||
}
|
||||
await reader.cancel();
|
||||
return frames;
|
||||
}
|
||||
|
||||
it('streams events from the bridge as SSE frames', async () => {
|
||||
const bridge = fakeBridge({
|
||||
async *subscribeImpl(_sessionId, _opts) {
|
||||
yield {
|
||||
id: 1,
|
||||
v: 1,
|
||||
type: 'session_update',
|
||||
data: { foo: 'bar' },
|
||||
};
|
||||
yield { id: 2, v: 1, type: 'session_update', data: { foo: 'baz' } };
|
||||
// No more events; the stream stays open until the caller aborts.
|
||||
await new Promise(() => {});
|
||||
},
|
||||
});
|
||||
handle = await runQwenServe(
|
||||
{ hostname: '127.0.0.1', port: 0, mode: 'http-bridge' },
|
||||
{ bridge },
|
||||
);
|
||||
const port = (handle.server.address() as { port: number }).port;
|
||||
|
||||
const res = await fetch(`http://127.0.0.1:${port}/session/sess-A/events`);
|
||||
expect(res.status).toBe(200);
|
||||
expect(res.headers.get('content-type')).toContain('text/event-stream');
|
||||
|
||||
const frames = await readSseFrames(res.body!, 2);
|
||||
|
||||
expect(frames).toHaveLength(2);
|
||||
expect(frames[0]?.id).toBe('1');
|
||||
expect(frames[0]?.event).toBe('session_update');
|
||||
expect(JSON.parse(frames[0]!.data!)).toEqual({
|
||||
id: 1,
|
||||
v: 1,
|
||||
type: 'session_update',
|
||||
data: { foo: 'bar' },
|
||||
});
|
||||
expect(frames[1]?.id).toBe('2');
|
||||
});
|
||||
|
||||
it('forwards Last-Event-ID to the bridge', async () => {
|
||||
const seen: number[] = [];
|
||||
const bridge = fakeBridge({
|
||||
async *subscribeImpl(_sessionId, opts) {
|
||||
seen.push(opts?.lastEventId ?? -1);
|
||||
yield { id: 42, v: 1, type: 'session_update', data: 'replay' };
|
||||
await new Promise(() => {});
|
||||
},
|
||||
});
|
||||
handle = await runQwenServe(
|
||||
{ hostname: '127.0.0.1', port: 0, mode: 'http-bridge' },
|
||||
{ bridge },
|
||||
);
|
||||
const port = (handle.server.address() as { port: number }).port;
|
||||
|
||||
const res = await fetch(`http://127.0.0.1:${port}/session/sess-A/events`, {
|
||||
headers: { 'Last-Event-ID': '17' },
|
||||
});
|
||||
const frames = await readSseFrames(res.body!, 1);
|
||||
|
||||
expect(seen).toEqual([17]);
|
||||
expect(frames[0]?.id).toBe('42');
|
||||
});
|
||||
|
||||
it('returns 404 when the bridge reports unknown session', async () => {
|
||||
const bridge = fakeBridge({
|
||||
subscribeImpl: (sessionId) => {
|
||||
throw new SessionNotFoundError(sessionId);
|
||||
},
|
||||
});
|
||||
handle = await runQwenServe(
|
||||
{ hostname: '127.0.0.1', port: 0, mode: 'http-bridge' },
|
||||
{ bridge },
|
||||
);
|
||||
const port = (handle.server.address() as { port: number }).port;
|
||||
|
||||
const res = await fetch(`http://127.0.0.1:${port}/session/missing/events`);
|
||||
expect(res.status).toBe(404);
|
||||
const body = await res.json();
|
||||
expect(body.sessionId).toBe('missing');
|
||||
});
|
||||
|
||||
it('aborts the bridge subscription when the client disconnects', async () => {
|
||||
const aborted = { value: false };
|
||||
const bridge = fakeBridge({
|
||||
async *subscribeImpl(_sessionId, opts) {
|
||||
opts?.signal?.addEventListener(
|
||||
'abort',
|
||||
() => {
|
||||
aborted.value = true;
|
||||
},
|
||||
{ once: true },
|
||||
);
|
||||
yield { id: 1, v: 1, type: 'session_update', data: 'first' };
|
||||
await new Promise<void>((resolve) => {
|
||||
opts?.signal?.addEventListener('abort', () => resolve(), {
|
||||
once: true,
|
||||
});
|
||||
});
|
||||
},
|
||||
});
|
||||
handle = await runQwenServe(
|
||||
{ hostname: '127.0.0.1', port: 0, mode: 'http-bridge' },
|
||||
{ bridge },
|
||||
);
|
||||
const port = (handle.server.address() as { port: number }).port;
|
||||
|
||||
const res = await fetch(`http://127.0.0.1:${port}/session/sess-A/events`);
|
||||
const frames = await readSseFrames(res.body!, 1);
|
||||
expect(frames).toHaveLength(1);
|
||||
// readSseFrames calls reader.cancel() once the requested frame count is
|
||||
// reached, which severs the underlying connection — the daemon's
|
||||
// `req.on('close')` handler then aborts the bridge subscription.
|
||||
|
||||
// Wait briefly for the close handler to propagate to the bridge.
|
||||
await new Promise((r) => setTimeout(r, 100));
|
||||
expect(aborted.value).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe('runQwenServe SIGINT handler', () => {
|
||||
it('does not register signal handlers until the listener is up', () => {
|
||||
// Sanity: we register `once` so we don't leak across test runs.
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import {
|
|||
SessionNotFoundError,
|
||||
type HttpAcpBridge,
|
||||
} from './httpAcpBridge.js';
|
||||
import type { BridgeEvent } from './eventBus.js';
|
||||
import {
|
||||
CAPABILITIES_SCHEMA_VERSION,
|
||||
STAGE1_FEATURES,
|
||||
|
|
@ -101,11 +102,9 @@ export function createServeApp(
|
|||
: {};
|
||||
const prompt = body['prompt'];
|
||||
if (!Array.isArray(prompt)) {
|
||||
res
|
||||
.status(400)
|
||||
.json({
|
||||
error: '`prompt` is required and must be an array of content blocks',
|
||||
});
|
||||
res.status(400).json({
|
||||
error: '`prompt` is required and must be an array of content blocks',
|
||||
});
|
||||
return;
|
||||
}
|
||||
try {
|
||||
|
|
@ -137,9 +136,91 @@ export function createServeApp(
|
|||
}
|
||||
});
|
||||
|
||||
app.get('/session/:id/events', (req, res) => {
|
||||
const sessionId = req.params['id'];
|
||||
const lastEventId = parseLastEventId(req.headers['last-event-id']);
|
||||
|
||||
let iter: AsyncIterator<BridgeEvent> | undefined;
|
||||
const abort = new AbortController();
|
||||
try {
|
||||
const iterable = bridge.subscribeEvents(sessionId, {
|
||||
signal: abort.signal,
|
||||
lastEventId,
|
||||
});
|
||||
iter = iterable[Symbol.asyncIterator]();
|
||||
} catch (err) {
|
||||
sendBridgeError(res, err);
|
||||
return;
|
||||
}
|
||||
|
||||
res.status(200);
|
||||
res.setHeader('Content-Type', 'text/event-stream');
|
||||
res.setHeader('Cache-Control', 'no-cache, no-transform');
|
||||
res.setHeader('Connection', 'keep-alive');
|
||||
// Disable proxy buffering (nginx); event-stream content type alone
|
||||
// doesn't always reach the client through every proxy.
|
||||
res.setHeader('X-Accel-Buffering', 'no');
|
||||
res.flushHeaders?.();
|
||||
// Tell EventSource to retry after 3s on disconnect.
|
||||
res.write('retry: 3000\n\n');
|
||||
|
||||
// Heartbeat keeps NAT/proxy connections alive and lets the server
|
||||
// notice a dead client through write-back-pressure. Comment frame is
|
||||
// ignored by EventSource.
|
||||
const heartbeatTimer = setInterval(() => {
|
||||
if (!res.writableEnded) res.write(': heartbeat\n\n');
|
||||
}, 15_000);
|
||||
heartbeatTimer.unref?.();
|
||||
|
||||
const cleanup = () => {
|
||||
clearInterval(heartbeatTimer);
|
||||
abort.abort();
|
||||
};
|
||||
req.on('close', cleanup);
|
||||
|
||||
void (async () => {
|
||||
try {
|
||||
while (true) {
|
||||
const next = await iter!.next();
|
||||
if (next.done) break;
|
||||
if (res.writableEnded) break;
|
||||
res.write(formatSseFrame(next.value));
|
||||
}
|
||||
} catch (err) {
|
||||
if (!res.writableEnded) {
|
||||
res.write(
|
||||
formatSseFrame({
|
||||
id: 0,
|
||||
v: 1,
|
||||
type: 'stream_error',
|
||||
data: { error: err instanceof Error ? err.message : String(err) },
|
||||
}),
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
cleanup();
|
||||
if (!res.writableEnded) res.end();
|
||||
}
|
||||
})();
|
||||
});
|
||||
|
||||
return app;
|
||||
}
|
||||
|
||||
function parseLastEventId(raw: unknown): number | undefined {
|
||||
if (typeof raw !== 'string' || !raw) return undefined;
|
||||
const n = Number.parseInt(raw, 10);
|
||||
if (!Number.isFinite(n) || n < 0) return undefined;
|
||||
return n;
|
||||
}
|
||||
|
||||
function formatSseFrame(event: BridgeEvent): string {
|
||||
// SSE format: id (optional), event (optional), data, blank line.
|
||||
// Splitting `data` on newlines lets browsers reassemble multi-line JSON.
|
||||
const dataJson = JSON.stringify(event);
|
||||
return `id: ${event.id}\nevent: ${event.type}\ndata: ${dataJson}\n\n`;
|
||||
}
|
||||
|
||||
function sendBridgeError(res: import('express').Response, err: unknown): void {
|
||||
if (err instanceof SessionNotFoundError) {
|
||||
res.status(404).json({ error: err.message, sessionId: err.sessionId });
|
||||
|
|
|
|||
|
|
@ -50,4 +50,8 @@ export const CAPABILITIES_SCHEMA_VERSION = 1 as const;
|
|||
export const STAGE1_FEATURES: readonly string[] = [
|
||||
'health',
|
||||
'capabilities',
|
||||
'session_create',
|
||||
'session_prompt',
|
||||
'session_cancel',
|
||||
'session_events',
|
||||
] as const;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue