From 3eedc43238b49a677c18779bfa4e00763847bb52 Mon Sep 17 00:00:00 2001 From: tanzhenxin Date: Tue, 24 Mar 2026 04:49:01 +0000 Subject: [PATCH] feat(channels): add Telegram channel integration with ACP bridge Implements the channels infrastructure for connecting external messaging platforms to Qwen Code via ACP. Phase 1 supports plain text round-trip: Telegram user sends message -> AcpBridge -> qwen-code --acp -> response back to Telegram. New packages: - @qwen-code/channel-base: AcpBridge, SessionRouter, SenderGate, ChannelBase - @qwen-code/channel-telegram: TelegramAdapter using telegraf CLI: `qwen channel start ` reads from settings.json channels config, spawns ACP agent, connects to Telegram via polling. --- esbuild.config.js | 4 + package-lock.json | 121 ++++++++++- package.json | 4 +- packages/channels/base/package.json | 17 ++ packages/channels/base/src/AcpBridge.ts | 188 ++++++++++++++++++ packages/channels/base/src/ChannelBase.ts | 57 ++++++ packages/channels/base/src/SenderGate.ts | 23 +++ packages/channels/base/src/SessionRouter.ts | 37 ++++ packages/channels/base/src/index.ts | 13 ++ packages/channels/base/src/types.ts | 30 +++ packages/channels/telegram/package.json | 18 ++ .../channels/telegram/src/TelegramAdapter.ts | 85 ++++++++ packages/channels/telegram/src/index.ts | 1 + packages/cli/package.json | 2 + packages/cli/src/commands/channel.ts | 13 ++ packages/cli/src/commands/channel/start.ts | 107 ++++++++++ packages/cli/src/config/config.ts | 8 +- packages/cli/src/config/settingsSchema.ts | 12 ++ 18 files changed, 736 insertions(+), 4 deletions(-) create mode 100644 packages/channels/base/package.json create mode 100644 packages/channels/base/src/AcpBridge.ts create mode 100644 packages/channels/base/src/ChannelBase.ts create mode 100644 packages/channels/base/src/SenderGate.ts create mode 100644 packages/channels/base/src/SessionRouter.ts create mode 100644 packages/channels/base/src/index.ts create mode 100644 packages/channels/base/src/types.ts create mode 100644 packages/channels/telegram/package.json create mode 100644 packages/channels/telegram/src/TelegramAdapter.ts create mode 100644 packages/channels/telegram/src/index.ts create mode 100644 packages/cli/src/commands/channel.ts create mode 100644 packages/cli/src/commands/channel/start.ts diff --git a/esbuild.config.js b/esbuild.config.js index 2b532b44e..c49eba358 100644 --- a/esbuild.config.js +++ b/esbuild.config.js @@ -62,6 +62,10 @@ esbuild __dirname, 'packages/cli/src/patches/is-in-ci.ts', ), + '@qwen-code/web-templates': path.resolve( + __dirname, + 'packages/web-templates/src/index.ts', + ), }, define: { 'process.env.CLI_VERSION': JSON.stringify(pkg.version), diff --git a/package-lock.json b/package-lock.json index 4bf43c5ee..2a0fedcbf 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8,7 +8,9 @@ "name": "@qwen-code/qwen-code", "version": "0.13.0", "workspaces": [ - "packages/*" + "packages/*", + "packages/channels/base", + "packages/channels/telegram" ], "dependencies": { "@testing-library/dom": "^10.4.1", @@ -2990,6 +2992,14 @@ "integrity": "sha512-Vvn3zZrhQZkkBE8LSuW3em98c0FwgO4nxzv6OdSxPKJIEKY2bGbHn+mhGIPerzI4twdxaP8/0+06HBpwf345Lw==", "license": "BSD-3-Clause" }, + "node_modules/@qwen-code/channel-base": { + "resolved": "packages/channels/base", + "link": true + }, + "node_modules/@qwen-code/channel-telegram": { + "resolved": "packages/channels/telegram", + "link": true + }, "node_modules/@qwen-code/qwen-code": { "resolved": "packages/cli", "link": true @@ -3961,6 +3971,12 @@ "node": ">= 10.16.0 < 11 || >= 11.8.0 < 12 || >= 12.0.0" } }, + "node_modules/@telegraf/types": { + "version": "7.1.0", + "resolved": "https://registry.npmjs.org/@telegraf/types/-/types-7.1.0.tgz", + "integrity": "sha512-kGevOIbpMcIlCDeorKGpwZmdH7kHbqlk/Yj6dEpJMKEQw5lk0KVQY0OLXaCswy8GqlIVLd5625OB+rAntP9xVw==", + "license": "MIT" + }, "node_modules/@testing-library/dom": { "version": "10.4.1", "resolved": "https://registry.npmjs.org/@testing-library/dom/-/dom-10.4.1.tgz", @@ -6739,6 +6755,22 @@ "ieee754": "^1.1.13" } }, + "node_modules/buffer-alloc": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/buffer-alloc/-/buffer-alloc-1.2.0.tgz", + "integrity": "sha512-CFsHQgjtW1UChdXgbyJGtnm+O/uLQeZdtbDo8mfUgYXCHSM1wgrVxXm6bSyrUuErEb+4sYVGCzASBRot7zyrow==", + "license": "MIT", + "dependencies": { + "buffer-alloc-unsafe": "^1.1.0", + "buffer-fill": "^1.0.0" + } + }, + "node_modules/buffer-alloc-unsafe": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/buffer-alloc-unsafe/-/buffer-alloc-unsafe-1.1.0.tgz", + "integrity": "sha512-TEM2iMIEQdJ2yjPJoSIsldnleVaAk1oW3DBVUykyOLsEsFmEc9kn+SFFPz+gl54KQNxlDnAwCXosOS9Okx2xAg==", + "license": "MIT" + }, "node_modules/buffer-crc32": { "version": "0.2.13", "resolved": "https://registry.npmjs.org/buffer-crc32/-/buffer-crc32-0.2.13.tgz", @@ -6754,6 +6786,12 @@ "integrity": "sha512-zRpUiDwd/xk6ADqPMATG8vc9VPrkck7T07OIx0gnjmJAnHnTVXNQG3vfvWNuiZIkwu9KrKdA1iJKfsfTVxE6NA==", "license": "BSD-3-Clause" }, + "node_modules/buffer-fill": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/buffer-fill/-/buffer-fill-1.0.0.tgz", + "integrity": "sha512-T7zexNBwiiaCOGDg9xNX9PBmjrubblRkENuptryuI64URkXDFum9il/JGL8Lm8wYfAXpredVXXZz7eMHilimiQ==", + "license": "MIT" + }, "node_modules/bundle-name": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/bundle-name/-/bundle-name-4.1.0.tgz", @@ -12954,6 +12992,15 @@ "integrity": "sha512-EGWKgxALGMgzvxYF1UyGTy0HXX/2vHLkw6+NvDKW2jypWbHpjQuj4UMcqQWXHERJhVGKikolT06G3bcKe4fi7w==", "license": "MIT" }, + "node_modules/mri": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/mri/-/mri-1.2.0.tgz", + "integrity": "sha512-tzzskb3bG8LvYGFF/mDTpq3jpI6Q9wc3LEmBaghu+DdCssd1FakN7Bc0hVNmEyGq1bq3RgfkCb3cmQLpNPOroA==", + "license": "MIT", + "engines": { + "node": ">=4" + } + }, "node_modules/mrmime": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/mrmime/-/mrmime-2.0.1.tgz", @@ -13889,6 +13936,15 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/p-timeout": { + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-4.1.0.tgz", + "integrity": "sha512-+/wmHtzJuWii1sXn3HCuH/FTwGhrp4tmJTxSKJbfS+vkipci6osxXM5mY0jUiRzWKMTgUT8l7HFbeSwZAynqHw==", + "license": "MIT", + "engines": { + "node": ">=10" + } + }, "node_modules/package-json": { "version": "10.0.1", "resolved": "https://registry.npmjs.org/package-json/-/package-json-10.0.1.tgz", @@ -15609,6 +15665,15 @@ ], "license": "MIT" }, + "node_modules/safe-compare": { + "version": "1.1.4", + "resolved": "https://registry.npmjs.org/safe-compare/-/safe-compare-1.1.4.tgz", + "integrity": "sha512-b9wZ986HHCo/HbKrRpBJb2kqXMK9CEWIE1egeEvZsYn69ay3kdfl9nG3RyOcR+jInTDf7a86WQ1d4VJX7goSSQ==", + "license": "MIT", + "dependencies": { + "buffer-alloc": "^1.2.0" + } + }, "node_modules/safe-push-apply": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/safe-push-apply/-/safe-push-apply-1.0.0.tgz", @@ -15650,6 +15715,15 @@ "integrity": "sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg==", "license": "MIT" }, + "node_modules/sandwich-stream": { + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/sandwich-stream/-/sandwich-stream-2.0.2.tgz", + "integrity": "sha512-jLYV0DORrzY3xaz/S9ydJL6Iz7essZeAfnAavsJ+zsJGZ1MOnsS52yRjU3uF3pJa/lla7+wisp//fxOwOH8SKQ==", + "license": "Apache-2.0", + "engines": { + "node": ">= 0.10" + } + }, "node_modules/sax": { "version": "1.4.1", "resolved": "https://registry.npmjs.org/sax/-/sax-1.4.1.tgz", @@ -16933,6 +17007,28 @@ "node": ">=6" } }, + "node_modules/telegraf": { + "version": "4.16.3", + "resolved": "https://registry.npmjs.org/telegraf/-/telegraf-4.16.3.tgz", + "integrity": "sha512-yjEu2NwkHlXu0OARWoNhJlIjX09dRktiMQFsM678BAH/PEPVwctzL67+tvXqLCRQQvm3SDtki2saGO9hLlz68w==", + "license": "MIT", + "dependencies": { + "@telegraf/types": "^7.1.0", + "abort-controller": "^3.0.0", + "debug": "^4.3.4", + "mri": "^1.2.0", + "node-fetch": "^2.7.0", + "p-timeout": "^4.1.0", + "safe-compare": "^1.1.4", + "sandwich-stream": "^2.0.2" + }, + "bin": { + "telegraf": "lib/cli.mjs" + }, + "engines": { + "node": "^12.20.0 || >=14.13.1" + } + }, "node_modules/terminal-link": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/terminal-link/-/terminal-link-4.0.0.tgz", @@ -18798,6 +18894,27 @@ "url": "https://github.com/sponsors/colinhacks" } }, + "packages/channels/base": { + "name": "@qwen-code/channel-base", + "version": "0.1.0", + "dependencies": { + "@agentclientprotocol/sdk": "^0.14.1" + }, + "devDependencies": { + "typescript": "^5.0.0" + } + }, + "packages/channels/telegram": { + "name": "@qwen-code/channel-telegram", + "version": "0.1.0", + "dependencies": { + "@qwen-code/channel-base": "file:../base", + "telegraf": "^4.16.0" + }, + "devDependencies": { + "typescript": "^5.0.0" + } + }, "packages/cli": { "name": "@qwen-code/qwen-code", "version": "0.13.0", @@ -18806,6 +18923,8 @@ "@google/genai": "1.30.0", "@iarna/toml": "^2.2.5", "@modelcontextprotocol/sdk": "^1.25.1", + "@qwen-code/channel-base": "file:../channels/base", + "@qwen-code/channel-telegram": "file:../channels/telegram", "@qwen-code/qwen-code-core": "file:../core", "@qwen-code/web-templates": "file:../web-templates", "@types/update-notifier": "^6.0.8", diff --git a/package.json b/package.json index c1dfa2448..5017b94bd 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,9 @@ }, "type": "module", "workspaces": [ - "packages/*" + "packages/*", + "packages/channels/base", + "packages/channels/telegram" ], "repository": { "type": "git", diff --git a/packages/channels/base/package.json b/packages/channels/base/package.json new file mode 100644 index 000000000..7513c43b4 --- /dev/null +++ b/packages/channels/base/package.json @@ -0,0 +1,17 @@ +{ + "name": "@qwen-code/channel-base", + "version": "0.1.0", + "description": "Base channel infrastructure for Qwen Code", + "type": "module", + "main": "src/index.ts", + "types": "src/index.ts", + "exports": { + ".": "./src/index.ts" + }, + "dependencies": { + "@agentclientprotocol/sdk": "^0.14.1" + }, + "devDependencies": { + "typescript": "^5.0.0" + } +} diff --git a/packages/channels/base/src/AcpBridge.ts b/packages/channels/base/src/AcpBridge.ts new file mode 100644 index 000000000..2860f1a7f --- /dev/null +++ b/packages/channels/base/src/AcpBridge.ts @@ -0,0 +1,188 @@ +import { spawn } from 'node:child_process'; +import type { ChildProcess } from 'node:child_process'; +import { Readable, Writable } from 'node:stream'; +import { EventEmitter } from 'node:events'; +import { + ClientSideConnection, + ndJsonStream, + PROTOCOL_VERSION, +} from '@agentclientprotocol/sdk'; +import type { + Client, + SessionNotification, + RequestPermissionRequest, + RequestPermissionResponse, +} from '@agentclientprotocol/sdk'; + +export interface AcpBridgeOptions { + cliEntryPath: string; + cwd: string; +} + +export class AcpBridge extends EventEmitter { + private child: ChildProcess | null = null; + private connection: ClientSideConnection | null = null; + private options: AcpBridgeOptions; + + constructor(options: AcpBridgeOptions) { + super(); + this.options = options; + } + + async start(): Promise { + const { cliEntryPath, cwd } = this.options; + + this.child = spawn(process.execPath, [cliEntryPath, '--acp'], { + cwd, + stdio: ['pipe', 'pipe', 'pipe'], + env: { ...process.env }, + shell: false, + }); + + this.child.stderr?.on('data', (data: Buffer) => { + const msg = data.toString().trim(); + if (msg) { + console.error('[AcpBridge]', msg); + } + }); + + this.child.on('exit', (code, signal) => { + console.error( + `[AcpBridge] Process exited (code=${code}, signal=${signal})`, + ); + this.connection = null; + this.child = null; + this.emit('disconnected', code, signal); + }); + + // Give the process a moment to start + await new Promise((resolve) => setTimeout(resolve, 1000)); + + if (!this.child || this.child.killed) { + throw new Error('ACP process failed to start'); + } + + const stdout = Readable.toWeb( + this.child.stdout!, + ) as ReadableStream; + const stdin = Writable.toWeb(this.child.stdin!) as WritableStream; + const stream = ndJsonStream(stdin, stdout); + + this.connection = new ClientSideConnection( + (): Client => ({ + sessionUpdate: (params: SessionNotification): Promise => { + const update = (params as unknown as Record) + .update as Record | undefined; + console.log( + '[AcpBridge] sessionUpdate:', + update?.sessionUpdate, + update?.content + ? JSON.stringify(update.content).substring(0, 200) + : '', + ); + this.emit('sessionUpdate', params); + return Promise.resolve(); + }, + + requestPermission: async ( + params: RequestPermissionRequest, + ): Promise => { + // Phase 1: auto-approve everything so plain text works + const options = Array.isArray(params.options) ? params.options : []; + const optionId = + options.find((o) => o.optionId === 'proceed_once')?.optionId || + options[0]?.optionId || + 'proceed_once'; + console.log( + '[AcpBridge] Permission request auto-approved:', + optionId, + params.toolCall?.name, + ); + return { outcome: { outcome: 'selected', optionId } }; + }, + + extNotification: async (): Promise => {}, + }), + stream, + ); + + await this.connection.initialize({ + protocolVersion: PROTOCOL_VERSION, + clientCapabilities: {}, + }); + + console.log('[AcpBridge] Connected and initialized'); + } + + async newSession(cwd: string): Promise { + const conn = this.ensureConnection(); + const response = await conn.newSession({ cwd, mcpServers: [] }); + const sessionId = response.sessionId; + console.log('[AcpBridge] New session:', sessionId); + return sessionId; + } + + async prompt(sessionId: string, text: string): Promise { + const conn = this.ensureConnection(); + + // Collect text from sessionUpdate events during this prompt + // SessionNotification shape: { sessionId, update: { sessionUpdate, content: { type, text } } } + const chunks: string[] = []; + const onUpdate = (params: SessionNotification) => { + if (params.sessionId !== sessionId) return; + const update = (params as unknown as Record).update as + | Record + | undefined; + if (!update) return; + if (update.sessionUpdate !== 'agent_message_chunk') return; + const content = update.content as + | { type?: string; text?: string } + | undefined; + if (content?.type === 'text' && content.text) { + chunks.push(content.text); + } + }; + this.on('sessionUpdate', onUpdate); + + try { + console.log('[AcpBridge] Sending prompt...'); + const result = await conn.prompt({ + sessionId, + prompt: [{ type: 'text', text }], + }); + console.log( + '[AcpBridge] Prompt resolved, stopReason:', + result?.stopReason, + ); + } finally { + this.off('sessionUpdate', onUpdate); + } + + const response = chunks.join(''); + console.log( + `[AcpBridge] Collected ${chunks.length} chunks, ${response.length} chars`, + ); + return response; + } + + stop(): void { + if (this.child) { + this.child.kill(); + this.child = null; + } + this.connection = null; + } + + get isConnected(): boolean { + return ( + this.child !== null && !this.child.killed && this.child.exitCode === null + ); + } + + private ensureConnection(): ClientSideConnection { + if (!this.connection || !this.isConnected) { + throw new Error('Not connected to ACP agent'); + } + return this.connection; + } +} diff --git a/packages/channels/base/src/ChannelBase.ts b/packages/channels/base/src/ChannelBase.ts new file mode 100644 index 000000000..2ffdb4c5f --- /dev/null +++ b/packages/channels/base/src/ChannelBase.ts @@ -0,0 +1,57 @@ +import type { ChannelConfig, Envelope } from './types.js'; +import { SenderGate } from './SenderGate.js'; +import { SessionRouter } from './SessionRouter.js'; +import type { AcpBridge } from './AcpBridge.js'; + +export abstract class ChannelBase { + protected config: ChannelConfig; + protected bridge: AcpBridge; + protected gate: SenderGate; + protected router: SessionRouter; + protected name: string; + + constructor(name: string, config: ChannelConfig, bridge: AcpBridge) { + this.name = name; + this.config = config; + this.bridge = bridge; + this.gate = new SenderGate(config.senderPolicy, config.allowedUsers); + this.router = new SessionRouter(bridge, config.cwd); + } + + abstract connect(): Promise; + abstract sendMessage(chatId: string, text: string): Promise; + abstract disconnect(): void; + + async handleInbound(envelope: Envelope): Promise { + if (!this.gate.check(envelope.senderId)) { + console.log( + `[Channel:${this.name}] Sender ${envelope.senderId} denied by gate`, + ); + return; + } + + const sessionId = await this.router.resolve( + this.name, + envelope.senderId, + envelope.chatId, + envelope.threadId, + ); + + console.log( + `[Channel:${this.name}] Prompting session ${sessionId}: "${envelope.text.substring(0, 80)}"`, + ); + + console.log(`[Channel:${this.name}] Waiting for prompt response...`); + const response = await this.bridge.prompt(sessionId, envelope.text); + console.log( + `[Channel:${this.name}] Got response (${response.length} chars): "${response.substring(0, 100)}"`, + ); + + if (response) { + await this.sendMessage(envelope.chatId, response); + console.log( + `[Channel:${this.name}] Message sent to chat ${envelope.chatId}`, + ); + } + } +} diff --git a/packages/channels/base/src/SenderGate.ts b/packages/channels/base/src/SenderGate.ts new file mode 100644 index 000000000..b6a338c86 --- /dev/null +++ b/packages/channels/base/src/SenderGate.ts @@ -0,0 +1,23 @@ +import type { SenderPolicy } from './types.js'; + +export class SenderGate { + private policy: SenderPolicy; + private allowedUsers: Set; + + constructor(policy: SenderPolicy, allowedUsers: string[] = []) { + this.policy = policy; + this.allowedUsers = new Set(allowedUsers); + } + + check(senderId: string): boolean { + switch (this.policy) { + case 'open': + return true; + case 'allowlist': + return this.allowedUsers.has(senderId); + case 'pairing': + // Pairing will be implemented later; for now, treat as allowlist + return this.allowedUsers.has(senderId); + } + } +} diff --git a/packages/channels/base/src/SessionRouter.ts b/packages/channels/base/src/SessionRouter.ts new file mode 100644 index 000000000..31e55ed24 --- /dev/null +++ b/packages/channels/base/src/SessionRouter.ts @@ -0,0 +1,37 @@ +import type { SessionTarget } from './types.js'; +import type { AcpBridge } from './AcpBridge.js'; + +export class SessionRouter { + private toSession: Map = new Map(); // routing key → session ID + private toTarget: Map = new Map(); // session ID → target + + private bridge: AcpBridge; + private cwd: string; + + constructor(bridge: AcpBridge, cwd: string) { + this.bridge = bridge; + this.cwd = cwd; + } + + async resolve( + channelName: string, + senderId: string, + chatId: string, + threadId?: string, + ): Promise { + const key = `${channelName}:${senderId}`; + const existing = this.toSession.get(key); + if (existing) { + return existing; + } + + const sessionId = await this.bridge.newSession(this.cwd); + this.toSession.set(key, sessionId); + this.toTarget.set(sessionId, { channelName, senderId, chatId, threadId }); + return sessionId; + } + + getTarget(sessionId: string): SessionTarget | undefined { + return this.toTarget.get(sessionId); + } +} diff --git a/packages/channels/base/src/index.ts b/packages/channels/base/src/index.ts new file mode 100644 index 000000000..92f278d0d --- /dev/null +++ b/packages/channels/base/src/index.ts @@ -0,0 +1,13 @@ +export { AcpBridge } from './AcpBridge.js'; +export type { AcpBridgeOptions } from './AcpBridge.js'; +export { ChannelBase } from './ChannelBase.js'; +export { SenderGate } from './SenderGate.js'; +export { SessionRouter } from './SessionRouter.js'; +export type { + ChannelConfig, + ChannelType, + Envelope, + SenderPolicy, + SessionScope, + SessionTarget, +} from './types.js'; diff --git a/packages/channels/base/src/types.ts b/packages/channels/base/src/types.ts new file mode 100644 index 000000000..5d57ef3ce --- /dev/null +++ b/packages/channels/base/src/types.ts @@ -0,0 +1,30 @@ +export type SenderPolicy = 'allowlist' | 'pairing' | 'open'; +export type SessionScope = 'user' | 'thread' | 'single'; +export type ChannelType = 'telegram' | 'discord' | 'webhook'; + +export interface ChannelConfig { + type: ChannelType; + token: string; + senderPolicy: SenderPolicy; + allowedUsers: string[]; + sessionScope: SessionScope; + cwd: string; + approvalMode?: string; + instructions?: string; +} + +export interface Envelope { + channelName: string; + senderId: string; + senderName: string; + chatId: string; + text: string; + threadId?: string; +} + +export interface SessionTarget { + channelName: string; + senderId: string; + chatId: string; + threadId?: string; +} diff --git a/packages/channels/telegram/package.json b/packages/channels/telegram/package.json new file mode 100644 index 000000000..76bed3298 --- /dev/null +++ b/packages/channels/telegram/package.json @@ -0,0 +1,18 @@ +{ + "name": "@qwen-code/channel-telegram", + "version": "0.1.0", + "description": "Telegram channel adapter for Qwen Code", + "type": "module", + "main": "src/index.ts", + "types": "src/index.ts", + "exports": { + ".": "./src/index.ts" + }, + "dependencies": { + "@qwen-code/channel-base": "file:../base", + "telegraf": "^4.16.0" + }, + "devDependencies": { + "typescript": "^5.0.0" + } +} diff --git a/packages/channels/telegram/src/TelegramAdapter.ts b/packages/channels/telegram/src/TelegramAdapter.ts new file mode 100644 index 000000000..5bb589759 --- /dev/null +++ b/packages/channels/telegram/src/TelegramAdapter.ts @@ -0,0 +1,85 @@ +import { Telegraf } from 'telegraf'; +import { ChannelBase } from '@qwen-code/channel-base'; +import type { ChannelConfig, Envelope } from '@qwen-code/channel-base'; +import type { AcpBridge } from '@qwen-code/channel-base'; + +const TELEGRAM_MSG_LIMIT = 4096; + +export class TelegramChannel extends ChannelBase { + private bot: Telegraf; + + constructor(name: string, config: ChannelConfig, bridge: AcpBridge) { + super(name, config, bridge); + this.bot = new Telegraf(config.token); + } + + async connect(): Promise { + this.bot.on('text', async (ctx) => { + const msg = ctx.message; + const envelope: Envelope = { + channelName: this.name, + senderId: String(msg.from.id), + senderName: + msg.from.first_name + + (msg.from.last_name ? ` ${msg.from.last_name}` : ''), + chatId: String(msg.chat.id), + text: msg.text, + }; + + try { + await this.handleInbound(envelope); + } catch (err) { + console.error(`[Telegram:${this.name}] Error handling message:`, err); + try { + await ctx.reply( + 'Sorry, something went wrong processing your message.', + ); + } catch { + // ignore send failure + } + } + }); + + console.log(`[Telegram:${this.name}] Launching bot (polling)...`); + this.bot.launch({ dropPendingUpdates: true }).catch((err) => { + console.error(`[Telegram:${this.name}] Bot launch error:`, err); + }); + console.log(`[Telegram:${this.name}] Bot started (polling)`); + + process.once('SIGINT', () => this.bot.stop('SIGINT')); + process.once('SIGTERM', () => this.bot.stop('SIGTERM')); + } + + async sendMessage(chatId: string, text: string): Promise { + // Split long messages at Telegram's 4096 char limit + const chunks = splitMessage(text, TELEGRAM_MSG_LIMIT); + for (const chunk of chunks) { + await this.bot.telegram.sendMessage(chatId, chunk); + } + } + + disconnect(): void { + this.bot.stop(); + } +} + +function splitMessage(text: string, limit: number): string[] { + if (text.length <= limit) return [text]; + + const chunks: string[] = []; + let remaining = text; + while (remaining.length > 0) { + if (remaining.length <= limit) { + chunks.push(remaining); + break; + } + // Try to split at last newline within limit + let splitAt = remaining.lastIndexOf('\n', limit); + if (splitAt <= 0) { + splitAt = limit; + } + chunks.push(remaining.substring(0, splitAt)); + remaining = remaining.substring(splitAt).replace(/^\n/, ''); + } + return chunks; +} diff --git a/packages/channels/telegram/src/index.ts b/packages/channels/telegram/src/index.ts new file mode 100644 index 000000000..976c4ab0d --- /dev/null +++ b/packages/channels/telegram/src/index.ts @@ -0,0 +1 @@ +export { TelegramChannel } from './TelegramAdapter.js'; diff --git a/packages/cli/package.json b/packages/cli/package.json index fff36c603..6f4023813 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -40,6 +40,8 @@ "@google/genai": "1.30.0", "@iarna/toml": "^2.2.5", "@modelcontextprotocol/sdk": "^1.25.1", + "@qwen-code/channel-base": "file:../channels/base", + "@qwen-code/channel-telegram": "file:../channels/telegram", "@qwen-code/qwen-code-core": "file:../core", "@qwen-code/web-templates": "file:../web-templates", "@types/update-notifier": "^6.0.8", diff --git a/packages/cli/src/commands/channel.ts b/packages/cli/src/commands/channel.ts new file mode 100644 index 000000000..190923d20 --- /dev/null +++ b/packages/cli/src/commands/channel.ts @@ -0,0 +1,13 @@ +import type { CommandModule, Argv } from 'yargs'; +import { startCommand } from './channel/start.js'; + +export const channelCommand: CommandModule = { + command: 'channel', + describe: 'Manage messaging channels (Telegram, Discord, etc.)', + builder: (yargs: Argv) => + yargs + .command(startCommand) + .demandCommand(1, 'You need at least one command before continuing.') + .version(false), + handler: () => {}, +}; diff --git a/packages/cli/src/commands/channel/start.ts b/packages/cli/src/commands/channel/start.ts new file mode 100644 index 000000000..d1ac96a35 --- /dev/null +++ b/packages/cli/src/commands/channel/start.ts @@ -0,0 +1,107 @@ +import type { CommandModule } from 'yargs'; +import { loadSettings } from '../../config/settings.js'; +import { writeStderrLine, writeStdoutLine } from '../../utils/stdioHelpers.js'; +import { AcpBridge } from '@qwen-code/channel-base'; +import type { ChannelConfig } from '@qwen-code/channel-base'; +import { TelegramChannel } from '@qwen-code/channel-telegram'; +import * as path from 'node:path'; + +function resolveEnvVars(value: string): string { + if (value.startsWith('$')) { + const envName = value.substring(1); + const envValue = process.env[envName]; + if (!envValue) { + throw new Error( + `Environment variable ${envName} is not set (referenced as ${value})`, + ); + } + return envValue; + } + return value; +} + +function findCliEntryPath(): string { + // When running from bundled dist/cli.js, use that same file for --acp + const mainModule = process.argv[1]; + if (mainModule) { + return path.resolve(mainModule); + } + throw new Error('Cannot determine CLI entry path'); +} + +export const startCommand: CommandModule = { + command: 'start ', + describe: 'Start a messaging channel', + builder: (yargs) => + yargs.positional('name', { + type: 'string', + describe: 'Name of the channel (as configured in settings.json)', + demandOption: true, + }), + handler: async (argv) => { + const { name } = argv; + + const settings = loadSettings(process.cwd()); + const channels = ( + settings.merged as unknown as { channels?: Record } + ).channels; + + if (!channels || !channels[name]) { + writeStderrLine( + `Error: Channel "${name}" not found in settings. Add it to channels.${name} in settings.json.`, + ); + process.exit(1); + } + + const rawConfig = channels[name] as Record; + const config: ChannelConfig = { + type: rawConfig['type'] as ChannelConfig['type'], + token: resolveEnvVars(rawConfig['token'] as string), + senderPolicy: + (rawConfig['senderPolicy'] as ChannelConfig['senderPolicy']) || + 'allowlist', + allowedUsers: (rawConfig['allowedUsers'] as string[]) || [], + sessionScope: + (rawConfig['sessionScope'] as ChannelConfig['sessionScope']) || 'user', + cwd: (rawConfig['cwd'] as string) || process.cwd(), + approvalMode: rawConfig['approvalMode'] as string | undefined, + instructions: rawConfig['instructions'] as string | undefined, + }; + + if (config.type !== 'telegram') { + writeStderrLine( + `Error: Channel type "${config.type}" is not yet supported. Only "telegram" is available.`, + ); + process.exit(1); + } + + const cliEntryPath = findCliEntryPath(); + writeStdoutLine(`[Channel] CLI entry: ${cliEntryPath}`); + writeStdoutLine(`[Channel] Starting "${name}" (type=${config.type})...`); + + const bridge = new AcpBridge({ cliEntryPath, cwd: config.cwd }); + await bridge.start(); + + const channel = new TelegramChannel(name, config, bridge); + await channel.connect(); + + writeStdoutLine(`[Channel] "${name}" is running. Press Ctrl+C to stop.`); + + // Keep process alive until interrupted + await new Promise((resolve) => { + process.on('SIGINT', () => { + writeStdoutLine('\n[Channel] Shutting down...'); + channel.disconnect(); + bridge.stop(); + resolve(); + }); + process.on('SIGTERM', () => { + channel.disconnect(); + bridge.stop(); + resolve(); + }); + }); + + process.exit(0); + }, +}; diff --git a/packages/cli/src/config/config.ts b/packages/cli/src/config/config.ts index 78ef3dde0..5b5436006 100755 --- a/packages/cli/src/config/config.ts +++ b/packages/cli/src/config/config.ts @@ -51,6 +51,7 @@ import { getCliVersion } from '../utils/version.js'; import { loadSandboxConfig } from './sandboxConfig.js'; import { appEvents } from '../utils/events.js'; import { mcpCommand } from '../commands/mcp.js'; +import { channelCommand } from '../commands/channel.js'; // UUID v4 regex pattern for validation const SESSION_ID_REGEX = @@ -590,7 +591,9 @@ export async function parseArguments(): Promise { // Register Auth subcommands .command(authCommand) // Register Hooks subcommands - .command(hooksCommand); + .command(hooksCommand) + // Register Channel subcommands + .command(channelCommand); yargsInstance .version(await getCliVersion()) // This will enable the --version flag based on package.json @@ -611,7 +614,8 @@ export async function parseArguments(): Promise { result._.length > 0 && (result._[0] === 'mcp' || result._[0] === 'extensions' || - result._[0] === 'hooks') + result._[0] === 'hooks' || + result._[0] === 'channel') ) { // MCP/Extensions/Hooks commands handle their own execution and process exit process.exit(0); diff --git a/packages/cli/src/config/settingsSchema.ts b/packages/cli/src/config/settingsSchema.ts index 379ea2168..b8b343685 100644 --- a/packages/cli/src/config/settingsSchema.ts +++ b/packages/cli/src/config/settingsSchema.ts @@ -189,6 +189,18 @@ const SETTINGS_SCHEMA = { mergeStrategy: MergeStrategy.SHALLOW_MERGE, }, + // Channels configuration (Telegram, Discord, etc.) + channels: { + type: 'object', + label: 'Channels', + category: 'Advanced', + requiresRestart: true, + default: {} as Record>, + description: 'Configuration for messaging channels.', + showInDialog: false, + mergeStrategy: MergeStrategy.SHALLOW_MERGE, + }, + // Model providers configuration grouped by authType modelProviders: { type: 'object',