diff --git a/packages/channels/base/src/ChannelBase.ts b/packages/channels/base/src/ChannelBase.ts index 0c533125f..cc118e15e 100644 --- a/packages/channels/base/src/ChannelBase.ts +++ b/packages/channels/base/src/ChannelBase.ts @@ -141,13 +141,15 @@ export abstract class ChannelBase { /** Register shared slash commands. Called from constructor. */ private registerSharedCommands(): void { const clearHandler: CommandHandler = async (envelope) => { - const removed = this.router.removeSession( + const removedIds = this.router.removeSession( this.name, envelope.senderId, envelope.chatId, ); - if (removed) { - this.instructedSessions.clear(); + if (removedIds.length > 0) { + for (const id of removedIds) { + this.instructedSessions.delete(id); + } await this.sendMessage( envelope.chatId, 'Session cleared. Your next message will start a fresh conversation.', diff --git a/packages/channels/base/src/PairingStore.ts b/packages/channels/base/src/PairingStore.ts index c49eeb4c9..ebe23f1d5 100644 --- a/packages/channels/base/src/PairingStore.ts +++ b/packages/channels/base/src/PairingStore.ts @@ -1,3 +1,4 @@ +import * as crypto from 'node:crypto'; import * as fs from 'node:fs'; import * as path from 'node:path'; import * as os from 'node:os'; @@ -134,7 +135,7 @@ export class PairingStore { function generateCode(): string { let code = ''; for (let i = 0; i < CODE_LENGTH; i++) { - code += SAFE_ALPHABET[Math.floor(Math.random() * SAFE_ALPHABET.length)]; + code += SAFE_ALPHABET[crypto.randomInt(SAFE_ALPHABET.length)]; } return code; } diff --git a/packages/channels/base/src/SessionRouter.test.ts b/packages/channels/base/src/SessionRouter.test.ts index c8d1b5df1..d33f67fd7 100644 --- a/packages/channels/base/src/SessionRouter.test.ts +++ b/packages/channels/base/src/SessionRouter.test.ts @@ -67,6 +67,43 @@ describe('SessionRouter', () => { const s2 = await router.resolve('ch2', 'alice', 'chat1'); expect(s1).not.toBe(s2); }); + + it('per-channel scope overrides default scope', async () => { + const router = new SessionRouter(bridge, '/tmp', 'user'); + router.setChannelScope('telegram', 'single'); + + // 'telegram' uses single scope: same session for different users + const t1 = await router.resolve('telegram', 'alice', 'chat1'); + const t2 = await router.resolve('telegram', 'bob', 'chat2'); + expect(t1).toBe(t2); + + // other channel still uses default 'user' scope + const d1 = await router.resolve('dingtalk', 'alice', 'chat1'); + const d2 = await router.resolve('dingtalk', 'bob', 'chat1'); + expect(d1).not.toBe(d2); + }); + + it('mixed per-channel scopes work independently', async () => { + const router = new SessionRouter(bridge, '/tmp'); + router.setChannelScope('ch-thread', 'thread'); + router.setChannelScope('ch-single', 'single'); + router.setChannelScope('ch-user', 'user'); + + // thread scope: same thread = same session + const t1 = await router.resolve('ch-thread', 'alice', 'c1', 'thread1'); + const t2 = await router.resolve('ch-thread', 'bob', 'c1', 'thread1'); + expect(t1).toBe(t2); + + // single scope: one session for all + const s1 = await router.resolve('ch-single', 'alice', 'c1'); + const s2 = await router.resolve('ch-single', 'bob', 'c2'); + expect(s1).toBe(s2); + + // user scope: per-sender-per-chat + const u1 = await router.resolve('ch-user', 'alice', 'c1'); + const u2 = await router.resolve('ch-user', 'alice', 'c2'); + expect(u1).not.toBe(u2); + }); }); describe('resolve', () => { @@ -123,23 +160,25 @@ describe('SessionRouter', () => { }); describe('removeSession', () => { - it('removes session by key and returns true', async () => { + it('removes session by key and returns session IDs', async () => { const router = new SessionRouter(bridge, '/tmp'); - await router.resolve('ch', 'alice', 'chat1'); - expect(router.removeSession('ch', 'alice', 'chat1')).toBe(true); + const sid = await router.resolve('ch', 'alice', 'chat1'); + const removed = router.removeSession('ch', 'alice', 'chat1'); + expect(removed).toEqual([sid]); expect(router.hasSession('ch', 'alice', 'chat1')).toBe(false); }); - it('returns false when nothing to remove', () => { + it('returns empty array when nothing to remove', () => { const router = new SessionRouter(bridge, '/tmp'); - expect(router.removeSession('ch', 'alice', 'chat1')).toBe(false); + expect(router.removeSession('ch', 'alice', 'chat1')).toEqual([]); }); it('removes all sender sessions when chatId omitted', async () => { const router = new SessionRouter(bridge, '/tmp'); await router.resolve('ch', 'alice', 'chat1'); await router.resolve('ch', 'alice', 'chat2'); - expect(router.removeSession('ch', 'alice')).toBe(true); + const removed = router.removeSession('ch', 'alice'); + expect(removed).toHaveLength(2); expect(router.hasSession('ch', 'alice')).toBe(false); }); diff --git a/packages/channels/base/src/SessionRouter.ts b/packages/channels/base/src/SessionRouter.ts index 50a52319f..bf08301bc 100644 --- a/packages/channels/base/src/SessionRouter.ts +++ b/packages/channels/base/src/SessionRouter.ts @@ -15,7 +15,8 @@ export class SessionRouter { private bridge: AcpBridge; private defaultCwd: string; - private scope: SessionScope; + private defaultScope: SessionScope; + private channelScopes: Map = new Map(); private persistPath: string | undefined; constructor( @@ -26,7 +27,7 @@ export class SessionRouter { ) { this.bridge = bridge; this.defaultCwd = defaultCwd; - this.scope = scope; + this.defaultScope = scope; this.persistPath = persistPath; } @@ -35,13 +36,19 @@ export class SessionRouter { this.bridge = bridge; } + /** Set scope override for a specific channel. */ + setChannelScope(channelName: string, scope: SessionScope): void { + this.channelScopes.set(channelName, scope); + } + private routingKey( channelName: string, senderId: string, chatId: string, threadId?: string, ): string { - switch (this.scope) { + const scope = this.channelScopes.get(channelName) || this.defaultScope; + switch (scope) { case 'thread': return `${channelName}:${threadId || chatId}`; case 'single': @@ -90,35 +97,40 @@ export class SessionRouter { return false; } + /** + * Remove session(s) for the given sender. Returns the removed session IDs. + */ removeSession( channelName: string, senderId: string, chatId?: string, - ): boolean { + ): string[] { + const removedIds: string[] = []; if (chatId) { const key = this.routingKey(channelName, senderId, chatId); - return this.deleteByKey(key); - } - // No chatId: remove all sessions for this sender on this channel - let removed = false; - const prefix = `${channelName}:${senderId}`; - for (const k of [...this.toSession.keys()]) { - if (k.startsWith(prefix)) { - this.deleteByKey(k); - removed = true; + const sessionId = this.deleteByKey(key); + if (sessionId) removedIds.push(sessionId); + } else { + // No chatId: remove all sessions for this sender on this channel + const prefix = `${channelName}:${senderId}`; + for (const k of [...this.toSession.keys()]) { + if (k.startsWith(prefix)) { + const sessionId = this.deleteByKey(k); + if (sessionId) removedIds.push(sessionId); + } } } - if (removed) this.persist(); - return removed; + if (removedIds.length > 0) this.persist(); + return removedIds; } - private deleteByKey(key: string): boolean { + private deleteByKey(key: string): string | null { const sessionId = this.toSession.get(key); - if (!sessionId) return false; + if (!sessionId) return null; this.toSession.delete(key); this.toTarget.delete(sessionId); this.toCwd.delete(sessionId); - return true; + return sessionId; } /** Get all session entries for crash recovery. */ diff --git a/packages/channels/dingtalk/src/DingtalkAdapter.ts b/packages/channels/dingtalk/src/DingtalkAdapter.ts index b3d2bbca1..d927c4583 100644 --- a/packages/channels/dingtalk/src/DingtalkAdapter.ts +++ b/packages/channels/dingtalk/src/DingtalkAdapter.ts @@ -1,5 +1,6 @@ -import { existsSync, mkdirSync, writeFileSync } from 'node:fs'; -import { join } from 'node:path'; +import { mkdirSync, writeFileSync } from 'node:fs'; +import { randomUUID } from 'node:crypto'; +import { basename, join } from 'node:path'; import { tmpdir } from 'node:os'; import { DWClient, TOPIC_ROBOT, EventAck } from 'dingtalk-stream-sdk-nodejs'; import type { DWClientDownStream } from 'dingtalk-stream-sdk-nodejs'; @@ -455,9 +456,10 @@ export class DingtalkChannel extends ChannelBase { ]; } else { // Save non-image files to temp dir so the agent can read them - const dir = join(tmpdir(), 'channel-files'); - if (!existsSync(dir)) mkdirSync(dir, { recursive: true }); - const safeName = fileName || `dingtalk_${mediaType}_${Date.now()}`; + const dir = join(tmpdir(), 'channel-files', randomUUID()); + mkdirSync(dir, { recursive: true }); + const safeName = + basename(fileName || '') || `dingtalk_${mediaType}_${Date.now()}`; const filePath = join(dir, safeName); writeFileSync(filePath, media.buffer); @@ -520,9 +522,9 @@ export class DingtalkChannel extends ChannelBase { const content = this.extractContent(data); let cleanText = content.text; - // Strip @bot mention from text + // Strip first @mention (the bot) from text, keep other @mentions intact if (isMentioned) { - cleanText = cleanText.replace(/@\S+/g, '').trim(); + cleanText = cleanText.replace(/@\S+/, '').trim(); } // Extract quoted message context diff --git a/packages/channels/telegram/src/TelegramAdapter.ts b/packages/channels/telegram/src/TelegramAdapter.ts index a3485c3b1..141b8368a 100644 --- a/packages/channels/telegram/src/TelegramAdapter.ts +++ b/packages/channels/telegram/src/TelegramAdapter.ts @@ -1,5 +1,6 @@ -import { existsSync, mkdirSync, writeFileSync } from 'node:fs'; -import { join } from 'node:path'; +import { mkdirSync, writeFileSync } from 'node:fs'; +import { randomUUID } from 'node:crypto'; +import { basename, join } from 'node:path'; import { tmpdir } from 'node:os'; import { Telegraf } from 'telegraf'; import { @@ -14,9 +15,6 @@ import type { AcpBridge, } from '@qwen-code/channel-base'; -// Commands handled by Telegraf directly (before handleInbound) -const TELEGRAF_COMMANDS = new Set(); - export class TelegramChannel extends ChannelBase { private bot: Telegraf; private botId: number = 0; @@ -42,14 +40,6 @@ export class TelegramChannel extends ChannelBase { const msg = ctx.message; const text = msg.text; - // Skip Telegraf-handled commands - if (text.startsWith('/')) { - const command = text.slice(1).split(/[\s@]/)[0]?.toLowerCase(); - if (command && TELEGRAF_COMMANDS.has(command)) { - return; - } - } - const envelope = this.buildEnvelope(msg, text, msg.entities); // Don't await — Telegraf has a 90s handler timeout that would kill long prompts @@ -118,9 +108,9 @@ export class TelegramChannel extends ChannelBase { const buf = Buffer.from(await resp.arrayBuffer()); // Save to temp dir so the agent can read it via read-file tool - const dir = join(tmpdir(), 'channel-files'); - if (!existsSync(dir)) mkdirSync(dir, { recursive: true }); - const filePath = join(dir, fileName); + const dir = join(tmpdir(), 'channel-files', randomUUID()); + mkdirSync(dir, { recursive: true }); + const filePath = join(dir, basename(fileName) || `file_${Date.now()}`); writeFileSync(filePath, buf); envelope.text = msg.caption || ''; diff --git a/packages/channels/weixin/src/WeixinAdapter.ts b/packages/channels/weixin/src/WeixinAdapter.ts index 6548953b7..7a5b36b97 100644 --- a/packages/channels/weixin/src/WeixinAdapter.ts +++ b/packages/channels/weixin/src/WeixinAdapter.ts @@ -3,8 +3,9 @@ * Extends ChannelBase with WeChat iLink Bot API integration. */ -import { existsSync, mkdirSync, writeFileSync } from 'node:fs'; -import { join } from 'node:path'; +import { mkdirSync, writeFileSync } from 'node:fs'; +import { randomUUID } from 'node:crypto'; +import { basename, join } from 'node:path'; import { tmpdir } from 'node:os'; import { ChannelBase } from '@qwen-code/channel-base'; import type { @@ -129,9 +130,12 @@ export class WeixinChannel extends ChannelBase { file.encryptQueryParam, file.aesKey, ); - const dir = join(tmpdir(), 'channel-files'); - if (!existsSync(dir)) mkdirSync(dir, { recursive: true }); - const filePath = join(dir, file.fileName); + const dir = join(tmpdir(), 'channel-files', randomUUID()); + mkdirSync(dir, { recursive: true }); + const filePath = join( + dir, + basename(file.fileName) || `file_${Date.now()}`, + ); writeFileSync(filePath, fileData); envelope.attachments = [ { diff --git a/packages/channels/weixin/src/monitor.ts b/packages/channels/weixin/src/monitor.ts index 48c3097c9..73ac6a32e 100644 --- a/packages/channels/weixin/src/monitor.ts +++ b/packages/channels/weixin/src/monitor.ts @@ -94,11 +94,6 @@ export async function startPollLoop(params: { consecutiveErrors = 0; - if (resp.get_updates_buf) { - cursor = resp.get_updates_buf; - saveCursor(cursor); - } - // Respect server-suggested poll timeout if (resp.longpolling_timeout_ms && resp.longpolling_timeout_ms > 0) { pollTimeoutMs = resp.longpolling_timeout_ms + 5000; // add buffer for network @@ -109,6 +104,12 @@ export async function startPollLoop(params: { await processMessage(msg, onMessage); } } + + // Persist cursor after messages are processed to avoid losing messages on crash + if (resp.get_updates_buf) { + cursor = resp.get_updates_buf; + saveCursor(cursor); + } } catch (err: unknown) { if (abortSignal.aborted) break; diff --git a/packages/cli/src/commands/channel/start.ts b/packages/cli/src/commands/channel/start.ts index f0b34eef4..2b7022622 100644 --- a/packages/cli/src/commands/channel/start.ts +++ b/packages/cli/src/commands/channel/start.ts @@ -19,6 +19,7 @@ import { import { getExtensionManager } from '../extensions/utils.js'; const MAX_CRASH_RESTARTS = 3; +const CRASH_WINDOW_MS = 5 * 60 * 1000; // 5-minute window for counting crashes const RESTART_DELAY_MS = 3000; function sessionsPath(): string { @@ -165,13 +166,18 @@ async function startSingle(name: string): Promise { const cliEntryPath = findCliEntryPath(); let shuttingDown = false; - let crashCount = 0; + const crashTimestamps: number[] = []; const bridgeOpts = { cliEntryPath, cwd: config.cwd, model: config.model }; let bridge = new AcpBridge(bridgeOpts); await bridge.start(); - const router = new SessionRouter(bridge, config.cwd, 'user', sessionsPath()); + const router = new SessionRouter( + bridge, + config.cwd, + config.sessionScope, + sessionsPath(), + ); const channels: Map = new Map(); const channel = createChannel(name, config, bridge, { router }); @@ -185,18 +191,25 @@ async function startSingle(name: string): Promise { bridge.on('disconnected', async () => { if (shuttingDown) return; - crashCount++; - if (crashCount > MAX_CRASH_RESTARTS) { + const now = Date.now(); + crashTimestamps.push(now); + // Only count crashes within the recent window + const recentCrashes = crashTimestamps.filter( + (ts) => now - ts < CRASH_WINDOW_MS, + ); + + if (recentCrashes.length > MAX_CRASH_RESTARTS) { writeStderrLine( - `[Channel] Bridge crashed ${crashCount} times. Giving up.`, + `[Channel] Bridge crashed ${recentCrashes.length} times in ${CRASH_WINDOW_MS / 1000}s. Giving up.`, ); + channel.disconnect(); router.clearAll(); removeServiceInfo(); process.exit(1); } writeStderrLine( - `[Channel] Bridge crashed (${crashCount}/${MAX_CRASH_RESTARTS}). Restarting in ${RESTART_DELAY_MS / 1000}s...`, + `[Channel] Bridge crashed (${recentCrashes.length}/${MAX_CRASH_RESTARTS} in window). Restarting in ${RESTART_DELAY_MS / 1000}s...`, ); await new Promise((r) => setTimeout(r, RESTART_DELAY_MS)); @@ -211,7 +224,6 @@ async function startSingle(name: string): Promise { writeStdoutLine( `[Channel] Bridge restarted. Sessions restored: ${result.restored}, failed: ${result.failed}`, ); - crashCount = 0; } catch (err) { writeStderrLine( `[Channel] Failed to restart bridge: ${err instanceof Error ? err.message : String(err)}`, @@ -270,7 +282,7 @@ async function startAll(): Promise { const cliEntryPath = findCliEntryPath(); const defaultCwd = process.cwd(); let shuttingDown = false; - let crashCount = 0; + const crashTimestamps: number[] = []; // All channels share one bridge process. Use the first channel's model. const models = [ @@ -291,6 +303,10 @@ async function startAll(): Promise { await bridge.start(); const router = new SessionRouter(bridge, defaultCwd, 'user', sessionsPath()); + // Register per-channel scope overrides so each channel uses its own sessionScope + for (const { name, config } of parsed) { + router.setChannelScope(name, config.sessionScope); + } const channels: Map = new Map(); writeStdoutLine( @@ -330,18 +346,30 @@ async function startAll(): Promise { bridge.on('disconnected', async () => { if (shuttingDown) return; - crashCount++; - if (crashCount > MAX_CRASH_RESTARTS) { + const now = Date.now(); + crashTimestamps.push(now); + const recentCrashes = crashTimestamps.filter( + (ts) => now - ts < CRASH_WINDOW_MS, + ); + + if (recentCrashes.length > MAX_CRASH_RESTARTS) { writeStderrLine( - `[Channel] Bridge crashed ${crashCount} times. Giving up.`, + `[Channel] Bridge crashed ${recentCrashes.length} times in ${CRASH_WINDOW_MS / 1000}s. Giving up.`, ); + for (const channel of channels.values()) { + try { + channel.disconnect(); + } catch { + // best-effort + } + } router.clearAll(); removeServiceInfo(); process.exit(1); } writeStderrLine( - `[Channel] Bridge crashed (${crashCount}/${MAX_CRASH_RESTARTS}). Restarting in ${RESTART_DELAY_MS / 1000}s...`, + `[Channel] Bridge crashed (${recentCrashes.length}/${MAX_CRASH_RESTARTS} in window). Restarting in ${RESTART_DELAY_MS / 1000}s...`, ); await new Promise((r) => setTimeout(r, RESTART_DELAY_MS)); @@ -358,7 +386,6 @@ async function startAll(): Promise { writeStdoutLine( `[Channel] Bridge restarted. Sessions restored: ${result.restored}, failed: ${result.failed}`, ); - crashCount = 0; } catch (err) { writeStderrLine( `[Channel] Failed to restart bridge: ${err instanceof Error ? err.message : String(err)}`,