import { mkdirSync, writeFileSync } from 'node:fs'; import { randomUUID } from 'node:crypto'; import { basename, join } from 'node:path'; import { tmpdir } from 'node:os'; import { DWClient, TOPIC_ROBOT, EventAck } from 'dingtalk-stream-sdk-nodejs'; import type { DWClientDownStream } from 'dingtalk-stream-sdk-nodejs'; import { ChannelBase } from '@qwen-code/channel-base'; import { normalizeDingTalkMarkdown, extractTitle } from './markdown.js'; import { downloadMedia } from './media.js'; import type { ChannelConfig, ChannelBaseOptions, Envelope, AcpBridge, } from '@qwen-code/channel-base'; /** * Raw DingTalk message data — the SDK's RobotMessage type only covers text, * but DingTalk sends richer payloads for richText, picture, file, etc. */ interface DingTalkRichTextPart { type?: string; text?: string; downloadCode?: string; atName?: string; } interface DingTalkRepliedMsg { msgId?: string; msgType?: string; senderId?: string; content?: { text?: string; richText?: DingTalkRichTextPart[]; downloadCode?: string; fileName?: string; }; } interface DingTalkMessageData { msgId?: string; msgtype?: string; conversationType?: string; conversationId?: string; sessionWebhook?: string; senderId?: string; senderStaffId?: string; senderNick?: string; chatbotUserId?: string; isInAtList?: boolean; text?: { content?: string; isReplyMsg?: boolean; repliedMsg?: DingTalkRepliedMsg; }; quoteMessage?: { msgId?: string; senderId?: string; text?: { content?: string }; msgtype?: string; }; content?: { richText?: DingTalkRichTextPart[]; downloadCode?: string; fileName?: string; recognition?: string; }; } /** Track seen msgIds to deduplicate retried callbacks. */ const DEDUP_TTL_MS = 5 * 60 * 1000; // 5 minutes const ACK_REACTION_NAME = '👀'; const ACK_EMOTION_ID = '2659900'; const ACK_EMOTION_BG_ID = 'im_bg_1'; const EMOTION_API = 'https://api.dingtalk.com/v1.0/robot/emotion'; export class DingtalkChannel extends ChannelBase { private client: DWClient; private seenMessages: Map = new Map(); private dedupTimer?: ReturnType; /** Map conversationId → latest sessionWebhook URL for sending replies. */ private webhooks: Map = new Map(); /** Map messageId → conversationId for reaction attach/recall in hooks. */ private reactionContext: Map = new Map(); constructor( name: string, config: ChannelConfig, bridge: AcpBridge, options?: ChannelBaseOptions, ) { super(name, config, bridge, options); if (!config.clientId || !config.clientSecret) { throw new Error( `Channel "${name}" requires clientId and clientSecret for DingTalk.`, ); } this.client = new DWClient({ clientId: config.clientId, clientSecret: config.clientSecret, }); } async connect(): Promise { this.client.registerCallbackListener( TOPIC_ROBOT, (msg: DWClientDownStream) => { // ACK immediately so DingTalk doesn't retry this.client.send(msg.headers.messageId, { status: EventAck.SUCCESS, message: 'ok', }); this.onMessage(msg); }, ); await this.client.connect(); // Periodically clean up dedup map this.dedupTimer = setInterval(() => { const now = Date.now(); for (const [id, ts] of this.seenMessages) { if (now - ts > DEDUP_TTL_MS) { this.seenMessages.delete(id); } } }, 60_000); process.stderr.write(`[DingTalk:${this.name}] Connected via stream.\n`); } async sendMessage(chatId: string, text: string): Promise { // chatId is a conversationId — resolve to the latest sessionWebhook const webhook = this.webhooks.get(chatId); if (!webhook) { process.stderr.write( `[DingTalk:${this.name}] No webhook for chatId ${chatId}, cannot send.\n`, ); return; } const chunks = normalizeDingTalkMarkdown(text); const title = extractTitle(text); for (let i = 0; i < chunks.length; i++) { const chunk = chunks[i]!; const body = { msgtype: 'markdown', markdown: { title: i === 0 ? title : `${title} (cont.)`, text: chunk, }, }; const resp = await fetch(webhook, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(body), }); if (!resp.ok) { const detail = await resp.text().catch(() => ''); process.stderr.write( `[DingTalk:${this.name}] sendMessage failed: HTTP ${resp.status} ${detail}\n`, ); } } } private getAccessToken(): string | undefined { return this.client.getConfig().access_token; } private async emotionApi( endpoint: 'reply' | 'recall', msgId: string, conversationId: string, ): Promise { const token = this.getAccessToken(); if (!token) return; const robotCode = this.config.clientId; if (!robotCode || !msgId || !conversationId) return; try { const resp = await fetch(`${EMOTION_API}/${endpoint}`, { method: 'POST', headers: { 'x-acs-dingtalk-access-token': token, 'Content-Type': 'application/json', }, body: JSON.stringify({ robotCode, openMsgId: msgId, openConversationId: conversationId, emotionType: 2, emotionName: ACK_REACTION_NAME, textEmotion: { emotionId: ACK_EMOTION_ID, emotionName: ACK_REACTION_NAME, text: ACK_REACTION_NAME, backgroundId: ACK_EMOTION_BG_ID, }, }), }); if (!resp.ok) { const detail = await resp.text().catch(() => ''); process.stderr.write( `[DingTalk:${this.name}] emotion/${endpoint} failed: ${resp.status} ${detail}\n`, ); } } catch { // best-effort, don't break message flow } } private async attachReaction( msgId: string, conversationId: string, ): Promise { await this.emotionApi('reply', msgId, conversationId); } private async recallReaction( msgId: string, conversationId: string, ): Promise { await this.emotionApi('recall', msgId, conversationId); } disconnect(): void { if (this.dedupTimer) { clearInterval(this.dedupTimer); } this.client.disconnect(); process.stderr.write(`[DingTalk:${this.name}] Disconnected.\n`); } protected override onPromptStart( _chatId: string, _sessionId: string, messageId?: string, ): void { if (!messageId) return; const convId = this.reactionContext.get(messageId); if (convId) { this.attachReaction(messageId, convId).catch(() => {}); } } protected override onPromptEnd( _chatId: string, _sessionId: string, messageId?: string, ): void { if (!messageId) return; const convId = this.reactionContext.get(messageId); if (convId) { this.recallReaction(messageId, convId).catch(() => {}); this.reactionContext.delete(messageId); } } /** * Extract quoted/referenced message context from a reply. * DingTalk provides this via text.repliedMsg (newer) or quoteMessage (legacy). */ private extractQuotedContext(data: DingTalkMessageData): { referencedText?: string; isReplyToBot: boolean; } { // Newer format: text.repliedMsg if (data.text?.isReplyMsg && data.text.repliedMsg) { const replied = data.text.repliedMsg; const isReplyToBot = !!data.chatbotUserId && replied.senderId === data.chatbotUserId; // Note: DingTalk doesn't include content for interactiveCard replies // (bot responses sent via webhook). Only user message quotes have text. const text = this.summarizeRepliedContent(replied); return { referencedText: text || undefined, isReplyToBot }; } // Legacy format: quoteMessage if (data.quoteMessage) { const quote = data.quoteMessage; const isReplyToBot = !!data.chatbotUserId && quote.senderId === data.chatbotUserId; const text = quote.text?.content?.trim(); return { referencedText: text || undefined, isReplyToBot }; } return { isReplyToBot: false }; } /** * Build a text summary from a repliedMsg, handling text, richText, and * media message types with placeholders. */ private summarizeRepliedContent(replied: DingTalkRepliedMsg): string { const msgType = replied.msgType; const content = replied.content; // Direct text content if (content?.text?.trim()) { return content.text.trim(); } // RichText: concatenate text parts, placeholder for images if (content?.richText && Array.isArray(content.richText)) { const parts: string[] = []; for (const part of content.richText) { const partType = part.type || 'text'; if (partType === 'text' && part.text) { parts.push(part.text); } else if (partType === 'picture') { parts.push('[image]'); } else if (partType === 'at' && part.atName) { parts.push(`@${part.atName}`); } } const summary = parts.join('').trim(); if (summary) return summary; } // Media type placeholders switch (msgType) { case 'picture': return '[image]'; case 'file': return `[file: ${content?.fileName || 'file'}]`; case 'audio': return '[audio]'; case 'video': return '[video]'; default: break; } return ''; } /** * Extract text and media download codes from an incoming DingTalk message. * Handles text, richText, picture, file, audio, and video message types. */ private extractContent(data: DingTalkMessageData): { text: string; downloadCodes: string[]; mediaType?: 'image' | 'file' | 'audio' | 'video'; fileName?: string; } { const msgtype = data.msgtype || 'text'; if (msgtype === 'richText') { const richText = data.content?.richText; if (!Array.isArray(richText)) { return { text: '', downloadCodes: [] }; } let text = ''; const codes: string[] = []; for (const part of richText) { const partType = part.type || 'text'; if (partType === 'text' && part.text) { text += part.text; } else if (partType === 'picture' && part.downloadCode) { codes.push(part.downloadCode); } } return { text: text.trim() || (codes.length > 0 ? '(image)' : ''), downloadCodes: codes, mediaType: codes.length > 0 ? 'image' : undefined, }; } if (msgtype === 'picture') { const code = data.content?.downloadCode; return { text: '(image)', downloadCodes: code ? [code] : [], mediaType: 'image', }; } if (msgtype === 'file') { const code = data.content?.downloadCode; const fileName = data.content?.fileName || undefined; return { text: `(file: ${fileName || 'file'})`, downloadCodes: code ? [code] : [], mediaType: 'file', fileName, }; } if (msgtype === 'audio') { const code = data.content?.downloadCode; const recognition = data.content?.recognition; return { text: recognition || '(audio)', downloadCodes: code ? [code] : [], mediaType: 'audio', }; } if (msgtype === 'video') { const code = data.content?.downloadCode; return { text: '(video)', downloadCodes: code ? [code] : [], mediaType: 'video', }; } // Default: text message return { text: data.text?.content?.trim() || '', downloadCodes: [] }; } /** * Download a media file and attach it to the envelope. * Images → base64 in envelope; files → saved to temp dir with path in text. */ private async attachMedia( envelope: Envelope, downloadCode: string, mediaType: 'image' | 'file' | 'audio' | 'video', fileName?: string, ): Promise { const token = this.getAccessToken(); const robotCode = this.config.clientId; if (!token || !robotCode) { process.stderr.write( `[DingTalk:${this.name}] Cannot download media: missing token or robotCode.\n`, ); return; } const media = await downloadMedia(downloadCode, robotCode, token); if (!media) return; if (mediaType === 'image') { const mimeType = media.mimeType.startsWith('image/') ? media.mimeType : 'image/jpeg'; envelope.attachments = [ ...(envelope.attachments || []), { type: 'image', data: media.buffer.toString('base64'), mimeType, }, ]; } else { // Save non-image files to temp dir so the agent can read them const dir = join(tmpdir(), 'channel-files', randomUUID()); mkdirSync(dir, { recursive: true }); const safeName = basename(fileName || '') || `dingtalk_${mediaType}_${Date.now()}`; const filePath = join(dir, safeName); writeFileSync(filePath, media.buffer); // Clean up placeholder text like "(audio)", "(video)", "(file: name)" if ( envelope.text === `(file: ${fileName || 'file'})` || envelope.text === '(audio)' || envelope.text === '(video)' ) { envelope.text = ''; } envelope.attachments = [ ...(envelope.attachments || []), { type: mediaType, filePath, mimeType: media.mimeType, fileName: safeName, }, ]; } } private onMessage(downstream: DWClientDownStream): void { try { const data: DingTalkMessageData = typeof downstream.data === 'string' ? JSON.parse(downstream.data) : (downstream.data as DingTalkMessageData); const msgId = data.msgId || downstream.headers.messageId; // Dedup: DingTalk retries unACKed messages if (msgId && this.seenMessages.has(msgId)) { return; } if (msgId) { this.seenMessages.set(msgId, Date.now()); } const isGroup = data.conversationType === '2'; const sessionWebhook = data.sessionWebhook; const conversationId = data.conversationId; if (!sessionWebhook) { process.stderr.write( `[DingTalk:${this.name}] No sessionWebhook in message, skipping.\n`, ); return; } // Cache webhook by conversationId so sendMessage can look it up if (conversationId) { this.webhooks.set(conversationId, sessionWebhook); } const isMentioned = Boolean(data.isInAtList); // Extract text and media info from message const content = this.extractContent(data); let cleanText = content.text; // Strip first @mention (the bot) from text, keep other @mentions intact if (isMentioned) { cleanText = cleanText.replace(/@\S+/, '').trim(); } // Extract quoted message context const quoted = this.extractQuotedContext(data); const chatId = conversationId || sessionWebhook; const envelope: Envelope = { channelName: this.name, senderId: data.senderStaffId || data.senderId || '', senderName: data.senderNick || 'Unknown', chatId, text: cleanText || content.text, isGroup, isMentioned, isReplyToBot: quoted.isReplyToBot, referencedText: quoted.referencedText, }; // Store messageId + conversationId for reaction hooks envelope.messageId = msgId; if (msgId && conversationId) { this.reactionContext.set(msgId, conversationId); } const processMessage = async () => { // Download media if present (first downloadCode only for images) if (content.downloadCodes.length > 0 && content.mediaType) { await this.attachMedia( envelope, content.downloadCodes[0]!, content.mediaType, content.fileName, ); } // reactionContext cleanup is handled by onPromptEnd (not here), // because in collect mode handleInbound returns immediately after // buffering — the context must survive until the prompt actually runs. await this.handleInbound(envelope); }; // Don't await — stream callback should return quickly processMessage().catch((err) => { process.stderr.write( `[DingTalk:${this.name}] Error handling message: ${err}\n`, ); this.sendMessage( chatId, 'Sorry, something went wrong processing your message.', ).catch(() => {}); }); } catch (err) { process.stderr.write( `[DingTalk:${this.name}] Failed to parse message: ${err}\n`, ); } } }