diff --git a/packages/cli/src/acp-integration/session/SubAgentTracker.test.ts b/packages/cli/src/acp-integration/session/SubAgentTracker.test.ts index f2bb7cc50..f86571c63 100644 --- a/packages/cli/src/acp-integration/session/SubAgentTracker.test.ts +++ b/packages/cli/src/acp-integration/session/SubAgentTracker.test.ts @@ -14,6 +14,7 @@ import type { SubAgentToolCallEvent, SubAgentToolResultEvent, SubAgentApprovalRequestEvent, + SubAgentStreamTextEvent, ToolEditConfirmationDetails, ToolInfoConfirmationDetails, } from '@qwen-code/qwen-code-core'; @@ -101,6 +102,18 @@ function createInfoConfirmation( }; } +// Helper to create a mock SubAgentStreamTextEvent with required fields +function createStreamTextEvent( + overrides: Partial & { text: string }, +): SubAgentStreamTextEvent { + return { + subagentId: 'test-subagent', + round: 1, + timestamp: Date.now(), + ...overrides, + }; +} + describe('SubAgentTracker', () => { let mockContext: SessionContext; let mockClient: acp.Client; @@ -162,6 +175,10 @@ describe('SubAgentTracker', () => { SubAgentEventType.TOOL_WAITING_APPROVAL, expect.any(Function), ); + expect(onSpy).toHaveBeenCalledWith( + SubAgentEventType.STREAM_TEXT, + expect.any(Function), + ); }); it('should remove event listeners on cleanup', () => { @@ -182,6 +199,10 @@ describe('SubAgentTracker', () => { SubAgentEventType.TOOL_WAITING_APPROVAL, expect.any(Function), ); + expect(offSpy).toHaveBeenCalledWith( + SubAgentEventType.STREAM_TEXT, + expect.any(Function), + ); }); }); @@ -522,4 +543,163 @@ describe('SubAgentTracker', () => { ); }); }); + + describe('stream text handling', () => { + it('should emit agent_message_chunk on STREAM_TEXT event', async () => { + tracker.setup(eventEmitter, abortController.signal); + + const event = createStreamTextEvent({ + text: 'Hello, this is a response from the model.', + }); + + eventEmitter.emit(SubAgentEventType.STREAM_TEXT, event); + + await vi.waitFor(() => { + expect(sendUpdateSpy).toHaveBeenCalled(); + }); + + expect(sendUpdateSpy).toHaveBeenCalledWith( + expect.objectContaining({ + sessionUpdate: 'agent_message_chunk', + content: { + type: 'text', + text: 'Hello, this is a response from the model.', + }, + }), + ); + }); + + it('should emit multiple chunks for multiple STREAM_TEXT events', async () => { + tracker.setup(eventEmitter, abortController.signal); + + eventEmitter.emit( + SubAgentEventType.STREAM_TEXT, + createStreamTextEvent({ text: 'First chunk ' }), + ); + eventEmitter.emit( + SubAgentEventType.STREAM_TEXT, + createStreamTextEvent({ text: 'Second chunk ' }), + ); + eventEmitter.emit( + SubAgentEventType.STREAM_TEXT, + createStreamTextEvent({ text: 'Third chunk' }), + ); + + await vi.waitFor(() => { + expect(sendUpdateSpy).toHaveBeenCalledTimes(3); + }); + + expect(sendUpdateSpy).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + sessionUpdate: 'agent_message_chunk', + content: { type: 'text', text: 'First chunk ' }, + }), + ); + expect(sendUpdateSpy).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + sessionUpdate: 'agent_message_chunk', + content: { type: 'text', text: 'Second chunk ' }, + }), + ); + expect(sendUpdateSpy).toHaveBeenNthCalledWith( + 3, + expect.objectContaining({ + sessionUpdate: 'agent_message_chunk', + content: { type: 'text', text: 'Third chunk' }, + }), + ); + }); + + it('should not emit when aborted', async () => { + tracker.setup(eventEmitter, abortController.signal); + abortController.abort(); + + const event = createStreamTextEvent({ + text: 'This should not be emitted', + }); + + eventEmitter.emit(SubAgentEventType.STREAM_TEXT, event); + + await new Promise((resolve) => setTimeout(resolve, 10)); + + expect(sendUpdateSpy).not.toHaveBeenCalled(); + }); + + it('should emit agent_thought_chunk when thought flag is true', async () => { + tracker.setup(eventEmitter, abortController.signal); + + const event = createStreamTextEvent({ + text: 'Let me think about this...', + thought: true, + }); + + eventEmitter.emit(SubAgentEventType.STREAM_TEXT, event); + + await vi.waitFor(() => { + expect(sendUpdateSpy).toHaveBeenCalled(); + }); + + expect(sendUpdateSpy).toHaveBeenCalledWith( + expect.objectContaining({ + sessionUpdate: 'agent_thought_chunk', + content: { + type: 'text', + text: 'Let me think about this...', + }, + }), + ); + }); + + it('should emit agent_message_chunk when thought flag is false', async () => { + tracker.setup(eventEmitter, abortController.signal); + + const event = createStreamTextEvent({ + text: 'Here is the answer.', + thought: false, + }); + + eventEmitter.emit(SubAgentEventType.STREAM_TEXT, event); + + await vi.waitFor(() => { + expect(sendUpdateSpy).toHaveBeenCalled(); + }); + + expect(sendUpdateSpy).toHaveBeenCalledWith( + expect.objectContaining({ + sessionUpdate: 'agent_message_chunk', + content: { + type: 'text', + text: 'Here is the answer.', + }, + }), + ); + }); + + it('should emit agent_message_chunk when thought flag is undefined', async () => { + tracker.setup(eventEmitter, abortController.signal); + + // Event without thought flag (undefined) + const event = createStreamTextEvent({ + text: 'Default behavior text.', + }); + + eventEmitter.emit(SubAgentEventType.STREAM_TEXT, event); + + await vi.waitFor(() => { + expect(sendUpdateSpy).toHaveBeenCalled(); + }); + + expect(sendUpdateSpy).toHaveBeenCalledWith( + expect.objectContaining({ + sessionUpdate: 'agent_message_chunk', + content: { + type: 'text', + text: 'Default behavior text.', + }, + }), + ); + }); + }); }); diff --git a/packages/cli/src/acp-integration/session/SubAgentTracker.ts b/packages/cli/src/acp-integration/session/SubAgentTracker.ts index 1e745b925..fe711b250 100644 --- a/packages/cli/src/acp-integration/session/SubAgentTracker.ts +++ b/packages/cli/src/acp-integration/session/SubAgentTracker.ts @@ -10,6 +10,7 @@ import type { SubAgentToolResultEvent, SubAgentApprovalRequestEvent, SubAgentUsageEvent, + SubAgentStreamTextEvent, ToolCallConfirmationDetails, AnyDeclarativeTool, AnyToolInvocation, @@ -97,11 +98,13 @@ export class SubAgentTracker { const onToolResult = this.createToolResultHandler(abortSignal); const onApproval = this.createApprovalHandler(abortSignal); const onUsageMetadata = this.createUsageMetadataHandler(abortSignal); + const onStreamText = this.createStreamTextHandler(abortSignal); eventEmitter.on(SubAgentEventType.TOOL_CALL, onToolCall); eventEmitter.on(SubAgentEventType.TOOL_RESULT, onToolResult); eventEmitter.on(SubAgentEventType.TOOL_WAITING_APPROVAL, onApproval); eventEmitter.on(SubAgentEventType.USAGE_METADATA, onUsageMetadata); + eventEmitter.on(SubAgentEventType.STREAM_TEXT, onStreamText); return [ () => { @@ -109,6 +112,7 @@ export class SubAgentTracker { eventEmitter.off(SubAgentEventType.TOOL_RESULT, onToolResult); eventEmitter.off(SubAgentEventType.TOOL_WAITING_APPROVAL, onApproval); eventEmitter.off(SubAgentEventType.USAGE_METADATA, onUsageMetadata); + eventEmitter.off(SubAgentEventType.STREAM_TEXT, onStreamText); // Clean up any remaining states this.toolStates.clear(); }, @@ -273,6 +277,26 @@ export class SubAgentTracker { }; } + /** + * Creates a handler for stream text events. + * Emits agent message or thought chunks for text content from subagent model responses. + */ + private createStreamTextHandler( + abortSignal: AbortSignal, + ): (...args: unknown[]) => void { + return (...args: unknown[]) => { + const event = args[0] as SubAgentStreamTextEvent; + if (abortSignal.aborted) return; + + // Emit streamed text as agent message or thought based on the flag + void this.messageEmitter.emitMessage( + event.text, + 'assistant', + event.thought ?? false, + ); + }; + } + /** * Converts confirmation details to permission options for the client. */ diff --git a/packages/core/src/subagents/subagent-events.ts b/packages/core/src/subagents/subagent-events.ts index 1f7933087..5de09a3c2 100644 --- a/packages/core/src/subagents/subagent-events.ts +++ b/packages/core/src/subagents/subagent-events.ts @@ -56,6 +56,8 @@ export interface SubAgentStreamTextEvent { subagentId: string; round: number; text: string; + /** Whether this text is reasoning/thinking content (as opposed to regular output) */ + thought?: boolean; timestamp: number; } diff --git a/packages/core/src/subagents/subagent.ts b/packages/core/src/subagents/subagent.ts index 39e43e54f..a8dbda7cc 100644 --- a/packages/core/src/subagents/subagent.ts +++ b/packages/core/src/subagents/subagent.ts @@ -412,13 +412,15 @@ export class SubAgentScope { const content = resp.candidates?.[0]?.content; const parts = content?.parts || []; for (const p of parts) { - const txt = (p as Part & { text?: string }).text; + const txt = p.text; + const isThought = p.thought ?? false; if (txt) roundText += txt; if (txt) this.eventEmitter?.emit(SubAgentEventType.STREAM_TEXT, { subagentId: this.subagentId, round: turnCounter, text: txt, + thought: isThought, timestamp: Date.now(), } as SubAgentStreamTextEvent); }