mirror of
https://github.com/QwenLM/qwen-code.git
synced 2026-05-19 07:54:38 +00:00
Stage 1 follow-up that turns prompt into a real streaming experience.
Replaces the in-memory `notifications: SessionNotification[]` buffer
on each session with a per-session EventBus and exposes it through
`GET /session/:id/events` as an `text/event-stream` SSE feed.
EventBus (`packages/cli/src/serve/eventBus.ts`):
- Monotonic per-session ids (`v: 1` schema). Each `publish` chains an
id, returning the materialized BridgeEvent.
- Bounded ring (default 1000) backs `Last-Event-ID` reconnect — a
consumer that drops can resume from `lastEventId` and replay any
still-buffered events before live events flow.
- Per-subscriber bounded queue (default 256). When a slow consumer
overruns its queue, the bus appends a synthetic `client_evicted`
terminal frame and closes that subscription so it can't hold the
daemon hostage. Other subscribers are unaffected.
- `subscribe()` returns an AsyncIterable — registration is synchronous
so events `publish`ed immediately after the subscribe land in the
queue (a generator-style implementation deferred registration to
first `next()` and raced with publishes).
- AbortSignal-aware: aborting the signal closes the iterator promptly.
Bridge (`httpAcpBridge.ts`):
- `BridgeClient.sessionUpdate` now publishes onto the session's
EventBus instead of pushing to a plain array — every ACP
notification the agent emits becomes a stream event automatically.
- New `subscribeEvents(sessionId, opts?)` returns the bus's
AsyncIterable; throws `SessionNotFoundError` for unknown ids.
- Shutdown closes every live event bus before killing channels so
pending consumers unwind cleanly.
Route (`server.ts`):
- `GET /session/:id/events` sets the SSE content type, advertises a
3s reconnect hint, and writes a 15s heartbeat comment frame to
keep proxy/NAT connections alive.
- Forwards the `Last-Event-ID` header to the bus.
- `req.on('close')` triggers an AbortController that propagates into
the bridge subscription so disconnects don't leak subscribers.
- 404 when the bridge can't find the session.
Capabilities envelope: `STAGE1_FEATURES` now advertises
`session_create`, `session_prompt`, `session_cancel`, `session_events`
in addition to `health`/`capabilities` so clients can light up UI for
the routes that have actually shipped.
Tests (16 new, 0 regressions in the 4995 baseline):
- 9 EventBus unit cases — id sequencing, live delivery, replay,
replay+live splice, fan-out to N subscribers, eviction on
overflow, abort-signal unsubscribe, bus.close() drains
subscribers, ring-size eviction.
- 4 bridge subscribe cases — 404, sessionUpdate→event publishing
via real ACP fake-agent, shutdown closes live subscriptions.
- 4 SSE route cases against a live HTTP listener — frame format,
Last-Event-ID forwarding, 404, abort propagation on disconnect.
Verified end-to-end against a real `qwen --acp` child:
- Subscribed to `/session/$SID/events`, fired `POST /session/$SID/prompt`
with text content. Captured 13 distinct `event: session_update`
SSE frames in real time during the model's response — `available_
commands_update` metadata, 9 `agent_thought_chunk` frames carrying
the model's chain-of-thought, 3 `agent_message_chunk` frames with
the actual reply, and a final usage frame with token totals.
- Frames carry monotonic ids 1..13, the daemon-side counter, and
are valid SSE per the EventSource spec.
579 lines
19 KiB
TypeScript
579 lines
19 KiB
TypeScript
/**
|
|
* @license
|
|
* Copyright 2025 Qwen Team
|
|
* SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
import { describe, it, expect, afterEach, vi } from 'vitest';
|
|
import request from 'supertest';
|
|
import { createServeApp } from './server.js';
|
|
import { runQwenServe, type RunHandle } from './runQwenServe.js';
|
|
import type {
|
|
CancelNotification,
|
|
PromptRequest,
|
|
PromptResponse,
|
|
} from '@agentclientprotocol/sdk';
|
|
import {
|
|
SessionNotFoundError,
|
|
type BridgeSession,
|
|
type BridgeSpawnRequest,
|
|
type HttpAcpBridge,
|
|
} from './httpAcpBridge.js';
|
|
import type { BridgeEvent, SubscribeOptions } from './eventBus.js';
|
|
import {
|
|
CAPABILITIES_SCHEMA_VERSION,
|
|
STAGE1_FEATURES,
|
|
type ServeOptions,
|
|
} from './types.js';
|
|
|
|
const baseOpts: ServeOptions = {
|
|
hostname: '127.0.0.1',
|
|
port: 4170,
|
|
mode: 'http-bridge',
|
|
};
|
|
|
|
interface FakeBridgeOpts {
|
|
spawnImpl?: (req: BridgeSpawnRequest) => Promise<BridgeSession>;
|
|
promptImpl?: (
|
|
sessionId: string,
|
|
req: PromptRequest,
|
|
) => Promise<PromptResponse>;
|
|
cancelImpl?: (sessionId: string, req?: CancelNotification) => Promise<void>;
|
|
subscribeImpl?: (
|
|
sessionId: string,
|
|
opts?: SubscribeOptions,
|
|
) => AsyncIterable<BridgeEvent>;
|
|
}
|
|
|
|
interface FakeBridge extends HttpAcpBridge {
|
|
calls: BridgeSpawnRequest[];
|
|
promptCalls: Array<{ sessionId: string; req: PromptRequest }>;
|
|
cancelCalls: Array<{ sessionId: string; req?: CancelNotification }>;
|
|
shutdownCalls: number;
|
|
}
|
|
|
|
function fakeBridge(opts: FakeBridgeOpts = {}): FakeBridge {
|
|
const calls: BridgeSpawnRequest[] = [];
|
|
const promptCalls: FakeBridge['promptCalls'] = [];
|
|
const cancelCalls: FakeBridge['cancelCalls'] = [];
|
|
let shutdownCalls = 0;
|
|
const spawnImpl =
|
|
opts.spawnImpl ??
|
|
(async (req) => ({
|
|
sessionId: `fake-${calls.length}`,
|
|
workspaceCwd: req.workspaceCwd,
|
|
attached: false,
|
|
}));
|
|
const promptImpl =
|
|
opts.promptImpl ?? (async () => ({ stopReason: 'end_turn' }));
|
|
const cancelImpl = opts.cancelImpl ?? (async () => {});
|
|
return {
|
|
calls,
|
|
promptCalls,
|
|
cancelCalls,
|
|
get shutdownCalls() {
|
|
return shutdownCalls;
|
|
},
|
|
get sessionCount() {
|
|
return calls.length;
|
|
},
|
|
async spawnOrAttach(req) {
|
|
const result = await spawnImpl(req);
|
|
calls.push(req);
|
|
return result;
|
|
},
|
|
async sendPrompt(sessionId, req) {
|
|
promptCalls.push({ sessionId, req });
|
|
return promptImpl(sessionId, req);
|
|
},
|
|
async cancelSession(sessionId, req) {
|
|
cancelCalls.push({ sessionId, req });
|
|
return cancelImpl(sessionId, req);
|
|
},
|
|
subscribeEvents(sessionId, subOpts) {
|
|
if (opts.subscribeImpl) return opts.subscribeImpl(sessionId, subOpts);
|
|
// Default: empty stream
|
|
return (async function* () {
|
|
// empty
|
|
})();
|
|
},
|
|
async shutdown() {
|
|
shutdownCalls += 1;
|
|
},
|
|
};
|
|
}
|
|
|
|
describe('createServeApp', () => {
|
|
describe('GET /health', () => {
|
|
it('returns 200 ok', async () => {
|
|
const app = createServeApp(baseOpts);
|
|
const res = await request(app)
|
|
.get('/health')
|
|
.set('Host', `127.0.0.1:${baseOpts.port}`);
|
|
expect(res.status).toBe(200);
|
|
expect(res.body).toEqual({ status: 'ok' });
|
|
});
|
|
});
|
|
|
|
describe('GET /capabilities', () => {
|
|
it('returns the v1 envelope', async () => {
|
|
const app = createServeApp(baseOpts);
|
|
const res = await request(app)
|
|
.get('/capabilities')
|
|
.set('Host', `127.0.0.1:${baseOpts.port}`);
|
|
expect(res.status).toBe(200);
|
|
expect(res.body.v).toBe(CAPABILITIES_SCHEMA_VERSION);
|
|
expect(res.body.mode).toBe('http-bridge');
|
|
expect(res.body.features).toEqual([...STAGE1_FEATURES]);
|
|
expect(res.body.modelServices).toEqual([]);
|
|
});
|
|
});
|
|
|
|
describe('host allowlist (loopback bind)', () => {
|
|
it('rejects requests with an unrelated Host header', async () => {
|
|
const app = createServeApp(baseOpts);
|
|
const res = await request(app)
|
|
.get('/health')
|
|
.set('Host', 'evil.example.com');
|
|
expect(res.status).toBe(403);
|
|
});
|
|
|
|
it('accepts host.docker.internal so containers can reach the host daemon', async () => {
|
|
const app = createServeApp(baseOpts);
|
|
const res = await request(app)
|
|
.get('/health')
|
|
.set('Host', `host.docker.internal:${baseOpts.port}`);
|
|
expect(res.status).toBe(200);
|
|
});
|
|
});
|
|
|
|
describe('POST /session', () => {
|
|
it('400 when cwd is missing', async () => {
|
|
const bridge = fakeBridge();
|
|
const app = createServeApp(baseOpts, undefined, { bridge });
|
|
const res = await request(app)
|
|
.post('/session')
|
|
.set('Host', `127.0.0.1:${baseOpts.port}`)
|
|
.send({});
|
|
expect(res.status).toBe(400);
|
|
expect(bridge.calls).toHaveLength(0);
|
|
});
|
|
|
|
it('400 when cwd is relative', async () => {
|
|
const bridge = fakeBridge();
|
|
const app = createServeApp(baseOpts, undefined, { bridge });
|
|
const res = await request(app)
|
|
.post('/session')
|
|
.set('Host', `127.0.0.1:${baseOpts.port}`)
|
|
.send({ cwd: 'relative/path' });
|
|
expect(res.status).toBe(400);
|
|
expect(bridge.calls).toHaveLength(0);
|
|
});
|
|
|
|
it('200 with the BridgeSession shape on success', async () => {
|
|
const bridge = fakeBridge();
|
|
const app = createServeApp(baseOpts, undefined, { bridge });
|
|
const res = await request(app)
|
|
.post('/session')
|
|
.set('Host', `127.0.0.1:${baseOpts.port}`)
|
|
.send({ cwd: '/work/a', modelServiceId: 'qwen-prod' });
|
|
expect(res.status).toBe(200);
|
|
expect(res.body).toEqual({
|
|
sessionId: 'fake-0',
|
|
workspaceCwd: '/work/a',
|
|
attached: false,
|
|
});
|
|
expect(bridge.calls).toEqual([
|
|
{ workspaceCwd: '/work/a', modelServiceId: 'qwen-prod' },
|
|
]);
|
|
});
|
|
|
|
it('500 when bridge throws', async () => {
|
|
const bridge = fakeBridge({
|
|
spawnImpl: async () => {
|
|
throw new Error('boom');
|
|
},
|
|
});
|
|
const app = createServeApp(baseOpts, undefined, { bridge });
|
|
const res = await request(app)
|
|
.post('/session')
|
|
.set('Host', `127.0.0.1:${baseOpts.port}`)
|
|
.send({ cwd: '/work/a' });
|
|
expect(res.status).toBe(500);
|
|
expect(res.body).toEqual({ error: 'boom' });
|
|
});
|
|
});
|
|
|
|
describe('POST /session/:id/prompt', () => {
|
|
it('200 with PromptResponse on success; route :id wins over body sessionId', async () => {
|
|
const bridge = fakeBridge({
|
|
promptImpl: async () => ({ stopReason: 'end_turn' }),
|
|
});
|
|
const app = createServeApp(baseOpts, undefined, { bridge });
|
|
const res = await request(app)
|
|
.post('/session/session-A/prompt')
|
|
.set('Host', `127.0.0.1:${baseOpts.port}`)
|
|
.send({
|
|
sessionId: 'spoofed-session-B',
|
|
prompt: [{ type: 'text', text: 'hi' }],
|
|
});
|
|
expect(res.status).toBe(200);
|
|
expect(res.body).toEqual({ stopReason: 'end_turn' });
|
|
expect(bridge.promptCalls).toHaveLength(1);
|
|
expect(bridge.promptCalls[0]?.sessionId).toBe('session-A');
|
|
expect(bridge.promptCalls[0]?.req.sessionId).toBe('session-A');
|
|
});
|
|
|
|
it('400 when prompt body is missing', async () => {
|
|
const bridge = fakeBridge();
|
|
const app = createServeApp(baseOpts, undefined, { bridge });
|
|
const res = await request(app)
|
|
.post('/session/session-A/prompt')
|
|
.set('Host', `127.0.0.1:${baseOpts.port}`)
|
|
.send({});
|
|
expect(res.status).toBe(400);
|
|
expect(bridge.promptCalls).toHaveLength(0);
|
|
});
|
|
|
|
it('404 when bridge reports unknown session', async () => {
|
|
const bridge = fakeBridge({
|
|
promptImpl: async (sessionId) => {
|
|
throw new SessionNotFoundError(sessionId);
|
|
},
|
|
});
|
|
const app = createServeApp(baseOpts, undefined, { bridge });
|
|
const res = await request(app)
|
|
.post('/session/missing/prompt')
|
|
.set('Host', `127.0.0.1:${baseOpts.port}`)
|
|
.send({ prompt: [{ type: 'text', text: 'hi' }] });
|
|
expect(res.status).toBe(404);
|
|
expect(res.body.sessionId).toBe('missing');
|
|
});
|
|
|
|
it('500 on generic bridge errors', async () => {
|
|
const bridge = fakeBridge({
|
|
promptImpl: async () => {
|
|
throw new Error('agent crashed');
|
|
},
|
|
});
|
|
const app = createServeApp(baseOpts, undefined, { bridge });
|
|
const res = await request(app)
|
|
.post('/session/session-A/prompt')
|
|
.set('Host', `127.0.0.1:${baseOpts.port}`)
|
|
.send({ prompt: [{ type: 'text', text: 'hi' }] });
|
|
expect(res.status).toBe(500);
|
|
expect(res.body).toEqual({ error: 'agent crashed' });
|
|
});
|
|
});
|
|
|
|
describe('POST /session/:id/cancel', () => {
|
|
it('204 on success and forwards routing id', async () => {
|
|
const bridge = fakeBridge();
|
|
const app = createServeApp(baseOpts, undefined, { bridge });
|
|
const res = await request(app)
|
|
.post('/session/session-A/cancel')
|
|
.set('Host', `127.0.0.1:${baseOpts.port}`)
|
|
.send({ sessionId: 'spoofed-B' });
|
|
expect(res.status).toBe(204);
|
|
expect(res.body).toEqual({});
|
|
expect(bridge.cancelCalls).toHaveLength(1);
|
|
expect(bridge.cancelCalls[0]?.sessionId).toBe('session-A');
|
|
expect(bridge.cancelCalls[0]?.req?.sessionId).toBe('session-A');
|
|
});
|
|
|
|
it('204 with empty body', async () => {
|
|
const bridge = fakeBridge();
|
|
const app = createServeApp(baseOpts, undefined, { bridge });
|
|
const res = await request(app)
|
|
.post('/session/session-A/cancel')
|
|
.set('Host', `127.0.0.1:${baseOpts.port}`);
|
|
expect(res.status).toBe(204);
|
|
expect(bridge.cancelCalls).toHaveLength(1);
|
|
});
|
|
|
|
it('404 on unknown session', async () => {
|
|
const bridge = fakeBridge({
|
|
cancelImpl: async (sessionId) => {
|
|
throw new SessionNotFoundError(sessionId);
|
|
},
|
|
});
|
|
const app = createServeApp(baseOpts, undefined, { bridge });
|
|
const res = await request(app)
|
|
.post('/session/missing/cancel')
|
|
.set('Host', `127.0.0.1:${baseOpts.port}`);
|
|
expect(res.status).toBe(404);
|
|
expect(res.body.sessionId).toBe('missing');
|
|
});
|
|
});
|
|
|
|
describe('bearer auth', () => {
|
|
it('is open by default (loopback developer convenience)', async () => {
|
|
const app = createServeApp(baseOpts);
|
|
const res = await request(app)
|
|
.get('/health')
|
|
.set('Host', `127.0.0.1:${baseOpts.port}`);
|
|
expect(res.status).toBe(200);
|
|
});
|
|
|
|
it('rejects missing Authorization header when token is set', async () => {
|
|
const app = createServeApp({ ...baseOpts, token: 'secret' });
|
|
const res = await request(app)
|
|
.get('/health')
|
|
.set('Host', `127.0.0.1:${baseOpts.port}`);
|
|
expect(res.status).toBe(401);
|
|
});
|
|
|
|
it('rejects wrong scheme', async () => {
|
|
const app = createServeApp({ ...baseOpts, token: 'secret' });
|
|
const res = await request(app)
|
|
.get('/health')
|
|
.set('Host', `127.0.0.1:${baseOpts.port}`)
|
|
.set('Authorization', 'Basic c2VjcmV0');
|
|
expect(res.status).toBe(401);
|
|
});
|
|
|
|
it('rejects wrong token', async () => {
|
|
const app = createServeApp({ ...baseOpts, token: 'secret' });
|
|
const res = await request(app)
|
|
.get('/health')
|
|
.set('Host', `127.0.0.1:${baseOpts.port}`)
|
|
.set('Authorization', 'Bearer wrong');
|
|
expect(res.status).toBe(401);
|
|
});
|
|
|
|
it('accepts the right token', async () => {
|
|
const app = createServeApp({ ...baseOpts, token: 'secret' });
|
|
const res = await request(app)
|
|
.get('/health')
|
|
.set('Host', `127.0.0.1:${baseOpts.port}`)
|
|
.set('Authorization', 'Bearer secret');
|
|
expect(res.status).toBe(200);
|
|
});
|
|
});
|
|
});
|
|
|
|
describe('runQwenServe', () => {
|
|
let handle: RunHandle | undefined;
|
|
|
|
afterEach(async () => {
|
|
if (handle) {
|
|
await handle.close();
|
|
handle = undefined;
|
|
}
|
|
delete process.env['QWEN_SERVER_TOKEN'];
|
|
});
|
|
|
|
it('refuses to bind 0.0.0.0 without a token', async () => {
|
|
await expect(
|
|
runQwenServe({
|
|
hostname: '0.0.0.0',
|
|
port: 0,
|
|
mode: 'http-bridge',
|
|
}),
|
|
).rejects.toThrow(/Refusing to bind/);
|
|
});
|
|
|
|
it('accepts QWEN_SERVER_TOKEN from the env when binding non-loopback', async () => {
|
|
process.env['QWEN_SERVER_TOKEN'] = 'env-secret';
|
|
handle = await runQwenServe({
|
|
hostname: '0.0.0.0',
|
|
port: 0,
|
|
mode: 'http-bridge',
|
|
});
|
|
expect(handle.url).toMatch(/^http:\/\/0\.0\.0\.0:\d+$/);
|
|
});
|
|
|
|
it('starts on a loopback ephemeral port without a token', async () => {
|
|
handle = await runQwenServe({
|
|
hostname: '127.0.0.1',
|
|
port: 0,
|
|
mode: 'http-bridge',
|
|
});
|
|
const port = (handle.server.address() as { port: number }).port;
|
|
expect(port).toBeGreaterThan(0);
|
|
|
|
const res = await fetch(`http://127.0.0.1:${port}/health`);
|
|
expect(res.status).toBe(200);
|
|
expect(await res.json()).toEqual({ status: 'ok' });
|
|
});
|
|
|
|
it('drains the bridge before closing the listener', async () => {
|
|
const bridge = fakeBridge();
|
|
handle = await runQwenServe(
|
|
{ hostname: '127.0.0.1', port: 0, mode: 'http-bridge' },
|
|
{ bridge },
|
|
);
|
|
expect(bridge.shutdownCalls).toBe(0);
|
|
await handle.close();
|
|
handle = undefined;
|
|
expect(bridge.shutdownCalls).toBe(1);
|
|
});
|
|
});
|
|
|
|
describe('GET /session/:id/events (SSE)', () => {
|
|
let handle: RunHandle | undefined;
|
|
|
|
afterEach(async () => {
|
|
if (handle) {
|
|
await handle.close();
|
|
handle = undefined;
|
|
}
|
|
});
|
|
|
|
async function readSseFrames(
|
|
body: ReadableStream<Uint8Array>,
|
|
minFrames: number,
|
|
): Promise<Array<{ id?: string; event?: string; data?: string }>> {
|
|
const reader = body.getReader();
|
|
const decoder = new TextDecoder();
|
|
let buf = '';
|
|
const frames: Array<{ id?: string; event?: string; data?: string }> = [];
|
|
while (frames.length < minFrames) {
|
|
const { value, done } = await reader.read();
|
|
if (done) break;
|
|
buf += decoder.decode(value, { stream: true });
|
|
let idx: number;
|
|
while ((idx = buf.indexOf('\n\n')) !== -1) {
|
|
const raw = buf.slice(0, idx);
|
|
buf = buf.slice(idx + 2);
|
|
if (!raw || raw.startsWith(':') || raw.startsWith('retry:')) continue;
|
|
const frame: { id?: string; event?: string; data?: string } = {};
|
|
for (const line of raw.split('\n')) {
|
|
if (line.startsWith('id: ')) frame.id = line.slice(4);
|
|
else if (line.startsWith('event: ')) frame.event = line.slice(7);
|
|
else if (line.startsWith('data: ')) frame.data = line.slice(6);
|
|
}
|
|
frames.push(frame);
|
|
}
|
|
}
|
|
await reader.cancel();
|
|
return frames;
|
|
}
|
|
|
|
it('streams events from the bridge as SSE frames', async () => {
|
|
const bridge = fakeBridge({
|
|
async *subscribeImpl(_sessionId, _opts) {
|
|
yield {
|
|
id: 1,
|
|
v: 1,
|
|
type: 'session_update',
|
|
data: { foo: 'bar' },
|
|
};
|
|
yield { id: 2, v: 1, type: 'session_update', data: { foo: 'baz' } };
|
|
// No more events; the stream stays open until the caller aborts.
|
|
await new Promise(() => {});
|
|
},
|
|
});
|
|
handle = await runQwenServe(
|
|
{ hostname: '127.0.0.1', port: 0, mode: 'http-bridge' },
|
|
{ bridge },
|
|
);
|
|
const port = (handle.server.address() as { port: number }).port;
|
|
|
|
const res = await fetch(`http://127.0.0.1:${port}/session/sess-A/events`);
|
|
expect(res.status).toBe(200);
|
|
expect(res.headers.get('content-type')).toContain('text/event-stream');
|
|
|
|
const frames = await readSseFrames(res.body!, 2);
|
|
|
|
expect(frames).toHaveLength(2);
|
|
expect(frames[0]?.id).toBe('1');
|
|
expect(frames[0]?.event).toBe('session_update');
|
|
expect(JSON.parse(frames[0]!.data!)).toEqual({
|
|
id: 1,
|
|
v: 1,
|
|
type: 'session_update',
|
|
data: { foo: 'bar' },
|
|
});
|
|
expect(frames[1]?.id).toBe('2');
|
|
});
|
|
|
|
it('forwards Last-Event-ID to the bridge', async () => {
|
|
const seen: number[] = [];
|
|
const bridge = fakeBridge({
|
|
async *subscribeImpl(_sessionId, opts) {
|
|
seen.push(opts?.lastEventId ?? -1);
|
|
yield { id: 42, v: 1, type: 'session_update', data: 'replay' };
|
|
await new Promise(() => {});
|
|
},
|
|
});
|
|
handle = await runQwenServe(
|
|
{ hostname: '127.0.0.1', port: 0, mode: 'http-bridge' },
|
|
{ bridge },
|
|
);
|
|
const port = (handle.server.address() as { port: number }).port;
|
|
|
|
const res = await fetch(`http://127.0.0.1:${port}/session/sess-A/events`, {
|
|
headers: { 'Last-Event-ID': '17' },
|
|
});
|
|
const frames = await readSseFrames(res.body!, 1);
|
|
|
|
expect(seen).toEqual([17]);
|
|
expect(frames[0]?.id).toBe('42');
|
|
});
|
|
|
|
it('returns 404 when the bridge reports unknown session', async () => {
|
|
const bridge = fakeBridge({
|
|
subscribeImpl: (sessionId) => {
|
|
throw new SessionNotFoundError(sessionId);
|
|
},
|
|
});
|
|
handle = await runQwenServe(
|
|
{ hostname: '127.0.0.1', port: 0, mode: 'http-bridge' },
|
|
{ bridge },
|
|
);
|
|
const port = (handle.server.address() as { port: number }).port;
|
|
|
|
const res = await fetch(`http://127.0.0.1:${port}/session/missing/events`);
|
|
expect(res.status).toBe(404);
|
|
const body = await res.json();
|
|
expect(body.sessionId).toBe('missing');
|
|
});
|
|
|
|
it('aborts the bridge subscription when the client disconnects', async () => {
|
|
const aborted = { value: false };
|
|
const bridge = fakeBridge({
|
|
async *subscribeImpl(_sessionId, opts) {
|
|
opts?.signal?.addEventListener(
|
|
'abort',
|
|
() => {
|
|
aborted.value = true;
|
|
},
|
|
{ once: true },
|
|
);
|
|
yield { id: 1, v: 1, type: 'session_update', data: 'first' };
|
|
await new Promise<void>((resolve) => {
|
|
opts?.signal?.addEventListener('abort', () => resolve(), {
|
|
once: true,
|
|
});
|
|
});
|
|
},
|
|
});
|
|
handle = await runQwenServe(
|
|
{ hostname: '127.0.0.1', port: 0, mode: 'http-bridge' },
|
|
{ bridge },
|
|
);
|
|
const port = (handle.server.address() as { port: number }).port;
|
|
|
|
const res = await fetch(`http://127.0.0.1:${port}/session/sess-A/events`);
|
|
const frames = await readSseFrames(res.body!, 1);
|
|
expect(frames).toHaveLength(1);
|
|
// readSseFrames calls reader.cancel() once the requested frame count is
|
|
// reached, which severs the underlying connection — the daemon's
|
|
// `req.on('close')` handler then aborts the bridge subscription.
|
|
|
|
// Wait briefly for the close handler to propagate to the bridge.
|
|
await new Promise((r) => setTimeout(r, 100));
|
|
expect(aborted.value).toBe(true);
|
|
});
|
|
});
|
|
|
|
describe('runQwenServe SIGINT handler', () => {
|
|
it('does not register signal handlers until the listener is up', () => {
|
|
// Sanity: we register `once` so we don't leak across test runs.
|
|
// No assertion beyond "module loads without throwing"; full lifecycle
|
|
// is covered indirectly by the loopback boot test above.
|
|
expect(typeof runQwenServe).toBe('function');
|
|
void vi.fn(); // silence unused-import lint if vitest tree-shakes
|
|
});
|
|
});
|