fix(sdk): avoid leaking process exit listeners in ProcessTransport (#3295)

* fix(sdk): avoid leaking process exit listeners in ProcessTransport

* Strengthen ProcessTransport cleanup during process exit.

  This updates the shared process-exit cleanup path to use a best-effort
  SIGTERM/SIGKILL sequence and adds coverage to verify the global exit
  handler terminates all active child processes.

  It keeps the listener leak fix in place while closing the remaining gaps
  found in review.
This commit is contained in:
Reid 2026-04-16 10:52:19 +08:00 committed by GitHub
parent b5115e731e
commit d439e7d738
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 134 additions and 20 deletions

View file

@ -11,6 +11,14 @@ import { SdkLogger } from '../utils/logger.js';
const logger = SdkLogger.createLogger('ProcessTransport'); const logger = SdkLogger.createLogger('ProcessTransport');
export class ProcessTransport implements Transport { export class ProcessTransport implements Transport {
private static activeTransports = new Set<ProcessTransport>();
private static hasProcessExitHandler = false;
private static readonly globalProcessExitHandler = (): void => {
for (const transport of ProcessTransport.activeTransports) {
transport.killChildProcessOnProcessExit();
}
};
private childProcess: ChildProcess | null = null; private childProcess: ChildProcess | null = null;
private childStdin: Writable | null = null; private childStdin: Writable | null = null;
private childStdout: Readable | null = null; private childStdout: Readable | null = null;
@ -20,7 +28,6 @@ export class ProcessTransport implements Transport {
private closed = false; private closed = false;
private inputClosed = false; private inputClosed = false;
private abortController: AbortController; private abortController: AbortController;
private processExitHandler: (() => void) | null = null;
private abortHandler: (() => void) | null = null; private abortHandler: (() => void) | null = null;
constructor(options: TransportOptions) { constructor(options: TransportOptions) {
@ -159,32 +166,81 @@ export class ProcessTransport implements Transport {
}); });
} }
const cleanup = (): void => { this.abortHandler = () => {
if (this.childProcess && !this.childProcess.killed) { this.killChildProcess();
this.childProcess.kill('SIGTERM');
}
}; };
this.processExitHandler = cleanup;
this.abortHandler = cleanup;
process.on('exit', this.processExitHandler);
this.abortController.signal.addEventListener('abort', this.abortHandler); this.abortController.signal.addEventListener('abort', this.abortHandler);
this.registerForProcessExit();
this.setupEventHandlers(); this.setupEventHandlers();
this.ready = true; this.ready = true;
logger.info('CLI process started successfully'); logger.info('CLI process started successfully');
} catch (error) { } catch (error) {
this.unregisterForProcessExit();
if (this.abortHandler) {
this.abortController.signal.removeEventListener(
'abort',
this.abortHandler,
);
this.abortHandler = null;
}
this.ready = false; this.ready = false;
logger.error('Failed to initialize CLI process:', error); logger.error('Failed to initialize CLI process:', error);
throw error; throw error;
} }
} }
private registerForProcessExit(): void {
ProcessTransport.activeTransports.add(this);
if (!ProcessTransport.hasProcessExitHandler) {
process.on('exit', ProcessTransport.globalProcessExitHandler);
ProcessTransport.hasProcessExitHandler = true;
}
}
private unregisterForProcessExit(): void {
ProcessTransport.activeTransports.delete(this);
if (
ProcessTransport.hasProcessExitHandler &&
ProcessTransport.activeTransports.size === 0
) {
process.off('exit', ProcessTransport.globalProcessExitHandler);
ProcessTransport.hasProcessExitHandler = false;
}
}
private killChildProcess(): void {
if (this.childProcess && !this.childProcess.killed) {
this.childProcess.kill('SIGTERM');
}
}
private killChildProcessOnProcessExit(): void {
if (!this.childProcess || this.childProcess.exitCode !== null) {
return;
}
try {
this.childProcess.kill('SIGTERM');
} catch {
return;
}
// Timers do not reliably run during process exit, so use a best-effort
// synchronous escalation to avoid leaving child processes behind.
try {
this.childProcess.kill('SIGKILL');
} catch {
// Ignore failures during process teardown.
}
}
private setupEventHandlers(): void { private setupEventHandlers(): void {
if (!this.childProcess) return; if (!this.childProcess) return;
this.childProcess.on('error', (error) => { this.childProcess.on('error', (error) => {
this.unregisterForProcessExit();
this.ready = false; this.ready = false;
if (this.abortController.signal.aborted) { if (this.abortController.signal.aborted) {
this._exitError = new AbortError('CLI process aborted by user'); this._exitError = new AbortError('CLI process aborted by user');
@ -195,6 +251,7 @@ export class ProcessTransport implements Transport {
}); });
this.childProcess.on('close', (code, signal) => { this.childProcess.on('close', (code, signal) => {
this.unregisterForProcessExit();
this.ready = false; this.ready = false;
if (this.abortController.signal.aborted) { if (this.abortController.signal.aborted) {
this._exitError = new AbortError('CLI process aborted by user'); this._exitError = new AbortError('CLI process aborted by user');
@ -287,10 +344,7 @@ export class ProcessTransport implements Transport {
this.childStdin = null; this.childStdin = null;
} }
if (this.processExitHandler) { this.unregisterForProcessExit();
process.off('exit', this.processExitHandler);
this.processExitHandler = null;
}
if (this.abortHandler) { if (this.abortHandler) {
this.abortController.signal.removeEventListener( this.abortController.signal.removeEventListener(

View file

@ -120,6 +120,32 @@ describe('ProcessTransport', () => {
}); });
describe('Construction and Initialization', () => { describe('Construction and Initialization', () => {
it('should not add one process exit listener per transport instance', async () => {
mockPrepareSpawnInfo.mockReturnValue({
command: 'qwen',
args: [],
type: 'native',
originalInput: 'qwen',
});
const transports: ProcessTransport[] = [];
const initialExitListeners = process.listeners('exit').length;
for (let i = 0; i < 12; i++) {
mockSpawn.mockReturnValue(createMockChildProcess());
transports.push(
new ProcessTransport({
pathToQwenExecutable: 'qwen',
}),
);
}
const finalExitListeners = process.listeners('exit').length;
expect(finalExitListeners - initialExitListeners).toBeLessThanOrEqual(1);
await Promise.all(transports.map((transport) => transport.close()));
});
it('should create transport with required options', () => { it('should create transport with required options', () => {
mockPrepareSpawnInfo.mockReturnValue({ mockPrepareSpawnInfo.mockReturnValue({
command: 'qwen', command: 'qwen',
@ -972,17 +998,19 @@ describe('ProcessTransport', () => {
}); });
mockSpawn.mockReturnValue(mockChildProcess); mockSpawn.mockReturnValue(mockChildProcess);
const processOnSpy = vi.spyOn(process, 'on'); const initialExitListeners = process.listeners('exit').length;
const options: TransportOptions = { const options: TransportOptions = {
pathToQwenExecutable: 'qwen', pathToQwenExecutable: 'qwen',
}; };
new ProcessTransport(options); const transport = new ProcessTransport(options);
expect(processOnSpy).toHaveBeenCalledWith('exit', expect.any(Function)); const finalExitListeners = process.listeners('exit').length;
expect(finalExitListeners).toBeGreaterThanOrEqual(initialExitListeners);
expect(finalExitListeners).toBeLessThanOrEqual(initialExitListeners + 1);
processOnSpy.mockRestore(); void transport.close();
}); });
it('should remove event listeners on close', async () => { it('should remove event listeners on close', async () => {
@ -994,7 +1022,7 @@ describe('ProcessTransport', () => {
}); });
mockSpawn.mockReturnValue(mockChildProcess); mockSpawn.mockReturnValue(mockChildProcess);
const processOffSpy = vi.spyOn(process, 'off'); const initialExitListeners = process.listeners('exit').length;
const options: TransportOptions = { const options: TransportOptions = {
pathToQwenExecutable: 'qwen', pathToQwenExecutable: 'qwen',
@ -1004,9 +1032,41 @@ describe('ProcessTransport', () => {
await transport.close(); await transport.close();
expect(processOffSpy).toHaveBeenCalledWith('exit', expect.any(Function)); expect(process.listeners('exit').length).toBe(initialExitListeners);
});
processOffSpy.mockRestore(); it('should terminate all active child processes from the global exit handler', async () => {
mockPrepareSpawnInfo.mockReturnValue({
command: 'qwen',
args: [],
type: 'native',
originalInput: 'qwen',
});
const childA = createMockChildProcess();
const childB = createMockChildProcess();
mockSpawn.mockReturnValueOnce(childA).mockReturnValueOnce(childB);
const transportA = new ProcessTransport({
pathToQwenExecutable: 'qwen',
});
const transportB = new ProcessTransport({
pathToQwenExecutable: 'qwen',
});
(
ProcessTransport as unknown as {
globalProcessExitHandler: () => void;
}
).globalProcessExitHandler();
expect(childA.kill).toHaveBeenCalledWith('SIGTERM');
expect(childA.kill).toHaveBeenCalledWith('SIGKILL');
expect(childB.kill).toHaveBeenCalledWith('SIGTERM');
expect(childB.kill).toHaveBeenCalledWith('SIGKILL');
await transportA.close();
await transportB.close();
}); });
it('should register abort listener', () => { it('should register abort listener', () => {