diff --git a/packages/cli/src/acp-integration/session/SubAgentTracker.test.ts b/packages/cli/src/acp-integration/session/SubAgentTracker.test.ts index b21cf6bc6..96b8bd998 100644 --- a/packages/cli/src/acp-integration/session/SubAgentTracker.test.ts +++ b/packages/cli/src/acp-integration/session/SubAgentTracker.test.ts @@ -14,6 +14,7 @@ import type { SubAgentToolCallEvent, SubAgentToolResultEvent, SubAgentApprovalRequestEvent, + SubAgentStreamTextEvent, ToolEditConfirmationDetails, ToolInfoConfirmationDetails, } from '@qwen-code/qwen-code-core'; @@ -101,6 +102,18 @@ function createInfoConfirmation( }; } +// Helper to create a mock SubAgentStreamTextEvent with required fields +function createStreamTextEvent( + overrides: Partial & { text: string }, +): SubAgentStreamTextEvent { + return { + subagentId: 'test-subagent', + round: 1, + timestamp: Date.now(), + ...overrides, + }; +} + describe('SubAgentTracker', () => { let mockContext: SessionContext; let mockClient: acp.Client; @@ -167,6 +180,10 @@ describe('SubAgentTracker', () => { SubAgentEventType.TOOL_WAITING_APPROVAL, expect.any(Function), ); + expect(onSpy).toHaveBeenCalledWith( + SubAgentEventType.STREAM_TEXT, + expect.any(Function), + ); }); it('should remove event listeners on cleanup', () => { @@ -187,6 +204,10 @@ describe('SubAgentTracker', () => { SubAgentEventType.TOOL_WAITING_APPROVAL, expect.any(Function), ); + expect(offSpy).toHaveBeenCalledWith( + SubAgentEventType.STREAM_TEXT, + expect.any(Function), + ); }); }); @@ -542,4 +563,163 @@ describe('SubAgentTracker', () => { ); }); }); + + describe('stream text handling', () => { + it('should emit agent_message_chunk on STREAM_TEXT event', async () => { + tracker.setup(eventEmitter, abortController.signal); + + const event = createStreamTextEvent({ + text: 'Hello, this is a response from the model.', + }); + + eventEmitter.emit(SubAgentEventType.STREAM_TEXT, event); + + await vi.waitFor(() => { + expect(sendUpdateSpy).toHaveBeenCalled(); + }); + + expect(sendUpdateSpy).toHaveBeenCalledWith( + expect.objectContaining({ + sessionUpdate: 'agent_message_chunk', + content: { + type: 'text', + text: 'Hello, this is a response from the model.', + }, + }), + ); + }); + + it('should emit multiple chunks for multiple STREAM_TEXT events', async () => { + tracker.setup(eventEmitter, abortController.signal); + + eventEmitter.emit( + SubAgentEventType.STREAM_TEXT, + createStreamTextEvent({ text: 'First chunk ' }), + ); + eventEmitter.emit( + SubAgentEventType.STREAM_TEXT, + createStreamTextEvent({ text: 'Second chunk ' }), + ); + eventEmitter.emit( + SubAgentEventType.STREAM_TEXT, + createStreamTextEvent({ text: 'Third chunk' }), + ); + + await vi.waitFor(() => { + expect(sendUpdateSpy).toHaveBeenCalledTimes(3); + }); + + expect(sendUpdateSpy).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + sessionUpdate: 'agent_message_chunk', + content: { type: 'text', text: 'First chunk ' }, + }), + ); + expect(sendUpdateSpy).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + sessionUpdate: 'agent_message_chunk', + content: { type: 'text', text: 'Second chunk ' }, + }), + ); + expect(sendUpdateSpy).toHaveBeenNthCalledWith( + 3, + expect.objectContaining({ + sessionUpdate: 'agent_message_chunk', + content: { type: 'text', text: 'Third chunk' }, + }), + ); + }); + + it('should not emit when aborted', async () => { + tracker.setup(eventEmitter, abortController.signal); + abortController.abort(); + + const event = createStreamTextEvent({ + text: 'This should not be emitted', + }); + + eventEmitter.emit(SubAgentEventType.STREAM_TEXT, event); + + await new Promise((resolve) => setTimeout(resolve, 10)); + + expect(sendUpdateSpy).not.toHaveBeenCalled(); + }); + + it('should emit agent_thought_chunk when thought flag is true', async () => { + tracker.setup(eventEmitter, abortController.signal); + + const event = createStreamTextEvent({ + text: 'Let me think about this...', + thought: true, + }); + + eventEmitter.emit(SubAgentEventType.STREAM_TEXT, event); + + await vi.waitFor(() => { + expect(sendUpdateSpy).toHaveBeenCalled(); + }); + + expect(sendUpdateSpy).toHaveBeenCalledWith( + expect.objectContaining({ + sessionUpdate: 'agent_thought_chunk', + content: { + type: 'text', + text: 'Let me think about this...', + }, + }), + ); + }); + + it('should emit agent_message_chunk when thought flag is false', async () => { + tracker.setup(eventEmitter, abortController.signal); + + const event = createStreamTextEvent({ + text: 'Here is the answer.', + thought: false, + }); + + eventEmitter.emit(SubAgentEventType.STREAM_TEXT, event); + + await vi.waitFor(() => { + expect(sendUpdateSpy).toHaveBeenCalled(); + }); + + expect(sendUpdateSpy).toHaveBeenCalledWith( + expect.objectContaining({ + sessionUpdate: 'agent_message_chunk', + content: { + type: 'text', + text: 'Here is the answer.', + }, + }), + ); + }); + + it('should emit agent_message_chunk when thought flag is undefined', async () => { + tracker.setup(eventEmitter, abortController.signal); + + // Event without thought flag (undefined) + const event = createStreamTextEvent({ + text: 'Default behavior text.', + }); + + eventEmitter.emit(SubAgentEventType.STREAM_TEXT, event); + + await vi.waitFor(() => { + expect(sendUpdateSpy).toHaveBeenCalled(); + }); + + expect(sendUpdateSpy).toHaveBeenCalledWith( + expect.objectContaining({ + sessionUpdate: 'agent_message_chunk', + content: { + type: 'text', + text: 'Default behavior text.', + }, + }), + ); + }); + }); }); diff --git a/packages/cli/src/acp-integration/session/SubAgentTracker.ts b/packages/cli/src/acp-integration/session/SubAgentTracker.ts index e174590cc..4643fe776 100644 --- a/packages/cli/src/acp-integration/session/SubAgentTracker.ts +++ b/packages/cli/src/acp-integration/session/SubAgentTracker.ts @@ -10,6 +10,7 @@ import type { SubAgentToolResultEvent, SubAgentApprovalRequestEvent, SubAgentUsageEvent, + SubAgentStreamTextEvent, ToolCallConfirmationDetails, AnyDeclarativeTool, AnyToolInvocation, @@ -109,11 +110,13 @@ export class SubAgentTracker { const onToolResult = this.createToolResultHandler(abortSignal); const onApproval = this.createApprovalHandler(abortSignal); const onUsageMetadata = this.createUsageMetadataHandler(abortSignal); + const onStreamText = this.createStreamTextHandler(abortSignal); eventEmitter.on(SubAgentEventType.TOOL_CALL, onToolCall); eventEmitter.on(SubAgentEventType.TOOL_RESULT, onToolResult); eventEmitter.on(SubAgentEventType.TOOL_WAITING_APPROVAL, onApproval); eventEmitter.on(SubAgentEventType.USAGE_METADATA, onUsageMetadata); + eventEmitter.on(SubAgentEventType.STREAM_TEXT, onStreamText); return [ () => { @@ -121,6 +124,7 @@ export class SubAgentTracker { eventEmitter.off(SubAgentEventType.TOOL_RESULT, onToolResult); eventEmitter.off(SubAgentEventType.TOOL_WAITING_APPROVAL, onApproval); eventEmitter.off(SubAgentEventType.USAGE_METADATA, onUsageMetadata); + eventEmitter.off(SubAgentEventType.STREAM_TEXT, onStreamText); // Clean up any remaining states this.toolStates.clear(); }, @@ -292,6 +296,26 @@ export class SubAgentTracker { }; } + /** + * Creates a handler for stream text events. + * Emits agent message or thought chunks for text content from subagent model responses. + */ + private createStreamTextHandler( + abortSignal: AbortSignal, + ): (...args: unknown[]) => void { + return (...args: unknown[]) => { + const event = args[0] as SubAgentStreamTextEvent; + if (abortSignal.aborted) return; + + // Emit streamed text as agent message or thought based on the flag + void this.messageEmitter.emitMessage( + event.text, + 'assistant', + event.thought ?? false, + ); + }; + } + /** * Converts confirmation details to permission options for the client. */ diff --git a/packages/core/src/subagents/subagent-events.ts b/packages/core/src/subagents/subagent-events.ts index 1f7933087..5de09a3c2 100644 --- a/packages/core/src/subagents/subagent-events.ts +++ b/packages/core/src/subagents/subagent-events.ts @@ -56,6 +56,8 @@ export interface SubAgentStreamTextEvent { subagentId: string; round: number; text: string; + /** Whether this text is reasoning/thinking content (as opposed to regular output) */ + thought?: boolean; timestamp: number; } diff --git a/packages/core/src/subagents/subagent.test.ts b/packages/core/src/subagents/subagent.test.ts index ed34a511d..67b9f375e 100644 --- a/packages/core/src/subagents/subagent.test.ts +++ b/packages/core/src/subagents/subagent.test.ts @@ -34,6 +34,11 @@ import { executeToolCall } from '../core/nonInteractiveToolExecutor.js'; import type { ToolRegistry } from '../tools/tool-registry.js'; import { type AnyDeclarativeTool } from '../tools/tools.js'; import { ContextState, SubAgentScope } from './subagent.js'; +import { + SubAgentEventEmitter, + SubAgentEventType, + type SubAgentStreamTextEvent, +} from './subagent-events.js'; import type { ModelConfig, PromptConfig, @@ -774,5 +779,156 @@ describe('subagent.ts', () => { expect(scope.getTerminateMode()).toBe(SubagentTerminateMode.ERROR); }); }); + + describe('runNonInteractive - Streaming and Thought Handling', () => { + const promptConfig: PromptConfig = { systemPrompt: 'Execute task.' }; + + // Helper to create a mock stream that yields specific parts + const createMockStreamWithParts = (parts: Part[]) => vi.fn().mockImplementation(async () => (async function* () { + yield { + type: 'chunk', + value: { + candidates: [ + { + content: { parts }, + }, + ], + }, + }; + })()); + + it('should emit STREAM_TEXT events with thought flag', async () => { + const { config } = await createMockConfig(); + + mockSendMessageStream = createMockStreamWithParts([ + { text: 'Let me think...' as string, thought: true }, + { text: 'Here is the answer.' as string }, + ]); + vi.mocked(GeminiChat).mockImplementation( + () => + ({ + sendMessageStream: mockSendMessageStream, + }) as unknown as GeminiChat, + ); + + const eventEmitter = new SubAgentEventEmitter(); + const events: SubAgentStreamTextEvent[] = []; + eventEmitter.on(SubAgentEventType.STREAM_TEXT, (...args: unknown[]) => { + events.push(args[0] as SubAgentStreamTextEvent); + }); + + const scope = await SubAgentScope.create( + 'test-agent', + config, + promptConfig, + defaultModelConfig, + defaultRunConfig, + undefined, + eventEmitter, + ); + + await scope.runNonInteractive(new ContextState()); + + expect(events).toHaveLength(2); + expect(events[0]!.text).toBe('Let me think...'); + expect(events[0]!.thought).toBe(true); + expect(events[1]!.text).toBe('Here is the answer.'); + expect(events[1]!.thought).toBe(false); + }); + + it('should exclude thought text from finalText', async () => { + const { config } = await createMockConfig(); + + mockSendMessageStream = createMockStreamWithParts([ + { text: 'Internal reasoning here.' as string, thought: true }, + { text: 'The final answer.' as string }, + ]); + vi.mocked(GeminiChat).mockImplementation( + () => + ({ + sendMessageStream: mockSendMessageStream, + }) as unknown as GeminiChat, + ); + + const scope = await SubAgentScope.create( + 'test-agent', + config, + promptConfig, + defaultModelConfig, + defaultRunConfig, + ); + + await scope.runNonInteractive(new ContextState()); + + expect(scope.getTerminateMode()).toBe(SubagentTerminateMode.GOAL); + expect(scope.getFinalText()).toBe('The final answer.'); + }); + + it('should not set finalText from thought-only response', async () => { + const { config } = await createMockConfig(); + + // First call: only thought text (no regular text → nudge) + // Second call: regular text response + let callIndex = 0; + mockSendMessageStream = vi.fn().mockImplementation(async () => { + const idx = callIndex++; + return (async function* () { + if (idx === 0) { + yield { + type: 'chunk', + value: { + candidates: [ + { + content: { + parts: [ + { + text: 'Just thinking...' as string, + thought: true, + }, + ], + }, + }, + ], + }, + }; + } else { + yield { + type: 'chunk', + value: { + candidates: [ + { + content: { + parts: [{ text: 'Actual output.' as string }], + }, + }, + ], + }, + }; + } + })(); + }); + vi.mocked(GeminiChat).mockImplementation( + () => + ({ + sendMessageStream: mockSendMessageStream, + }) as unknown as GeminiChat, + ); + + const scope = await SubAgentScope.create( + 'test-agent', + config, + promptConfig, + defaultModelConfig, + defaultRunConfig, + ); + + await scope.runNonInteractive(new ContextState()); + + expect(scope.getTerminateMode()).toBe(SubagentTerminateMode.GOAL); + expect(scope.getFinalText()).toBe('Actual output.'); + // Should have been called twice: first with thought-only, then nudged + expect(mockSendMessageStream).toHaveBeenCalledTimes(2); + }); + }); }); }); diff --git a/packages/core/src/subagents/subagent.ts b/packages/core/src/subagents/subagent.ts index 39e43e54f..7f3146e98 100644 --- a/packages/core/src/subagents/subagent.ts +++ b/packages/core/src/subagents/subagent.ts @@ -39,7 +39,6 @@ import type { SubAgentStartEvent, SubAgentToolCallEvent, SubAgentToolResultEvent, - SubAgentStreamTextEvent, SubAgentErrorEvent, SubAgentUsageEvent, } from './subagent-events.js'; @@ -412,15 +411,17 @@ export class SubAgentScope { const content = resp.candidates?.[0]?.content; const parts = content?.parts || []; for (const p of parts) { - const txt = (p as Part & { text?: string }).text; - if (txt) roundText += txt; + const txt = p.text; + const isThought = p.thought ?? false; + if (txt && !isThought) roundText += txt; if (txt) this.eventEmitter?.emit(SubAgentEventType.STREAM_TEXT, { subagentId: this.subagentId, round: turnCounter, text: txt, + thought: isThought, timestamp: Date.now(), - } as SubAgentStreamTextEvent); + }); } if (resp.usageMetadata) lastUsage = resp.usageMetadata; }