From ff63da26528663e1efb17bb51929ae5ed54ef0ee Mon Sep 17 00:00:00 2001 From: jinye Date: Fri, 15 May 2026 14:43:06 +0800 Subject: [PATCH] refactor(serve): extract createInMemoryChannel helper (#4156 A1) (#4160) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * refactor(serve): extract createInMemoryChannel helper from httpAcpBridge.test.ts (#4156 A1) Sub-PR A1 of issue #4156 (Stage 1.5b Mode A daemon). Pure refactor with zero behavior change. Extracts the inline paired NDJSON channel construction (`new TransformStream` × 2 + `ndJsonStream` × 2) that was duplicated across `httpAcpBridge.test.ts` into a production helper `createInMemoryChannel()` at `packages/cli/src/serve/inMemoryChannel.ts`. The helper is added to `packages/cli/src/serve/index.ts`'s barrel export alongside the rest of the serve module's public API. The helper is intentionally bare — it returns only the stream pair, no lifecycle / teardown surface. Two reasons: 1. Consumer behavior diverges widely (stuck channel, crashable child simulation, no-op, real in-process termination); a one-size-fits-all `close()` would either pull test-fixture concerns into a production module or force a single shape on consumers that don't want it. 2. The SDK's `ndJsonStream` outer wrapper does not reliably propagate close on `Stream.writable` to the opposite `Stream.readable`; consumers needing to simulate a child exit hold their own underlying `TransformStream` references and close those directly. 10 of 11 inline call sites in `httpAcpBridge.test.ts` migrate cleanly to the new helper. The 11th (`makeChannel` at line 151) keeps the inline 4-line construction because its `kill()` closure needs the underlying `ab` / `ba` writables to simulate child-process termination — a comment above the function explains the asymmetry. The helper is also a primitive for the future A2 PR's `inProcessAcpBridge.ts`, which will use it to wrap an in-process `QwenAgent` without spawning a `qwen --acp` child (see issue #4156 §3 decision 1 and §8). Test plan: - New `inMemoryChannel.test.ts`: 5 tests covering bidirectional round-trip, ordering preservation, and bidirectional direction isolation - Existing `httpAcpBridge.test.ts`: 70 tests, identical count and behavior before vs after migration - `vitest run packages/cli/src/serve/inMemoryChannel.test.ts packages/cli/src/serve/httpAcpBridge.test.ts` — 75/75 pass - `tsc --noEmit -p packages/cli/tsconfig.json` — clean for changed files 🤖 Generated with [Qwen Code](https://github.com/QwenLM/qwen-code) * refactor(serve): address Copilot review feedback on createInMemoryChannel Two small follow-ups from #4160 review: 1. inMemoryChannel.test.ts:113,137 — handle the pending `reader.read()` that the isolation tests intentionally leave hanging when the timeout wins the race. `reader.releaseLock()` in `finally` rejects that pending read per Web Streams spec; without a rejection handler this could surface as an unhandled rejection / flaky test signal. Added a no-op rejection handler via the two-arg `.then(onResolve, onReject)` form so the cleanup-path rejection settles cleanly. 2. inMemoryChannel.ts:11 — the JSDoc said "two `TransformStream<...>` pairs" which reads ambiguously as "two pairs of TransformStream" (i.e., 4 streams). The implementation creates exactly two TransformStreams (one per direction). Reworded to "two `TransformStream<...>` instances (one per direction)" to disambiguate. Tests still 5/5 pass. 🤖 Generated with [Qwen Code](https://github.com/QwenLM/qwen-code) * refactor(serve): expose abort() teardown primitive on createInMemoryChannel + route test through barrel Two follow-ups from #4160 review: 1. Expose `abort(reason?)` on the helper return value (per @wenshao critical comment). Reasoning: the helper previously returned only the `Stream` pair, leaving consumers no way to tear the channel down. `ndJsonStream`'s outer wrapper does not reliably propagate `close()`, but `abort()` on the underlying byte-level `TransformStream` is forceful-by-spec — pending reads on both sides settle immediately so GC can reclaim. This unblocks the future Stage 1.5b in-process bridge (#4156, sub-PR A2) which needs teardown on daemon shutdown. The settlement shape is documented honestly in JSDoc: at the inner byte-level layer pending reads reject with the supplied reason; at the outer SDK-wrapped `Stream` the wrapper translates that into a clean `{done: true}` signal. Either way, pending operations no longer hang — that's the teardown invariant we care about. 2. Route the test's import through the `serve/index.js` barrel rather than the source file (per @wenshao suggestion). Without a test that exercises the public API path, a typo or missing re-export in the barrel would go undetected in CI. Tests: 8/8 helper tests pass (5 existing + 3 new abort tests covering teardown invariant + idempotency + no-reason variant). 70/70 existing httpAcpBridge tests still pass. 🤖 Generated with [Qwen Code](https://github.com/QwenLM/qwen-code) --- packages/cli/src/serve/httpAcpBridge.test.ts | 57 ++--- .../cli/src/serve/inMemoryChannel.test.ts | 224 ++++++++++++++++++ packages/cli/src/serve/inMemoryChannel.ts | 73 ++++++ packages/cli/src/serve/index.ts | 1 + 4 files changed, 315 insertions(+), 40 deletions(-) create mode 100644 packages/cli/src/serve/inMemoryChannel.test.ts create mode 100644 packages/cli/src/serve/inMemoryChannel.ts diff --git a/packages/cli/src/serve/httpAcpBridge.test.ts b/packages/cli/src/serve/httpAcpBridge.test.ts index 603613756..974469093 100644 --- a/packages/cli/src/serve/httpAcpBridge.test.ts +++ b/packages/cli/src/serve/httpAcpBridge.test.ts @@ -43,6 +43,7 @@ import { type ChannelFactory, type HttpAcpBridge, } from './httpAcpBridge.js'; +import { createInMemoryChannel } from './inMemoryChannel.js'; import type { BridgeEvent } from './eventBus.js'; // Workspace fixtures must round-trip through `path.resolve` so the @@ -174,6 +175,12 @@ interface ChannelHandle { * Create a paired in-memory NDJSON channel: bridge sees `clientChannel`, * fake agent sees `agentStream`. Each `TransformStream` carries one * direction. + * + * Not migrated to `createInMemoryChannel()` (used by the other 10 sites + * in this file): `kill()` below needs the underlying `ab` / `ba` + * writables to simulate child-process termination, which the bare + * helper deliberately does not expose. See `inMemoryChannel.ts` JSDoc + * for the rationale. */ function makeChannel(opts: FakeAgentOpts = {}): ChannelHandle { const ab = new TransformStream(); @@ -1032,10 +1039,7 @@ describe('createHttpAcpBridge', () => { let capturedConn: AgentSideConnection | undefined; const handles: Array<{ killed: boolean }> = []; const factory: ChannelFactory = async () => { - const ab = new TransformStream(); - const ba = new TransformStream(); - const clientStream = ndJsonStream(ab.writable, ba.readable); - const agentStream = ndJsonStream(ba.writable, ab.readable); + const { clientStream, agentStream } = createInMemoryChannel(); const fakeAgent = new FakeAgent(); // The agent side gets an AgentSideConnection; that exposes a // ClientSideConnection-equivalent on its `agent` callback. We need @@ -1344,10 +1348,7 @@ describe('createHttpAcpBridge', () => { // (it sees the cancelled outcome). Both sides settle. let conn: AgentSideConnection | undefined; const factory: ChannelFactory = async () => { - const ab = new TransformStream(); - const ba = new TransformStream(); - const clientStream = ndJsonStream(ab.writable, ba.readable); - const agentStream = ndJsonStream(ba.writable, ab.readable); + const { clientStream, agentStream } = createInMemoryChannel(); const fakeAgent = new FakeAgent({ promptImpl: async (p): Promise => { // Issue the permission request from inside prompt() so @@ -1424,10 +1425,7 @@ describe('createHttpAcpBridge', () => { function setup(opts: { setModelImpl?: () => Promise } = {}) { const setModelCalls: Array<{ sessionId: string; modelId: string }> = []; const factory: ChannelFactory = async () => { - const ab = new TransformStream(); - const ba = new TransformStream(); - const clientStream = ndJsonStream(ab.writable, ba.readable); - const agentStream = ndJsonStream(ba.writable, ab.readable); + const { clientStream, agentStream } = createInMemoryChannel(); const fakeAgent = new FakeAgent(); const augmented = new Proxy(fakeAgent, { get(target, prop) { @@ -1624,10 +1622,7 @@ describe('createHttpAcpBridge', () => { it('publishes model_switch_failed and surfaces the error when the agent rejects', async () => { let attempts = 0; const factory: ChannelFactory = async () => { - const ab = new TransformStream(); - const ba = new TransformStream(); - const clientStream = ndJsonStream(ab.writable, ba.readable); - const agentStream = ndJsonStream(ba.writable, ab.readable); + const { clientStream, agentStream } = createInMemoryChannel(); const fakeAgent = new FakeAgent(); const augmented = new Proxy(fakeAgent, { get(target, prop) { @@ -1702,10 +1697,7 @@ describe('createHttpAcpBridge', () => { it('serializes concurrent model-change calls (FIFO)', async () => { const callOrder: string[] = []; const factory: ChannelFactory = async () => { - const ab = new TransformStream(); - const ba = new TransformStream(); - const clientStream = ndJsonStream(ab.writable, ba.readable); - const agentStream = ndJsonStream(ba.writable, ab.readable); + const { clientStream, agentStream } = createInMemoryChannel(); const fakeAgent = new FakeAgent(); const augmented = new Proxy(fakeAgent, { get(target, prop) { @@ -1771,10 +1763,7 @@ describe('createHttpAcpBridge', () => { function setupRecording() { const setModelCalls: Array<{ sessionId: string; modelId: string }> = []; const factory: ChannelFactory = async () => { - const ab = new TransformStream(); - const ba = new TransformStream(); - const clientStream = ndJsonStream(ab.writable, ba.readable); - const agentStream = ndJsonStream(ba.writable, ab.readable); + const { clientStream, agentStream } = createInMemoryChannel(); const fakeAgent = new FakeAgent(); const augmented = new Proxy(fakeAgent, { get(target, prop) { @@ -1861,10 +1850,7 @@ describe('createHttpAcpBridge', () => { resolveExited = () => r(undefined); }); const factory: ChannelFactory = async () => { - const ab = new TransformStream(); - const ba = new TransformStream(); - const clientStream = ndJsonStream(ab.writable, ba.readable); - const agentStream = ndJsonStream(ba.writable, ab.readable); + const { clientStream, agentStream } = createInMemoryChannel(); // Fake agent's prompt() never replies — we want the bridge's // race-against-exited to be the only resolution path. const stuckAgent: Agent = { @@ -2015,10 +2001,7 @@ describe('createHttpAcpBridge', () => { async function setupForFs() { let capturedConn: AgentSideConnection | undefined; const factory: ChannelFactory = async () => { - const ab = new TransformStream(); - const ba = new TransformStream(); - const clientStream = ndJsonStream(ab.writable, ba.readable); - const agentStream = ndJsonStream(ba.writable, ab.readable); + const { clientStream, agentStream } = createInMemoryChannel(); capturedConn = new AgentSideConnection( () => new FakeAgent(), agentStream, @@ -2412,10 +2395,7 @@ describe('createHttpAcpBridge', () => { async function setup() { const setModelCalls: Array<{ sessionId: string; modelId: string }> = []; const factory: ChannelFactory = async () => { - const ab = new TransformStream(); - const ba = new TransformStream(); - const clientStream = ndJsonStream(ab.writable, ba.readable); - const agentStream = ndJsonStream(ba.writable, ab.readable); + const { clientStream, agentStream } = createInMemoryChannel(); const fakeAgent = new FakeAgent(); // Augment the agent with the unstable model setter via a proxy so we // don't need to extend the FakeAgent class with optional methods. @@ -2515,10 +2495,7 @@ describe('createHttpAcpBridge', () => { const factory: ChannelFactory = async () => { // Build a channel pair where we capture the agent-side connection // so we can drive sessionUpdate notifications from the test. - const ab = new TransformStream(); - const ba = new TransformStream(); - const clientStream = ndJsonStream(ab.writable, ba.readable); - const agentStream = ndJsonStream(ba.writable, ab.readable); + const { clientStream, agentStream } = createInMemoryChannel(); const fakeAgent = new FakeAgent(); capturedConn = new AgentSideConnection(() => fakeAgent, agentStream); return { diff --git a/packages/cli/src/serve/inMemoryChannel.test.ts b/packages/cli/src/serve/inMemoryChannel.test.ts new file mode 100644 index 000000000..1f7c08148 --- /dev/null +++ b/packages/cli/src/serve/inMemoryChannel.test.ts @@ -0,0 +1,224 @@ +/** + * @license + * Copyright 2025 Qwen Team + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it, expect } from 'vitest'; +import type { AnyMessage } from '@agentclientprotocol/sdk'; +// Import via the barrel rather than the source file so the public API +// surface (serve/index.ts) is exercised by CI — a typo or missing +// re-export would otherwise go undetected. +import { createInMemoryChannel } from './index.js'; + +/** + * Push one JSON-RPC notification onto a `Stream.writable`. The SDK's + * `ndJsonStream` encodes the message + appends `\n` so the matching + * decoder on the other side can frame it. + */ +async function send( + writable: WritableStream, + msg: AnyMessage, +): Promise { + const writer = writable.getWriter(); + try { + await writer.write(msg); + } finally { + writer.releaseLock(); + } +} + +/** Read the next single message off a `Stream.readable`. */ +async function recvOne( + readable: ReadableStream, +): Promise { + const reader = readable.getReader(); + try { + const { value, done } = await reader.read(); + if (done || !value) { + throw new Error('stream closed before a frame arrived'); + } + return value; + } finally { + reader.releaseLock(); + } +} + +describe('createInMemoryChannel', () => { + it('round-trips a frame from client to agent', async () => { + const { clientStream, agentStream } = createInMemoryChannel(); + const sent: AnyMessage = { + jsonrpc: '2.0', + method: 'ping', + params: { n: 1 }, + }; + await send(clientStream.writable, sent); + const received = await recvOne(agentStream.readable); + expect(received).toEqual(sent); + }); + + it('round-trips a frame from agent to client', async () => { + const { clientStream, agentStream } = createInMemoryChannel(); + const sent: AnyMessage = { + jsonrpc: '2.0', + method: 'pong', + params: { reply: true }, + }; + await send(agentStream.writable, sent); + const received = await recvOne(clientStream.readable); + expect(received).toEqual(sent); + }); + + it('preserves order across multiple frames in one direction', async () => { + const { clientStream, agentStream } = createInMemoryChannel(); + const writer = clientStream.writable.getWriter(); + for (let i = 1; i <= 3; i++) { + await writer.write({ + jsonrpc: '2.0', + method: 'tick', + params: { i }, + }); + } + writer.releaseLock(); + + const reader = agentStream.readable.getReader(); + const out: AnyMessage[] = []; + for (let i = 0; i < 3; i++) { + const { value, done } = await reader.read(); + if (done || !value) throw new Error('unexpected close'); + out.push(value); + } + reader.releaseLock(); + + expect(out.map((m) => (m as { params: { i: number } }).params.i)).toEqual([ + 1, 2, 3, + ]); + }); + + it('isolates client→agent direction (client write does not echo to client.readable)', async () => { + // Sanity check that the channel is truly paired. A buggy + // implementation that aliased `ab` to both ends would still pass + // the round-trip tests above for one frame but echo the writer's + // own messages back on its own readable. + const { clientStream, agentStream } = createInMemoryChannel(); + await send(clientStream.writable, { + jsonrpc: '2.0', + method: 'client-only', + }); + + const onAgent = await recvOne(agentStream.readable); + expect((onAgent as { method: string }).method).toBe('client-only'); + + // Client's own readable must NOT see it. Race a fresh read against + // a short timeout — if the channel is correctly paired the read + // never resolves on its own. + // + // The read promise stays pending while the timeout wins; releasing + // the reader's lock in `finally` then causes that pending read to + // reject per Web Streams spec. Attach a rejection handler so the + // post-`releaseLock` rejection doesn't surface as an unhandled + // rejection / flaky test signal — the rejection itself isn't a + // failure here, it's just the cleanup path settling. + const reader = clientStream.readable.getReader(); + try { + const winner = await Promise.race([ + reader.read().then( + () => 'leaked' as const, + () => null, + ), + new Promise<'isolated'>((res) => setTimeout(() => res('isolated'), 50)), + ]); + expect(winner).toBe('isolated'); + } finally { + reader.releaseLock(); + } + }); + + it('isolates agent→client direction (agent write does not echo to agent.readable)', async () => { + // Symmetric counterpart of the previous test — closes the obvious + // "wired one direction correctly but not the other" failure mode. + // Same `releaseLock`-causes-pending-read-to-reject handling as the + // previous test; see comment there. + const { clientStream, agentStream } = createInMemoryChannel(); + await send(agentStream.writable, { + jsonrpc: '2.0', + method: 'agent-only', + }); + + const onClient = await recvOne(clientStream.readable); + expect((onClient as { method: string }).method).toBe('agent-only'); + + const reader = agentStream.readable.getReader(); + try { + const winner = await Promise.race([ + reader.read().then( + () => 'leaked' as const, + () => null, + ), + new Promise<'isolated'>((res) => setTimeout(() => res('isolated'), 50)), + ]); + expect(winner).toBe('isolated'); + } finally { + reader.releaseLock(); + } + }); + + it('abort() settles pending readers on both sides (teardown invariant)', async () => { + // Key teardown invariant: after abort(), pending read() calls do + // NOT hang. The exact settlement (resolve-with-{done:true} vs + // reject) depends on the SDK's ndJsonStream wrapper translation — + // see the helper's JSDoc. What matters is that consumers can + // GC-reclaim and shut down without leaking. + const { clientStream, agentStream, abort } = createInMemoryChannel(); + const clientReader = clientStream.readable.getReader(); + const agentReader = agentStream.readable.getReader(); + + // Race each read against a 100ms timeout; if abort works, both + // settle near-instantly. If abort doesn't propagate, the timeout + // wins and the test fails with `'hung'`. + const wrap = (p: Promise) => + Promise.race<'settled' | 'hung'>([ + p.then( + () => 'settled' as const, + () => 'settled' as const, + ), + new Promise<'hung'>((res) => setTimeout(() => res('hung'), 100)), + ]); + + const clientReadP = wrap(clientReader.read()); + const agentReadP = wrap(agentReader.read()); + + abort(new Error('shutdown')); + + expect(await clientReadP).toBe('settled'); + expect(await agentReadP).toBe('settled'); + + clientReader.releaseLock(); + agentReader.releaseLock(); + }); + + it('abort() is idempotent (calling twice is safe)', () => { + const { abort } = createInMemoryChannel(); + expect(() => { + abort('first'); + abort('second'); + }).not.toThrow(); + }); + + it('abort() with no reason still tears down', async () => { + const { agentStream, abort } = createInMemoryChannel(); + const reader = agentStream.readable.getReader(); + const readP = reader.read().then( + () => 'settled' as const, + () => 'settled' as const, + ); + abort(); + expect( + await Promise.race([ + readP, + new Promise<'hung'>((res) => setTimeout(() => res('hung'), 100)), + ]), + ).toBe('settled'); + reader.releaseLock(); + }); +}); diff --git a/packages/cli/src/serve/inMemoryChannel.ts b/packages/cli/src/serve/inMemoryChannel.ts new file mode 100644 index 000000000..e2478782e --- /dev/null +++ b/packages/cli/src/serve/inMemoryChannel.ts @@ -0,0 +1,73 @@ +/** + * @license + * Copyright 2025 Qwen Team + * SPDX-License-Identifier: Apache-2.0 + */ + +import { ndJsonStream, type Stream } from '@agentclientprotocol/sdk'; + +/** + * Create a paired in-memory NDJSON channel: two `Stream`s connected + * back-to-back via two `TransformStream` instances + * (one per direction). + * Whatever `clientStream.writable` writes appears on `agentStream.readable`, + * and vice versa. Each side is a full ACP `Stream` (via SDK `ndJsonStream`) + * so callers can hand them to `ClientSideConnection` / `AgentSideConnection` + * exactly as they would a real stdio pair. + * + * Used today by Stage 1 tests (replaces 10 sites of inline boilerplate + * in `httpAcpBridge.test.ts`). Will also be consumed by the Stage 1.5b + * in-process bridge (issue #4156) when that lands, to wrap an in-process + * `QwenAgent` without spawning a `qwen --acp` child. + * + * `abort(reason?)` is the universal teardown primitive. It calls + * `WritableStream.abort()` on both underlying byte-level + * `TransformStream`s, which immediately settles any pending `read()` / + * `write()` operations on both sides so the channel can be reclaimed. + * Use this to terminate the channel during shutdown / crash simulation + * / daemon teardown. + * + * **Settlement shape**: at the inner byte-level layer the pending read + * rejects with the supplied reason; at the outer SDK-wrapped `Stream` + * layer (what callers actually see) the SDK's `ndJsonStream` translates + * that error into a clean end-of-stream signal — `read()` resolves with + * `{value: undefined, done: true}` rather than rejecting. The exact + * shape depends on how deep the consumer is in the wrapper chain, but + * the key invariant — **pending operations no longer hang** — holds + * either way. Consumers wanting to distinguish "graceful close" from + * "aborted" should track the call themselves. + * + * We expose `abort` rather than `close` because `close()` only reaches + * the opposite `ReadableStream` after pending writes flush, and in + * practice the SDK's `ndJsonStream` outer wrapper does not reliably + * propagate close at all. `abort` is forceful and synchronous-by-spec, + * so it is the safe primitive for lifecycle teardown across an + * `ndJsonStream`-wrapped pair. + * + * Consumers that don't need teardown (most test sites, which let the + * channel die with the test scope) can ignore `abort`. `abort` is a + * platform-level primitive (not a test-fixture concern), so exposing + * it does not pull fixture machinery into this production module. + */ +export function createInMemoryChannel(): { + clientStream: Stream; + agentStream: Stream; + abort(reason?: unknown): void; +} { + const ab = new TransformStream(); + const ba = new TransformStream(); + const clientStream = ndJsonStream(ab.writable, ba.readable); + const agentStream = ndJsonStream(ba.writable, ab.readable); + return { + clientStream, + agentStream, + abort(reason?: unknown) { + // Fire-and-forget; both `abort()` calls return promises that we + // intentionally do not await (callers want the synchronous + // "tear it down now" semantic) and which may reject if the + // stream is already in errored state — both are expected. + ab.writable.abort(reason).catch(() => {}); + ba.writable.abort(reason).catch(() => {}); + }, + }; +} diff --git a/packages/cli/src/serve/index.ts b/packages/cli/src/serve/index.ts index c36c7fd0a..52288bbe2 100644 --- a/packages/cli/src/serve/index.ts +++ b/packages/cli/src/serve/index.ts @@ -35,3 +35,4 @@ export { type BridgeEvent, type SubscribeOptions, } from './eventBus.js'; +export { createInMemoryChannel } from './inMemoryChannel.js';