Merge pull request #2628 from QwenLM/feat/channels-telegram

feat(channels): add extensible Channels platform with plugin system and Telegram/WeChat/DingTalk channels
This commit is contained in:
tanzhenxin 2026-04-01 16:19:08 +08:00 committed by GitHub
commit b2f04418fa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
97 changed files with 9624 additions and 19 deletions

View file

@ -0,0 +1,36 @@
import type { CommandModule, Argv } from 'yargs';
import { startCommand } from './channel/start.js';
import { stopCommand } from './channel/stop.js';
import { statusCommand } from './channel/status.js';
import {
pairingListCommand,
pairingApproveCommand,
} from './channel/pairing.js';
import { configureWeixinCommand } from './channel/configure.js';
const pairingCommand: CommandModule = {
command: 'pairing',
describe: 'Manage DM pairing requests',
builder: (yargs: Argv) =>
yargs
.command(pairingListCommand)
.command(pairingApproveCommand)
.demandCommand(1, 'You need at least one command before continuing.')
.version(false),
handler: () => {},
};
export const channelCommand: CommandModule = {
command: 'channel',
describe: 'Manage messaging channels (Telegram, Discord, etc.)',
builder: (yargs: Argv) =>
yargs
.command(startCommand)
.command(stopCommand)
.command(statusCommand)
.command(pairingCommand)
.command(configureWeixinCommand)
.demandCommand(1, 'You need at least one command before continuing.')
.version(false),
handler: () => {},
};

View file

@ -0,0 +1,28 @@
import type { ChannelPlugin } from '@qwen-code/channel-base';
import { plugin as telegramPlugin } from '@qwen-code/channel-telegram';
import { plugin as weixinPlugin } from '@qwen-code/channel-weixin';
import { plugin as dingtalkPlugin } from '@qwen-code/channel-dingtalk';
const registry = new Map<string, ChannelPlugin>();
// Register built-in channel types
for (const p of [telegramPlugin, weixinPlugin, dingtalkPlugin]) {
registry.set(p.channelType, p);
}
export function registerPlugin(plugin: ChannelPlugin): void {
if (registry.has(plugin.channelType)) {
throw new Error(
`Channel type "${plugin.channelType}" is already registered.`,
);
}
registry.set(plugin.channelType, plugin);
}
export function getPlugin(channelType: string): ChannelPlugin | undefined {
return registry.get(channelType);
}
export function supportedTypes(): string[] {
return [...registry.keys()];
}

View file

@ -0,0 +1,140 @@
import { describe, it, expect, vi, afterEach } from 'vitest';
import { resolveEnvVars, parseChannelConfig } from './config-utils.js';
// Mock the channel-registry so we don't pull in real plugins
vi.mock('./channel-registry.js', () => ({
getPlugin: (type: string) => {
const plugins: Record<
string,
{ channelType: string; requiredConfigFields?: string[] }
> = {
telegram: { channelType: 'telegram', requiredConfigFields: ['token'] },
dingtalk: {
channelType: 'dingtalk',
requiredConfigFields: ['clientId', 'clientSecret'],
},
bare: { channelType: 'bare' }, // no requiredConfigFields
};
return plugins[type];
},
supportedTypes: () => ['telegram', 'dingtalk', 'bare'],
}));
describe('resolveEnvVars', () => {
const ENV_KEY = 'TEST_RESOLVE_VAR_123';
afterEach(() => {
delete process.env[ENV_KEY];
});
it('returns literal values unchanged', () => {
expect(resolveEnvVars('my-token')).toBe('my-token');
});
it('resolves $ENV_VAR to its value', () => {
process.env[ENV_KEY] = 'secret';
expect(resolveEnvVars(`$${ENV_KEY}`)).toBe('secret');
});
it('throws when referenced env var is not set', () => {
expect(() => resolveEnvVars(`$${ENV_KEY}`)).toThrow(
`Environment variable ${ENV_KEY} is not set`,
);
});
it('does not resolve vars that do not start with $', () => {
process.env[ENV_KEY] = 'val';
expect(resolveEnvVars(`prefix$${ENV_KEY}`)).toBe(`prefix$${ENV_KEY}`);
});
});
describe('parseChannelConfig', () => {
it('throws when type is missing', () => {
expect(() => parseChannelConfig('bot', {})).toThrow(
'missing required field "type"',
);
});
it('throws for unsupported channel type', () => {
expect(() => parseChannelConfig('bot', { type: 'slack' })).toThrow(
'"slack" is not supported',
);
});
it('throws when plugin-required fields are missing', () => {
expect(() => parseChannelConfig('bot', { type: 'telegram' })).toThrow(
'requires "token"',
);
});
it('parses minimal valid config with defaults', () => {
const result = parseChannelConfig('bot', {
type: 'bare',
});
expect(result.type).toBe('bare');
expect(result.token).toBe('');
expect(result.senderPolicy).toBe('allowlist');
expect(result.allowedUsers).toEqual([]);
expect(result.sessionScope).toBe('user');
expect(result.cwd).toBe(process.cwd());
expect(result.groupPolicy).toBe('disabled');
expect(result.groups).toEqual({});
});
it('resolves env vars in token, clientId, clientSecret', () => {
process.env['TEST_TOKEN'] = 'tok123';
process.env['TEST_CID'] = 'cid456';
process.env['TEST_SEC'] = 'sec789';
const result = parseChannelConfig('bot', {
type: 'bare',
token: '$TEST_TOKEN',
clientId: '$TEST_CID',
clientSecret: '$TEST_SEC',
});
expect(result.token).toBe('tok123');
expect(result.clientId).toBe('cid456');
expect(result.clientSecret).toBe('sec789');
delete process.env['TEST_TOKEN'];
delete process.env['TEST_CID'];
delete process.env['TEST_SEC'];
});
it('preserves explicit config values over defaults', () => {
const result = parseChannelConfig('bot', {
type: 'bare',
token: 'literal-tok',
senderPolicy: 'open',
allowedUsers: ['alice'],
sessionScope: 'thread',
cwd: '/custom',
approvalMode: 'auto',
instructions: 'Be helpful',
model: 'qwen-coder',
groupPolicy: 'open',
groups: { g1: { mentionKeywords: ['@bot'] } },
});
expect(result.token).toBe('literal-tok');
expect(result.senderPolicy).toBe('open');
expect(result.allowedUsers).toEqual(['alice']);
expect(result.sessionScope).toBe('thread');
expect(result.cwd).toBe('/custom');
expect(result.approvalMode).toBe('auto');
expect(result.instructions).toBe('Be helpful');
expect(result.model).toBe('qwen-coder');
expect(result.groupPolicy).toBe('open');
expect(result.groups).toEqual({ g1: { mentionKeywords: ['@bot'] } });
});
it('spreads extra fields from raw config', () => {
const result = parseChannelConfig('bot', {
type: 'bare',
customField: 42,
});
expect((result as Record<string, unknown>)['customField']).toBe(42);
});
});

