feat(channels): add streaming response hooks to ChannelBase

- Add onResponseChunk hook for progressive text display during streaming
- Add onResponseComplete hook for customizing response delivery
- Update mock plugin channel to support streaming chunks

This enables channels to display AI responses progressively as they stream,
improving user experience with real-time feedback.

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
tanzhenxin 2026-03-27 14:08:09 +00:00
parent 0ca8cf86f6
commit f7979aa902
4 changed files with 117 additions and 17 deletions

View file

@ -6,7 +6,11 @@ import type {
AcpBridge,
} from '@qwen-code/channel-base';
import WebSocket from 'ws';
import type { InboundMessage, OutboundMessage } from './protocol.js';
import type {
InboundMessage,
OutboundMessage,
ChunkMessage,
} from './protocol.js';
export interface MockPluginConfig extends ChannelConfig {
serverWsUrl: string;
@ -76,6 +80,30 @@ export class MockPluginChannel extends ChannelBase {
});
}
protected override onResponseChunk(
chatId: string,
chunk: string,
_sessionId: string,
): void {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return;
const msg: ChunkMessage = {
type: 'chunk',
messageId: this.pendingMessageId || 'unknown',
chatId,
text: chunk,
};
this.ws.send(JSON.stringify(msg));
}
protected override async onResponseComplete(
chatId: string,
fullText: string,
_sessionId: string,
): Promise<void> {
await this.sendMessage(chatId, fullText);
}
async sendMessage(chatId: string, text: string): Promise<void> {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
return;

View file

@ -53,9 +53,10 @@ export function createMockServer(
const pendingRequests = new Map<
string,
{
resolve: (text: string) => void;
resolve: (result: { text: string; chunks: string[] }) => void;
reject: (err: Error) => void;
timer: ReturnType<typeof setTimeout>;
chunks: string[];
}
>();
@ -73,12 +74,17 @@ export function createMockServer(
ws.on('message', (data) => {
try {
const msg = JSON.parse(data.toString());
if (msg.type === 'outbound' && msg.messageId) {
if (msg.type === 'chunk' && msg.messageId) {
const pending = pendingRequests.get(msg.messageId);
if (pending) {
pending.chunks.push(msg.text);
}
} else if (msg.type === 'outbound' && msg.messageId) {
const pending = pendingRequests.get(msg.messageId);
if (pending) {
clearTimeout(pending.timer);
pendingRequests.delete(msg.messageId);
pending.resolve(msg.text);
pending.resolve({ text: msg.text, chunks: pending.chunks });
}
}
} catch {
@ -138,18 +144,35 @@ export function createMockServer(
}),
);
const responsePromise = new Promise<string>((resolve, reject) => {
const responsePromise = new Promise<{
text: string;
chunks: string[];
}>((resolve, reject) => {
const timer = setTimeout(() => {
pendingRequests.delete(messageId);
reject(new Error('Timeout waiting for agent response'));
}, responseTimeoutMs);
pendingRequests.set(messageId, { resolve, reject, timer });
pendingRequests.set(messageId, {
resolve,
reject,
timer,
chunks: [],
});
});
responsePromise
.then((responseText) => {
.then((result) => {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ messageId, text: responseText }));
res.end(
JSON.stringify({
messageId,
text: result.text,
streaming: {
chunks: result.chunks.length,
bytes: result.chunks.reduce((n, c) => n + c.length, 0),
},
}),
);
})
.catch((err: Error) => {
res.writeHead(504, { 'Content-Type': 'application/json' });
@ -215,7 +238,12 @@ export function createMockServer(
pendingRequests.delete(messageId);
reject(new Error('Timeout waiting for agent response'));
}, responseTimeoutMs);
pendingRequests.set(messageId, { resolve, reject, timer });
pendingRequests.set(messageId, {
resolve: (result) => resolve(result.text),
reject,
timer,
chunks: [],
});
});
},

View file

@ -12,7 +12,15 @@ export interface InboundMessage {
text: string;
}
/** Plugin Channel → Server (WebSocket) */
/** Plugin Channel → Server (WebSocket) — streaming chunk */
export interface ChunkMessage {
type: 'chunk';
messageId: string;
chatId: string;
text: string;
}
/** Plugin Channel → Server (WebSocket) — final response */
export interface OutboundMessage {
type: 'outbound';
messageId: string;
@ -20,4 +28,4 @@ export interface OutboundMessage {
text: string;
}
export type WsMessage = InboundMessage | OutboundMessage;
export type WsMessage = InboundMessage | ChunkMessage | OutboundMessage;