diff --git a/integration-tests/channel-plugin.test.ts b/integration-tests/channel-plugin.test.ts index 9712581ea..605df871c 100644 --- a/integration-tests/channel-plugin.test.ts +++ b/integration-tests/channel-plugin.test.ts @@ -5,26 +5,24 @@ */ /** - * Channel Plugin Integration Test — "Loopback Channel" + * Channel Plugin Integration Test — Real E2E with WebSocket * - * Creative approach: instead of the heavy 3-process architecture - * (mock server + channel service + mock client), we use an in-process - * "loopback channel" that acts as both sender and receiver. + * Tests the actual MockPluginChannel (from @qwen-code/channel-mock) connected + * to an in-process mock server via WebSocket. The full message flow is: * - * The LoopbackChannel extends ChannelBase and plugs directly into AcpBridge. - * When a message is sent, it flows through the REAL pipeline: + * server.sendMessage("What is 2+2?") + * → WebSocket push to MockPluginChannel + * → ChannelBase.handleInbound(envelope) + * → SenderGate (open policy) + * → SessionRouter (creates/reuses session) + * → AcpBridge.prompt(sessionId, text) + * → qwen-code --acp (REAL model request) + * → MockPluginChannel.sendMessage(chatId, response) + * → WebSocket response to mock server + * → server resolves promise with agent text * - * test.send("What is 2+2?") - * → LoopbackChannel.handleInbound(envelope) - * → SenderGate (open policy) - * → SessionRouter (creates/reuses session) - * → AcpBridge.prompt(sessionId, text) - * → qwen-code --acp (REAL model request) - * → LoopbackChannel.sendMessage(chatId, response) - * → test receives response via promise - * - * No WebSocket, no HTTP, no separate processes. Just the real - * channel pipeline with a real agent backend. + * This exercises the real WebSocket protocol, real message serialization, + * real ChannelPlugin interface, and real model backend — all in one test process. */ import { describe, it, expect, afterAll } from 'vitest'; @@ -32,172 +30,71 @@ import { join, dirname } from 'node:path'; import { fileURLToPath } from 'node:url'; import { mkdirSync } from 'node:fs'; -// Import channel-base directly from compiled dist +// Import from the monorepo channel packages import { AcpBridge, - ChannelBase, SessionRouter, } from '../packages/channels/base/dist/index.js'; -import type { - ChannelConfig, - Envelope, - ChannelBaseOptions, -} from '../packages/channels/base/dist/index.js'; -import type { AcpBridge as AcpBridgeType } from '../packages/channels/base/dist/index.js'; +import type { ChannelConfig } from '../packages/channels/base/dist/index.js'; +import { + MockPluginChannel, + createMockServer, +} from '../packages/channels/mock/src/index.js'; +import type { MockServerHandle } from '../packages/channels/mock/src/index.js'; const __dirname = dirname(fileURLToPath(import.meta.url)); const CLI_PATH = join(__dirname, '..', 'dist', 'cli.js'); const RESPONSE_TIMEOUT_MS = 120_000; -// --------------------------------------------------------------------------- -// Loopback Channel — the creative core -// --------------------------------------------------------------------------- - -/** - * A channel that lives entirely in the test process. - * - * - connect() is a no-op (nothing external to connect to) - * - sendMessage() resolves a pending promise so the test gets the response - * - send() pushes a message through handleInbound and returns the agent reply - * - * Think of it as a "promise pipe" that wraps the full ChannelBase pipeline. - */ -class LoopbackChannel extends ChannelBase { - /** Map of chatId → resolver for the next sendMessage call */ - private responseResolvers = new Map void>(); - private responseChunks = new Map(); - - constructor( - name: string, - config: ChannelConfig, - bridge: AcpBridgeType, - options?: ChannelBaseOptions, - ) { - super(name, config, bridge, options); - } - - async connect(): Promise { - // No external connection needed — we ARE the platform - } - - async sendMessage(chatId: string, text: string): Promise { - const resolver = this.responseResolvers.get(chatId); - if (resolver) { - resolver(text); - this.responseResolvers.delete(chatId); - } else { - // Buffer for cases where response arrives before await - const chunks = this.responseChunks.get(chatId) || []; - chunks.push(text); - this.responseChunks.set(chatId, chunks); - } - } - - disconnect(): void { - // Clean up any pending resolvers - for (const [, resolver] of this.responseResolvers) { - resolver('[channel disconnected]'); - } - this.responseResolvers.clear(); - } - - /** - * Send a message through the full channel pipeline and wait for the response. - * This is the test-facing API. - */ - async send( - text: string, - options?: { - senderId?: string; - senderName?: string; - chatId?: string; - timeoutMs?: number; - }, - ): Promise { - const chatId = options?.chatId || 'loopback-dm-1'; - const senderId = options?.senderId || 'test-user'; - const senderName = options?.senderName || 'Test User'; - const timeoutMs = options?.timeoutMs || RESPONSE_TIMEOUT_MS; - - // Create promise to capture the response from sendMessage - const responsePromise = new Promise((resolve, reject) => { - const timer = setTimeout(() => { - this.responseResolvers.delete(chatId); - reject(new Error(`Loopback timeout: no response after ${timeoutMs}ms`)); - }, timeoutMs); - - this.responseResolvers.set(chatId, (text: string) => { - clearTimeout(timer); - resolve(text); - }); - }); - - // Build envelope and push through the pipeline - const envelope: Envelope = { - channelName: this.name, - senderId, - senderName, - chatId, - text, - isGroup: false, - isMentioned: false, - isReplyToBot: false, - }; - - // handleInbound → gates → session → bridge.prompt → sendMessage - await this.handleInbound(envelope); - - return responsePromise; - } -} - -// --------------------------------------------------------------------------- -// Test helpers -// --------------------------------------------------------------------------- - -function createTestConfig(cwd: string): ChannelConfig { - return { - type: 'loopback', - token: '', - senderPolicy: 'open', - allowedUsers: [], - sessionScope: 'user', - cwd, - groupPolicy: 'disabled', - groups: {}, - } as ChannelConfig; -} - // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- -describe('Channel Plugin (Loopback)', () => { +describe('Channel Plugin (Mock WebSocket E2E)', () => { let bridge: InstanceType; - let channel: LoopbackChannel; + let channel: MockPluginChannel; + let server: MockServerHandle; let testDir: string; - // Set up once for all tests — reuse the bridge (expensive to start) const setup = async () => { const baseDir = process.env['INTEGRATION_TEST_FILE_DIR'] || join(__dirname, '..', '.integration-tests', `channel-${Date.now()}`); - testDir = join(baseDir, 'channel-plugin'); + testDir = join(baseDir, 'channel-mock-e2e'); mkdirSync(testDir, { recursive: true }); + // 1. Start mock server on random ports (no port conflicts) + server = await createMockServer({ httpPort: 0, wsPort: 0 }); + + // 2. Start AcpBridge (spawns real qwen-code --acp) bridge = new AcpBridge({ cliEntryPath: CLI_PATH, cwd: testDir, }); await bridge.start(); + // 3. Create and connect MockPluginChannel via WebSocket + const config: ChannelConfig & Record = { + type: 'mock-plugin', + token: '', + senderPolicy: 'open', + allowedUsers: [], + sessionScope: 'user', + cwd: testDir, + groupPolicy: 'disabled', + groups: {}, + serverWsUrl: server.wsUrl, + }; + const router = new SessionRouter(bridge, testDir, 'user'); - const config = createTestConfig(testDir); - channel = new LoopbackChannel('test-loopback', config, bridge, { router }); + channel = new MockPluginChannel('test-mock', config, bridge, { router }); await channel.connect(); + + // 4. Wait for the channel's WebSocket to be registered by the server + await server.waitForConnection(5_000); }; - afterAll(() => { + afterAll(async () => { try { channel?.disconnect(); } catch { @@ -208,67 +105,69 @@ describe('Channel Plugin (Loopback)', () => { } catch { // ignore } + try { + await server?.close(); + } catch { + // ignore + } }); it( - 'should receive a real agent response through the full channel pipeline', + 'should send a message through WebSocket and receive a real agent response', async () => { await setup(); - const response = await channel.send( + // This goes: server → WS → MockPluginChannel → ChannelBase → AcpBridge → agent → back + const response = await server.sendMessage( 'What is 2+2? Reply with ONLY the number, nothing else.', ); - // The real model should return something containing "4" expect(response).toBeTruthy(); expect(response).toContain('4'); - console.log(`[channel-plugin] Single turn response: "${response}"`); + console.log(`[mock-e2e] Single turn response: "${response}"`); }, RESPONSE_TIMEOUT_MS, ); it( - 'should maintain session state across multiple messages', + 'should maintain session state across multiple WebSocket messages', async () => { - // Use a dedicated chatId for this test's session - const chatId = 'session-test-dm'; + const chatId = 'ws-session-test'; + const opts = { chatId }; - const r1 = await channel.send( + const r1 = await server.sendMessage( 'My secret word is "pineapple". Remember it.', - { - chatId, - }, + opts, ); expect(r1).toBeTruthy(); - console.log(`[channel-plugin] Memory set response: "${r1}"`); + console.log(`[mock-e2e] Memory set response: "${r1}"`); - const r2 = await channel.send( + const r2 = await server.sendMessage( 'What is my secret word? Reply with ONLY the word, nothing else.', - { chatId }, + opts, ); expect(r2).toBeTruthy(); expect(r2.toLowerCase()).toContain('pineapple'); - console.log(`[channel-plugin] Memory recall response: "${r2}"`); + console.log(`[mock-e2e] Memory recall response: "${r2}"`); }, RESPONSE_TIMEOUT_MS * 2, ); it( - 'should handle a different sender through the same pipeline', + 'should handle a different sender through the same WebSocket pipeline', async () => { - // Use a different sender to verify per-sender session routing works - const response = await channel.send( + const response = await server.sendMessage( 'What is 10 * 5? Reply with ONLY the number, nothing else.', { - senderId: 'different-user', + senderId: 'another-user', senderName: 'Another User', - chatId: 'different-user-dm', + chatId: 'dm-another-user', }, ); expect(response).toBeTruthy(); expect(response).toContain('50'); - console.log(`[channel-plugin] Different sender response: "${response}"`); + console.log(`[mock-e2e] Different sender response: "${response}"`); }, RESPONSE_TIMEOUT_MS, ); diff --git a/package-lock.json b/package-lock.json index caf9f2156..aaba91193 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,7 +12,8 @@ "packages/channels/base", "packages/channels/telegram", "packages/channels/weixin", - "packages/channels/dingtalk" + "packages/channels/dingtalk", + "packages/channels/mock" ], "dependencies": { "@testing-library/dom": "^10.4.1", @@ -3002,6 +3003,10 @@ "resolved": "packages/channels/dingtalk", "link": true }, + "node_modules/@qwen-code/channel-mock": { + "resolved": "packages/channels/mock", + "link": true + }, "node_modules/@qwen-code/channel-telegram": { "resolved": "packages/channels/telegram", "link": true @@ -18985,6 +18990,17 @@ "typescript": "^5.0.0" } }, + "packages/channels/mock": { + "name": "@qwen-code/channel-mock", + "version": "0.1.0", + "dependencies": { + "@qwen-code/channel-base": "file:../base", + "ws": "^8.18.0" + }, + "devDependencies": { + "@types/ws": "^8.5.0" + } + }, "packages/channels/telegram": { "name": "@qwen-code/channel-telegram", "version": "0.1.0", diff --git a/package.json b/package.json index cf339100a..c565eaf91 100644 --- a/package.json +++ b/package.json @@ -10,7 +10,8 @@ "packages/channels/base", "packages/channels/telegram", "packages/channels/weixin", - "packages/channels/dingtalk" + "packages/channels/dingtalk", + "packages/channels/mock" ], "repository": { "type": "git", diff --git a/packages/channels/mock/package.json b/packages/channels/mock/package.json new file mode 100644 index 000000000..18e2d6738 --- /dev/null +++ b/packages/channels/mock/package.json @@ -0,0 +1,17 @@ +{ + "name": "@qwen-code/channel-mock", + "version": "0.1.0", + "type": "module", + "main": "src/index.ts", + "types": "src/index.ts", + "exports": { + ".": "./src/index.ts" + }, + "dependencies": { + "@qwen-code/channel-base": "file:../base", + "ws": "^8.18.0" + }, + "devDependencies": { + "@types/ws": "^8.5.0" + } +} diff --git a/packages/channels/mock/src/MockPluginChannel.ts b/packages/channels/mock/src/MockPluginChannel.ts new file mode 100644 index 000000000..f75c518fb --- /dev/null +++ b/packages/channels/mock/src/MockPluginChannel.ts @@ -0,0 +1,116 @@ +import { ChannelBase } from '@qwen-code/channel-base'; +import type { + ChannelConfig, + ChannelBaseOptions, + Envelope, + AcpBridge, +} from '@qwen-code/channel-base'; +import WebSocket from 'ws'; +import type { InboundMessage, OutboundMessage } from './protocol.js'; + +export interface MockPluginConfig extends ChannelConfig { + serverWsUrl: string; +} + +export class MockPluginChannel extends ChannelBase { + private ws: WebSocket | null = null; + private serverWsUrl: string; + private pendingMessageId: string | undefined; + + constructor( + name: string, + config: MockPluginConfig & Record, + bridge: AcpBridge, + options?: ChannelBaseOptions, + ) { + super(name, config, bridge, options); + this.serverWsUrl = config.serverWsUrl; + } + + async connect(): Promise { + return new Promise((resolve, reject) => { + this.ws = new WebSocket(this.serverWsUrl); + + this.ws.on('open', () => { + resolve(); + }); + + this.ws.on('message', (data: Buffer) => { + try { + const msg = JSON.parse(data.toString()) as InboundMessage; + if (msg.type === 'inbound') { + this.onInboundMessage(msg); + } + } catch { + // ignore parse errors + } + }); + + this.ws.on('close', () => { + this.ws = null; + }); + + this.ws.on('error', (err: Error) => { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { + reject(err); + } + }); + }); + } + + private onInboundMessage(msg: InboundMessage): void { + const envelope: Envelope = { + channelName: this.name, + senderId: msg.senderId, + senderName: msg.senderName, + chatId: msg.chatId, + text: msg.text, + isGroup: false, + isMentioned: false, + isReplyToBot: false, + }; + + // Store messageId for response correlation + (envelope as unknown as Record)['_messageId'] = + msg.messageId; + + this.handleInbound(envelope).catch(() => { + // errors handled internally by ChannelBase + }); + } + + async sendMessage(chatId: string, text: string): Promise { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { + return; + } + + const messageId = this.pendingMessageId || 'unknown'; + + const outbound: OutboundMessage = { + type: 'outbound', + messageId, + chatId, + text, + }; + + this.ws.send(JSON.stringify(outbound)); + } + + disconnect(): void { + if (this.ws) { + this.ws.close(); + this.ws = null; + } + } + + override async handleInbound(envelope: Envelope): Promise { + this.pendingMessageId = (envelope as unknown as Record)[ + '_messageId' + ] as string | undefined; + try { + await super.handleInbound(envelope); + } finally { + this.pendingMessageId = undefined; + } + } +} diff --git a/packages/channels/mock/src/index.ts b/packages/channels/mock/src/index.ts new file mode 100644 index 000000000..df9ddb184 --- /dev/null +++ b/packages/channels/mock/src/index.ts @@ -0,0 +1,16 @@ +import type { ChannelPlugin } from '@qwen-code/channel-base'; +import { MockPluginChannel } from './MockPluginChannel.js'; + +export { MockPluginChannel } from './MockPluginChannel.js'; +export type { MockPluginConfig } from './MockPluginChannel.js'; +export { createMockServer } from './mock-server.js'; +export type { MockServerHandle, MockServerOptions } from './mock-server.js'; +export type { InboundMessage, OutboundMessage, WsMessage } from './protocol.js'; + +export const plugin: ChannelPlugin = { + channelType: 'mock-plugin', + displayName: 'Mock Plugin', + requiredConfigFields: ['serverWsUrl'], + createChannel: (name, config, bridge, options) => + new MockPluginChannel(name, config as MockPluginConfig, bridge, options), +}; diff --git a/packages/channels/mock/src/mock-server.ts b/packages/channels/mock/src/mock-server.ts new file mode 100644 index 000000000..6b40290ed --- /dev/null +++ b/packages/channels/mock/src/mock-server.ts @@ -0,0 +1,254 @@ +/** + * Mock Platform Server — programmatic API for integration tests. + * + * Provides a createMockServer() function that starts HTTP + WebSocket servers + * and returns a handle for sending messages and cleaning up. + * + * Architecture: + * Test code calls server.sendMessage("Hello") + * → HTTP handler creates messageId, pushes via WebSocket to connected channel + * → Channel processes → responds via WebSocket + * → Server resolves the pending promise with agent response text + */ + +import http from 'node:http'; +import crypto from 'node:crypto'; +import { WebSocketServer, WebSocket } from 'ws'; + +export interface MockServerHandle { + /** Port the HTTP server is listening on */ + httpPort: number; + /** Port the WebSocket server is listening on */ + wsPort: number; + /** WebSocket URL for channels to connect to */ + wsUrl: string; + /** Send a message through the full pipeline and wait for the agent response */ + sendMessage( + text: string, + options?: { senderId?: string; senderName?: string; chatId?: string }, + ): Promise; + /** Wait for a plugin channel to connect */ + waitForConnection(timeoutMs?: number): Promise; + /** Shut down both servers and reject pending requests */ + close(): Promise; +} + +export interface MockServerOptions { + /** HTTP port (0 = random available port) */ + httpPort?: number; + /** WebSocket port (0 = random available port) */ + wsPort?: number; + /** Timeout for agent responses in ms (default: 120000) */ + responseTimeoutMs?: number; +} + +export function createMockServer( + options?: MockServerOptions, +): Promise { + const responseTimeoutMs = options?.responseTimeoutMs ?? 120_000; + + let pluginWs: WebSocket | null = null; + let connectionResolver: (() => void) | null = null; + + const pendingRequests = new Map< + string, + { + resolve: (text: string) => void; + reject: (err: Error) => void; + timer: ReturnType; + } + >(); + + // --- WebSocket server --- + const wss = new WebSocketServer({ port: options?.wsPort ?? 0 }); + + wss.on('connection', (ws) => { + pluginWs = ws; + + if (connectionResolver) { + connectionResolver(); + connectionResolver = null; + } + + ws.on('message', (data) => { + try { + const msg = JSON.parse(data.toString()); + if (msg.type === 'outbound' && msg.messageId) { + const pending = pendingRequests.get(msg.messageId); + if (pending) { + clearTimeout(pending.timer); + pendingRequests.delete(msg.messageId); + pending.resolve(msg.text); + } + } + } catch { + // ignore + } + }); + + ws.on('close', () => { + if (pluginWs === ws) pluginWs = null; + }); + }); + + // --- HTTP server --- + const httpServer = http.createServer((req, res) => { + if (req.method === 'GET' && req.url === '/health') { + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end( + JSON.stringify({ + status: 'ok', + pluginConnected: + pluginWs !== null && pluginWs.readyState === WebSocket.OPEN, + }), + ); + return; + } + + if (req.method === 'POST' && req.url === '/message') { + let body = ''; + req.on('data', (chunk: Buffer) => { + body += chunk.toString(); + }); + req.on('end', () => { + try { + const { senderId, senderName, chatId, text } = JSON.parse(body); + if (!senderId || !text) { + res.writeHead(400, { 'Content-Type': 'application/json' }); + res.end( + JSON.stringify({ error: 'senderId and text are required' }), + ); + return; + } + if (!pluginWs || pluginWs.readyState !== WebSocket.OPEN) { + res.writeHead(503, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Plugin channel not connected' })); + return; + } + + const messageId = crypto.randomUUID(); + pluginWs.send( + JSON.stringify({ + type: 'inbound', + messageId, + senderId, + senderName: senderName || senderId, + chatId: chatId || `dm-${senderId}`, + text, + }), + ); + + const responsePromise = new Promise((resolve, reject) => { + const timer = setTimeout(() => { + pendingRequests.delete(messageId); + reject(new Error('Timeout waiting for agent response')); + }, responseTimeoutMs); + pendingRequests.set(messageId, { resolve, reject, timer }); + }); + + responsePromise + .then((responseText) => { + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ messageId, text: responseText })); + }) + .catch((err: Error) => { + res.writeHead(504, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: err.message })); + }); + } catch { + res.writeHead(400, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Invalid JSON body' })); + } + }); + return; + } + + res.writeHead(404, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Not found' })); + }); + + // Start both servers and return the handle + return new Promise((resolve, reject) => { + const wsAddress = wss.address(); + if (!wsAddress || typeof wsAddress === 'string') { + reject(new Error('WebSocket server failed to bind')); + return; + } + const wsPort = wsAddress.port; + + httpServer.listen(options?.httpPort ?? 0, () => { + const httpAddress = httpServer.address(); + if (!httpAddress || typeof httpAddress === 'string') { + reject(new Error('HTTP server failed to bind')); + return; + } + const httpPort = httpAddress.port; + + const handle: MockServerHandle = { + httpPort, + wsPort, + wsUrl: `ws://localhost:${wsPort}`, + + async sendMessage(text, opts) { + const senderId = opts?.senderId || 'test-user'; + const senderName = opts?.senderName || 'Test User'; + const chatId = opts?.chatId || `dm-${senderId}`; + + if (!pluginWs || pluginWs.readyState !== WebSocket.OPEN) { + throw new Error('Plugin channel not connected'); + } + + const messageId = crypto.randomUUID(); + pluginWs.send( + JSON.stringify({ + type: 'inbound', + messageId, + senderId, + senderName, + chatId, + text, + }), + ); + + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + pendingRequests.delete(messageId); + reject(new Error('Timeout waiting for agent response')); + }, responseTimeoutMs); + pendingRequests.set(messageId, { resolve, reject, timer }); + }); + }, + + async waitForConnection(timeoutMs = 10_000) { + if (pluginWs && pluginWs.readyState === WebSocket.OPEN) return; + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + reject(new Error('Timeout waiting for channel connection')); + }, timeoutMs); + connectionResolver = () => { + clearTimeout(timer); + resolve(); + }; + }); + }, + + async close() { + for (const [, pending] of pendingRequests) { + clearTimeout(pending.timer); + pending.reject(new Error('Server shutting down')); + } + pendingRequests.clear(); + + await new Promise((r) => { + wss.close(() => r()); + }); + await new Promise((r) => { + httpServer.close(() => r()); + }); + }, + }; + + resolve(handle); + }); + }); +} diff --git a/packages/channels/mock/src/protocol.ts b/packages/channels/mock/src/protocol.ts new file mode 100644 index 000000000..49c2e0fba --- /dev/null +++ b/packages/channels/mock/src/protocol.ts @@ -0,0 +1,23 @@ +/** + * Shared protocol types for mock channel WebSocket communication. + */ + +/** Server → Plugin Channel (WebSocket) */ +export interface InboundMessage { + type: 'inbound'; + messageId: string; + senderId: string; + senderName: string; + chatId: string; + text: string; +} + +/** Plugin Channel → Server (WebSocket) */ +export interface OutboundMessage { + type: 'outbound'; + messageId: string; + chatId: string; + text: string; +} + +export type WsMessage = InboundMessage | OutboundMessage; diff --git a/packages/channels/mock/tsconfig.json b/packages/channels/mock/tsconfig.json new file mode 100644 index 000000000..8daf59408 --- /dev/null +++ b/packages/channels/mock/tsconfig.json @@ -0,0 +1,10 @@ +{ + "extends": "../../../tsconfig.json", + "compilerOptions": { + "outDir": "dist", + "rootDir": "src" + }, + "include": ["src/**/*.ts"], + "exclude": ["node_modules", "dist"], + "references": [{ "path": "../base" }] +}