View file

@ -0,0 +1,83 @@
import type { ChannelConfig } from '@qwen-code/channel-base';
import * as path from 'node:path';
import { getPlugin, supportedTypes } from './channel-registry.js';
export function resolveEnvVars(value: string): string {
if (value.startsWith('$')) {
const envName = value.substring(1);
const envValue = process.env[envName];
if (!envValue) {
throw new Error(
`Environment variable ${envName} is not set (referenced as ${value})`,
);
}
return envValue;
}
return value;
}
export function findCliEntryPath(): string {
const mainModule = process.argv[1];
if (mainModule) {
return path.resolve(mainModule);
}
throw new Error('Cannot determine CLI entry path');
}
export function parseChannelConfig(
name: string,
rawConfig: Record<string, unknown>,
): ChannelConfig & Record<string, unknown> {
if (!rawConfig['type']) {
throw new Error(`Channel "${name}" is missing required field "type".`);
}
const channelType = rawConfig['type'] as string;
const plugin = getPlugin(channelType);
if (!plugin) {
throw new Error(
`Channel type "${channelType}" is not supported. Available: ${supportedTypes().join(', ')}`,
);
}
// Validate plugin-required fields
for (const field of plugin.requiredConfigFields ?? []) {
if (!rawConfig[field]) {
throw new Error(
`Channel "${name}" (${channelType}) requires "${field}".`,
);
}
}
// Resolve env vars for known credential fields
const token = rawConfig['token']
? resolveEnvVars(rawConfig['token'] as string)
: '';
const clientId = rawConfig['clientId']
? resolveEnvVars(rawConfig['clientId'] as string)
: undefined;
const clientSecret = rawConfig['clientSecret']
? resolveEnvVars(rawConfig['clientSecret'] as string)
: undefined;
return {
...rawConfig,
type: channelType,
token,
clientId,
clientSecret,
senderPolicy:
(rawConfig['senderPolicy'] as ChannelConfig['senderPolicy']) ||
'allowlist',
allowedUsers: (rawConfig['allowedUsers'] as string[]) || [],
sessionScope:
(rawConfig['sessionScope'] as ChannelConfig['sessionScope']) || 'user',
cwd: (rawConfig['cwd'] as string) || process.cwd(),
approvalMode: rawConfig['approvalMode'] as string | undefined,
instructions: rawConfig['instructions'] as string | undefined,
model: rawConfig['model'] as string | undefined,
groupPolicy:
(rawConfig['groupPolicy'] as ChannelConfig['groupPolicy']) || 'disabled',
groups: (rawConfig['groups'] as ChannelConfig['groups']) || {},
};
}

View file

@ -0,0 +1,85 @@
import type { CommandModule } from 'yargs';
import { writeStderrLine, writeStdoutLine } from '../../utils/stdioHelpers.js';
import {
loadAccount,
saveAccount,
clearAccount,
DEFAULT_BASE_URL,
} from '@qwen-code/channel-weixin/accounts';
import { startLogin, waitForLogin } from '@qwen-code/channel-weixin/login';
export const configureWeixinCommand: CommandModule<
object,
{ action: string | undefined }
> = {
command: 'configure-weixin [action]',
describe: 'Configure WeChat channel (login via QR code)',
builder: (yargs) =>
yargs.positional('action', {
type: 'string',
describe: '"clear" to remove stored credentials, omit to login',
}),
handler: async (argv) => {
const { action } = argv;
if (action === 'clear') {
clearAccount();
writeStdoutLine('WeChat credentials cleared.');
return;
}
if (action === 'status') {
const account = loadAccount();
if (account) {
writeStdoutLine(`WeChat account configured (saved ${account.savedAt})`);
writeStdoutLine(` Base URL: ${account.baseUrl}`);
if (account.userId) {
writeStdoutLine(` User ID: ${account.userId}`);
}
} else {
writeStdoutLine('WeChat account not configured.');
}
return;
}
// Default action: login
const existing = loadAccount();
if (existing) {
writeStdoutLine(
`Existing WeChat credentials found (saved ${existing.savedAt}).`,
);
writeStdoutLine('Re-running login will overwrite them.\n');
}
const baseUrl = DEFAULT_BASE_URL;
writeStdoutLine('Starting WeChat QR code login...\n');
try {
const qrcodeId = await startLogin(baseUrl);
const result = await waitForLogin({ qrcodeId, apiBaseUrl: baseUrl });
if (result.connected && result.token) {
saveAccount({
token: result.token,
baseUrl: result.baseUrl || baseUrl,
userId: result.userId,
savedAt: new Date().toISOString(),
});
writeStdoutLine('\n' + result.message);
writeStdoutLine(
'Credentials saved. You can now start a weixin channel with:',
);
writeStdoutLine(' qwen channel start <name>');
} else {
writeStderrLine('\n' + result.message);
process.exit(1);
}
} catch (err) {
writeStderrLine(
`Login failed: ${err instanceof Error ? err.message : String(err)}`,
);
process.exit(1);
}
},
};

View file

@ -0,0 +1,66 @@
import type { CommandModule } from 'yargs';
import { PairingStore } from '@qwen-code/channel-base';
import { writeStderrLine, writeStdoutLine } from '../../utils/stdioHelpers.js';
export const pairingListCommand: CommandModule<object, { name: string }> = {
command: 'list <name>',
describe: 'List pending pairing requests for a channel',
builder: (yargs) =>
yargs.positional('name', {
type: 'string',
describe: 'Channel name',
demandOption: true,
}),
handler: (argv) => {
const store = new PairingStore(argv.name);
const pending = store.listPending();
if (pending.length === 0) {
writeStdoutLine('No pending pairing requests.');
return;
}
writeStdoutLine(`Pending pairing requests for "${argv.name}":\n`);
for (const req of pending) {
const ago = Math.round((Date.now() - req.createdAt) / 60000);
writeStdoutLine(
` Code: ${req.code} Sender: ${req.senderName} (${req.senderId}) ${ago}m ago`,
);
}
},
};
export const pairingApproveCommand: CommandModule<
object,
{ name: string; code: string }
> = {
command: 'approve <name> <code>',
describe: 'Approve a pending pairing request',
builder: (yargs) =>
yargs
.positional('name', {
type: 'string',
describe: 'Channel name',
demandOption: true,
})
.positional('code', {
type: 'string',
describe: 'Pairing code',
demandOption: true,
}),
handler: (argv) => {
const store = new PairingStore(argv.name);
const request = store.approve(argv.code);
if (!request) {
writeStderrLine(
`No pending request found for code "${argv.code.toUpperCase()}". It may have expired.`,
);
process.exit(1);
}
writeStdoutLine(
`Approved: ${request.senderName} (${request.senderId}) can now use channel "${argv.name}".`,
);
},
};

View file

@ -0,0 +1,178 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { homedir } from 'node:os';
import { join } from 'node:path';
// vi.hoisted runs before vi.mock hoisting, so fsStore is available in the factory
const fsStore = vi.hoisted(() => {
const store: Record<string, string> = {};
return store;
});
vi.mock('node:fs', () => {
const mock = {
existsSync: (p: string) => p in fsStore,
readFileSync: (p: string) => {
if (!(p in fsStore)) throw new Error('ENOENT');
return fsStore[p];
},
writeFileSync: (p: string, data: string) => {
fsStore[p] = data;
},
mkdirSync: () => {},
unlinkSync: (p: string) => {
delete fsStore[p];
},
};
return { ...mock, default: mock };
});
import {
readServiceInfo,
writeServiceInfo,
removeServiceInfo,
signalService,
waitForExit,
} from './pidfile.js';
// We need to mock process.kill for isProcessAlive / signalService
const originalKill = process.kill;
function getPidFilePath() {
return join(homedir(), '.qwen', 'channels', 'service.pid');
}
beforeEach(() => {
for (const k of Object.keys(fsStore)) delete fsStore[k];
});
afterEach(() => {
process.kill = originalKill;
});
describe('writeServiceInfo + readServiceInfo', () => {
it('writes and reads back service info for a live process', () => {
// Mock process.kill(pid, 0) to indicate alive
// eslint-disable-next-line @typescript-eslint/no-explicit-any
process.kill = vi.fn(() => true) as any;
writeServiceInfo(['telegram', 'dingtalk']);
const info = readServiceInfo();
expect(info).not.toBeNull();
expect(info!.pid).toBe(process.pid);
expect(info!.channels).toEqual(['telegram', 'dingtalk']);
expect(info!.startedAt).toBeTruthy();
});
it('returns null when no PID file exists', () => {
const info = readServiceInfo();
expect(info).toBeNull();
});
it('cleans up and returns null for corrupt PID file', () => {
const filePath = getPidFilePath();
fsStore[filePath] = 'not-json!!!';
const info = readServiceInfo();
expect(info).toBeNull();
// File should be cleaned up
expect(filePath in fsStore).toBe(false);
});
it('cleans up and returns null for stale PID (dead process)', () => {
// First write with alive process
// eslint-disable-next-line @typescript-eslint/no-explicit-any
process.kill = vi.fn(() => true) as any;
writeServiceInfo(['telegram']);
// Now simulate dead process
process.kill = vi.fn(() => {
throw new Error('ESRCH');
// eslint-disable-next-line @typescript-eslint/no-explicit-any
}) as any;
const info = readServiceInfo();
expect(info).toBeNull();
});
});
describe('removeServiceInfo', () => {
it('removes existing PID file', () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
process.kill = vi.fn(() => true) as any;
writeServiceInfo(['test']);
removeServiceInfo();
const info = readServiceInfo();
expect(info).toBeNull();
});
it('is a no-op when no PID file exists', () => {
expect(() => removeServiceInfo()).not.toThrow();
});
});
describe('signalService', () => {
it('returns true when signal is delivered', () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
process.kill = vi.fn(() => true) as any;
expect(signalService(1234, 'SIGTERM')).toBe(true);
expect(process.kill).toHaveBeenCalledWith(1234, 'SIGTERM');
});
it('returns false when process is not found', () => {
process.kill = vi.fn(() => {
throw new Error('ESRCH');
// eslint-disable-next-line @typescript-eslint/no-explicit-any
}) as any;
expect(signalService(9999)).toBe(false);
});
it('defaults to SIGTERM', () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
process.kill = vi.fn(() => true) as any;
signalService(1234);
expect(process.kill).toHaveBeenCalledWith(1234, 'SIGTERM');
});
});
describe('waitForExit', () => {
it('returns true immediately if process is already dead', async () => {
process.kill = vi.fn(() => {
throw new Error('ESRCH');
// eslint-disable-next-line @typescript-eslint/no-explicit-any
}) as any;
const result = await waitForExit(9999, 1000, 50);
expect(result).toBe(true);
});
it('returns true when process dies within timeout', async () => {
let alive = true;
process.kill = vi.fn(() => {
if (!alive) throw new Error('ESRCH');
return true;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
}) as any;
// Kill after 100ms
setTimeout(() => {
alive = false;
}, 100);
const result = await waitForExit(1234, 2000, 50);
expect(result).toBe(true);
});
it('returns false on timeout when process stays alive', async () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
process.kill = vi.fn(() => true) as any;
const result = await waitForExit(1234, 150, 50);
expect(result).toBe(false);
});
});

View file

@ -0,0 +1,126 @@
import {
existsSync,
readFileSync,
writeFileSync,
mkdirSync,
unlinkSync,
} from 'node:fs';
import * as path from 'node:path';
import * as os from 'node:os';
export interface ServiceInfo {
pid: number;
startedAt: string;
channels: string[];
}
function pidFilePath(): string {
return path.join(os.homedir(), '.qwen', 'channels', 'service.pid');
}
/** Check if a process is alive. */
function isProcessAlive(pid: number): boolean {
try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
}
/**
* Read the PID file and return service info if the process is still alive.
* Returns null if no file, invalid file, or stale (dead process).
* Automatically cleans up stale PID files.
*/
export function readServiceInfo(): ServiceInfo | null {
const filePath = pidFilePath();
if (!existsSync(filePath)) return null;
let info: ServiceInfo;
try {
info = JSON.parse(readFileSync(filePath, 'utf-8'));
} catch {
// Corrupt file — clean up
try {
unlinkSync(filePath);
} catch {
// best-effort
}
return null;
}
if (!isProcessAlive(info.pid)) {
// Stale PID — process is dead, clean up
try {
unlinkSync(filePath);
} catch {
// best-effort
}
return null;
}
return info;
}
/** Write PID file with current process info. */
export function writeServiceInfo(channels: string[]): void {
const filePath = pidFilePath();
const dir = path.dirname(filePath);
if (!existsSync(dir)) {
mkdirSync(dir, { recursive: true });
}
const info: ServiceInfo = {
pid: process.pid,
startedAt: new Date().toISOString(),
channels,
};
writeFileSync(filePath, JSON.stringify(info, null, 2), 'utf-8');
}
/** Delete the PID file. */
export function removeServiceInfo(): void {
const filePath = pidFilePath();
if (existsSync(filePath)) {
try {
unlinkSync(filePath);
} catch {
// best-effort
}
}
}
/**
* Send a signal to the running service.
* Returns true if signal was sent, false if process not found.
*/
export function signalService(
pid: number,
signal: NodeJS.Signals = 'SIGTERM',
): boolean {
try {
process.kill(pid, signal);
return true;
} catch {
return false;
}
}
/**
* Wait for a process to exit, polling at intervals.
* Returns true if process exited, false if timeout.
*/
export async function waitForExit(
pid: number,
timeoutMs: number = 5000,
pollMs: number = 200,
): Promise<boolean> {
const start = Date.now();
while (Date.now() - start < timeoutMs) {
if (!isProcessAlive(pid)) return true;
await new Promise((r) => setTimeout(r, pollMs));
}
return !isProcessAlive(pid);
}

View file

@ -0,0 +1,433 @@
import * as path from 'node:path';
import * as os from 'node:os';
import type { CommandModule } from 'yargs';
import { loadSettings } from '../../config/settings.js';
import { writeStderrLine, writeStdoutLine } from '../../utils/stdioHelpers.js';
import { AcpBridge, SessionRouter } from '@qwen-code/channel-base';
import type {
ChannelBase,
ChannelPlugin,
ToolCallEvent,
} from '@qwen-code/channel-base';
import { getPlugin, registerPlugin } from './channel-registry.js';
import { findCliEntryPath, parseChannelConfig } from './config-utils.js';
import {
readServiceInfo,
writeServiceInfo,
removeServiceInfo,
} from './pidfile.js';
import { getExtensionManager } from '../extensions/utils.js';
const MAX_CRASH_RESTARTS = 3;
const CRASH_WINDOW_MS = 5 * 60 * 1000; // 5-minute window for counting crashes
const RESTART_DELAY_MS = 3000;
function sessionsPath(): string {
return path.join(os.homedir(), '.qwen', 'channels', 'sessions.json');
}
function loadChannelsConfig(): Record<string, unknown> {
const settings = loadSettings(process.cwd());
const channels = (
settings.merged as unknown as { channels?: Record<string, unknown> }
).channels;
return channels || {};
}
/**
* Load channel plugins from active extensions.
* Extensions declare channels in their qwen-extension.json manifest.
*/
async function loadChannelsFromExtensions(): Promise<number> {
let loaded = 0;
try {
const extensionManager = await getExtensionManager();
const extensions = extensionManager
.getLoadedExtensions()
.filter((e) => e.isActive && e.channels);
for (const ext of extensions) {
for (const [channelType, channelDef] of Object.entries(ext.channels!)) {
if (getPlugin(channelType)) {
writeStderrLine(
`[Extensions] Skipping channel "${channelType}" from "${ext.name}": type already registered`,
);
continue;
}
const entryPath = path.join(ext.path, channelDef.entry);
try {
const module = (await import(entryPath)) as {
plugin?: ChannelPlugin;
};
const plugin = module.plugin;
if (!plugin || typeof plugin.createChannel !== 'function') {
writeStderrLine(
`[Extensions] "${ext.name}": channel entry point does not export a valid plugin object`,
);
continue;
}
if (plugin.channelType !== channelType) {
writeStderrLine(
`[Extensions] "${ext.name}": channelType mismatch — manifest says "${channelType}", plugin says "${plugin.channelType}"`,
);
continue;
}
registerPlugin(plugin);
loaded++;
writeStdoutLine(
`[Extensions] Loaded channel "${channelType}" from "${ext.name}"`,
);
} catch (err) {
writeStderrLine(
`[Extensions] Failed to load channel "${channelType}" from "${ext.name}": ${err instanceof Error ? err.message : String(err)}`,
);
}
}
}
} catch (err) {
writeStderrLine(
`[Extensions] Failed to load extensions: ${err instanceof Error ? err.message : String(err)}`,
);
}
return loaded;
}
function createChannel(
name: string,
config: ReturnType<typeof parseChannelConfig>,
bridge: AcpBridge,
options?: { router?: SessionRouter },
): ChannelBase {
const channelPlugin = getPlugin(config.type);
if (!channelPlugin) {
throw new Error(`Unknown channel type: "${config.type}".`);
}
return channelPlugin.createChannel(name, config, bridge, options);
}
function registerToolCallDispatch(
bridge: AcpBridge,
router: SessionRouter,
channels: Map<string, ChannelBase>,
): void {
bridge.on('toolCall', (event: ToolCallEvent) => {
const target = router.getTarget(event.sessionId);
if (target) {
const channel = channels.get(target.channelName);
if (channel) {
channel.onToolCall(target.chatId, event);
}
}
});
}
/** Check for duplicate instance and abort if one is already running. */
function checkDuplicateInstance(): void {
const existing = readServiceInfo();
if (existing) {
writeStderrLine(
`Error: Channel service is already running (PID ${existing.pid}, started ${existing.startedAt}).`,
);
writeStderrLine('Use "qwen channel stop" to stop it first.');
process.exit(1);
}
}
/** Start a single channel with its own bridge + crash recovery. */
async function startSingle(name: string): Promise<void> {
checkDuplicateInstance();
const channelsConfig = loadChannelsConfig();
await loadChannelsFromExtensions();
if (!channelsConfig[name]) {
writeStderrLine(
`Error: Channel "${name}" not found in settings. Add it to channels.${name} in settings.json.`,
);
process.exit(1);
}
let config;
try {
config = parseChannelConfig(
name,
channelsConfig[name] as Record<string, unknown>,
);
} catch (err) {
writeStderrLine(
`Error: ${err instanceof Error ? err.message : String(err)}`,
);
process.exit(1);
}
const cliEntryPath = findCliEntryPath();
let shuttingDown = false;
const crashTimestamps: number[] = [];
const bridgeOpts = { cliEntryPath, cwd: config.cwd, model: config.model };
let bridge = new AcpBridge(bridgeOpts);
await bridge.start();
const router = new SessionRouter(
bridge,
config.cwd,
config.sessionScope,
sessionsPath(),
);
const channels: Map<string, ChannelBase> = new Map();
const channel = createChannel(name, config, bridge, { router });
channels.set(name, channel);
registerToolCallDispatch(bridge, router, channels);
await channel.connect();
writeServiceInfo([name]);
writeStdoutLine(`[Channel] "${name}" is running. Press Ctrl+C to stop.`);
bridge.on('disconnected', async () => {
if (shuttingDown) return;
const now = Date.now();
crashTimestamps.push(now);
// Only count crashes within the recent window
const recentCrashes = crashTimestamps.filter(
(ts) => now - ts < CRASH_WINDOW_MS,
);
if (recentCrashes.length > MAX_CRASH_RESTARTS) {
writeStderrLine(
`[Channel] Bridge crashed ${recentCrashes.length} times in ${CRASH_WINDOW_MS / 1000}s. Giving up.`,
);
channel.disconnect();
router.clearAll();
removeServiceInfo();
process.exit(1);
}
writeStderrLine(
`[Channel] Bridge crashed (${recentCrashes.length}/${MAX_CRASH_RESTARTS} in window). Restarting in ${RESTART_DELAY_MS / 1000}s...`,
);
await new Promise((r) => setTimeout(r, RESTART_DELAY_MS));
try {
bridge = new AcpBridge(bridgeOpts);
await bridge.start();
router.setBridge(bridge);
channel.setBridge(bridge);
registerToolCallDispatch(bridge, router, channels);
const result = await router.restoreSessions();
writeStdoutLine(
`[Channel] Bridge restarted. Sessions restored: ${result.restored}, failed: ${result.failed}`,
);
} catch (err) {
writeStderrLine(
`[Channel] Failed to restart bridge: ${err instanceof Error ? err.message : String(err)}`,
);
}
});
const shutdown = () => {
shuttingDown = true;
writeStdoutLine('\n[Channel] Shutting down...');
channel.disconnect();
bridge.stop();
router.clearAll();
removeServiceInfo();
process.exit(0);
};
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
await new Promise<void>(() => {});
}
/** Start all configured channels with a shared bridge + crash recovery. */
async function startAll(): Promise<void> {
checkDuplicateInstance();
const channelsConfig = loadChannelsConfig();
await loadChannelsFromExtensions();
if (Object.keys(channelsConfig).length === 0) {
writeStderrLine(
'Error: No channels configured in settings.json. Add entries under "channels".',
);
process.exit(1);
}
// Parse all configs upfront — fail fast on bad config
const parsed: Array<{
name: string;
config: ReturnType<typeof parseChannelConfig>;
}> = [];
for (const [name, raw] of Object.entries(channelsConfig)) {
try {
parsed.push({
name,
config: parseChannelConfig(name, raw as Record<string, unknown>),
});
} catch (err) {
writeStderrLine(
`Error in channel "${name}": ${err instanceof Error ? err.message : String(err)}`,
);
process.exit(1);
}
}
const cliEntryPath = findCliEntryPath();
const defaultCwd = process.cwd();
let shuttingDown = false;
const crashTimestamps: number[] = [];
// All channels share one bridge process. Use the first channel's model.
const models = [
...new Set(parsed.map((p) => p.config.model).filter(Boolean)),
];
if (models.length > 1) {
writeStderrLine(
`[Channel] Warning: Multiple models configured (${models.join(', ')}). ` +
`Shared bridge will use "${models[0]}".`,
);
}
const bridgeOpts = {
cliEntryPath,
cwd: defaultCwd,
model: models[0],
};
let bridge = new AcpBridge(bridgeOpts);
await bridge.start();
const router = new SessionRouter(bridge, defaultCwd, 'user', sessionsPath());
// Register per-channel scope overrides so each channel uses its own sessionScope
for (const { name, config } of parsed) {
router.setChannelScope(name, config.sessionScope);
}
const channels: Map<string, ChannelBase> = new Map();
writeStdoutLine(
`[Channel] Starting ${parsed.length} channel(s): ${parsed.map((p) => p.name).join(', ')}`,
);
for (const { name, config } of parsed) {
channels.set(name, createChannel(name, config, bridge, { router }));
}
registerToolCallDispatch(bridge, router, channels);
// Connect all channels
let connectedCount = 0;
for (const [name, channel] of channels) {
try {
await channel.connect();
connectedCount++;
writeStdoutLine(`[Channel] "${name}" connected.`);
} catch (err) {
writeStderrLine(
`[Channel] Failed to connect "${name}": ${err instanceof Error ? err.message : String(err)}`,
);
}
}
if (connectedCount === 0) {
writeStderrLine('[Channel] No channels connected. Exiting.');
bridge.stop();
process.exit(1);
}
writeServiceInfo(parsed.map((p) => p.name));
writeStdoutLine(
`[Channel] Running ${connectedCount} channel(s). Press Ctrl+C to stop.`,
);
bridge.on('disconnected', async () => {
if (shuttingDown) return;
const now = Date.now();
crashTimestamps.push(now);
const recentCrashes = crashTimestamps.filter(
(ts) => now - ts < CRASH_WINDOW_MS,
);
if (recentCrashes.length > MAX_CRASH_RESTARTS) {
writeStderrLine(
`[Channel] Bridge crashed ${recentCrashes.length} times in ${CRASH_WINDOW_MS / 1000}s. Giving up.`,
);
for (const channel of channels.values()) {
try {
channel.disconnect();
} catch {
// best-effort
}
}
router.clearAll();
removeServiceInfo();
process.exit(1);
}
writeStderrLine(
`[Channel] Bridge crashed (${recentCrashes.length}/${MAX_CRASH_RESTARTS} in window). Restarting in ${RESTART_DELAY_MS / 1000}s...`,
);
await new Promise((r) => setTimeout(r, RESTART_DELAY_MS));
try {
bridge = new AcpBridge(bridgeOpts);
await bridge.start();
router.setBridge(bridge);
for (const channel of channels.values()) {
channel.setBridge(bridge);
}
registerToolCallDispatch(bridge, router, channels);
const result = await router.restoreSessions();
writeStdoutLine(
`[Channel] Bridge restarted. Sessions restored: ${result.restored}, failed: ${result.failed}`,
);
} catch (err) {
writeStderrLine(
`[Channel] Failed to restart bridge: ${err instanceof Error ? err.message : String(err)}`,
);
}
});
const shutdown = () => {
shuttingDown = true;
writeStdoutLine('\n[Channel] Shutting down...');
for (const [name, channel] of channels) {
try {
channel.disconnect();
writeStdoutLine(`[Channel] "${name}" disconnected.`);
} catch {
// best-effort
}
}
bridge.stop();
router.clearAll();
removeServiceInfo();
process.exit(0);
};
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
await new Promise<void>(() => {});
}
export const startCommand: CommandModule<object, { name?: string }> = {
command: 'start [name]',
describe: 'Start channels (all if no name given, or a single named channel)',
builder: (yargs) =>
yargs.positional('name', {
type: 'string',
describe: 'Channel name (omit to start all configured channels)',
}),
handler: async (argv) => {
if (argv.name) {
await startSingle(argv.name);
} else {
await startAll();
}
},
};

View file

@ -0,0 +1,78 @@
import { existsSync, readFileSync } from 'node:fs';
import * as path from 'node:path';
import * as os from 'node:os';
import type { CommandModule } from 'yargs';
import { writeStdoutLine } from '../../utils/stdioHelpers.js';
import { readServiceInfo } from './pidfile.js';
import type { SessionTarget } from '@qwen-code/channel-base';
interface PersistedEntry {
sessionId: string;
target: SessionTarget;
cwd: string;
}
function formatUptime(startedAt: string): string {
const ms = Date.now() - new Date(startedAt).getTime();
const seconds = Math.floor(ms / 1000);
const minutes = Math.floor(seconds / 60);
const hours = Math.floor(minutes / 60);
const days = Math.floor(hours / 24);
if (days > 0) return `${days}d ${hours % 24}h ${minutes % 60}m`;
if (hours > 0) return `${hours}h ${minutes % 60}m`;
if (minutes > 0) return `${minutes}m ${seconds % 60}s`;
return `${seconds}s`;
}
export const statusCommand: CommandModule = {
command: 'status',
describe: 'Show channel service status',
handler: async () => {
const info = readServiceInfo();
if (!info) {
writeStdoutLine('No channel service is running.');
process.exit(0);
}
writeStdoutLine(`Channel service: running (PID ${info.pid})`);
writeStdoutLine(`Uptime: ${formatUptime(info.startedAt)}`);
writeStdoutLine('');
// Read session data for per-channel counts
const sessionsPath = path.join(
os.homedir(),
'.qwen',
'channels',
'sessions.json',
);
const sessionCounts = new Map<string, number>();
if (existsSync(sessionsPath)) {
try {
const entries: Record<string, PersistedEntry> = JSON.parse(
readFileSync(sessionsPath, 'utf-8'),
);
for (const entry of Object.values(entries)) {
const name = entry.target.channelName;
sessionCounts.set(name, (sessionCounts.get(name) || 0) + 1);
}
} catch {
// best-effort
}
}
// Table header
const nameWidth = Math.max(15, ...info.channels.map((c) => c.length + 2));
writeStdoutLine(`${'Channel'.padEnd(nameWidth)}Sessions`);
writeStdoutLine(`${'-'.repeat(nameWidth)}--------`);
for (const name of info.channels) {
const count = sessionCounts.get(name) || 0;
writeStdoutLine(`${name.padEnd(nameWidth)}${count}`);
}
process.exit(0);
},
};

View file

@ -0,0 +1,49 @@
import type { CommandModule } from 'yargs';
import { writeStderrLine, writeStdoutLine } from '../../utils/stdioHelpers.js';
import {
readServiceInfo,
signalService,
waitForExit,
removeServiceInfo,
} from './pidfile.js';
export const stopCommand: CommandModule = {
command: 'stop',
describe: 'Stop the running channel service',
handler: async () => {
const info = readServiceInfo();
if (!info) {
writeStdoutLine('No channel service is running.');
process.exit(0);
}
writeStdoutLine(`Stopping channel service (PID ${info.pid})...`);
if (!signalService(info.pid, 'SIGTERM')) {
writeStderrLine(
'Failed to send signal — process may have already exited.',
);
removeServiceInfo();
process.exit(0);
}
const exited = await waitForExit(info.pid, 5000);
if (exited) {
// Clean up in case the process didn't delete its own PID file
removeServiceInfo();
writeStdoutLine('Service stopped.');
} else {
writeStderrLine(
'Service did not exit within 5 seconds. Sending SIGKILL...',
);
signalService(info.pid, 'SIGKILL');
await waitForExit(info.pid, 2000);
removeServiceInfo();
writeStdoutLine('Service killed.');
}
process.exit(0);
},
};

View file

@ -51,6 +51,7 @@ import { getCliVersion } from '../utils/version.js';
import { loadSandboxConfig } from './sandboxConfig.js';
import { appEvents } from '../utils/events.js';
import { mcpCommand } from '../commands/mcp.js';
import { channelCommand } from '../commands/channel.js';
// UUID v4 regex pattern for validation
const SESSION_ID_REGEX =
@ -583,7 +584,9 @@ export async function parseArguments(): Promise<CliArgs> {
// Register Auth subcommands
.command(authCommand)
// Register Hooks subcommands
.command(hooksCommand);
.command(hooksCommand)
// Register Channel subcommands
.command(channelCommand);
yargsInstance
.version(await getCliVersion()) // This will enable the --version flag based on package.json
@ -604,7 +607,8 @@ export async function parseArguments(): Promise<CliArgs> {
result._.length > 0 &&
(result._[0] === 'mcp' ||
result._[0] === 'extensions' ||
result._[0] === 'hooks')
result._[0] === 'hooks' ||
result._[0] === 'channel')
) {
// MCP/Extensions/Hooks commands handle their own execution and process exit
process.exit(0);

View file

@ -189,6 +189,18 @@ const SETTINGS_SCHEMA = {
mergeStrategy: MergeStrategy.SHALLOW_MERGE,
},
// Channels configuration (Telegram, Discord, etc.)
channels: {
type: 'object',
label: 'Channels',
category: 'Advanced',
requiresRestart: true,
default: {} as Record<string, Record<string, unknown>>,
description: 'Configuration for messaging channels.',
showInDialog: false,
mergeStrategy: MergeStrategy.SHALLOW_MERGE,
},
// Model providers configuration grouped by authType
modelProviders: {
type: 'object',

View file

@ -282,11 +282,13 @@ export async function main() {
process.exit(1);
}
}
// For stream-json mode, don't read stdin here - it should be forwarded to the sandbox
// and consumed by StreamJsonInputReader inside the container
// For stream-json and ACP modes, don't read stdin here — stdin carries
// protocol data (not a user prompt) and should be forwarded to the sandbox
// intact via stdio: 'inherit'.
const inputFormat = argv.inputFormat as string | undefined;
const isAcpMode = argv.acp || argv.experimentalAcp;
let stdinData = '';
if (!process.stdin.isTTY && inputFormat !== 'stream-json') {
if (!process.stdin.isTTY && inputFormat !== 'stream-json' && !isAcpMode) {
stdinData = await readStdin();
}