diff --git a/packages/channels/base/src/ChannelBase.ts b/packages/channels/base/src/ChannelBase.ts index 12b9b9ff9..d4c3814a7 100644 --- a/packages/channels/base/src/ChannelBase.ts +++ b/packages/channels/base/src/ChannelBase.ts @@ -72,6 +72,30 @@ export abstract class ChannelBase { onToolCall(_chatId: string, _event: ToolCallEvent): void {} + /** + * Called for each text chunk as the agent streams its response. + * Override to implement progressive display (e.g., updating an AI card in-place). + * Default: no-op (chunks are collected internally and delivered via onResponseComplete). + */ + protected onResponseChunk( + _chatId: string, + _chunk: string, + _sessionId: string, + ): void {} + + /** + * Called when the agent's full response is ready. + * Override to customize delivery (e.g., finalize an AI card). + * Default: calls sendMessage() with the full response text. + */ + protected async onResponseComplete( + chatId: string, + fullText: string, + _sessionId: string, + ): Promise { + await this.sendMessage(chatId, fullText); + } + /** * Register a slash command handler. Subclasses can call this to add * platform-specific commands (e.g., /start for Telegram). @@ -223,13 +247,25 @@ export abstract class ChannelBase { // pollution when concurrent messages hit the same session. const prev = this.sessionQueues.get(sessionId) ?? Promise.resolve(); const current = prev.then(async () => { - const response = await this.bridge.prompt(sessionId, promptText, { - imageBase64: envelope.imageBase64, - imageMimeType: envelope.imageMimeType, - }); + // Forward streaming chunks to the subclass hook + const onChunk = (sid: string, chunk: string) => { + if (sid === sessionId) { + this.onResponseChunk(envelope.chatId, chunk, sessionId); + } + }; + this.bridge.on('textChunk', onChunk); - if (response) { - await this.sendMessage(envelope.chatId, response); + try { + const response = await this.bridge.prompt(sessionId, promptText, { + imageBase64: envelope.imageBase64, + imageMimeType: envelope.imageMimeType, + }); + + if (response) { + await this.onResponseComplete(envelope.chatId, response, sessionId); + } + } finally { + this.bridge.off('textChunk', onChunk); } }); this.sessionQueues.set( diff --git a/packages/channels/plugin-example/src/MockPluginChannel.ts b/packages/channels/plugin-example/src/MockPluginChannel.ts index 481c9e97f..a4f856350 100644 --- a/packages/channels/plugin-example/src/MockPluginChannel.ts +++ b/packages/channels/plugin-example/src/MockPluginChannel.ts @@ -6,7 +6,11 @@ import type { AcpBridge, } from '@qwen-code/channel-base'; import WebSocket from 'ws'; -import type { InboundMessage, OutboundMessage } from './protocol.js'; +import type { + InboundMessage, + OutboundMessage, + ChunkMessage, +} from './protocol.js'; export interface MockPluginConfig extends ChannelConfig { serverWsUrl: string; @@ -76,6 +80,30 @@ export class MockPluginChannel extends ChannelBase { }); } + protected override onResponseChunk( + chatId: string, + chunk: string, + _sessionId: string, + ): void { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return; + + const msg: ChunkMessage = { + type: 'chunk', + messageId: this.pendingMessageId || 'unknown', + chatId, + text: chunk, + }; + this.ws.send(JSON.stringify(msg)); + } + + protected override async onResponseComplete( + chatId: string, + fullText: string, + _sessionId: string, + ): Promise { + await this.sendMessage(chatId, fullText); + } + async sendMessage(chatId: string, text: string): Promise { if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { return; diff --git a/packages/channels/plugin-example/src/mock-server.ts b/packages/channels/plugin-example/src/mock-server.ts index 6b40290ed..450ded322 100644 --- a/packages/channels/plugin-example/src/mock-server.ts +++ b/packages/channels/plugin-example/src/mock-server.ts @@ -53,9 +53,10 @@ export function createMockServer( const pendingRequests = new Map< string, { - resolve: (text: string) => void; + resolve: (result: { text: string; chunks: string[] }) => void; reject: (err: Error) => void; timer: ReturnType; + chunks: string[]; } >(); @@ -73,12 +74,17 @@ export function createMockServer( ws.on('message', (data) => { try { const msg = JSON.parse(data.toString()); - if (msg.type === 'outbound' && msg.messageId) { + if (msg.type === 'chunk' && msg.messageId) { + const pending = pendingRequests.get(msg.messageId); + if (pending) { + pending.chunks.push(msg.text); + } + } else if (msg.type === 'outbound' && msg.messageId) { const pending = pendingRequests.get(msg.messageId); if (pending) { clearTimeout(pending.timer); pendingRequests.delete(msg.messageId); - pending.resolve(msg.text); + pending.resolve({ text: msg.text, chunks: pending.chunks }); } } } catch { @@ -138,18 +144,35 @@ export function createMockServer( }), ); - const responsePromise = new Promise((resolve, reject) => { + const responsePromise = new Promise<{ + text: string; + chunks: string[]; + }>((resolve, reject) => { const timer = setTimeout(() => { pendingRequests.delete(messageId); reject(new Error('Timeout waiting for agent response')); }, responseTimeoutMs); - pendingRequests.set(messageId, { resolve, reject, timer }); + pendingRequests.set(messageId, { + resolve, + reject, + timer, + chunks: [], + }); }); responsePromise - .then((responseText) => { + .then((result) => { res.writeHead(200, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ messageId, text: responseText })); + res.end( + JSON.stringify({ + messageId, + text: result.text, + streaming: { + chunks: result.chunks.length, + bytes: result.chunks.reduce((n, c) => n + c.length, 0), + }, + }), + ); }) .catch((err: Error) => { res.writeHead(504, { 'Content-Type': 'application/json' }); @@ -215,7 +238,12 @@ export function createMockServer( pendingRequests.delete(messageId); reject(new Error('Timeout waiting for agent response')); }, responseTimeoutMs); - pendingRequests.set(messageId, { resolve, reject, timer }); + pendingRequests.set(messageId, { + resolve: (result) => resolve(result.text), + reject, + timer, + chunks: [], + }); }); }, diff --git a/packages/channels/plugin-example/src/protocol.ts b/packages/channels/plugin-example/src/protocol.ts index 49c2e0fba..25eee9034 100644 --- a/packages/channels/plugin-example/src/protocol.ts +++ b/packages/channels/plugin-example/src/protocol.ts @@ -12,7 +12,15 @@ export interface InboundMessage { text: string; } -/** Plugin Channel → Server (WebSocket) */ +/** Plugin Channel → Server (WebSocket) — streaming chunk */ +export interface ChunkMessage { + type: 'chunk'; + messageId: string; + chatId: string; + text: string; +} + +/** Plugin Channel → Server (WebSocket) — final response */ export interface OutboundMessage { type: 'outbound'; messageId: string; @@ -20,4 +28,4 @@ export interface OutboundMessage { text: string; } -export type WsMessage = InboundMessage | OutboundMessage; +export type WsMessage = InboundMessage | ChunkMessage | OutboundMessage;