diff --git a/packages/channels/base/src/ChannelBase.ts b/packages/channels/base/src/ChannelBase.ts index a5e966182..12b9b9ff9 100644 --- a/packages/channels/base/src/ChannelBase.ts +++ b/packages/channels/base/src/ChannelBase.ts @@ -21,6 +21,8 @@ export abstract class ChannelBase { protected name: string; private instructedSessions: Set = new Set(); private commands: Map = new Map(); + /** Per-session promise chain to serialize prompt + send. */ + private sessionQueues: Map> = new Map(); constructor( name: string, @@ -82,7 +84,11 @@ export abstract class ChannelBase { /** Register shared slash commands. Called from constructor. */ private registerSharedCommands(): void { const clearHandler: CommandHandler = async (envelope) => { - const removed = this.router.removeSession(this.name, envelope.senderId); + const removed = this.router.removeSession( + this.name, + envelope.senderId, + envelope.chatId, + ); if (removed) { this.instructedSessions.clear(); await this.sendMessage( @@ -132,7 +138,11 @@ export abstract class ChannelBase { }); this.registerCommand('status', async (envelope) => { - const hasSession = this.router.hasSession(this.name, envelope.senderId); + const hasSession = this.router.hasSession( + this.name, + envelope.senderId, + envelope.chatId, + ); const policy = this.config.senderPolicy; const lines = [ `Session: ${hasSession ? 'active' : 'none'}`, @@ -209,14 +219,24 @@ export abstract class ChannelBase { this.instructedSessions.add(sessionId); } - const response = await this.bridge.prompt(sessionId, promptText, { - imageBase64: envelope.imageBase64, - imageMimeType: envelope.imageMimeType, - }); + // Serialize prompt + send per session to prevent textChunk listener + // pollution when concurrent messages hit the same session. + const prev = this.sessionQueues.get(sessionId) ?? Promise.resolve(); + const current = prev.then(async () => { + const response = await this.bridge.prompt(sessionId, promptText, { + imageBase64: envelope.imageBase64, + imageMimeType: envelope.imageMimeType, + }); - if (response) { - await this.sendMessage(envelope.chatId, response); - } + if (response) { + await this.sendMessage(envelope.chatId, response); + } + }); + this.sessionQueues.set( + sessionId, + current.catch(() => {}), + ); + await current; } protected async onPairingRequired( diff --git a/packages/channels/base/src/SessionRouter.ts b/packages/channels/base/src/SessionRouter.ts index 34217861f..50a52319f 100644 --- a/packages/channels/base/src/SessionRouter.ts +++ b/packages/channels/base/src/SessionRouter.ts @@ -48,7 +48,7 @@ export class SessionRouter { return `${channelName}:__single__`; case 'user': default: - return `${channelName}:${senderId}`; + return `${channelName}:${senderId}:${chatId}`; } } @@ -78,18 +78,46 @@ export class SessionRouter { return this.toTarget.get(sessionId); } - hasSession(channelName: string, senderId: string): boolean { - return this.toSession.has(`${channelName}:${senderId}`); + hasSession(channelName: string, senderId: string, chatId?: string): boolean { + const key = chatId + ? this.routingKey(channelName, senderId, chatId) + : `${channelName}:${senderId}`; + // If chatId is provided, do exact lookup; otherwise prefix-scan for any match + if (chatId) return this.toSession.has(key); + for (const k of this.toSession.keys()) { + if (k.startsWith(`${channelName}:${senderId}`)) return true; + } + return false; } - removeSession(channelName: string, senderId: string): boolean { - const key = `${channelName}:${senderId}`; + removeSession( + channelName: string, + senderId: string, + chatId?: string, + ): boolean { + if (chatId) { + const key = this.routingKey(channelName, senderId, chatId); + return this.deleteByKey(key); + } + // No chatId: remove all sessions for this sender on this channel + let removed = false; + const prefix = `${channelName}:${senderId}`; + for (const k of [...this.toSession.keys()]) { + if (k.startsWith(prefix)) { + this.deleteByKey(k); + removed = true; + } + } + if (removed) this.persist(); + return removed; + } + + private deleteByKey(key: string): boolean { const sessionId = this.toSession.get(key); if (!sessionId) return false; this.toSession.delete(key); this.toTarget.delete(sessionId); this.toCwd.delete(sessionId); - this.persist(); return true; }