diff --git a/packages/sdk-typescript/src/transport/ProcessTransport.ts b/packages/sdk-typescript/src/transport/ProcessTransport.ts index fa55d0327..1a63e96d5 100644 --- a/packages/sdk-typescript/src/transport/ProcessTransport.ts +++ b/packages/sdk-typescript/src/transport/ProcessTransport.ts @@ -11,6 +11,14 @@ import { SdkLogger } from '../utils/logger.js'; const logger = SdkLogger.createLogger('ProcessTransport'); export class ProcessTransport implements Transport { + private static activeTransports = new Set(); + private static hasProcessExitHandler = false; + private static readonly globalProcessExitHandler = (): void => { + for (const transport of ProcessTransport.activeTransports) { + transport.killChildProcessOnProcessExit(); + } + }; + private childProcess: ChildProcess | null = null; private childStdin: Writable | null = null; private childStdout: Readable | null = null; @@ -20,7 +28,6 @@ export class ProcessTransport implements Transport { private closed = false; private inputClosed = false; private abortController: AbortController; - private processExitHandler: (() => void) | null = null; private abortHandler: (() => void) | null = null; constructor(options: TransportOptions) { @@ -159,32 +166,81 @@ export class ProcessTransport implements Transport { }); } - const cleanup = (): void => { - if (this.childProcess && !this.childProcess.killed) { - this.childProcess.kill('SIGTERM'); - } + this.abortHandler = () => { + this.killChildProcess(); }; - - this.processExitHandler = cleanup; - this.abortHandler = cleanup; - process.on('exit', this.processExitHandler); this.abortController.signal.addEventListener('abort', this.abortHandler); + this.registerForProcessExit(); this.setupEventHandlers(); this.ready = true; logger.info('CLI process started successfully'); } catch (error) { + this.unregisterForProcessExit(); + if (this.abortHandler) { + this.abortController.signal.removeEventListener( + 'abort', + this.abortHandler, + ); + this.abortHandler = null; + } this.ready = false; logger.error('Failed to initialize CLI process:', error); throw error; } } + private registerForProcessExit(): void { + ProcessTransport.activeTransports.add(this); + if (!ProcessTransport.hasProcessExitHandler) { + process.on('exit', ProcessTransport.globalProcessExitHandler); + ProcessTransport.hasProcessExitHandler = true; + } + } + + private unregisterForProcessExit(): void { + ProcessTransport.activeTransports.delete(this); + if ( + ProcessTransport.hasProcessExitHandler && + ProcessTransport.activeTransports.size === 0 + ) { + process.off('exit', ProcessTransport.globalProcessExitHandler); + ProcessTransport.hasProcessExitHandler = false; + } + } + + private killChildProcess(): void { + if (this.childProcess && !this.childProcess.killed) { + this.childProcess.kill('SIGTERM'); + } + } + + private killChildProcessOnProcessExit(): void { + if (!this.childProcess || this.childProcess.exitCode !== null) { + return; + } + + try { + this.childProcess.kill('SIGTERM'); + } catch { + return; + } + + // Timers do not reliably run during process exit, so use a best-effort + // synchronous escalation to avoid leaving child processes behind. + try { + this.childProcess.kill('SIGKILL'); + } catch { + // Ignore failures during process teardown. + } + } + private setupEventHandlers(): void { if (!this.childProcess) return; this.childProcess.on('error', (error) => { + this.unregisterForProcessExit(); this.ready = false; if (this.abortController.signal.aborted) { this._exitError = new AbortError('CLI process aborted by user'); @@ -195,6 +251,7 @@ export class ProcessTransport implements Transport { }); this.childProcess.on('close', (code, signal) => { + this.unregisterForProcessExit(); this.ready = false; if (this.abortController.signal.aborted) { this._exitError = new AbortError('CLI process aborted by user'); @@ -287,10 +344,7 @@ export class ProcessTransport implements Transport { this.childStdin = null; } - if (this.processExitHandler) { - process.off('exit', this.processExitHandler); - this.processExitHandler = null; - } + this.unregisterForProcessExit(); if (this.abortHandler) { this.abortController.signal.removeEventListener( diff --git a/packages/sdk-typescript/test/unit/ProcessTransport.test.ts b/packages/sdk-typescript/test/unit/ProcessTransport.test.ts index b5e6c19c0..94bacb3f4 100644 --- a/packages/sdk-typescript/test/unit/ProcessTransport.test.ts +++ b/packages/sdk-typescript/test/unit/ProcessTransport.test.ts @@ -120,6 +120,32 @@ describe('ProcessTransport', () => { }); describe('Construction and Initialization', () => { + it('should not add one process exit listener per transport instance', async () => { + mockPrepareSpawnInfo.mockReturnValue({ + command: 'qwen', + args: [], + type: 'native', + originalInput: 'qwen', + }); + + const transports: ProcessTransport[] = []; + const initialExitListeners = process.listeners('exit').length; + + for (let i = 0; i < 12; i++) { + mockSpawn.mockReturnValue(createMockChildProcess()); + transports.push( + new ProcessTransport({ + pathToQwenExecutable: 'qwen', + }), + ); + } + + const finalExitListeners = process.listeners('exit').length; + expect(finalExitListeners - initialExitListeners).toBeLessThanOrEqual(1); + + await Promise.all(transports.map((transport) => transport.close())); + }); + it('should create transport with required options', () => { mockPrepareSpawnInfo.mockReturnValue({ command: 'qwen', @@ -972,17 +998,19 @@ describe('ProcessTransport', () => { }); mockSpawn.mockReturnValue(mockChildProcess); - const processOnSpy = vi.spyOn(process, 'on'); + const initialExitListeners = process.listeners('exit').length; const options: TransportOptions = { pathToQwenExecutable: 'qwen', }; - new ProcessTransport(options); + const transport = new ProcessTransport(options); - expect(processOnSpy).toHaveBeenCalledWith('exit', expect.any(Function)); + const finalExitListeners = process.listeners('exit').length; + expect(finalExitListeners).toBeGreaterThanOrEqual(initialExitListeners); + expect(finalExitListeners).toBeLessThanOrEqual(initialExitListeners + 1); - processOnSpy.mockRestore(); + void transport.close(); }); it('should remove event listeners on close', async () => { @@ -994,7 +1022,7 @@ describe('ProcessTransport', () => { }); mockSpawn.mockReturnValue(mockChildProcess); - const processOffSpy = vi.spyOn(process, 'off'); + const initialExitListeners = process.listeners('exit').length; const options: TransportOptions = { pathToQwenExecutable: 'qwen', @@ -1004,9 +1032,41 @@ describe('ProcessTransport', () => { await transport.close(); - expect(processOffSpy).toHaveBeenCalledWith('exit', expect.any(Function)); + expect(process.listeners('exit').length).toBe(initialExitListeners); + }); - processOffSpy.mockRestore(); + it('should terminate all active child processes from the global exit handler', async () => { + mockPrepareSpawnInfo.mockReturnValue({ + command: 'qwen', + args: [], + type: 'native', + originalInput: 'qwen', + }); + + const childA = createMockChildProcess(); + const childB = createMockChildProcess(); + mockSpawn.mockReturnValueOnce(childA).mockReturnValueOnce(childB); + + const transportA = new ProcessTransport({ + pathToQwenExecutable: 'qwen', + }); + const transportB = new ProcessTransport({ + pathToQwenExecutable: 'qwen', + }); + + ( + ProcessTransport as unknown as { + globalProcessExitHandler: () => void; + } + ).globalProcessExitHandler(); + + expect(childA.kill).toHaveBeenCalledWith('SIGTERM'); + expect(childA.kill).toHaveBeenCalledWith('SIGKILL'); + expect(childB.kill).toHaveBeenCalledWith('SIGTERM'); + expect(childB.kill).toHaveBeenCalledWith('SIGKILL'); + + await transportA.close(); + await transportB.close(); }); it('should register abort listener', () => {