From d439e7d7381c433db60a1eb31da618b4103bfd91 Mon Sep 17 00:00:00 2001 From: Reid <61492567+reidliu41@users.noreply.github.com> Date: Thu, 16 Apr 2026 10:52:19 +0800 Subject: [PATCH] fix(sdk): avoid leaking process exit listeners in ProcessTransport (#3295) * fix(sdk): avoid leaking process exit listeners in ProcessTransport * Strengthen ProcessTransport cleanup during process exit. This updates the shared process-exit cleanup path to use a best-effort SIGTERM/SIGKILL sequence and adds coverage to verify the global exit handler terminates all active child processes. It keeps the listener leak fix in place while closing the remaining gaps found in review. --- .../src/transport/ProcessTransport.ts | 80 ++++++++++++++++--- .../test/unit/ProcessTransport.test.ts | 74 +++++++++++++++-- 2 files changed, 134 insertions(+), 20 deletions(-) diff --git a/packages/sdk-typescript/src/transport/ProcessTransport.ts b/packages/sdk-typescript/src/transport/ProcessTransport.ts index fa55d0327..1a63e96d5 100644 --- a/packages/sdk-typescript/src/transport/ProcessTransport.ts +++ b/packages/sdk-typescript/src/transport/ProcessTransport.ts @@ -11,6 +11,14 @@ import { SdkLogger } from '../utils/logger.js'; const logger = SdkLogger.createLogger('ProcessTransport'); export class ProcessTransport implements Transport { + private static activeTransports = new Set(); + private static hasProcessExitHandler = false; + private static readonly globalProcessExitHandler = (): void => { + for (const transport of ProcessTransport.activeTransports) { + transport.killChildProcessOnProcessExit(); + } + }; + private childProcess: ChildProcess | null = null; private childStdin: Writable | null = null; private childStdout: Readable | null = null; @@ -20,7 +28,6 @@ export class ProcessTransport implements Transport { private closed = false; private inputClosed = false; private abortController: AbortController; - private processExitHandler: (() => void) | null = null; private abortHandler: (() => void) | null = null; constructor(options: TransportOptions) { @@ -159,32 +166,81 @@ export class ProcessTransport implements Transport { }); } - const cleanup = (): void => { - if (this.childProcess && !this.childProcess.killed) { - this.childProcess.kill('SIGTERM'); - } + this.abortHandler = () => { + this.killChildProcess(); }; - - this.processExitHandler = cleanup; - this.abortHandler = cleanup; - process.on('exit', this.processExitHandler); this.abortController.signal.addEventListener('abort', this.abortHandler); + this.registerForProcessExit(); this.setupEventHandlers(); this.ready = true; logger.info('CLI process started successfully'); } catch (error) { + this.unregisterForProcessExit(); + if (this.abortHandler) { + this.abortController.signal.removeEventListener( + 'abort', + this.abortHandler, + ); + this.abortHandler = null; + } this.ready = false; logger.error('Failed to initialize CLI process:', error); throw error; } } + private registerForProcessExit(): void { + ProcessTransport.activeTransports.add(this); + if (!ProcessTransport.hasProcessExitHandler) { + process.on('exit', ProcessTransport.globalProcessExitHandler); + ProcessTransport.hasProcessExitHandler = true; + } + } + + private unregisterForProcessExit(): void { + ProcessTransport.activeTransports.delete(this); + if ( + ProcessTransport.hasProcessExitHandler && + ProcessTransport.activeTransports.size === 0 + ) { + process.off('exit', ProcessTransport.globalProcessExitHandler); + ProcessTransport.hasProcessExitHandler = false; + } + } + + private killChildProcess(): void { + if (this.childProcess && !this.childProcess.killed) { + this.childProcess.kill('SIGTERM'); + } + } + + private killChildProcessOnProcessExit(): void { + if (!this.childProcess || this.childProcess.exitCode !== null) { + return; + } + + try { + this.childProcess.kill('SIGTERM'); + } catch { + return; + } + + // Timers do not reliably run during process exit, so use a best-effort + // synchronous escalation to avoid leaving child processes behind. + try { + this.childProcess.kill('SIGKILL'); + } catch { + // Ignore failures during process teardown. + } + } + private setupEventHandlers(): void { if (!this.childProcess) return; this.childProcess.on('error', (error) => { + this.unregisterForProcessExit(); this.ready = false; if (this.abortController.signal.aborted) { this._exitError = new AbortError('CLI process aborted by user'); @@ -195,6 +251,7 @@ export class ProcessTransport implements Transport { }); this.childProcess.on('close', (code, signal) => { + this.unregisterForProcessExit(); this.ready = false; if (this.abortController.signal.aborted) { this._exitError = new AbortError('CLI process aborted by user'); @@ -287,10 +344,7 @@ export class ProcessTransport implements Transport { this.childStdin = null; } - if (this.processExitHandler) { - process.off('exit', this.processExitHandler); - this.processExitHandler = null; - } + this.unregisterForProcessExit(); if (this.abortHandler) { this.abortController.signal.removeEventListener( diff --git a/packages/sdk-typescript/test/unit/ProcessTransport.test.ts b/packages/sdk-typescript/test/unit/ProcessTransport.test.ts index b5e6c19c0..94bacb3f4 100644 --- a/packages/sdk-typescript/test/unit/ProcessTransport.test.ts +++ b/packages/sdk-typescript/test/unit/ProcessTransport.test.ts @@ -120,6 +120,32 @@ describe('ProcessTransport', () => { }); describe('Construction and Initialization', () => { + it('should not add one process exit listener per transport instance', async () => { + mockPrepareSpawnInfo.mockReturnValue({ + command: 'qwen', + args: [], + type: 'native', + originalInput: 'qwen', + }); + + const transports: ProcessTransport[] = []; + const initialExitListeners = process.listeners('exit').length; + + for (let i = 0; i < 12; i++) { + mockSpawn.mockReturnValue(createMockChildProcess()); + transports.push( + new ProcessTransport({ + pathToQwenExecutable: 'qwen', + }), + ); + } + + const finalExitListeners = process.listeners('exit').length; + expect(finalExitListeners - initialExitListeners).toBeLessThanOrEqual(1); + + await Promise.all(transports.map((transport) => transport.close())); + }); + it('should create transport with required options', () => { mockPrepareSpawnInfo.mockReturnValue({ command: 'qwen', @@ -972,17 +998,19 @@ describe('ProcessTransport', () => { }); mockSpawn.mockReturnValue(mockChildProcess); - const processOnSpy = vi.spyOn(process, 'on'); + const initialExitListeners = process.listeners('exit').length; const options: TransportOptions = { pathToQwenExecutable: 'qwen', }; - new ProcessTransport(options); + const transport = new ProcessTransport(options); - expect(processOnSpy).toHaveBeenCalledWith('exit', expect.any(Function)); + const finalExitListeners = process.listeners('exit').length; + expect(finalExitListeners).toBeGreaterThanOrEqual(initialExitListeners); + expect(finalExitListeners).toBeLessThanOrEqual(initialExitListeners + 1); - processOnSpy.mockRestore(); + void transport.close(); }); it('should remove event listeners on close', async () => { @@ -994,7 +1022,7 @@ describe('ProcessTransport', () => { }); mockSpawn.mockReturnValue(mockChildProcess); - const processOffSpy = vi.spyOn(process, 'off'); + const initialExitListeners = process.listeners('exit').length; const options: TransportOptions = { pathToQwenExecutable: 'qwen', @@ -1004,9 +1032,41 @@ describe('ProcessTransport', () => { await transport.close(); - expect(processOffSpy).toHaveBeenCalledWith('exit', expect.any(Function)); + expect(process.listeners('exit').length).toBe(initialExitListeners); + }); - processOffSpy.mockRestore(); + it('should terminate all active child processes from the global exit handler', async () => { + mockPrepareSpawnInfo.mockReturnValue({ + command: 'qwen', + args: [], + type: 'native', + originalInput: 'qwen', + }); + + const childA = createMockChildProcess(); + const childB = createMockChildProcess(); + mockSpawn.mockReturnValueOnce(childA).mockReturnValueOnce(childB); + + const transportA = new ProcessTransport({ + pathToQwenExecutable: 'qwen', + }); + const transportB = new ProcessTransport({ + pathToQwenExecutable: 'qwen', + }); + + ( + ProcessTransport as unknown as { + globalProcessExitHandler: () => void; + } + ).globalProcessExitHandler(); + + expect(childA.kill).toHaveBeenCalledWith('SIGTERM'); + expect(childA.kill).toHaveBeenCalledWith('SIGKILL'); + expect(childB.kill).toHaveBeenCalledWith('SIGTERM'); + expect(childB.kill).toHaveBeenCalledWith('SIGKILL'); + + await transportA.close(); + await transportB.close(); }); it('should register abort listener', () => {