docs(channels): add plugin developer guide and rename mock to plugin-example

- Add comprehensive developer guide for building channel plugins
- Add user-facing docs for installing/configuring custom channel plugins
- Replace custom-channels.md with new plugins.md
- Rename @qwen-code/channel-mock to @qwen-code/channel-plugin-example
- Add messageId field to Envelope type for response correlation

This provides clear documentation for developers building custom channel
adapters and renames the mock package to better reflect its purpose as
a reference implementation example.

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
tanzhenxin 2026-03-27 03:19:34 +00:00
parent 01c2e5a373
commit 987eebd1c4
17 changed files with 246 additions and 270 deletions

View file

@ -0,0 +1,109 @@
import { ChannelBase } from '@qwen-code/channel-base';
import type {
ChannelConfig,
ChannelBaseOptions,
Envelope,
AcpBridge,
} from '@qwen-code/channel-base';
import WebSocket from 'ws';
import type { InboundMessage, OutboundMessage } from './protocol.js';
export interface MockPluginConfig extends ChannelConfig {
serverWsUrl: string;
}
export class MockPluginChannel extends ChannelBase {
private ws: WebSocket | null = null;
private serverWsUrl: string;
private pendingMessageId: string | undefined;
constructor(
name: string,
config: MockPluginConfig & Record<string, unknown>,
bridge: AcpBridge,
options?: ChannelBaseOptions,
) {
super(name, config, bridge, options);
this.serverWsUrl = config.serverWsUrl;
}
async connect(): Promise<void> {
return new Promise<void>((resolve, reject) => {
this.ws = new WebSocket(this.serverWsUrl);
this.ws.on('open', () => {
resolve();
});
this.ws.on('message', (data: Buffer) => {
try {
const msg = JSON.parse(data.toString()) as InboundMessage;
if (msg.type === 'inbound') {
this.onInboundMessage(msg);
}
} catch {
// ignore parse errors
}
});
this.ws.on('close', () => {
this.ws = null;
});
this.ws.on('error', (err: Error) => {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
reject(err);
}
});
});
}
private onInboundMessage(msg: InboundMessage): void {
const envelope: Envelope = {
channelName: this.name,
senderId: msg.senderId,
senderName: msg.senderName,
chatId: msg.chatId,
text: msg.text,
messageId: msg.messageId,
isGroup: false,
isMentioned: false,
isReplyToBot: false,
};
this.handleInbound(envelope).catch(() => {
// errors handled internally by ChannelBase
});
}
async sendMessage(chatId: string, text: string): Promise<void> {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
return;
}
const outbound: OutboundMessage = {
type: 'outbound',
messageId: this.pendingMessageId || 'unknown',
chatId,
text,
};
this.ws.send(JSON.stringify(outbound));
}
disconnect(): void {
if (this.ws) {
this.ws.close();
this.ws = null;
}
}
override async handleInbound(envelope: Envelope): Promise<void> {
this.pendingMessageId = envelope.messageId;
try {
await super.handleInbound(envelope);
} finally {
this.pendingMessageId = undefined;
}
}
}

View file

@ -0,0 +1,16 @@
import type { ChannelPlugin } from '@qwen-code/channel-base';
import { MockPluginChannel } from './MockPluginChannel.js';
export { MockPluginChannel } from './MockPluginChannel.js';
export type { MockPluginConfig } from './MockPluginChannel.js';
export { createMockServer } from './mock-server.js';
export type { MockServerHandle, MockServerOptions } from './mock-server.js';
export type { InboundMessage, OutboundMessage, WsMessage } from './protocol.js';
export const plugin: ChannelPlugin = {
channelType: 'plugin-example',
displayName: 'Plugin Example',
requiredConfigFields: ['serverWsUrl'],
createChannel: (name, config, bridge, options) =>
new MockPluginChannel(name, config as MockPluginConfig, bridge, options),
};

View file

