feat: enhance session ID handling and error propagation

This commit is contained in:
mingholy.lmh 2026-02-13 21:37:51 +08:00
parent 51760fe3a6
commit 82dc79629c
6 changed files with 168 additions and 9 deletions

View file

@ -28,7 +28,8 @@ describe('Session ID Support (E2E)', () => {
beforeEach(async () => {
helper = new SDKTestHelper();
testDir = await helper.setup('session-id');
// Enable chat recording for session-id tests to allow duplicate session detection
testDir = await helper.setup('session-id', { chatRecording: true });
});
afterEach(async () => {
@ -374,6 +375,117 @@ describe('Session ID Support (E2E)', () => {
});
});
describe('Session ID Duplicate Detection', () => {
it('should reject duplicate sessionId with error', async () => {
// Valid UUID v4
const customSessionId = 'dddddddd-eeee-4fff-aaaa-bbbbbbbbbbbb';
// First query: create a session with the custom session ID
const q1 = query({
prompt: 'Say hello',
options: {
...SHARED_TEST_OPTIONS,
cwd: testDir,
sessionId: customSessionId,
debug: false,
},
});
// Consume the first query to completion and close it
try {
for await (const _msg of q1) {
// consume
}
} finally {
await q1.close();
}
// Second query: try to use the same session ID
// This should fail because the session ID is already in use
// CLI will exit with code 1 when detecting duplicate session ID
const q2 = query({
prompt: 'Say hello again',
options: {
...SHARED_TEST_OPTIONS,
cwd: testDir,
sessionId: customSessionId,
debug: false,
},
});
// The error should be propagated and the iteration should throw
// When iterating over messages, if CLI exits with code 1 (duplicate session ID),
// the error should be thrown during iteration
await expect(async () => {
for await (const _msg of q2) {
// consume
}
}).rejects.toThrow(/CLI process exited with code 1/);
await q2.close();
});
it('should throw error when CLI exits with non-zero code', async () => {
// Valid UUID v4
const customSessionId = 'eeeeeeee-ffff-4aaa-bbbb-cccccccccccc';
// First query: create a session and properly close it after completion
const q1 = query({
prompt: 'Say hello',
options: {
...SHARED_TEST_OPTIONS,
cwd: testDir,
sessionId: customSessionId,
debug: false,
},
});
try {
for await (const _msg of q1) {
// consume
}
} finally {
await q1.close();
}
// Second query with same session ID
// When using the same session ID, CLI will detect the duplicate and exit with code 1
const q2 = query({
prompt: 'Say hello again',
options: {
...SHARED_TEST_OPTIONS,
cwd: testDir,
sessionId: customSessionId,
debug: false,
},
});
let errorCaught = false;
let errorMessage = '';
try {
// Iterate over messages - the error should be thrown during iteration
// because CLI exits with code 1 when detecting duplicate session ID
for await (const _msg of q2) {
// consume
}
} catch (error) {
errorCaught = true;
// CLI errors are written directly to console (stderr inherit mode)
// SDK only reports the exit status, not the error message
expect(error instanceof Error).toBe(true);
errorMessage = error instanceof Error ? error.message : String(error);
// Verify the error message contains the expected exit code
expect(errorMessage).toContain('CLI process exited with code 1');
} finally {
await q2.close();
}
// Verify that an error was actually caught during message iteration
expect(errorCaught).toBe(true);
});
});
describe('Session ID Consistency', () => {
it('should expose same sessionId via getSessionId() and messages', async () => {
// Valid UUID v4: 4 in position 14, 8/9/a/b in position 19

View file

@ -41,6 +41,13 @@ export interface SDKTestHelperOptions {
* Whether to create .qwen/settings.json
*/
createQwenConfig?: boolean;
/**
* Whether to enable chat recording for this test.
* - Set to `true` to enable recording (needed for session-id duplicate detection tests)
* - Set to `false` or leave undefined to disable recording (default for most tests)
* This sets chatRecording in general settings.
*/
chatRecording?: boolean;
}
/**
@ -91,7 +98,8 @@ export class SDKTestHelper {
},
general: {
...generalSettings,
chatRecording: false, // SDK tests don't need chat recording
// Default to disabling chat recording unless explicitly enabled
...(options.chatRecording !== true ? { chatRecording: false } : {}),
},
};

View file

@ -932,6 +932,14 @@ export async function loadCliConfig(
}
} else if (argv['session-id']) {
// Use provided session ID without session resumption
// Check if session ID is already in use
const sessionService = new SessionService(cwd);
const exists = await sessionService.sessionExists(argv['session-id']);
if (exists) {
const message = `Error: Session Id ${argv['session-id']} is already in use.`;
writeStderrLine(message);
process.exit(1);
}
sessionId = argv['session-id'];
}

View file

@ -137,7 +137,22 @@ export class Query implements AsyncIterable<SDKMessage> {
}
this.initialized = this.initialize();
this.initialized.catch(() => {});
this.initialized.catch((error) => {
// Propagate initialization errors to inputStream so users can catch them
const errorMessage =
error instanceof Error ? error.message : String(error);
if (
errorMessage.includes('Query is closed') &&
this.transport.exitError
) {
// If query was closed due to transport error, propagate the transport error
this.inputStream.error(this.transport.exitError);
} else {
this.inputStream.error(
error instanceof Error ? error : new Error(errorMessage),
);
}
});
this.startMessageRouter();
}
@ -630,6 +645,11 @@ export class Query implements AsyncIterable<SDKMessage> {
return Promise.reject(new Error('Query is closed'));
}
// Check if transport has already exited with an error
if (this.transport.exitError) {
return Promise.reject(this.transport.exitError);
}
if (subtype !== ControlRequestType.INITIALIZE) {
// Ensure all other control requests get processed after initialization
await this.initialized;
@ -731,16 +751,20 @@ export class Query implements AsyncIterable<SDKMessage> {
this.abortHandler = null;
}
// Use transport's exit error if available, otherwise use generic error
const transportError = this.transport.exitError;
const rejectionError = transportError ?? new Error('Query is closed');
for (const pending of this.pendingControlRequests.values()) {
pending.abortController.abort();
clearTimeout(pending.timeout);
pending.reject(new Error('Query is closed'));
pending.reject(rejectionError);
}
this.pendingControlRequests.clear();
// Clean up pending MCP responses
for (const pending of this.pendingMcpResponses.values()) {
pending.reject(new Error('Query is closed'));
pending.reject(rejectionError);
}
this.pendingMcpResponses.clear();

View file

@ -89,9 +89,16 @@ export function query({
(async () => {
try {
await queryInstance.initialized;
// Skip writing if transport has already exited with an error
if (transport.exitError) {
return;
}
transport.write(serializeJsonLine(message));
} catch (err) {
logger.error('Error sending single-turn prompt:', err);
// Only log error if it's not due to transport already being closed
if (!transport.exitError) {
logger.error('Error sending single-turn prompt:', err);
}
}
})();
} else {

View file

@ -26,12 +26,12 @@ export class Stream<T> implements AsyncIterable<T> {
value: this.queue.shift()!,
});
}
if (this.isDone) {
return Promise.resolve({ done: true, value: undefined });
}
if (this.hasError) {
return Promise.reject(this.hasError);
}
if (this.isDone) {
return Promise.resolve({ done: true, value: undefined });
}
return new Promise<IteratorResult<T>>((resolve, reject) => {
this.readResolve = resolve;
this.readReject = reject;