feat(channels): add dispatch modes and prompt lifecycle hooks

Add three dispatch modes for handling concurrent messages:
- steer (default): cancel current prompt and start new one
- collect: buffer messages and coalesce into follow-up prompt
- followup: queue messages for sequential processing

Introduce onPromptStart/onPromptEnd lifecycle hooks for working
indicators. These fire only when a prompt actually begins processing,
not for buffered (collect mode) or gated/blocked messages.

Refactor Telegram, WeChat, and DingTalk adapters to use the new hooks
instead of overriding handleInbound, simplifying the working indicator
pattern and ensuring correct behavior with dispatch modes.

This enables better UX for async workflows and prevents indicator
leaks when messages are buffered or cancelled.
This commit is contained in:
tanzhenxin 2026-03-28 06:19:02 +00:00
parent 9fc2abbed2
commit 7251da0152
10 changed files with 649 additions and 124 deletions

View file

@ -152,13 +152,15 @@ this.registerCommand('mycommand', async (envelope, args) => {
});
```
**Working indicators** — override `handleInbound()` to show platform-specific typing indicators:
**Working indicators** — override `onPromptStart()` and `onPromptEnd()` to show platform-specific typing indicators. These hooks fire only when a prompt actually begins processing — not for buffered messages (collect mode) or gated/blocked messages:
```typescript
override async handleInbound(envelope: Envelope): Promise<void> {
await this.platformClient.sendTyping(envelope.chatId); // your platform API
try { await super.handleInbound(envelope); }
finally { await this.platformClient.stopTyping(envelope.chatId); }
protected override onPromptStart(chatId: string, sessionId: string, messageId?: string): void {
this.platformClient.sendTyping(chatId); // your platform API
}
protected override onPromptEnd(chatId: string, sessionId: string, messageId?: string): void {
this.platformClient.stopTyping(chatId);
}
```

View file

@ -47,23 +47,24 @@ Channels are configured under the `channels` key in `settings.json`. Each channe
### Options
| Option | Required | Description |
| ------------------------ | -------- | ---------------------------------------------------------------------------------------------------------------------------------------- |
| `type` | Yes | Channel type: `telegram`, `weixin`, `dingtalk`, or a custom type from an extension (see [Plugins](./plugins)) |
| `token` | Telegram | Bot token. Supports `$ENV_VAR` syntax to read from environment variables. Not needed for WeChat or DingTalk |
| `clientId` | DingTalk | DingTalk AppKey. Supports `$ENV_VAR` syntax |
| `clientSecret` | DingTalk | DingTalk AppSecret. Supports `$ENV_VAR` syntax |
| `model` | No | Model to use for this channel (e.g., `qwen3.5-plus`). Overrides the default model. Useful for multimodal models that support image input |
| `senderPolicy` | No | Who can talk to the bot: `allowlist` (default), `open`, or `pairing` |
| `allowedUsers` | No | List of user IDs allowed to use the bot (used by `allowlist` and `pairing` policies) |
| `sessionScope` | No | How sessions are scoped: `user` (default), `thread`, or `single` |
| `cwd` | No | Working directory for the agent. Defaults to the current directory |
| `instructions` | No | Custom instructions prepended to the first message of each session |
| `groupPolicy` | No | Group chat access: `disabled` (default), `allowlist`, or `open`. See [Group Chats](#group-chats) |
| `groups` | No | Per-group settings. Keys are group chat IDs or `"*"` for defaults. See [Group Chats](#group-chats) |
| `blockStreaming` | No | Progressive response delivery: `on` or `off` (default). See [Block Streaming](#block-streaming) |
| `blockStreamingChunk` | No | Chunk size bounds: `{ "minChars": 400, "maxChars": 1000 }`. See [Block Streaming](#block-streaming) |
| `blockStreamingCoalesce` | No | Idle flush: `{ "idleMs": 1500 }`. See [Block Streaming](#block-streaming) |
| Option | Required | Description |
| ------------------------ | -------- | ---------------------------------------------------------------------------------------------------------------------------------------------- |
| `type` | Yes | Channel type: `telegram`, `weixin`, `dingtalk`, or a custom type from an extension (see [Plugins](./plugins)) |
| `token` | Telegram | Bot token. Supports `$ENV_VAR` syntax to read from environment variables. Not needed for WeChat or DingTalk |
| `clientId` | DingTalk | DingTalk AppKey. Supports `$ENV_VAR` syntax |
| `clientSecret` | DingTalk | DingTalk AppSecret. Supports `$ENV_VAR` syntax |
| `model` | No | Model to use for this channel (e.g., `qwen3.5-plus`). Overrides the default model. Useful for multimodal models that support image input |
| `senderPolicy` | No | Who can talk to the bot: `allowlist` (default), `open`, or `pairing` |
| `allowedUsers` | No | List of user IDs allowed to use the bot (used by `allowlist` and `pairing` policies) |
| `sessionScope` | No | How sessions are scoped: `user` (default), `thread`, or `single` |
| `cwd` | No | Working directory for the agent. Defaults to the current directory |
| `instructions` | No | Custom instructions prepended to the first message of each session |
| `groupPolicy` | No | Group chat access: `disabled` (default), `allowlist`, or `open`. See [Group Chats](#group-chats) |
| `groups` | No | Per-group settings. Keys are group chat IDs or `"*"` for defaults. See [Group Chats](#group-chats) |
| `dispatchMode` | No | What happens when you send a message while the bot is busy: `steer` (default), `collect`, or `followup`. See [Dispatch Modes](#dispatch-modes) |
| `blockStreaming` | No | Progressive response delivery: `on` or `off` (default). See [Block Streaming](#block-streaming) |
| `blockStreamingChunk` | No | Chunk size bounds: `{ "minChars": 400, "maxChars": 1000 }`. See [Block Streaming](#block-streaming) |
| `blockStreamingCoalesce` | No | Idle flush: `{ "idleMs": 1500 }`. See [Block Streaming](#block-streaming) |
### Sender Policy
@ -222,6 +223,37 @@ Files work with any model — no multimodal support required.
| Files | Direct download via Bot API (20MB limit) | CDN download with AES decryption | downloadCode API (two-step) |
| Captions | Photo/file captions included as message text | Not applicable | Rich text: mixed text + images in one message |
## Dispatch Modes
Controls what happens when you send a new message while the bot is still processing a previous one.
- **`steer`** (default) — The bot cancels the current request and starts working on your new message. Best for normal chat, where a follow-up usually means you want to correct or redirect the bot.
- **`collect`** — Your new messages are buffered. When the current request finishes, all buffered messages are combined into a single follow-up prompt. Good for async workflows where you want to queue up thoughts.
- **`followup`** — Each message is queued and processed as its own separate turn, in order. Useful for batch workflows where each message is independent.
```json
{
"channels": {
"my-channel": {
"type": "telegram",
"dispatchMode": "steer",
...
}
}
}
```
You can also set dispatch mode per group, overriding the channel default:
```json
{
"groups": {
"*": { "requireMention": true, "dispatchMode": "steer" },
"-100123456": { "dispatchMode": "collect" }
}
}
```
## Block Streaming
By default, the agent works for a while and then sends one large response. With block streaming enabled, the response arrives as multiple shorter messages while the agent is still working — similar to how ChatGPT or Claude show progressive output.

View file

@ -174,6 +174,11 @@ export class AcpBridge extends EventEmitter {
return chunks.join('');
}
async cancelSession(sessionId: string): Promise<void> {
const conn = this.ensureConnection();
await conn.cancel({ sessionId });
}
stop(): void {
if (this.child) {
this.child.kill();

View file

@ -9,6 +9,13 @@ import type { ChannelBaseOptions } from './ChannelBase.js';
class TestChannel extends ChannelBase {
sent: Array<{ chatId: string; text: string }> = [];
connected = false;
promptStarts: Array<{
chatId: string;
sessionId: string;
messageId?: string;
}> = [];
promptEnds: Array<{ chatId: string; sessionId: string; messageId?: string }> =
[];
async connect() {
this.connected = true;
@ -19,6 +26,22 @@ class TestChannel extends ChannelBase {
disconnect() {
this.connected = false;
}
protected override onPromptStart(
chatId: string,
sessionId: string,
messageId?: string,
): void {
this.promptStarts.push({ chatId, sessionId, messageId });
}
protected override onPromptEnd(
chatId: string,
sessionId: string,
messageId?: string,
): void {
this.promptEnds.push({ chatId, sessionId, messageId });
}
}
function createBridge(): AcpBridge {
@ -377,6 +400,342 @@ describe('ChannelBase', () => {
});
});
describe('dispatch modes', () => {
it('collect: buffers messages and coalesces into one followup prompt', async () => {
// Make the first prompt "slow" — we control when it resolves
let resolveFirst!: (v: string) => void;
const firstPrompt = new Promise<string>((r) => {
resolveFirst = r;
});
let callCount = 0;
(bridge.prompt as ReturnType<typeof vi.fn>).mockImplementation(() => {
callCount++;
if (callCount === 1) return firstPrompt;
return Promise.resolve('coalesced response');
});
const ch = createChannel({ dispatchMode: 'collect' });
// Send first message — starts processing
const p1 = ch.handleInbound(envelope({ text: 'first' }));
// Wait a tick for the prompt to be registered as active
await new Promise((r) => setTimeout(r, 10));
// Send two more messages while first is busy — these should buffer
const p2 = ch.handleInbound(envelope({ text: 'second' }));
const p3 = ch.handleInbound(envelope({ text: 'third' }));
// p2 and p3 should resolve immediately (buffered, not queued)
await p2;
await p3;
// First prompt is still running, bridge.prompt called only once
expect(callCount).toBe(1);
// Resolve the first prompt
resolveFirst('first response');
await p1;
// Wait for the coalesced followup to process
await new Promise((r) => setTimeout(r, 50));
// bridge.prompt should have been called twice: original + coalesced
expect(callCount).toBe(2);
// The second call should contain both buffered messages coalesced
const secondCallText = (bridge.prompt as ReturnType<typeof vi.fn>).mock
.calls[1][1] as string;
expect(secondCallText).toContain('second');
expect(secondCallText).toContain('third');
// Both responses should have been sent
expect(ch.sent).toEqual(
expect.arrayContaining([
expect.objectContaining({ text: 'first response' }),
expect.objectContaining({ text: 'coalesced response' }),
]),
);
});
it('collect: no followup if no messages buffered', async () => {
const ch = createChannel({ dispatchMode: 'collect' });
await ch.handleInbound(envelope({ text: 'only message' }));
expect(bridge.prompt).toHaveBeenCalledTimes(1);
expect(ch.sent).toHaveLength(1);
});
it('steer: cancels running prompt and re-prompts with cancellation note', async () => {
let resolveFirst!: (v: string) => void;
const firstPrompt = new Promise<string>((r) => {
resolveFirst = r;
});
let callCount = 0;
(bridge.prompt as ReturnType<typeof vi.fn>).mockImplementation(() => {
callCount++;
if (callCount === 1) return firstPrompt;
return Promise.resolve('steered response');
});
// Add cancelSession mock
(bridge as unknown as Record<string, unknown>).cancelSession = vi
.fn()
.mockImplementation(() => {
// Simulate cancellation — resolve the first prompt
resolveFirst('cancelled partial');
return Promise.resolve();
});
const ch = createChannel({ dispatchMode: 'steer' });
// Send first message — starts processing
const p1 = ch.handleInbound(envelope({ text: 'refactor auth' }));
// Wait for prompt to register as active
await new Promise((r) => setTimeout(r, 10));
// Send correction while first is busy
const p2 = ch.handleInbound(
envelope({ text: 'actually refactor billing' }),
);
// Both should resolve
await p1;
await p2;
// cancelSession should have been called
expect(
(bridge as unknown as Record<string, () => unknown>).cancelSession,
).toHaveBeenCalledTimes(1);
// First prompt's response should NOT have been sent (it was cancelled)
expect(ch.sent).not.toEqual(
expect.arrayContaining([
expect.objectContaining({ text: 'cancelled partial' }),
]),
);
// Second prompt should include the cancellation note
const secondCallText = (bridge.prompt as ReturnType<typeof vi.fn>).mock
.calls[1][1] as string;
expect(secondCallText).toContain('previous request has been cancelled');
expect(secondCallText).toContain('actually refactor billing');
// Steered response should have been sent
expect(ch.sent).toEqual(
expect.arrayContaining([
expect.objectContaining({ text: 'steered response' }),
]),
);
});
it('followup: queues messages sequentially', async () => {
let resolveFirst!: (v: string) => void;
const firstPrompt = new Promise<string>((r) => {
resolveFirst = r;
});
let callCount = 0;
(bridge.prompt as ReturnType<typeof vi.fn>).mockImplementation(() => {
callCount++;
if (callCount === 1) return firstPrompt;
return Promise.resolve(`response-${callCount}`);
});
const ch = createChannel({ dispatchMode: 'followup' });
// Send first message
const p1 = ch.handleInbound(envelope({ text: 'task one' }));
// Wait for prompt to start
await new Promise((r) => setTimeout(r, 10));
// Send second message — should queue (not buffer)
const p2 = ch.handleInbound(envelope({ text: 'task two' }));
// Only first prompt should be running
expect(callCount).toBe(1);
// Resolve first
resolveFirst('response-1');
await p1;
await p2;
// Both prompts ran sequentially
expect(callCount).toBe(2);
// Both got their own response
expect(ch.sent).toEqual([
expect.objectContaining({ text: 'response-1' }),
expect.objectContaining({ text: 'response-2' }),
]);
});
it('steer is the default mode when dispatchMode not set', async () => {
let resolveFirst!: (v: string) => void;
const firstPrompt = new Promise<string>((r) => {
resolveFirst = r;
});
let callCount = 0;
(bridge.prompt as ReturnType<typeof vi.fn>).mockImplementation(() => {
callCount++;
if (callCount === 1) return firstPrompt;
return Promise.resolve('steered response');
});
// Add cancelSession mock
(bridge as unknown as Record<string, unknown>).cancelSession = vi
.fn()
.mockImplementation(() => {
resolveFirst('cancelled');
return Promise.resolve();
});
// No dispatchMode set — should default to steer
const ch = createChannel();
const p1 = ch.handleInbound(envelope({ text: 'first' }));
await new Promise((r) => setTimeout(r, 10));
// Second message should cancel the first (steer behavior)
const p2 = ch.handleInbound(envelope({ text: 'second' }));
await p1;
await p2;
// cancelSession should have been called (steer behavior)
expect(
(bridge as unknown as Record<string, () => unknown>).cancelSession,
).toHaveBeenCalledTimes(1);
// Both prompts ran
expect(callCount).toBe(2);
});
it('per-group dispatchMode overrides channel-level', async () => {
let resolveFirst!: (v: string) => void;
const firstPrompt = new Promise<string>((r) => {
resolveFirst = r;
});
let callCount = 0;
(bridge.prompt as ReturnType<typeof vi.fn>).mockImplementation(() => {
callCount++;
if (callCount === 1) return firstPrompt;
return Promise.resolve(`response-${callCount}`);
});
// Channel default is collect, but group overrides to followup
const ch = createChannel({
dispatchMode: 'collect',
groupPolicy: 'open',
groups: { 'group-1': { dispatchMode: 'followup' } },
});
const groupEnv = envelope({
isGroup: true,
isMentioned: true,
chatId: 'group-1',
});
const p1 = ch.handleInbound({ ...groupEnv, text: 'first' });
await new Promise((r) => setTimeout(r, 10));
// In followup mode, second message queues (doesn't buffer and return)
const p2Promise = ch.handleInbound({ ...groupEnv, text: 'second' });
expect(callCount).toBe(1);
resolveFirst('response-1');
await p1;
await p2Promise;
// Both ran sequentially — followup behavior
expect(callCount).toBe(2);
expect(ch.sent).toEqual([
expect.objectContaining({ text: 'response-1' }),
expect.objectContaining({ text: 'response-2' }),
]);
});
});
describe('prompt lifecycle hooks', () => {
it('calls onPromptStart and onPromptEnd for each prompt', async () => {
const ch = createChannel();
await ch.handleInbound(envelope({ text: 'hello' }));
expect(ch.promptStarts).toHaveLength(1);
expect(ch.promptStarts[0]!.chatId).toBe('chat1');
expect(ch.promptEnds).toHaveLength(1);
expect(ch.promptEnds[0]!.chatId).toBe('chat1');
});
it('passes messageId to hooks', async () => {
const ch = createChannel();
await ch.handleInbound(envelope({ text: 'hello', messageId: 'msg-42' }));
expect(ch.promptStarts[0]!.messageId).toBe('msg-42');
expect(ch.promptEnds[0]!.messageId).toBe('msg-42');
});
it('does not call hooks for gated messages', async () => {
const ch = createChannel({
senderPolicy: 'allowlist',
allowedUsers: ['admin'],
});
await ch.handleInbound(envelope({ senderId: 'stranger' }));
expect(ch.promptStarts).toHaveLength(0);
expect(ch.promptEnds).toHaveLength(0);
});
it('does not call hooks for buffered messages in collect mode', async () => {
let resolveFirst!: (v: string) => void;
const firstPrompt = new Promise<string>((r) => {
resolveFirst = r;
});
let callCount = 0;
(bridge.prompt as ReturnType<typeof vi.fn>).mockImplementation(() => {
callCount++;
if (callCount === 1) return firstPrompt;
return Promise.resolve('ok');
});
const ch = createChannel({ dispatchMode: 'collect' });
const p1 = ch.handleInbound(
envelope({ text: 'first', messageId: 'msg-1' }),
);
await new Promise((r) => setTimeout(r, 10));
// This message gets buffered — should NOT trigger hooks
await ch.handleInbound(envelope({ text: 'second', messageId: 'msg-2' }));
// Only one prompt start so far (for the first message)
expect(ch.promptStarts).toHaveLength(1);
expect(ch.promptStarts[0]!.messageId).toBe('msg-1');
resolveFirst('done');
await p1;
await new Promise((r) => setTimeout(r, 50));
// After coalesced prompt runs, we should have 2 start/end pairs
expect(ch.promptStarts).toHaveLength(2);
expect(ch.promptEnds).toHaveLength(2);
});
it('calls onPromptEnd even when prompt throws', async () => {
(bridge.prompt as ReturnType<typeof vi.fn>).mockRejectedValue(
new Error('agent error'),
);
const ch = createChannel();
// handleInbound catches the error internally
await ch.handleInbound(envelope({ text: 'hello' })).catch(() => {});
expect(ch.promptStarts).toHaveLength(1);
expect(ch.promptEnds).toHaveLength(1);
});
});
describe('isLocalCommand', () => {
it('returns true for registered commands', () => {
const ch = createChannel();

View file

@ -1,4 +1,4 @@
import type { ChannelConfig, Envelope } from './types.js';
import type { ChannelConfig, DispatchMode, Envelope } from './types.js';
import { BlockStreamer } from './BlockStreamer.js';
import { GroupGate } from './GroupGate.js';
import { SenderGate } from './SenderGate.js';
@ -22,9 +22,20 @@ export abstract class ChannelBase {
protected name: string;
private instructedSessions: Set<string> = new Set();
private commands: Map<string, CommandHandler> = new Map();
/** Per-session promise chain to serialize prompt + send. */
/** Per-session promise chain to serialize prompt + send (followup mode). */
private sessionQueues: Map<string, Promise<void>> = new Map();
/** Per-session active prompt tracking for dispatch modes. */
private activePrompts: Map<
string,
{ cancelled: boolean; done: Promise<void>; resolve: () => void }
> = new Map();
/** Per-session message buffer for collect mode. */
private collectBuffers: Map<
string,
Array<{ text: string; envelope: Envelope }>
> = new Map();
constructor(
name: string,
config: ChannelConfig,
@ -73,6 +84,27 @@ export abstract class ChannelBase {
onToolCall(_chatId: string, _event: ToolCallEvent): void {}
/**
* Called when a prompt actually begins processing (inside the session queue).
* Override to show a platform-specific working indicator (e.g., typing, reaction).
* Not called for buffered messages (collect mode) or gated/blocked messages.
*/
protected onPromptStart(
_chatId: string,
_sessionId: string,
_messageId?: string,
): void {}
/**
* Called when a prompt finishes (response sent or cancelled).
* Override to hide the working indicator.
*/
protected onPromptEnd(
_chatId: string,
_sessionId: string,
_messageId?: string,
): void {}
/**
* Called for each text chunk as the agent streams its response.
* Override to implement progressive display (e.g., updating an AI card in-place).
@ -266,11 +298,64 @@ export abstract class ChannelBase {
this.instructedSessions.add(sessionId);
}
// Serialize prompt + send per session to prevent textChunk listener
// pollution when concurrent messages hit the same session.
// Resolve dispatch mode: per-group override → channel config → default
const groupCfg = envelope.isGroup
? this.config.groups[envelope.chatId] || this.config.groups['*']
: undefined;
const mode: DispatchMode =
groupCfg?.dispatchMode || this.config.dispatchMode || 'steer';
const active = this.activePrompts.get(sessionId);
if (active) {
// A prompt is already running for this session
switch (mode) {
case 'collect': {
// Buffer the message; it will be coalesced when the active prompt finishes
let buffer = this.collectBuffers.get(sessionId);
if (!buffer) {
buffer = [];
this.collectBuffers.set(sessionId, buffer);
}
buffer.push({ text: promptText, envelope });
return;
}
case 'steer': {
// Cancel the running prompt, then fall through to send a new one
active.cancelled = true;
await this.bridge.cancelSession(sessionId).catch(() => {});
// Wait for the active prompt to finish winding down
await active.done;
// Prepend a cancellation note so the agent understands context
promptText = `[The user sent a new message while you were working. Their previous request has been cancelled.]\n\n${promptText}`;
break;
}
case 'followup': {
// Chain onto the session queue (existing sequential behavior)
break;
}
default: {
// Exhaustive check — should never happen
const _exhaustive: never = mode;
throw new Error(`Unknown dispatch mode: ${_exhaustive}`);
}
}
}
// Run the prompt (with followup-mode serialization for safety)
const prev = this.sessionQueues.get(sessionId) ?? Promise.resolve();
const useBlockStreaming = this.config.blockStreaming === 'on';
const current = prev.then(async () => {
// Register this prompt as active
let doneResolve: () => void = () => {};
const done = new Promise<void>((r) => {
doneResolve = r;
});
const promptState = { cancelled: false, done, resolve: doneResolve };
this.activePrompts.set(sessionId, promptState);
this.onPromptStart(envelope.chatId, sessionId, envelope.messageId);
const streamer = useBlockStreaming
? new BlockStreamer({
minChars: this.config.blockStreamingChunk?.minChars ?? 400,
@ -280,7 +365,6 @@ export abstract class ChannelBase {
})
: null;
// Forward streaming chunks to the subclass hook (and block streamer)
const onChunk = (sid: string, chunk: string) => {
if (sid === sessionId) {
this.onResponseChunk(envelope.chatId, chunk, sessionId);
@ -295,7 +379,8 @@ export abstract class ChannelBase {
imageMimeType,
});
if (response) {
// If cancelled (steer mode), skip sending the response
if (!promptState.cancelled && response) {
if (streamer) {
await streamer.flush();
} else {
@ -304,6 +389,30 @@ export abstract class ChannelBase {
}
} finally {
this.bridge.off('textChunk', onChunk);
this.onPromptEnd(envelope.chatId, sessionId, envelope.messageId);
this.activePrompts.delete(sessionId);
// Signal any steer waiter that we're done
promptState.resolve();
// Drain collect buffer if any messages accumulated
const buffer = this.collectBuffers.get(sessionId);
if (buffer && buffer.length > 0) {
this.collectBuffers.delete(sessionId);
const coalesced = buffer.map((b) => b.text).join('\n\n');
const lastEnvelope = buffer[buffer.length - 1]!.envelope;
// Re-enter handleInbound with the coalesced message
const syntheticEnvelope: Envelope = {
...lastEnvelope,
text: coalesced,
// Clear attachments/references — already resolved in original text
referencedText: undefined,
attachments: undefined,
imageBase64: undefined,
imageMimeType: undefined,
};
// Queue the coalesced prompt (don't await to avoid deadlock on the queue)
this.handleInbound(syntheticEnvelope).catch(() => {});
}
}
});
this.sessionQueues.set(

View file

@ -22,6 +22,7 @@ export type {
ChannelConfig,
ChannelPlugin,
ChannelType,
DispatchMode,
Envelope,
GroupConfig,
GroupPolicy,

View file

@ -5,9 +5,11 @@ export type SenderPolicy = 'allowlist' | 'pairing' | 'open';
export type SessionScope = 'user' | 'thread' | 'single';
export type ChannelType = string;
export type GroupPolicy = 'disabled' | 'allowlist' | 'open';
export type DispatchMode = 'collect' | 'steer' | 'followup';
export interface GroupConfig {
requireMention?: boolean; // default: true
dispatchMode?: DispatchMode;
}
export interface BlockStreamingChunkConfig {
@ -37,6 +39,9 @@ export interface ChannelConfig {
groupPolicy: GroupPolicy; // default: "disabled"
groups: Record<string, GroupConfig>; // "*" for defaults, group IDs for overrides
/** Dispatch mode for concurrent messages. Default: 'collect'. */
dispatchMode?: DispatchMode;
/** Enable block streaming — emit completed blocks as separate messages. */
blockStreaming?: 'on' | 'off';
/** Chunk size bounds for block streaming. */

View file

@ -81,6 +81,8 @@ export class DingtalkChannel extends ChannelBase {
private dedupTimer?: ReturnType<typeof setInterval>;
/** Map conversationId → latest sessionWebhook URL for sending replies. */
private webhooks: Map<string, string> = new Map();
/** Map messageId → conversationId for reaction attach/recall in hooks. */
private reactionContext: Map<string, string> = new Map();
constructor(
name: string,
@ -236,6 +238,31 @@ export class DingtalkChannel extends ChannelBase {
process.stderr.write(`[DingTalk:${this.name}] Disconnected.\n`);
}
protected override onPromptStart(
_chatId: string,
_sessionId: string,
messageId?: string,
): void {
if (!messageId) return;
const convId = this.reactionContext.get(messageId);
if (convId) {
this.attachReaction(messageId, convId).catch(() => {});
}
}
protected override onPromptEnd(
_chatId: string,
_sessionId: string,
messageId?: string,
): void {
if (!messageId) return;
const convId = this.reactionContext.get(messageId);
if (convId) {
this.recallReaction(messageId, convId).catch(() => {});
this.reactionContext.delete(messageId);
}
}
/**
* Extract quoted/referenced message context from a reply.
* DingTalk provides this via text.repliedMsg (newer) or quoteMessage (legacy).
@ -515,30 +542,26 @@ export class DingtalkChannel extends ChannelBase {
referencedText: quoted.referencedText,
};
// Attach 👀 reaction, process message, then recall reaction
const reactionMsgId = msgId;
const reactionConvId = conversationId;
// Store messageId + conversationId for reaction hooks
envelope.messageId = msgId;
if (msgId && conversationId) {
this.reactionContext.set(msgId, conversationId);
}
const processMessage = async () => {
if (reactionMsgId && reactionConvId) {
this.attachReaction(reactionMsgId, reactionConvId).catch(() => {});
}
try {
// Download media if present (first downloadCode only for images)
if (content.downloadCodes.length > 0 && content.mediaType) {
await this.attachMedia(
envelope,
content.downloadCodes[0]!,
content.mediaType,
content.fileName,
);
}
await this.handleInbound(envelope);
} finally {
if (reactionMsgId && reactionConvId) {
this.recallReaction(reactionMsgId, reactionConvId).catch(() => {});
}
// Download media if present (first downloadCode only for images)
if (content.downloadCodes.length > 0 && content.mediaType) {
await this.attachMedia(
envelope,
content.downloadCodes[0]!,
content.mediaType,
content.fileName,
);
}
// reactionContext cleanup is handled by onPromptEnd (not here),
// because in collect mode handleInbound returns immediately after
// buffering — the context must survive until the prompt actually runs.
await this.handleInbound(envelope);
};
// Don't await — stream callback should return quickly

View file

@ -161,31 +161,25 @@ export class TelegramChannel extends ChannelBase {
process.once('SIGTERM', () => this.bot.stop('SIGTERM'));
}
override async handleInbound(envelope: Envelope): Promise<void> {
// Check group gate before showing "Working..." indicator
const groupResult = this.groupGate.check(envelope);
if (!groupResult.allowed) {
return;
}
/** Per-chat typing interval — repeats every 4s since Telegram expires it after 5s. */
private typingIntervals = new Map<string, ReturnType<typeof setInterval>>();
// Skip "Working..." for local slash commands — they respond instantly
const isLocalCommand =
envelope.text.startsWith('/') && this.isLocalCommand(envelope.text);
protected override onPromptStart(chatId: string): void {
// Clear any stale interval (shouldn't happen, but safe)
const existing = this.typingIntervals.get(chatId);
if (existing) clearInterval(existing);
const workingMsg = isLocalCommand
? null
: await this.bot.telegram
.sendMessage(envelope.chatId, 'Working...')
.catch(() => null);
const sendTyping = () =>
this.bot.telegram.sendChatAction(chatId, 'typing').catch(() => {});
sendTyping();
this.typingIntervals.set(chatId, setInterval(sendTyping, 4000));
}
try {
await super.handleInbound(envelope);
} finally {
if (workingMsg) {
this.bot.telegram
.deleteMessage(envelope.chatId, workingMsg.message_id)
.catch(() => {});
}
protected override onPromptEnd(chatId: string): void {
const interval = this.typingIntervals.get(chatId);
if (interval) {
clearInterval(interval);
this.typingIntervals.delete(chatId);
}
}

View file

@ -93,68 +93,63 @@ export class WeixinChannel extends ChannelBase {
);
}
protected override onPromptStart(chatId: string): void {
this.setTyping(chatId, true).catch(() => {});
}
protected override onPromptEnd(chatId: string): void {
this.setTyping(chatId, false).catch(() => {});
}
private async handleInboundWithMedia(
envelope: Envelope,
image?: CdnRef,
file?: FileCdnRef,
): Promise<void> {
// Check group gate before showing typing
const groupResult = this.groupGate.check(envelope);
if (!groupResult.allowed) {
return;
// Download image from CDN
if (image) {
try {
const imageData = await downloadAndDecrypt(
image.encryptQueryParam,
image.aesKey,
);
envelope.imageBase64 = imageData.toString('base64');
envelope.imageMimeType = detectImageMime(imageData);
} catch (err) {
process.stderr.write(
`[Weixin:${this.name}] Failed to download image: ${err instanceof Error ? err.message : err}\n`,
);
}
}
// Show typing indicator immediately — before CDN download
await this.setTyping(envelope.chatId, true);
try {
// Download image from CDN (after typing has started)
if (image) {
try {
const imageData = await downloadAndDecrypt(
image.encryptQueryParam,
image.aesKey,
);
envelope.imageBase64 = imageData.toString('base64');
envelope.imageMimeType = detectImageMime(imageData);
} catch (err) {
process.stderr.write(
`[Weixin:${this.name}] Failed to download image: ${err instanceof Error ? err.message : err}\n`,
);
}
// Download file from CDN, save to temp dir
if (file) {
try {
const fileData = await downloadAndDecrypt(
file.encryptQueryParam,
file.aesKey,
);
const dir = join(tmpdir(), 'channel-files');
if (!existsSync(dir)) mkdirSync(dir, { recursive: true });
const filePath = join(dir, file.fileName);
writeFileSync(filePath, fileData);
envelope.attachments = [
{
type: 'file',
filePath,
mimeType: 'application/octet-stream',
fileName: file.fileName,
},
];
} catch (err) {
process.stderr.write(
`[Weixin:${this.name}] Failed to download file: ${err instanceof Error ? err.message : err}\n`,
);
envelope.text = `(User sent a file "${file.fileName}" but download failed)`;
}
// Download file from CDN, save to temp dir
if (file) {
try {
const fileData = await downloadAndDecrypt(
file.encryptQueryParam,
file.aesKey,
);
const dir = join(tmpdir(), 'channel-files');
if (!existsSync(dir)) mkdirSync(dir, { recursive: true });
const filePath = join(dir, file.fileName);
writeFileSync(filePath, fileData);
envelope.attachments = [
{
type: 'file',
filePath,
mimeType: 'application/octet-stream',
fileName: file.fileName,
},
];
} catch (err) {
process.stderr.write(
`[Weixin:${this.name}] Failed to download file: ${err instanceof Error ? err.message : err}\n`,
);
envelope.text = `(User sent a file "${file.fileName}" but download failed)`;
}
}
await super.handleInbound(envelope);
} finally {
await this.setTyping(envelope.chatId, false);
}
await super.handleInbound(envelope);
}
async sendMessage(chatId: string, text: string): Promise<void> {