mirror of
https://github.com/QwenLM/qwen-code.git
synced 2026-05-20 01:01:53 +00:00
fix(core): detach service listeners on background-promote (resolve review)
Addresses 4 Critical + 2 Suggestion findings on PR-1 of #3831: - **childProcess listener detach** (review line 555 + 573): Anonymous arrow listeners on stdout/stderr/error/exit could not be off()'d. After background-promote, post-promote bytes would re-enter handleOutput, which then calls decoder.decode() on a now-finalized text decoder (cleanup() already called .decode() without stream:true) → TypeError crash. Even without the crash, old onOutputEvent would fire for new data → ownership contract violation + duplication. Fix: extract named handler refs (stdoutHandler / stderrHandler / errorHandler / exitHandler) and call off() on all four in the background-promote branch via a detachServiceListeners() helper. - **PTY listener detach** (review line 967 + 990): node-pty's onData / onExit return IDisposable handles; the abort handler now captures dataDisposable / exitDisposable and calls .dispose() in the background-promote branch. ptyProcess.on('error') is EventEmitter-style (not IDisposable) — extract a named ptyErrorHandler ref and off() it. Without these, post-promote PTY error throws → Node.js crash; post-promote data continues writing to headlessTerminal and calling old onOutputEvent → ownership violation. - **PTY in-flight chain item ownership** (related to review line 990): processingChain may have already-enqueued callbacks past the early listenersDetached check. Refactored from "early-return short-circuit" to "guard each onOutputEvent emit individually" so in-flight writes still LAND in headlessTerminal (snapshot reflects them) but no events leak to the foreground onOutputEvent. Also clear renderTimeout in the abort handler so a pending throttled render doesn't fire post-promote. - **PTY snapshot freshness** (review line 972, suggestion): The original abort handler called serializeTerminalToText immediately. Now we await Promise.race([processingChain drain, SIGKILL_TIMEOUT_MS]) first (mirrors the onExit finalize pattern at ~line 970) so in-flight headlessTerminal.write callbacks land before serialization. Skipped render(true) intentionally because it would emit final onOutputEvent data (renderFn calls onOutputEvent), violating the "no emit post-promote" invariant — added a comment explaining why direct serialize is correct. - **Handoff-boundary tests** (review line 1257, suggestion): Added 4 new tests pinning the ownership contract — 2 for child_process (post-promote stdout/stderr does NOT route to onOutputEvent; child exit does NOT re-resolve result), 2 for PTY (data/exit disposables ARE called; result shape stays promoted: true even if post-promote events fire). Also: test setup now stubs mockPtyProcess.onData / .onExit to return { dispose: vi.fn() } so the background-promote path's dispose() calls don't crash on undefined (the stub's mock.results[0].value is then inspected by the new handoff tests). 58 / 58 tests pass (50 baseline + 4 first-pass + 4 handoff). Total +235 / -35 on top of the prior commit.
This commit is contained in:
parent
6cbab376d7
commit
8e8e18ca7f
2 changed files with 235 additions and 35 deletions
|
|
@ -228,8 +228,12 @@ describe('ShellExecutionService', () => {
|
|||
};
|
||||
mockPtyProcess.pid = 12345;
|
||||
mockPtyProcess.kill = vi.fn();
|
||||
mockPtyProcess.onData = vi.fn();
|
||||
mockPtyProcess.onExit = vi.fn();
|
||||
// node-pty's onData/onExit return IDisposable; the production
|
||||
// background-promote path calls .dispose() on those handles to detach
|
||||
// its listeners cleanly. Mock them to return a disposable stub so the
|
||||
// promote path doesn't crash on `undefined.dispose()`.
|
||||
mockPtyProcess.onData = vi.fn().mockReturnValue({ dispose: vi.fn() });
|
||||
mockPtyProcess.onExit = vi.fn().mockReturnValue({ dispose: vi.fn() });
|
||||
mockPtyProcess.write = vi.fn();
|
||||
mockPtyProcess.resize = vi.fn();
|
||||
|
||||
|
|
@ -664,6 +668,60 @@ describe('ShellExecutionService', () => {
|
|||
'SIGKILL',
|
||||
);
|
||||
});
|
||||
|
||||
it('post-promotion: PTY data is no longer routed to onOutputEvent (handoff boundary)', async () => {
|
||||
// Pin the ownership contract: after background-promote, PTY data
|
||||
// arriving on the still-running child must NOT surface through the
|
||||
// foreground execute()'s onOutputEvent (the caller has its own
|
||||
// listeners now). Without dataDisposable.dispose() in the abort
|
||||
// handler, the listener-retention bug would let post-promote bytes
|
||||
// leak into the foreground consumer.
|
||||
const { result } = await simulateExecution(
|
||||
'tail -f /tmp/never.log',
|
||||
(pty, abortController) => {
|
||||
// Data BEFORE promote — fed via the live onData listener so it
|
||||
// reaches the foreground onOutputEvent normally.
|
||||
pty.onData.mock.calls[0][0]('pre-promote-data\n');
|
||||
abortController.abort({
|
||||
kind: 'background',
|
||||
shellId: 'bg_test123',
|
||||
} satisfies ShellAbortReason);
|
||||
},
|
||||
);
|
||||
expect(result.promoted).toBe(true);
|
||||
|
||||
// The disposable returned by mockPtyProcess.onData was disposed by
|
||||
// the abort handler — verify by calling .dispose's mock.
|
||||
const dataDisposableStub = mockPtyProcess.onData.mock.results[0]
|
||||
.value as { dispose: Mock };
|
||||
expect(dataDisposableStub.dispose).toHaveBeenCalled();
|
||||
const exitDisposableStub = mockPtyProcess.onExit.mock.results[0]
|
||||
.value as { dispose: Mock };
|
||||
expect(exitDisposableStub.dispose).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('post-promotion: PTY exit does NOT re-resolve the result (already resolved with promoted)', async () => {
|
||||
// Pin: even if the still-running child later exits naturally and the
|
||||
// caller's own exit listener fires, our foreground result Promise
|
||||
// must NOT be re-resolved with a different shape (Promise can only
|
||||
// resolve once). The exit disposable being disposed prevents our
|
||||
// own onExit from firing at all in the first place — but verify the
|
||||
// final resolved shape stays `promoted: true` regardless.
|
||||
const { result } = await simulateExecution(
|
||||
'tail -f /tmp/never.log',
|
||||
(_pty, abortController) => {
|
||||
abortController.abort({
|
||||
kind: 'background',
|
||||
shellId: 'bg_test123',
|
||||
} satisfies ShellAbortReason);
|
||||
},
|
||||
);
|
||||
|
||||
// Resolved as promoted, with no exit info from a post-promote exit.
|
||||
expect(result.promoted).toBe(true);
|
||||
expect(result.exitCode).toBeNull();
|
||||
expect(result.signal).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe('Binary Output', () => {
|
||||
|
|
@ -1256,6 +1314,68 @@ describe('ShellExecutionService child_process fallback', () => {
|
|||
expect(mockChildProcess.kill).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('post-promotion: stdout / stderr data is no longer routed to onOutputEvent (handoff boundary)', async () => {
|
||||
mockPlatform.mockReturnValue('linux');
|
||||
// Pin the ownership contract: after background-promote, stdout/stderr
|
||||
// arriving on the still-running child must NOT surface through the
|
||||
// foreground execute()'s onOutputEvent. Without off()'ing the
|
||||
// stdoutHandler / stderrHandler in the abort handler, post-promote
|
||||
// bytes would re-enter handleOutput, which then calls
|
||||
// decoder.decode() on a now-finalized decoder (cleanup() called
|
||||
// .decode() without stream:true) → TypeError crash, OR routes to
|
||||
// onOutputEvent → ownership leak / duplicated emit.
|
||||
const { result } = await simulateExecution(
|
||||
'tail -f /tmp/never.log',
|
||||
(cp, abortController) => {
|
||||
cp.stdout?.emit('data', Buffer.from('pre-promote\n'));
|
||||
abortController.abort({
|
||||
kind: 'background',
|
||||
shellId: 'bg_test123',
|
||||
} satisfies ShellAbortReason);
|
||||
// Capture call count at the moment of promote, then emit more
|
||||
// data on the still-live child stream and assert onOutputEvent
|
||||
// was NOT called again. (Also verifies no TypeError from
|
||||
// decoding through the finalized decoder.)
|
||||
const eventCountAtPromote = onOutputEventMock.mock.calls.length;
|
||||
cp.stdout?.emit('data', Buffer.from('post-promote-stdout\n'));
|
||||
cp.stderr?.emit('data', Buffer.from('post-promote-stderr\n'));
|
||||
expect(onOutputEventMock.mock.calls.length).toBe(eventCountAtPromote);
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.promoted).toBe(true);
|
||||
// Pre-promote data made it into the snapshot; post-promote did not.
|
||||
expect(result.output).toContain('pre-promote');
|
||||
expect(result.output).not.toContain('post-promote-stdout');
|
||||
expect(result.output).not.toContain('post-promote-stderr');
|
||||
});
|
||||
|
||||
it('post-promotion: child exit does NOT re-resolve the result with a non-promoted shape', async () => {
|
||||
mockPlatform.mockReturnValue('linux');
|
||||
// Pin: even if the still-running child later exits naturally and the
|
||||
// caller's own exit listener fires, our foreground result Promise
|
||||
// must NOT be re-resolved (Promise can only resolve once). The
|
||||
// detached exit handler prevents our own handler from firing.
|
||||
const { result } = await simulateExecution(
|
||||
'tail -f /tmp/never.log',
|
||||
(cp, abortController) => {
|
||||
abortController.abort({
|
||||
kind: 'background',
|
||||
shellId: 'bg_test123',
|
||||
} satisfies ShellAbortReason);
|
||||
// Simulate the still-running child exiting later; this should
|
||||
// NOT route through our handleExit because the exit listener
|
||||
// was off()'d in the background-promote branch.
|
||||
cp.emit('exit', 42, null);
|
||||
cp.emit('close', 42, null);
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.promoted).toBe(true);
|
||||
expect(result.exitCode).toBeNull();
|
||||
expect(result.signal).toBeNull();
|
||||
});
|
||||
|
||||
it('should gracefully attempt SIGKILL on linux if SIGTERM fails', async () => {
|
||||
mockPlatform.mockReturnValue('linux');
|
||||
vi.useFakeTimers();
|
||||
|
|
@ -1452,8 +1572,12 @@ describe('ShellExecutionService execution method selection', () => {
|
|||
};
|
||||
mockPtyProcess.pid = 12345;
|
||||
mockPtyProcess.kill = vi.fn();
|
||||
mockPtyProcess.onData = vi.fn();
|
||||
mockPtyProcess.onExit = vi.fn();
|
||||
// node-pty's onData/onExit return IDisposable; the production
|
||||
// background-promote path calls .dispose() on those handles to detach
|
||||
// its listeners cleanly. Mock them to return a disposable stub so the
|
||||
// promote path doesn't crash on `undefined.dispose()`.
|
||||
mockPtyProcess.onData = vi.fn().mockReturnValue({ dispose: vi.fn() });
|
||||
mockPtyProcess.onExit = vi.fn().mockReturnValue({ dispose: vi.fn() });
|
||||
mockPtyProcess.write = vi.fn();
|
||||
mockPtyProcess.resize = vi.fn();
|
||||
|
||||
|
|
|
|||
|
|
@ -535,22 +535,50 @@ export class ShellExecutionService {
|
|||
});
|
||||
};
|
||||
|
||||
child.stdout.on('data', (data) => handleOutput(data, 'stdout'));
|
||||
child.stderr.on('data', (data) => handleOutput(data, 'stderr'));
|
||||
child.on('error', (err) => {
|
||||
// Named handler refs so the background-promote branch below can
|
||||
// detach them all and hand ownership of the child cleanly to the
|
||||
// caller. Anonymous arrows here would leak: the still-running child
|
||||
// would keep firing into our handlers (using a finalized decoder →
|
||||
// TypeError, or duplicating events the caller now also receives).
|
||||
const stdoutHandler = (data: Buffer) => handleOutput(data, 'stdout');
|
||||
const stderrHandler = (data: Buffer) => handleOutput(data, 'stderr');
|
||||
const errorHandler = (err: Error) => {
|
||||
error = err;
|
||||
handleExit(1, null);
|
||||
});
|
||||
};
|
||||
const exitHandler = (
|
||||
code: number | null,
|
||||
signal: NodeJS.Signals | null,
|
||||
) => {
|
||||
if (child.pid) {
|
||||
this.activeChildProcesses.delete(child.pid);
|
||||
}
|
||||
handleExit(code, signal);
|
||||
};
|
||||
|
||||
child.stdout.on('data', stdoutHandler);
|
||||
child.stderr.on('data', stderrHandler);
|
||||
child.on('error', errorHandler);
|
||||
|
||||
const detachServiceListeners = () => {
|
||||
child.stdout?.off('data', stdoutHandler);
|
||||
child.stderr?.off('data', stderrHandler);
|
||||
child.off('error', errorHandler);
|
||||
child.off('exit', exitHandler);
|
||||
};
|
||||
|
||||
const abortHandler = async () => {
|
||||
// Background-promote takeover: skip kill, drop the child from our
|
||||
// active set (so cleanup() won't kill it later), flush our text
|
||||
// buffers into a snapshot, and resolve immediately with
|
||||
// `promoted: true` so the awaiting caller unblocks. The caller has
|
||||
// attached its own listeners to the live child by this point.
|
||||
// Background-promote takeover: skip kill, detach our listeners (so
|
||||
// post-promote output doesn't leak into the foreground onOutputEvent
|
||||
// or the now-finalized text decoder), drop the child from our active
|
||||
// set (so cleanup() won't kill it later), flush our text buffers
|
||||
// into a snapshot, and resolve immediately with `promoted: true` so
|
||||
// the awaiting caller unblocks. The caller has attached its own
|
||||
// listeners to the live child by this point and now owns the child.
|
||||
const reason = abortSignal.reason as ShellAbortReason | undefined;
|
||||
if (reason?.kind === 'background' && child.pid && !exited) {
|
||||
this.activeChildProcesses.delete(child.pid);
|
||||
detachServiceListeners();
|
||||
const {
|
||||
stdout: snapStdout,
|
||||
stderr: snapStderr,
|
||||
|
|
@ -597,12 +625,7 @@ export class ShellExecutionService {
|
|||
this.activeChildProcesses.add(child.pid);
|
||||
}
|
||||
|
||||
child.on('exit', (code, signal) => {
|
||||
if (child.pid) {
|
||||
this.activeChildProcesses.delete(child.pid);
|
||||
}
|
||||
handleExit(code, signal);
|
||||
});
|
||||
child.on('exit', exitHandler);
|
||||
|
||||
function cleanup() {
|
||||
exited = true;
|
||||
|
|
@ -717,11 +740,18 @@ export class ShellExecutionService {
|
|||
let isWriting = false;
|
||||
let hasStartedOutput = false;
|
||||
let renderTimeout: NodeJS.Timeout | null = null;
|
||||
// Set to true by the background-promote branch so any in-flight
|
||||
// processingChain callback or pending render short-circuits instead
|
||||
// of emitting onOutputEvent / writing to the (now caller-owned)
|
||||
// headlessTerminal. The PTY data disposable is also disposed in the
|
||||
// same branch so no NEW work is enqueued — this guard handles the
|
||||
// already-scheduled chain items.
|
||||
let listenersDetached = false;
|
||||
|
||||
const RENDER_THROTTLE_MS = 100;
|
||||
|
||||
const renderFn = () => {
|
||||
if (!isStreamingRawContent) {
|
||||
if (!isStreamingRawContent || listenersDetached) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -842,30 +872,46 @@ export class ShellExecutionService {
|
|||
|
||||
if (isBinary(sniffBuffer)) {
|
||||
isStreamingRawContent = false;
|
||||
onOutputEvent({ type: 'binary_detected' });
|
||||
if (!listenersDetached) {
|
||||
onOutputEvent({ type: 'binary_detected' });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (isStreamingRawContent) {
|
||||
const decodedChunk = decoder!.decode(data, { stream: true });
|
||||
isWriting = true;
|
||||
// Allow in-flight writes to LAND in the headlessTerminal
|
||||
// even after a background promote — the snapshot we'll
|
||||
// serialize next reads from this buffer. The render()
|
||||
// callback (and renderFn) is already guarded by
|
||||
// listenersDetached, so no onOutputEvent fires.
|
||||
headlessTerminal.write(decodedChunk, () => {
|
||||
render();
|
||||
isWriting = false;
|
||||
resolve();
|
||||
});
|
||||
} else {
|
||||
onOutputEvent({
|
||||
type: 'binary_progress',
|
||||
bytesReceived,
|
||||
});
|
||||
if (!listenersDetached) {
|
||||
onOutputEvent({
|
||||
type: 'binary_progress',
|
||||
bytesReceived,
|
||||
});
|
||||
}
|
||||
resolve();
|
||||
}
|
||||
}),
|
||||
);
|
||||
};
|
||||
|
||||
ptyProcess.onData((data: string) => {
|
||||
// Capture the IDisposables that node-pty returns so the
|
||||
// background-promote branch below can hand the live PTY to the
|
||||
// caller cleanly. Without dispose(), post-promote PTY data would
|
||||
// continue calling our handleOutput → render → onOutputEvent (the
|
||||
// foreground caller's downstream consumer that no longer owns this
|
||||
// child) and post-promote PTY errors would `throw err` → process
|
||||
// crash.
|
||||
const dataDisposable = ptyProcess.onData((data: string) => {
|
||||
const bufferData = Buffer.from(data, 'utf-8');
|
||||
handleOutput(bufferData);
|
||||
});
|
||||
|
|
@ -874,7 +920,7 @@ export class ShellExecutionService {
|
|||
// due to race conditions between the exit event and read operations.
|
||||
// This is a normal behavior on macOS/Linux and should not crash the app.
|
||||
// See: https://github.com/microsoft/node-pty/issues/178
|
||||
ptyProcess.on('error', (err: NodeJS.ErrnoException) => {
|
||||
const ptyErrorHandler = (err: NodeJS.ErrnoException) => {
|
||||
if (isExpectedPtyReadExitError(err)) {
|
||||
// EIO is expected when the PTY process exits - ignore it
|
||||
return;
|
||||
|
|
@ -882,9 +928,10 @@ export class ShellExecutionService {
|
|||
|
||||
// Surface unexpected PTY errors to preserve existing crash behavior.
|
||||
throw err;
|
||||
});
|
||||
};
|
||||
ptyProcess.on('error', ptyErrorHandler);
|
||||
|
||||
ptyProcess.onExit(
|
||||
const exitDisposable = ptyProcess.onExit(
|
||||
({ exitCode, signal }: { exitCode: number; signal?: number }) => {
|
||||
exited = true;
|
||||
abortSignal.removeEventListener('abort', abortHandler);
|
||||
|
|
@ -954,17 +1001,46 @@ export class ShellExecutionService {
|
|||
);
|
||||
|
||||
const abortHandler = async () => {
|
||||
// Background-promote takeover: skip kill, drop the PTY from the
|
||||
// active set (so cleanup() won't kill it later), flush a snapshot
|
||||
// of the output buffer captured so far, and resolve immediately
|
||||
// with `promoted: true` so the awaiting caller unblocks. The
|
||||
// caller has attached its own listeners to the live PTY by this
|
||||
// point and owns its lifecycle from here on.
|
||||
// Background-promote takeover: skip kill, dispose all our
|
||||
// listeners on the live PTY (so post-promote data/exit/error don't
|
||||
// leak into our foreground onOutputEvent or crash via the error
|
||||
// handler's `throw err`), set the listenersDetached guard so any
|
||||
// already-enqueued processingChain callback's onOutputEvent emits
|
||||
// are suppressed (in-flight writes still LAND in headlessTerminal
|
||||
// so the snapshot below reflects them), drain pending chain work,
|
||||
// drop the PTY from the active set (so cleanup() won't kill it
|
||||
// later), serialize the terminal as the snapshot, and resolve
|
||||
// immediately with `promoted: true` so the awaiting caller
|
||||
// unblocks. The caller has attached its own listeners to the live
|
||||
// PTY by this point and owns its lifecycle from here on.
|
||||
const reason = abortSignal.reason as ShellAbortReason | undefined;
|
||||
if (reason?.kind === 'background' && ptyProcess.pid && !exited) {
|
||||
exited = true;
|
||||
listenersDetached = true;
|
||||
abortSignal.removeEventListener('abort', abortHandler);
|
||||
dataDisposable.dispose();
|
||||
exitDisposable.dispose();
|
||||
ptyProcess.off('error', ptyErrorHandler);
|
||||
if (renderTimeout) {
|
||||
clearTimeout(renderTimeout);
|
||||
renderTimeout = null;
|
||||
}
|
||||
this.activePtys.delete(ptyProcess.pid);
|
||||
|
||||
// Drain in-flight chain work (already-enqueued
|
||||
// headlessTerminal.write callbacks) so the snapshot reflects
|
||||
// the last batch of bytes the PTY emitted before promote.
|
||||
// Bounded by SIGKILL_TIMEOUT_MS so the caller's await never
|
||||
// blocks indefinitely if a write callback is stuck.
|
||||
const drain = () =>
|
||||
new Promise<void>((res) => setImmediate(res)).then(
|
||||
() => processingChain,
|
||||
);
|
||||
await Promise.race([
|
||||
processingChain.then(drain).then(drain),
|
||||
new Promise<void>((res) => setTimeout(res, SIGKILL_TIMEOUT_MS)),
|
||||
]);
|
||||
|
||||
const finalBuffer = Buffer.concat(outputChunks);
|
||||
let snapshot = '';
|
||||
try {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue