feat(channels): add Telegram channel integration with ACP bridge

Implements the channels infrastructure for connecting external messaging
platforms to Qwen Code via ACP. Phase 1 supports plain text round-trip:
Telegram user sends message -> AcpBridge -> qwen-code --acp -> response
back to Telegram.

New packages:
- @qwen-code/channel-base: AcpBridge, SessionRouter, SenderGate, ChannelBase
- @qwen-code/channel-telegram: TelegramAdapter using telegraf

CLI: `qwen channel start <name>` reads from settings.json channels config,
spawns ACP agent, connects to Telegram via polling.
This commit is contained in:
tanzhenxin 2026-03-24 04:49:01 +00:00
parent aebe889b31
commit 3eedc43238
18 changed files with 736 additions and 4 deletions

View file

@ -0,0 +1,17 @@
{
"name": "@qwen-code/channel-base",
"version": "0.1.0",
"description": "Base channel infrastructure for Qwen Code",
"type": "module",
"main": "src/index.ts",
"types": "src/index.ts",
"exports": {
".": "./src/index.ts"
},
"dependencies": {
"@agentclientprotocol/sdk": "^0.14.1"
},
"devDependencies": {
"typescript": "^5.0.0"
}
}

View file

@ -0,0 +1,188 @@
import { spawn } from 'node:child_process';
import type { ChildProcess } from 'node:child_process';
import { Readable, Writable } from 'node:stream';
import { EventEmitter } from 'node:events';
import {
ClientSideConnection,
ndJsonStream,
PROTOCOL_VERSION,
} from '@agentclientprotocol/sdk';
import type {
Client,
SessionNotification,
RequestPermissionRequest,
RequestPermissionResponse,
} from '@agentclientprotocol/sdk';
export interface AcpBridgeOptions {
cliEntryPath: string;
cwd: string;
}
export class AcpBridge extends EventEmitter {
private child: ChildProcess | null = null;
private connection: ClientSideConnection | null = null;
private options: AcpBridgeOptions;
constructor(options: AcpBridgeOptions) {
super();
this.options = options;
}
async start(): Promise<void> {
const { cliEntryPath, cwd } = this.options;
this.child = spawn(process.execPath, [cliEntryPath, '--acp'], {
cwd,
stdio: ['pipe', 'pipe', 'pipe'],
env: { ...process.env },
shell: false,
});
this.child.stderr?.on('data', (data: Buffer) => {
const msg = data.toString().trim();
if (msg) {
console.error('[AcpBridge]', msg);
}
});
this.child.on('exit', (code, signal) => {
console.error(
`[AcpBridge] Process exited (code=${code}, signal=${signal})`,
);
this.connection = null;
this.child = null;
this.emit('disconnected', code, signal);
});
// Give the process a moment to start
await new Promise((resolve) => setTimeout(resolve, 1000));
if (!this.child || this.child.killed) {
throw new Error('ACP process failed to start');
}
const stdout = Readable.toWeb(
this.child.stdout!,
) as ReadableStream<Uint8Array>;
const stdin = Writable.toWeb(this.child.stdin!) as WritableStream;
const stream = ndJsonStream(stdin, stdout);
this.connection = new ClientSideConnection(
(): Client => ({
sessionUpdate: (params: SessionNotification): Promise<void> => {
const update = (params as unknown as Record<string, unknown>)
.update as Record<string, unknown> | undefined;
console.log(
'[AcpBridge] sessionUpdate:',
update?.sessionUpdate,
update?.content
? JSON.stringify(update.content).substring(0, 200)
: '',
);
this.emit('sessionUpdate', params);
return Promise.resolve();
},
requestPermission: async (
params: RequestPermissionRequest,
): Promise<RequestPermissionResponse> => {
// Phase 1: auto-approve everything so plain text works
const options = Array.isArray(params.options) ? params.options : [];
const optionId =
options.find((o) => o.optionId === 'proceed_once')?.optionId ||
options[0]?.optionId ||
'proceed_once';
console.log(
'[AcpBridge] Permission request auto-approved:',
optionId,
params.toolCall?.name,
);
return { outcome: { outcome: 'selected', optionId } };
},
extNotification: async (): Promise<void> => {},
}),
stream,
);
await this.connection.initialize({
protocolVersion: PROTOCOL_VERSION,
clientCapabilities: {},
});
console.log('[AcpBridge] Connected and initialized');
}
async newSession(cwd: string): Promise<string> {
const conn = this.ensureConnection();
const response = await conn.newSession({ cwd, mcpServers: [] });
const sessionId = response.sessionId;
console.log('[AcpBridge] New session:', sessionId);
return sessionId;
}
async prompt(sessionId: string, text: string): Promise<string> {
const conn = this.ensureConnection();
// Collect text from sessionUpdate events during this prompt
// SessionNotification shape: { sessionId, update: { sessionUpdate, content: { type, text } } }
const chunks: string[] = [];
const onUpdate = (params: SessionNotification) => {
if (params.sessionId !== sessionId) return;
const update = (params as unknown as Record<string, unknown>).update as
| Record<string, unknown>
| undefined;
if (!update) return;
if (update.sessionUpdate !== 'agent_message_chunk') return;
const content = update.content as
| { type?: string; text?: string }
| undefined;
if (content?.type === 'text' && content.text) {
chunks.push(content.text);
}
};
this.on('sessionUpdate', onUpdate);
try {
console.log('[AcpBridge] Sending prompt...');
const result = await conn.prompt({
sessionId,
prompt: [{ type: 'text', text }],
});
console.log(
'[AcpBridge] Prompt resolved, stopReason:',
result?.stopReason,
);
} finally {
this.off('sessionUpdate', onUpdate);
}
const response = chunks.join('');
console.log(
`[AcpBridge] Collected ${chunks.length} chunks, ${response.length} chars`,
);
return response;
}
stop(): void {
if (this.child) {
this.child.kill();
this.child = null;
}
this.connection = null;
}
get isConnected(): boolean {
return (
this.child !== null && !this.child.killed && this.child.exitCode === null
);
}
private ensureConnection(): ClientSideConnection {
if (!this.connection || !this.isConnected) {
throw new Error('Not connected to ACP agent');
}
return this.connection;
}
}

View file

@ -0,0 +1,57 @@
import type { ChannelConfig, Envelope } from './types.js';
import { SenderGate } from './SenderGate.js';
import { SessionRouter } from './SessionRouter.js';
import type { AcpBridge } from './AcpBridge.js';
export abstract class ChannelBase {
protected config: ChannelConfig;
protected bridge: AcpBridge;
protected gate: SenderGate;
protected router: SessionRouter;
protected name: string;
constructor(name: string, config: ChannelConfig, bridge: AcpBridge) {
this.name = name;
this.config = config;
this.bridge = bridge;
this.gate = new SenderGate(config.senderPolicy, config.allowedUsers);
this.router = new SessionRouter(bridge, config.cwd);
}
abstract connect(): Promise<void>;
abstract sendMessage(chatId: string, text: string): Promise<void>;
abstract disconnect(): void;
async handleInbound(envelope: Envelope): Promise<void> {
if (!this.gate.check(envelope.senderId)) {
console.log(
`[Channel:${this.name}] Sender ${envelope.senderId} denied by gate`,
);
return;
}
const sessionId = await this.router.resolve(
this.name,
envelope.senderId,
envelope.chatId,
envelope.threadId,
);
console.log(
`[Channel:${this.name}] Prompting session ${sessionId}: "${envelope.text.substring(0, 80)}"`,
);
console.log(`[Channel:${this.name}] Waiting for prompt response...`);
const response = await this.bridge.prompt(sessionId, envelope.text);
console.log(
`[Channel:${this.name}] Got response (${response.length} chars): "${response.substring(0, 100)}"`,
);
if (response) {
await this.sendMessage(envelope.chatId, response);
console.log(
`[Channel:${this.name}] Message sent to chat ${envelope.chatId}`,
);
}
}
}

View file

@ -0,0 +1,23 @@
import type { SenderPolicy } from './types.js';
export class SenderGate {
private policy: SenderPolicy;
private allowedUsers: Set<string>;
constructor(policy: SenderPolicy, allowedUsers: string[] = []) {
this.policy = policy;
this.allowedUsers = new Set(allowedUsers);
}
check(senderId: string): boolean {
switch (this.policy) {
case 'open':
return true;
case 'allowlist':
return this.allowedUsers.has(senderId);
case 'pairing':
// Pairing will be implemented later; for now, treat as allowlist
return this.allowedUsers.has(senderId);
}
}
}

View file

@ -0,0 +1,37 @@
import type { SessionTarget } from './types.js';
import type { AcpBridge } from './AcpBridge.js';
export class SessionRouter {
private toSession: Map<string, string> = new Map(); // routing key → session ID
private toTarget: Map<string, SessionTarget> = new Map(); // session ID → target
private bridge: AcpBridge;
private cwd: string;
constructor(bridge: AcpBridge, cwd: string) {
this.bridge = bridge;
this.cwd = cwd;
}
async resolve(
channelName: string,
senderId: string,
chatId: string,
threadId?: string,
): Promise<string> {
const key = `${channelName}:${senderId}`;
const existing = this.toSession.get(key);
if (existing) {
return existing;
}
const sessionId = await this.bridge.newSession(this.cwd);
this.toSession.set(key, sessionId);
this.toTarget.set(sessionId, { channelName, senderId, chatId, threadId });
return sessionId;
}
getTarget(sessionId: string): SessionTarget | undefined {
return this.toTarget.get(sessionId);
}
}

View file

@ -0,0 +1,13 @@
export { AcpBridge } from './AcpBridge.js';
export type { AcpBridgeOptions } from './AcpBridge.js';
export { ChannelBase } from './ChannelBase.js';
export { SenderGate } from './SenderGate.js';
export { SessionRouter } from './SessionRouter.js';
export type {
ChannelConfig,
ChannelType,
Envelope,
SenderPolicy,
SessionScope,
SessionTarget,
} from './types.js';

View file

@ -0,0 +1,30 @@
export type SenderPolicy = 'allowlist' | 'pairing' | 'open';
export type SessionScope = 'user' | 'thread' | 'single';
export type ChannelType = 'telegram' | 'discord' | 'webhook';
export interface ChannelConfig {
type: ChannelType;
token: string;
senderPolicy: SenderPolicy;
allowedUsers: string[];
sessionScope: SessionScope;
cwd: string;
approvalMode?: string;
instructions?: string;
}
export interface Envelope {
channelName: string;
senderId: string;
senderName: string;
chatId: string;
text: string;
threadId?: string;
}
export interface SessionTarget {
channelName: string;
senderId: string;
chatId: string;
threadId?: string;
}