fix(acp): stream subagent text chunks (with thoughts)

Propagate `thought` metadata through SubAgent STREAM_TEXT events and render them as agent message/thought chunks in ACP sessions.
This commit is contained in:
tanzhenxin 2026-01-27 16:59:02 +08:00
parent 7177b41120
commit 8ce176389c
4 changed files with 209 additions and 1 deletions

View file

@ -14,6 +14,7 @@ import type {
SubAgentToolCallEvent,
SubAgentToolResultEvent,
SubAgentApprovalRequestEvent,
SubAgentStreamTextEvent,
ToolEditConfirmationDetails,
ToolInfoConfirmationDetails,
} from '@qwen-code/qwen-code-core';
@ -101,6 +102,18 @@ function createInfoConfirmation(
};
}
// Helper to create a mock SubAgentStreamTextEvent with required fields
function createStreamTextEvent(
overrides: Partial<SubAgentStreamTextEvent> & { text: string },
): SubAgentStreamTextEvent {
return {
subagentId: 'test-subagent',
round: 1,
timestamp: Date.now(),
...overrides,
};
}
describe('SubAgentTracker', () => {
let mockContext: SessionContext;
let mockClient: acp.Client;
@ -162,6 +175,10 @@ describe('SubAgentTracker', () => {
SubAgentEventType.TOOL_WAITING_APPROVAL,
expect.any(Function),
);
expect(onSpy).toHaveBeenCalledWith(
SubAgentEventType.STREAM_TEXT,
expect.any(Function),
);
});
it('should remove event listeners on cleanup', () => {
@ -182,6 +199,10 @@ describe('SubAgentTracker', () => {
SubAgentEventType.TOOL_WAITING_APPROVAL,
expect.any(Function),
);
expect(offSpy).toHaveBeenCalledWith(
SubAgentEventType.STREAM_TEXT,
expect.any(Function),
);
});
});
@ -522,4 +543,163 @@ describe('SubAgentTracker', () => {
);
});
});
describe('stream text handling', () => {
it('should emit agent_message_chunk on STREAM_TEXT event', async () => {
tracker.setup(eventEmitter, abortController.signal);
const event = createStreamTextEvent({
text: 'Hello, this is a response from the model.',
});
eventEmitter.emit(SubAgentEventType.STREAM_TEXT, event);
await vi.waitFor(() => {
expect(sendUpdateSpy).toHaveBeenCalled();
});
expect(sendUpdateSpy).toHaveBeenCalledWith(
expect.objectContaining({
sessionUpdate: 'agent_message_chunk',
content: {
type: 'text',
text: 'Hello, this is a response from the model.',
},
}),
);
});
it('should emit multiple chunks for multiple STREAM_TEXT events', async () => {
tracker.setup(eventEmitter, abortController.signal);
eventEmitter.emit(
SubAgentEventType.STREAM_TEXT,
createStreamTextEvent({ text: 'First chunk ' }),
);
eventEmitter.emit(
SubAgentEventType.STREAM_TEXT,
createStreamTextEvent({ text: 'Second chunk ' }),
);
eventEmitter.emit(
SubAgentEventType.STREAM_TEXT,
createStreamTextEvent({ text: 'Third chunk' }),
);
await vi.waitFor(() => {
expect(sendUpdateSpy).toHaveBeenCalledTimes(3);
});
expect(sendUpdateSpy).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
sessionUpdate: 'agent_message_chunk',
content: { type: 'text', text: 'First chunk ' },
}),
);
expect(sendUpdateSpy).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
sessionUpdate: 'agent_message_chunk',
content: { type: 'text', text: 'Second chunk ' },
}),
);
expect(sendUpdateSpy).toHaveBeenNthCalledWith(
3,
expect.objectContaining({
sessionUpdate: 'agent_message_chunk',
content: { type: 'text', text: 'Third chunk' },
}),
);
});
it('should not emit when aborted', async () => {
tracker.setup(eventEmitter, abortController.signal);
abortController.abort();
const event = createStreamTextEvent({
text: 'This should not be emitted',
});
eventEmitter.emit(SubAgentEventType.STREAM_TEXT, event);
await new Promise((resolve) => setTimeout(resolve, 10));
expect(sendUpdateSpy).not.toHaveBeenCalled();
});
it('should emit agent_thought_chunk when thought flag is true', async () => {
tracker.setup(eventEmitter, abortController.signal);
const event = createStreamTextEvent({
text: 'Let me think about this...',
thought: true,
});
eventEmitter.emit(SubAgentEventType.STREAM_TEXT, event);
await vi.waitFor(() => {
expect(sendUpdateSpy).toHaveBeenCalled();
});
expect(sendUpdateSpy).toHaveBeenCalledWith(
expect.objectContaining({
sessionUpdate: 'agent_thought_chunk',
content: {
type: 'text',
text: 'Let me think about this...',
},
}),
);
});
it('should emit agent_message_chunk when thought flag is false', async () => {
tracker.setup(eventEmitter, abortController.signal);
const event = createStreamTextEvent({
text: 'Here is the answer.',
thought: false,
});
eventEmitter.emit(SubAgentEventType.STREAM_TEXT, event);
await vi.waitFor(() => {
expect(sendUpdateSpy).toHaveBeenCalled();
});
expect(sendUpdateSpy).toHaveBeenCalledWith(
expect.objectContaining({
sessionUpdate: 'agent_message_chunk',
content: {
type: 'text',
text: 'Here is the answer.',
},
}),
);
});
it('should emit agent_message_chunk when thought flag is undefined', async () => {
tracker.setup(eventEmitter, abortController.signal);
// Event without thought flag (undefined)
const event = createStreamTextEvent({
text: 'Default behavior text.',
});
eventEmitter.emit(SubAgentEventType.STREAM_TEXT, event);
await vi.waitFor(() => {
expect(sendUpdateSpy).toHaveBeenCalled();
});
expect(sendUpdateSpy).toHaveBeenCalledWith(
expect.objectContaining({
sessionUpdate: 'agent_message_chunk',
content: {
type: 'text',
text: 'Default behavior text.',
},
}),
);
});
});
});