refactor(serve): extract createInMemoryChannel helper (#4156 A1) (#4160)

* refactor(serve): extract createInMemoryChannel helper from httpAcpBridge.test.ts (#4156 A1)

Sub-PR A1 of issue #4156 (Stage 1.5b Mode A daemon). Pure refactor with
zero behavior change.

Extracts the inline paired NDJSON channel construction (`new TransformStream`
× 2 + `ndJsonStream` × 2) that was duplicated across `httpAcpBridge.test.ts`
into a production helper `createInMemoryChannel()` at
`packages/cli/src/serve/inMemoryChannel.ts`. The helper is added to
`packages/cli/src/serve/index.ts`'s barrel export alongside the rest of
the serve module's public API.

The helper is intentionally bare — it returns only the stream pair, no
lifecycle / teardown surface. Two reasons:
  1. Consumer behavior diverges widely (stuck channel, crashable child
     simulation, no-op, real in-process termination); a one-size-fits-all
     `close()` would either pull test-fixture concerns into a production
     module or force a single shape on consumers that don't want it.
  2. The SDK's `ndJsonStream` outer wrapper does not reliably propagate
     close on `Stream.writable` to the opposite `Stream.readable`;
     consumers needing to simulate a child exit hold their own underlying
     `TransformStream` references and close those directly.

10 of 11 inline call sites in `httpAcpBridge.test.ts` migrate cleanly to
the new helper. The 11th (`makeChannel` at line 151) keeps the inline
4-line construction because its `kill()` closure needs the underlying
`ab` / `ba` writables to simulate child-process termination — a comment
above the function explains the asymmetry.

The helper is also a primitive for the future A2 PR's
`inProcessAcpBridge.ts`, which will use it to wrap an in-process
`QwenAgent` without spawning a `qwen --acp` child (see issue #4156 §3
decision 1 and §8).

Test plan:
- New `inMemoryChannel.test.ts`: 5 tests covering bidirectional round-trip,
  ordering preservation, and bidirectional direction isolation
- Existing `httpAcpBridge.test.ts`: 70 tests, identical count and behavior
  before vs after migration
- `vitest run packages/cli/src/serve/inMemoryChannel.test.ts
  packages/cli/src/serve/httpAcpBridge.test.ts` — 75/75 pass
- `tsc --noEmit -p packages/cli/tsconfig.json` — clean for changed files

🤖 Generated with [Qwen Code](https://github.com/QwenLM/qwen-code)

* refactor(serve): address Copilot review feedback on createInMemoryChannel

Two small follow-ups from #4160 review:

1. inMemoryChannel.test.ts:113,137 — handle the pending `reader.read()`
   that the isolation tests intentionally leave hanging when the timeout
   wins the race. `reader.releaseLock()` in `finally` rejects that
   pending read per Web Streams spec; without a rejection handler this
   could surface as an unhandled rejection / flaky test signal. Added a
   no-op rejection handler via the two-arg `.then(onResolve, onReject)`
   form so the cleanup-path rejection settles cleanly.

2. inMemoryChannel.ts:11 — the JSDoc said "two `TransformStream<...>`
   pairs" which reads ambiguously as "two pairs of TransformStream"
   (i.e., 4 streams). The implementation creates exactly two
   TransformStreams (one per direction). Reworded to "two
   `TransformStream<...>` instances (one per direction)" to disambiguate.

Tests still 5/5 pass.

🤖 Generated with [Qwen Code](https://github.com/QwenLM/qwen-code)

* refactor(serve): expose abort() teardown primitive on createInMemoryChannel + route test through barrel

Two follow-ups from #4160 review:

1. Expose `abort(reason?)` on the helper return value (per @wenshao
   critical comment). Reasoning: the helper previously returned only the
   `Stream` pair, leaving consumers no way to tear the channel down.
   `ndJsonStream`'s outer wrapper does not reliably propagate `close()`,
   but `abort()` on the underlying byte-level `TransformStream` is
   forceful-by-spec — pending reads on both sides settle immediately so
   GC can reclaim. This unblocks the future Stage 1.5b in-process bridge
   (#4156, sub-PR A2) which needs teardown on daemon shutdown.

   The settlement shape is documented honestly in JSDoc: at the inner
   byte-level layer pending reads reject with the supplied reason; at
   the outer SDK-wrapped `Stream` the wrapper translates that into a
   clean `{done: true}` signal. Either way, pending operations no
   longer hang — that's the teardown invariant we care about.

2. Route the test's import through the `serve/index.js` barrel rather
   than the source file (per @wenshao suggestion). Without a test that
   exercises the public API path, a typo or missing re-export in the
   barrel would go undetected in CI.

Tests: 8/8 helper tests pass (5 existing + 3 new abort tests covering
teardown invariant + idempotency + no-reason variant). 70/70 existing
httpAcpBridge tests still pass.

🤖 Generated with [Qwen Code](https://github.com/QwenLM/qwen-code)
This commit is contained in:
jinye 2026-05-15 14:43:06 +08:00 committed by GitHub
parent da1941c975
commit ff63da2652
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 315 additions and 40 deletions

View file

@ -43,6 +43,7 @@ import {
type ChannelFactory,
type HttpAcpBridge,
} from './httpAcpBridge.js';
import { createInMemoryChannel } from './inMemoryChannel.js';
import type { BridgeEvent } from './eventBus.js';
// Workspace fixtures must round-trip through `path.resolve` so the
@ -174,6 +175,12 @@ interface ChannelHandle {
* Create a paired in-memory NDJSON channel: bridge sees `clientChannel`,
* fake agent sees `agentStream`. Each `TransformStream` carries one
* direction.
*
* Not migrated to `createInMemoryChannel()` (used by the other 10 sites
* in this file): `kill()` below needs the underlying `ab` / `ba`
* writables to simulate child-process termination, which the bare
* helper deliberately does not expose. See `inMemoryChannel.ts` JSDoc
* for the rationale.
*/
function makeChannel(opts: FakeAgentOpts = {}): ChannelHandle {
const ab = new TransformStream<Uint8Array, Uint8Array>();
@ -1032,10 +1039,7 @@ describe('createHttpAcpBridge', () => {
let capturedConn: AgentSideConnection | undefined;
const handles: Array<{ killed: boolean }> = [];
const factory: ChannelFactory = async () => {
const ab = new TransformStream<Uint8Array, Uint8Array>();
const ba = new TransformStream<Uint8Array, Uint8Array>();
const clientStream = ndJsonStream(ab.writable, ba.readable);
const agentStream = ndJsonStream(ba.writable, ab.readable);
const { clientStream, agentStream } = createInMemoryChannel();
const fakeAgent = new FakeAgent();
// The agent side gets an AgentSideConnection; that exposes a
// ClientSideConnection-equivalent on its `agent` callback. We need
@ -1344,10 +1348,7 @@ describe('createHttpAcpBridge', () => {
// (it sees the cancelled outcome). Both sides settle.
let conn: AgentSideConnection | undefined;
const factory: ChannelFactory = async () => {
const ab = new TransformStream<Uint8Array, Uint8Array>();
const ba = new TransformStream<Uint8Array, Uint8Array>();
const clientStream = ndJsonStream(ab.writable, ba.readable);
const agentStream = ndJsonStream(ba.writable, ab.readable);
const { clientStream, agentStream } = createInMemoryChannel();
const fakeAgent = new FakeAgent({
promptImpl: async (p): Promise<PromptResponse> => {
// Issue the permission request from inside prompt() so
@ -1424,10 +1425,7 @@ describe('createHttpAcpBridge', () => {
function setup(opts: { setModelImpl?: () => Promise<unknown> } = {}) {
const setModelCalls: Array<{ sessionId: string; modelId: string }> = [];
const factory: ChannelFactory = async () => {
const ab = new TransformStream<Uint8Array, Uint8Array>();
const ba = new TransformStream<Uint8Array, Uint8Array>();
const clientStream = ndJsonStream(ab.writable, ba.readable);
const agentStream = ndJsonStream(ba.writable, ab.readable);
const { clientStream, agentStream } = createInMemoryChannel();
const fakeAgent = new FakeAgent();
const augmented = new Proxy(fakeAgent, {
get(target, prop) {
@ -1624,10 +1622,7 @@ describe('createHttpAcpBridge', () => {
it('publishes model_switch_failed and surfaces the error when the agent rejects', async () => {
let attempts = 0;
const factory: ChannelFactory = async () => {
const ab = new TransformStream<Uint8Array, Uint8Array>();
const ba = new TransformStream<Uint8Array, Uint8Array>();
const clientStream = ndJsonStream(ab.writable, ba.readable);
const agentStream = ndJsonStream(ba.writable, ab.readable);
const { clientStream, agentStream } = createInMemoryChannel();
const fakeAgent = new FakeAgent();
const augmented = new Proxy(fakeAgent, {
get(target, prop) {
@ -1702,10 +1697,7 @@ describe('createHttpAcpBridge', () => {
it('serializes concurrent model-change calls (FIFO)', async () => {
const callOrder: string[] = [];
const factory: ChannelFactory = async () => {
const ab = new TransformStream<Uint8Array, Uint8Array>();
const ba = new TransformStream<Uint8Array, Uint8Array>();
const clientStream = ndJsonStream(ab.writable, ba.readable);
const agentStream = ndJsonStream(ba.writable, ab.readable);
const { clientStream, agentStream } = createInMemoryChannel();
const fakeAgent = new FakeAgent();
const augmented = new Proxy(fakeAgent, {
get(target, prop) {
@ -1771,10 +1763,7 @@ describe('createHttpAcpBridge', () => {
function setupRecording() {
const setModelCalls: Array<{ sessionId: string; modelId: string }> = [];
const factory: ChannelFactory = async () => {
const ab = new TransformStream<Uint8Array, Uint8Array>();
const ba = new TransformStream<Uint8Array, Uint8Array>();
const clientStream = ndJsonStream(ab.writable, ba.readable);
const agentStream = ndJsonStream(ba.writable, ab.readable);
const { clientStream, agentStream } = createInMemoryChannel();
const fakeAgent = new FakeAgent();
const augmented = new Proxy(fakeAgent, {
get(target, prop) {
@ -1861,10 +1850,7 @@ describe('createHttpAcpBridge', () => {
resolveExited = () => r(undefined);
});
const factory: ChannelFactory = async () => {
const ab = new TransformStream<Uint8Array, Uint8Array>();
const ba = new TransformStream<Uint8Array, Uint8Array>();
const clientStream = ndJsonStream(ab.writable, ba.readable);
const agentStream = ndJsonStream(ba.writable, ab.readable);
const { clientStream, agentStream } = createInMemoryChannel();
// Fake agent's prompt() never replies — we want the bridge's
// race-against-exited to be the only resolution path.
const stuckAgent: Agent = {
@ -2015,10 +2001,7 @@ describe('createHttpAcpBridge', () => {
async function setupForFs() {
let capturedConn: AgentSideConnection | undefined;
const factory: ChannelFactory = async () => {
const ab = new TransformStream<Uint8Array, Uint8Array>();
const ba = new TransformStream<Uint8Array, Uint8Array>();
const clientStream = ndJsonStream(ab.writable, ba.readable);
const agentStream = ndJsonStream(ba.writable, ab.readable);
const { clientStream, agentStream } = createInMemoryChannel();
capturedConn = new AgentSideConnection(
() => new FakeAgent(),
agentStream,
@ -2412,10 +2395,7 @@ describe('createHttpAcpBridge', () => {
async function setup() {
const setModelCalls: Array<{ sessionId: string; modelId: string }> = [];
const factory: ChannelFactory = async () => {
const ab = new TransformStream<Uint8Array, Uint8Array>();
const ba = new TransformStream<Uint8Array, Uint8Array>();
const clientStream = ndJsonStream(ab.writable, ba.readable);
const agentStream = ndJsonStream(ba.writable, ab.readable);
const { clientStream, agentStream } = createInMemoryChannel();
const fakeAgent = new FakeAgent();
// Augment the agent with the unstable model setter via a proxy so we
// don't need to extend the FakeAgent class with optional methods.
@ -2515,10 +2495,7 @@ describe('createHttpAcpBridge', () => {
const factory: ChannelFactory = async () => {
// Build a channel pair where we capture the agent-side connection
// so we can drive sessionUpdate notifications from the test.
const ab = new TransformStream<Uint8Array, Uint8Array>();
const ba = new TransformStream<Uint8Array, Uint8Array>();
const clientStream = ndJsonStream(ab.writable, ba.readable);
const agentStream = ndJsonStream(ba.writable, ab.readable);
const { clientStream, agentStream } = createInMemoryChannel();
const fakeAgent = new FakeAgent();
capturedConn = new AgentSideConnection(() => fakeAgent, agentStream);
return {

View file

@ -0,0 +1,224 @@
/**
* @license
* Copyright 2025 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect } from 'vitest';
import type { AnyMessage } from '@agentclientprotocol/sdk';
// Import via the barrel rather than the source file so the public API
// surface (serve/index.ts) is exercised by CI — a typo or missing
// re-export would otherwise go undetected.
import { createInMemoryChannel } from './index.js';
/**
* Push one JSON-RPC notification onto a `Stream.writable`. The SDK's
* `ndJsonStream` encodes the message + appends `\n` so the matching
* decoder on the other side can frame it.
*/
async function send(
writable: WritableStream<AnyMessage>,
msg: AnyMessage,
): Promise<void> {
const writer = writable.getWriter();
try {
await writer.write(msg);
} finally {
writer.releaseLock();
}
}
/** Read the next single message off a `Stream.readable`. */
async function recvOne(
readable: ReadableStream<AnyMessage>,
): Promise<AnyMessage> {
const reader = readable.getReader();
try {
const { value, done } = await reader.read();
if (done || !value) {
throw new Error('stream closed before a frame arrived');
}
return value;
} finally {
reader.releaseLock();
}
}
describe('createInMemoryChannel', () => {
it('round-trips a frame from client to agent', async () => {
const { clientStream, agentStream } = createInMemoryChannel();
const sent: AnyMessage = {
jsonrpc: '2.0',
method: 'ping',
params: { n: 1 },
};
await send(clientStream.writable, sent);
const received = await recvOne(agentStream.readable);
expect(received).toEqual(sent);
});
it('round-trips a frame from agent to client', async () => {
const { clientStream, agentStream } = createInMemoryChannel();
const sent: AnyMessage = {
jsonrpc: '2.0',
method: 'pong',
params: { reply: true },
};
await send(agentStream.writable, sent);
const received = await recvOne(clientStream.readable);
expect(received).toEqual(sent);
});
it('preserves order across multiple frames in one direction', async () => {
const { clientStream, agentStream } = createInMemoryChannel();
const writer = clientStream.writable.getWriter();
for (let i = 1; i <= 3; i++) {
await writer.write({
jsonrpc: '2.0',
method: 'tick',
params: { i },
});
}
writer.releaseLock();
const reader = agentStream.readable.getReader();
const out: AnyMessage[] = [];
for (let i = 0; i < 3; i++) {
const { value, done } = await reader.read();
if (done || !value) throw new Error('unexpected close');
out.push(value);
}
reader.releaseLock();
expect(out.map((m) => (m as { params: { i: number } }).params.i)).toEqual([
1, 2, 3,
]);
});
it('isolates client→agent direction (client write does not echo to client.readable)', async () => {
// Sanity check that the channel is truly paired. A buggy
// implementation that aliased `ab` to both ends would still pass
// the round-trip tests above for one frame but echo the writer's
// own messages back on its own readable.
const { clientStream, agentStream } = createInMemoryChannel();
await send(clientStream.writable, {
jsonrpc: '2.0',
method: 'client-only',
});
const onAgent = await recvOne(agentStream.readable);
expect((onAgent as { method: string }).method).toBe('client-only');
// Client's own readable must NOT see it. Race a fresh read against
// a short timeout — if the channel is correctly paired the read
// never resolves on its own.
//
// The read promise stays pending while the timeout wins; releasing
// the reader's lock in `finally` then causes that pending read to
// reject per Web Streams spec. Attach a rejection handler so the
// post-`releaseLock` rejection doesn't surface as an unhandled
// rejection / flaky test signal — the rejection itself isn't a
// failure here, it's just the cleanup path settling.
const reader = clientStream.readable.getReader();
try {
const winner = await Promise.race([
reader.read().then(
() => 'leaked' as const,
() => null,
),
new Promise<'isolated'>((res) => setTimeout(() => res('isolated'), 50)),
]);
expect(winner).toBe('isolated');
} finally {
reader.releaseLock();
}
});
it('isolates agent→client direction (agent write does not echo to agent.readable)', async () => {
// Symmetric counterpart of the previous test — closes the obvious
// "wired one direction correctly but not the other" failure mode.
// Same `releaseLock`-causes-pending-read-to-reject handling as the
// previous test; see comment there.
const { clientStream, agentStream } = createInMemoryChannel();
await send(agentStream.writable, {
jsonrpc: '2.0',
method: 'agent-only',
});
const onClient = await recvOne(clientStream.readable);
expect((onClient as { method: string }).method).toBe('agent-only');
const reader = agentStream.readable.getReader();
try {
const winner = await Promise.race([
reader.read().then(
() => 'leaked' as const,
() => null,
),
new Promise<'isolated'>((res) => setTimeout(() => res('isolated'), 50)),
]);
expect(winner).toBe('isolated');
} finally {
reader.releaseLock();
}
});
it('abort() settles pending readers on both sides (teardown invariant)', async () => {
// Key teardown invariant: after abort(), pending read() calls do
// NOT hang. The exact settlement (resolve-with-{done:true} vs
// reject) depends on the SDK's ndJsonStream wrapper translation —
// see the helper's JSDoc. What matters is that consumers can
// GC-reclaim and shut down without leaking.
const { clientStream, agentStream, abort } = createInMemoryChannel();
const clientReader = clientStream.readable.getReader();
const agentReader = agentStream.readable.getReader();
// Race each read against a 100ms timeout; if abort works, both
// settle near-instantly. If abort doesn't propagate, the timeout
// wins and the test fails with `'hung'`.
const wrap = <T>(p: Promise<T>) =>
Promise.race<'settled' | 'hung'>([
p.then(
() => 'settled' as const,
() => 'settled' as const,
),
new Promise<'hung'>((res) => setTimeout(() => res('hung'), 100)),
]);
const clientReadP = wrap(clientReader.read());
const agentReadP = wrap(agentReader.read());
abort(new Error('shutdown'));
expect(await clientReadP).toBe('settled');
expect(await agentReadP).toBe('settled');
clientReader.releaseLock();
agentReader.releaseLock();
});
it('abort() is idempotent (calling twice is safe)', () => {
const { abort } = createInMemoryChannel();
expect(() => {
abort('first');
abort('second');
}).not.toThrow();
});
it('abort() with no reason still tears down', async () => {
const { agentStream, abort } = createInMemoryChannel();
const reader = agentStream.readable.getReader();
const readP = reader.read().then(
() => 'settled' as const,
() => 'settled' as const,
);
abort();
expect(
await Promise.race([
readP,
new Promise<'hung'>((res) => setTimeout(() => res('hung'), 100)),
]),
).toBe('settled');
reader.releaseLock();
});
});

View file

@ -0,0 +1,73 @@
/**
* @license
* Copyright 2025 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
import { ndJsonStream, type Stream } from '@agentclientprotocol/sdk';
/**
* Create a paired in-memory NDJSON channel: two `Stream`s connected
* back-to-back via two `TransformStream<Uint8Array, Uint8Array>` instances
* (one per direction).
* Whatever `clientStream.writable` writes appears on `agentStream.readable`,
* and vice versa. Each side is a full ACP `Stream` (via SDK `ndJsonStream`)
* so callers can hand them to `ClientSideConnection` / `AgentSideConnection`
* exactly as they would a real stdio pair.
*
* Used today by Stage 1 tests (replaces 10 sites of inline boilerplate
* in `httpAcpBridge.test.ts`). Will also be consumed by the Stage 1.5b
* in-process bridge (issue #4156) when that lands, to wrap an in-process
* `QwenAgent` without spawning a `qwen --acp` child.
*
* `abort(reason?)` is the universal teardown primitive. It calls
* `WritableStream.abort()` on both underlying byte-level
* `TransformStream`s, which immediately settles any pending `read()` /
* `write()` operations on both sides so the channel can be reclaimed.
* Use this to terminate the channel during shutdown / crash simulation
* / daemon teardown.
*
* **Settlement shape**: at the inner byte-level layer the pending read
* rejects with the supplied reason; at the outer SDK-wrapped `Stream`
* layer (what callers actually see) the SDK's `ndJsonStream` translates
* that error into a clean end-of-stream signal `read()` resolves with
* `{value: undefined, done: true}` rather than rejecting. The exact
* shape depends on how deep the consumer is in the wrapper chain, but
* the key invariant **pending operations no longer hang** holds
* either way. Consumers wanting to distinguish "graceful close" from
* "aborted" should track the call themselves.
*
* We expose `abort` rather than `close` because `close()` only reaches
* the opposite `ReadableStream` after pending writes flush, and in
* practice the SDK's `ndJsonStream` outer wrapper does not reliably
* propagate close at all. `abort` is forceful and synchronous-by-spec,
* so it is the safe primitive for lifecycle teardown across an
* `ndJsonStream`-wrapped pair.
*
* Consumers that don't need teardown (most test sites, which let the
* channel die with the test scope) can ignore `abort`. `abort` is a
* platform-level primitive (not a test-fixture concern), so exposing
* it does not pull fixture machinery into this production module.
*/
export function createInMemoryChannel(): {
clientStream: Stream;
agentStream: Stream;
abort(reason?: unknown): void;
} {
const ab = new TransformStream<Uint8Array, Uint8Array>();
const ba = new TransformStream<Uint8Array, Uint8Array>();
const clientStream = ndJsonStream(ab.writable, ba.readable);
const agentStream = ndJsonStream(ba.writable, ab.readable);
return {
clientStream,
agentStream,
abort(reason?: unknown) {
// Fire-and-forget; both `abort()` calls return promises that we
// intentionally do not await (callers want the synchronous
// "tear it down now" semantic) and which may reject if the
// stream is already in errored state — both are expected.
ab.writable.abort(reason).catch(() => {});
ba.writable.abort(reason).catch(() => {});
},
};
}

View file

@ -35,3 +35,4 @@ export {
type BridgeEvent,
type SubscribeOptions,
} from './eventBus.js';
export { createInMemoryChannel } from './inMemoryChannel.js';