@ -0,0 +1,254 @@
/**
* Mock Platform Server programmatic API for integration tests.
*
* Provides a createMockServer() function that starts HTTP + WebSocket servers
* and returns a handle for sending messages and cleaning up.
*
* Architecture:
* Test code calls server.sendMessage("Hello")
* HTTP handler creates messageId, pushes via WebSocket to connected channel
* Channel processes responds via WebSocket
* Server resolves the pending promise with agent response text
*/
import http from 'node:http';
import crypto from 'node:crypto';
import { WebSocketServer, WebSocket } from 'ws';
export interface MockServerHandle {
/** Port the HTTP server is listening on */
httpPort: number;
/** Port the WebSocket server is listening on */
wsPort: number;
/** WebSocket URL for channels to connect to */
wsUrl: string;
/** Send a message through the full pipeline and wait for the agent response */
sendMessage(
text: string,
options?: { senderId?: string; senderName?: string; chatId?: string },
): Promise<string>;
/** Wait for a plugin channel to connect */
waitForConnection(timeoutMs?: number): Promise<void>;
/** Shut down both servers and reject pending requests */
close(): Promise<void>;
}
export interface MockServerOptions {
/** HTTP port (0 = random available port) */
httpPort?: number;
/** WebSocket port (0 = random available port) */
wsPort?: number;
/** Timeout for agent responses in ms (default: 120000) */
responseTimeoutMs?: number;
}
export function createMockServer(
options?: MockServerOptions,
): Promise<MockServerHandle> {
const responseTimeoutMs = options?.responseTimeoutMs ?? 120_000;
let pluginWs: WebSocket | null = null;
let connectionResolver: (() => void) | null = null;
const pendingRequests = new Map<
string,
{
resolve: (text: string) => void;
reject: (err: Error) => void;
timer: ReturnType<typeof setTimeout>;
}
>();
// --- WebSocket server ---
const wss = new WebSocketServer({ port: options?.wsPort ?? 0 });
wss.on('connection', (ws) => {
pluginWs = ws;
if (connectionResolver) {
connectionResolver();
connectionResolver = null;
}
ws.on('message', (data) => {
try {
const msg = JSON.parse(data.toString());
if (msg.type === 'outbound' && msg.messageId) {
const pending = pendingRequests.get(msg.messageId);
if (pending) {
clearTimeout(pending.timer);
pendingRequests.delete(msg.messageId);
pending.resolve(msg.text);
}
}
} catch {
// ignore
}
});
ws.on('close', () => {
if (pluginWs === ws) pluginWs = null;
});
});
// --- HTTP server ---
const httpServer = http.createServer((req, res) => {
if (req.method === 'GET' && req.url === '/health') {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(
JSON.stringify({
status: 'ok',
pluginConnected:
pluginWs !== null && pluginWs.readyState === WebSocket.OPEN,
}),
);
return;
}
if (req.method === 'POST' && req.url === '/message') {
let body = '';
req.on('data', (chunk: Buffer) => {
body += chunk.toString();
});
req.on('end', () => {
try {
const { senderId, senderName, chatId, text } = JSON.parse(body);
if (!senderId || !text) {
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(
JSON.stringify({ error: 'senderId and text are required' }),
);
return;
}
if (!pluginWs || pluginWs.readyState !== WebSocket.OPEN) {
res.writeHead(503, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Plugin channel not connected' }));
return;
}
const messageId = crypto.randomUUID();
pluginWs.send(
JSON.stringify({
type: 'inbound',
messageId,
senderId,
senderName: senderName || senderId,
chatId: chatId || `dm-${senderId}`,
text,
}),
);
const responsePromise = new Promise<string>((resolve, reject) => {
const timer = setTimeout(() => {
pendingRequests.delete(messageId);
reject(new Error('Timeout waiting for agent response'));
}, responseTimeoutMs);
pendingRequests.set(messageId, { resolve, reject, timer });
});
responsePromise
.then((responseText) => {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ messageId, text: responseText }));
})
.catch((err: Error) => {
res.writeHead(504, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: err.message }));
});
} catch {
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Invalid JSON body' }));
}
});
return;
}
res.writeHead(404, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Not found' }));
});
// Start both servers and return the handle
return new Promise<MockServerHandle>((resolve, reject) => {
const wsAddress = wss.address();
if (!wsAddress || typeof wsAddress === 'string') {
reject(new Error('WebSocket server failed to bind'));
return;
}
const wsPort = wsAddress.port;
httpServer.listen(options?.httpPort ?? 0, () => {
const httpAddress = httpServer.address();
if (!httpAddress || typeof httpAddress === 'string') {
reject(new Error('HTTP server failed to bind'));
return;
}
const httpPort = httpAddress.port;
const handle: MockServerHandle = {
httpPort,
wsPort,
wsUrl: `ws://localhost:${wsPort}`,
async sendMessage(text, opts) {
const senderId = opts?.senderId || 'test-user';
const senderName = opts?.senderName || 'Test User';
const chatId = opts?.chatId || `dm-${senderId}`;
if (!pluginWs || pluginWs.readyState !== WebSocket.OPEN) {
throw new Error('Plugin channel not connected');
}
const messageId = crypto.randomUUID();
pluginWs.send(
JSON.stringify({
type: 'inbound',
messageId,
senderId,
senderName,
chatId,
text,
}),
);
return new Promise<string>((resolve, reject) => {
const timer = setTimeout(() => {
pendingRequests.delete(messageId);
reject(new Error('Timeout waiting for agent response'));
}, responseTimeoutMs);
pendingRequests.set(messageId, { resolve, reject, timer });
});
},
async waitForConnection(timeoutMs = 10_000) {
if (pluginWs && pluginWs.readyState === WebSocket.OPEN) return;
return new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => {
reject(new Error('Timeout waiting for channel connection'));
}, timeoutMs);
connectionResolver = () => {
clearTimeout(timer);
resolve();
};
});
},
async close() {
for (const [, pending] of pendingRequests) {
clearTimeout(pending.timer);
pending.reject(new Error('Server shutting down'));
}
pendingRequests.clear();
await new Promise<void>((r) => {
wss.close(() => r());
});
await new Promise<void>((r) => {
httpServer.close(() => r());
});
},
};
resolve(handle);
});
});
}

View file

@ -0,0 +1,23 @@
/**
* Shared protocol types for mock channel WebSocket communication.
*/
/** Server → Plugin Channel (WebSocket) */
export interface InboundMessage {
type: 'inbound';
messageId: string;
senderId: string;
senderName: string;
chatId: string;
text: string;
}
/** Plugin Channel → Server (WebSocket) */
export interface OutboundMessage {
type: 'outbound';
messageId: string;
chatId: string;
text: string;
}
export type WsMessage = InboundMessage | OutboundMessage;