Fix SDK message event pairing and improve content block handling

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
mingholy.lmh 2026-03-18 18:08:56 +08:00
parent eea92fc8db
commit 79083ffd50
8 changed files with 545 additions and 102 deletions

View file

@ -282,12 +282,12 @@ export abstract class BaseJsonOutputAdapter {
return;
}
if (lastBlock.type === 'text') {
const index = state.blocks.length - 1;
this.onBlockClosed(state, index, actualParentToolUseId);
this.closeBlock(state, index);
} else if (lastBlock.type === 'thinking') {
const index = state.blocks.length - 1;
const index = state.blocks.length - 1;
if (!state.openBlocks.has(index)) {
return;
}
if (lastBlock.type === 'text' || lastBlock.type === 'thinking') {
this.onBlockClosed(state, index, actualParentToolUseId);
this.closeBlock(state, index);
}
@ -392,7 +392,9 @@ export abstract class BaseJsonOutputAdapter {
}
const message = this.buildMessage(parentToolUseId);
this.emitMessageImpl(message);
if (state.messageStarted) {
this.emitMessageImpl(message);
}
return message;
}
@ -656,12 +658,7 @@ export abstract class BaseJsonOutputAdapter {
parentToolUseId: string,
): CLIAssistantMessage {
const state = this.getMessageState(parentToolUseId);
const message = this.finalizeAssistantMessageInternal(
state,
parentToolUseId,
);
this.updateLastAssistantMessage(message);
return message;
return this.finalizeAssistantMessageInternal(state, parentToolUseId);
}
/**

View file

@ -52,12 +52,10 @@ export class JsonOutputAdapter
}
finalizeAssistantMessage(): CLIAssistantMessage {
const message = this.finalizeAssistantMessageInternal(
return this.finalizeAssistantMessageInternal(
this.mainAgentMessageState,
null,
);
this.updateLastAssistantMessage(message);
return message;
}
emitResult(options: ResultOptions): void {

View file

@ -654,6 +654,24 @@ describe('StreamJsonOutputAdapter', () => {
'Message not started',
);
});
it('should not emit empty assistant message when started but no content processed', () => {
stdoutWriteSpy.mockClear();
adapter.finalizeAssistantMessage();
const assistantCalls = stdoutWriteSpy.mock.calls.filter(
(call: unknown[]) => {
try {
const parsed = JSON.parse(call[0] as string);
return parsed.type === 'assistant';
} catch {
return false;
}
},
);
expect(assistantCalls).toHaveLength(0);
});
});
describe('emitResult', () => {
@ -1007,56 +1025,68 @@ describe('StreamJsonOutputAdapter', () => {
});
});
describe('message_id in stream events', () => {
describe('content_block event identification', () => {
beforeEach(() => {
adapter = new StreamJsonOutputAdapter(mockConfig, true);
adapter.startAssistantMessage();
});
it('should include message_id in stream events after message starts', () => {
it('should not include message_id in content_block events', () => {
adapter.processEvent({
type: GeminiEventType.Content,
value: 'Text',
});
// Process another event to ensure messageStarted is true
adapter.processEvent({
type: GeminiEventType.Content,
value: 'More',
});
const calls = stdoutWriteSpy.mock.calls;
// Find all delta events
const deltaCalls = calls.filter((call: unknown[]) => {
const contentBlockCalls = calls.filter((call: unknown[]) => {
try {
const parsed = JSON.parse(call[0] as string);
return (
parsed.type === 'stream_event' &&
parsed.event.type === 'content_block_delta'
(parsed.event.type === 'content_block_start' ||
parsed.event.type === 'content_block_delta' ||
parsed.event.type === 'content_block_stop')
);
} catch {
return false;
}
});
expect(deltaCalls.length).toBeGreaterThan(0);
// The second delta event should have message_id (after messageStarted becomes true)
// message_id is added to the event object, so check parsed.event.message_id
if (deltaCalls.length > 1) {
const secondDelta = JSON.parse(
(deltaCalls[1] as unknown[])[0] as string,
);
// message_id is on the enriched event object
expect(
secondDelta.event.message_id || secondDelta.message_id,
).toBeTruthy();
} else {
// If only one delta, check if message_id exists
const delta = JSON.parse((deltaCalls[0] as unknown[])[0] as string);
// message_id is added when messageStarted is true
// First event may or may not have it, but subsequent ones should
expect(delta.event.message_id || delta.message_id).toBeTruthy();
expect(contentBlockCalls.length).toBeGreaterThan(0);
for (const call of contentBlockCalls) {
const parsed = JSON.parse((call as unknown[])[0] as string);
expect(parsed.event.message_id).toBeUndefined();
}
});
it('should identify content_block events by session_id and index', () => {
adapter.processEvent({
type: GeminiEventType.Content,
value: 'Text',
});
const calls = stdoutWriteSpy.mock.calls;
const blockStartCall = calls.find((call: unknown[]) => {
try {
const parsed = JSON.parse(call[0] as string);
return (
parsed.type === 'stream_event' &&
parsed.event.type === 'content_block_start'
);
} catch {
return false;
}
});
expect(blockStartCall).toBeDefined();
const parsed = JSON.parse((blockStartCall as unknown[])[0] as string);
expect(parsed.session_id).toBe('test-session-id');
expect(typeof parsed.event.index).toBe('number');
});
});
describe('multiple text blocks', () => {

View file

@ -36,6 +36,8 @@ export class StreamJsonOutputAdapter
extends BaseJsonOutputAdapter
implements JsonOutputAdapterInterface
{
private mainTurnMessageStartEmitted = false;
constructor(
config: Config,
private readonly includePartialMessages: boolean,
@ -68,47 +70,27 @@ export class StreamJsonOutputAdapter
return this.includePartialMessages;
}
override startAssistantMessage(): void {
this.mainTurnMessageStartEmitted = false;
super.startAssistantMessage();
}
finalizeAssistantMessage(): CLIAssistantMessage {
return this.finalizeAssistantMessageInternal(
const message = this.finalizeAssistantMessageInternal(
this.mainAgentMessageState,
null,
);
}
/**
* Overrides base class to emit message_stop event when message is finalized.
* This ensures message_start and message_stop are always paired.
*/
protected override finalizeAssistantMessageInternal(
state: MessageState,
parentToolUseId: string | null,
): CLIAssistantMessage {
if (state.finalized) {
return this.buildMessage(parentToolUseId);
if (this.mainTurnMessageStartEmitted && this.includePartialMessages) {
const partial: CLIPartialAssistantMessage = {
type: 'stream_event',
uuid: randomUUID(),
session_id: this.getSessionId(),
parent_tool_use_id: null,
event: { type: 'message_stop' },
};
this.emitMessageImpl(partial);
}
state.finalized = true;
this.finalizePendingBlocks(state, parentToolUseId);
const orderedOpenBlocks = Array.from(state.openBlocks).sort(
(a, b) => a - b,
);
for (const index of orderedOpenBlocks) {
this.onBlockClosed(state, index, parentToolUseId);
this.closeBlock(state, index);
}
// Emit message_stop for main agent when message was started and partial messages are enabled
if (
state.messageStarted &&
this.includePartialMessages &&
parentToolUseId === null
) {
this.emitStreamEventIfEnabled({ type: 'message_stop' }, null);
}
const message = this.buildMessage(parentToolUseId);
this.updateLastAssistantMessage(message);
this.emitMessageImpl(message);
this.mainTurnMessageStartEmitted = false;
return message;
}
@ -267,14 +249,15 @@ export class StreamJsonOutputAdapter
/**
* Overrides base class hook to emit message_start event when message is started.
* Only emits for main agent, not for subagents.
* Only emits once per turn for the main agent (guarded by mainTurnMessageStartEmitted),
* so block-type transitions inside a single turn do not produce spurious message_start events.
*/
protected override onEnsureMessageStarted(
state: MessageState,
parentToolUseId: string | null,
): void {
// Only emit message_start for main agent, not for subagents
if (parentToolUseId === null) {
if (parentToolUseId === null && !this.mainTurnMessageStartEmitted) {
this.mainTurnMessageStartEmitted = true;
this.emitStreamEventIfEnabled(
{
type: 'message_start',
@ -282,6 +265,7 @@ export class StreamJsonOutputAdapter
id: state.messageId!,
role: 'assistant',
model: this.config.getModel(),
content: [],
},
},
null,
@ -329,19 +313,12 @@ export class StreamJsonOutputAdapter
return;
}
const state = this.getMessageState(parentToolUseId);
const enrichedEvent = state.messageStarted
? ({ ...event, message_id: state.messageId } as StreamEvent & {
message_id: string;
})
: event;
const partial: CLIPartialAssistantMessage = {
type: 'stream_event',
uuid: randomUUID(),
session_id: this.getSessionId(),
parent_tool_use_id: parentToolUseId,
event: enrichedEvent,
event,
};
this.emitMessageImpl(partial);
}

View file

@ -201,6 +201,7 @@ export interface MessageStartStreamEvent {
id: string;
role: 'assistant';
model: string;
content: [];
};
}