fix(cli): wait for dual output stream shutdown (#3416)

Make DualOutputBridge.shutdown() await the underlying write stream close
  event instead of returning immediately after stream.end(). This removes
  the Windows temp directory cleanup race in DualOutputBridge tests and
  makes interactive cleanup reliably flush session_end.
This commit is contained in:
Reid 2026-04-18 18:11:19 +08:00 committed by GitHub
parent 7eba1c4635
commit 9f7f061bcc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 45 additions and 30 deletions

View file

@ -44,10 +44,8 @@ describe('DualOutputBridge', () => {
});
afterEach(async () => {
bridge?.shutdown();
await bridge?.shutdown();
bridge = null;
// Give the stream a tick to flush before removing the directory
await new Promise((r) => setTimeout(r, 10));
fs.rmSync(tmpDir, { recursive: true, force: true });
});
@ -69,8 +67,7 @@ describe('DualOutputBridge', () => {
const newFile = path.join(tmpDir, 'does-not-exist.jsonl');
// newFile is NOT pre-created — tests the ENOENT fallback path
bridge = new DualOutputBridge(config, { filePath: newFile });
bridge.shutdown();
await new Promise((r) => setTimeout(r, 10));
await bridge.shutdown();
const lines = readJsonl(newFile);
expect(lines.length).toBeGreaterThan(0);
@ -82,8 +79,7 @@ describe('DualOutputBridge', () => {
it('emits a session_start event immediately on construction', async () => {
bridge = new DualOutputBridge(config, { filePath: target });
bridge.shutdown();
await new Promise((r) => setTimeout(r, 10));
await bridge.shutdown();
const lines = readJsonl(target);
expect(lines.length).toBeGreaterThan(0);
@ -100,8 +96,7 @@ describe('DualOutputBridge', () => {
{ filePath: target },
{ version: '1.2.3' },
);
bridge.shutdown();
await new Promise((r) => setTimeout(r, 10));
await bridge.shutdown();
const lines = readJsonl(target);
const start = lines.find(
@ -116,8 +111,7 @@ describe('DualOutputBridge', () => {
it('emits session_end on shutdown for a clean termination signal', async () => {
bridge = new DualOutputBridge(config, { filePath: target });
bridge.shutdown();
await new Promise((r) => setTimeout(r, 10));
await bridge.shutdown();
const lines = readJsonl(target);
const end = lines.find(
@ -132,9 +126,8 @@ describe('DualOutputBridge', () => {
it('shutdown is idempotent — calling it twice emits session_end only once', async () => {
bridge = new DualOutputBridge(config, { filePath: target });
bridge.shutdown();
bridge.shutdown();
await new Promise((r) => setTimeout(r, 10));
await bridge.shutdown();
await bridge.shutdown();
const lines = readJsonl(target);
const endEvents = lines.filter(
@ -146,8 +139,7 @@ describe('DualOutputBridge', () => {
it('emitControlError routes through the adapter as a control_response error', async () => {
bridge = new DualOutputBridge(config, { filePath: target });
bridge.emitControlError('req-missing', 'unknown request_id');
bridge.shutdown();
await new Promise((r) => setTimeout(r, 10));
await bridge.shutdown();
const lines = readJsonl(target);
const errorResponse = lines.find(
@ -169,8 +161,7 @@ describe('DualOutputBridge', () => {
bridge = new DualOutputBridge(config, { filePath: target });
bridge.emitPermissionRequest('req-1', 'shell', 'tu-1', { cmd: 'ls' });
bridge.emitControlResponse('req-1', false);
bridge.shutdown();
await new Promise((r) => setTimeout(r, 10));
await bridge.shutdown();
const lines = readJsonl(target);
const request = lines.find((l) => l['type'] === 'control_request');
@ -198,7 +189,7 @@ describe('DualOutputBridge', () => {
it('reports isConnected=false after shutdown and silently drops further events', async () => {
bridge = new DualOutputBridge(config, { filePath: target });
bridge.shutdown();
await bridge.shutdown();
expect(bridge.isConnected).toBe(false);
// Should not throw

View file

@ -77,7 +77,7 @@ export class DualOutputBridge {
private readonly stream: WriteStream;
private readonly sessionId: string;
private active = true;
private shutdownCalled = false;
private shutdownPromise: Promise<void> | null = null;
constructor(
config: Config,
@ -288,9 +288,8 @@ export class DualOutputBridge {
}
}
shutdown(): void {
if (this.shutdownCalled) return;
this.shutdownCalled = true;
shutdown(): Promise<void> {
if (this.shutdownPromise) return this.shutdownPromise;
// Try to emit session_end before tearing the stream down so consumers
// get a definitive termination signal rather than inferring it from
// EPIPE. Failures here are swallowed — the stream may already be in an
@ -305,10 +304,35 @@ export class DualOutputBridge {
}
}
this.active = false;
try {
this.stream.end();
} catch (err) {
debugLogger.debug('DualOutput: stream end error during shutdown:', err);
}
this.shutdownPromise = new Promise((resolve) => {
if (this.stream.closed) {
resolve();
return;
}
const cleanup = () => {
this.stream.off('close', onClose);
this.stream.off('error', onError);
};
const onClose = () => {
cleanup();
resolve();
};
const onError = (err: Error) => {
debugLogger.debug('DualOutput: stream error during shutdown:', err);
};
this.stream.once('close', onClose);
this.stream.once('error', onError);
try {
this.stream.end();
} catch (err) {
cleanup();
debugLogger.debug('DualOutput: stream end error during shutdown:', err);
resolve();
}
});
return this.shutdownPromise;
}
}

View file

@ -269,9 +269,9 @@ export async function startInteractiveUI(
});
}
registerCleanup(() => {
registerCleanup(async () => {
remoteInputWatcher?.shutdown();
dualOutputBridge?.shutdown();
await dualOutputBridge?.shutdown();
instance.unmount();
restoreTerminalRedrawOptimizer();
});