Add message_thread_id support across messaging components

- Introduced message_thread_id to the IncomingMessage model for handling forum topic IDs in Telegram.
- Updated messaging platforms (Discord and Telegram) to accept and process message_thread_id in send_message methods.
- Modified message handlers to utilize message_thread_id when sending messages.
- Enhanced test cases to validate the integration of message_thread_id in message handling.

This change improves support for forum supergroups in Telegram and enhances message management across platforms.
This commit is contained in:
Alishahryar1 2026-02-18 16:10:57 -08:00
parent 8807f58267
commit 16fa9d90cd
8 changed files with 58 additions and 10 deletions

View file

@ -255,6 +255,7 @@ class ClaudeMessageHandler:
status_text,
reply_to=incoming.message_id,
fire_and_forget=False,
message_thread_id=incoming.message_thread_id,
)
self._record_outgoing_message(
incoming.platform, incoming.chat_id, status_msg_id, "status"
@ -766,6 +767,7 @@ class ClaudeMessageHandler:
"", "Stopped.", "Nothing to stop for that message."
),
fire_and_forget=False,
message_thread_id=incoming.message_thread_id,
)
self._record_outgoing_message(
incoming.platform, incoming.chat_id, msg_id, "command"
@ -778,6 +780,7 @@ class ClaudeMessageHandler:
incoming.chat_id,
self._format_status("", "Stopped.", f"Cancelled {count} {noun}."),
fire_and_forget=False,
message_thread_id=incoming.message_thread_id,
)
self._record_outgoing_message(
incoming.platform, incoming.chat_id, msg_id, "command"
@ -792,6 +795,7 @@ class ClaudeMessageHandler:
"", "Stopped.", f"Cancelled {count} pending or active requests."
),
fire_and_forget=False,
message_thread_id=incoming.message_thread_id,
)
self._record_outgoing_message(
incoming.platform, incoming.chat_id, msg_id, "command"
@ -813,6 +817,7 @@ class ClaudeMessageHandler:
+ "\n"
+ ctx.escape_text(f"• Message Trees: {tree_count}"),
fire_and_forget=False,
message_thread_id=incoming.message_thread_id,
)
self._record_outgoing_message(
incoming.platform, incoming.chat_id, msg_id, "command"
@ -938,6 +943,7 @@ class ClaudeMessageHandler:
"🗑", "Cleared.", "Voice note cancelled."
),
fire_and_forget=False,
message_thread_id=incoming.message_thread_id,
)
self._record_outgoing_message(
incoming.platform, incoming.chat_id, msg_id, "command"
@ -949,6 +955,7 @@ class ClaudeMessageHandler:
"🗑", "Cleared.", "Nothing to clear for that message."
),
fire_and_forget=False,
message_thread_id=incoming.message_thread_id,
)
self._record_outgoing_message(
incoming.platform, incoming.chat_id, msg_id, "command"

View file

@ -21,6 +21,8 @@ class IncomingMessage:
# Optional fields
reply_to_message_id: str | None = None
# Forum topic ID (Telegram); required when replying in forum supergroups
message_thread_id: str | None = None
username: str | None = None
# Pre-sent status message ID (e.g. "Transcribing voice note..."); handler edits in place
status_message_id: str | None = None

View file

@ -91,6 +91,7 @@ class MessagingPlatform(ABC):
text: str,
reply_to: str | None = None,
parse_mode: str | None = None,
message_thread_id: str | None = None,
) -> str:
"""
Send a message to a chat.
@ -100,6 +101,7 @@ class MessagingPlatform(ABC):
text: Message content
reply_to: Optional message ID to reply to
parse_mode: Optional formatting mode ("markdown", "html")
message_thread_id: Optional forum topic ID (Telegram)
Returns:
The message ID of the sent message
@ -148,6 +150,7 @@ class MessagingPlatform(ABC):
reply_to: str | None = None,
parse_mode: str | None = None,
fire_and_forget: bool = True,
message_thread_id: str | None = None,
) -> str | None:
"""
Enqueue a message to be sent.

View file

@ -376,6 +376,7 @@ class DiscordPlatform(MessagingPlatform):
text: str,
reply_to: str | None = None,
parse_mode: str | None = None,
message_thread_id: str | None = None,
) -> str:
"""Send a message to a channel."""
channel = self._client.get_channel(int(chat_id))
@ -449,13 +450,18 @@ class DiscordPlatform(MessagingPlatform):
reply_to: str | None = None,
parse_mode: str | None = None,
fire_and_forget: bool = True,
message_thread_id: str | None = None,
) -> str | None:
"""Enqueue a message to be sent."""
if not self._limiter:
return await self.send_message(chat_id, text, reply_to, parse_mode)
return await self.send_message(
chat_id, text, reply_to, parse_mode, message_thread_id
)
async def _send():
return await self.send_message(chat_id, text, reply_to, parse_mode)
return await self.send_message(
chat_id, text, reply_to, parse_mode, message_thread_id
)
if fire_and_forget:
self._limiter.fire_and_forget(_send)

View file

@ -264,6 +264,7 @@ class TelegramPlatform(MessagingPlatform):
text: str,
reply_to: str | None = None,
parse_mode: str | None = "MarkdownV2",
message_thread_id: str | None = None,
) -> str:
"""Send a message to a chat."""
app = self._application
@ -272,12 +273,15 @@ class TelegramPlatform(MessagingPlatform):
async def _do_send(parse_mode=parse_mode):
bot = app.bot
msg = await bot.send_message(
chat_id=chat_id,
text=text,
reply_to_message_id=int(reply_to) if reply_to else None,
parse_mode=parse_mode,
)
kwargs: dict[str, Any] = {
"chat_id": chat_id,
"text": text,
"reply_to_message_id": int(reply_to) if reply_to else None,
"parse_mode": parse_mode,
}
if message_thread_id is not None:
kwargs["message_thread_id"] = int(message_thread_id)
msg = await bot.send_message(**kwargs)
return str(msg.message_id)
return await self._with_retry(_do_send, parse_mode=parse_mode)
@ -358,14 +362,19 @@ class TelegramPlatform(MessagingPlatform):
reply_to: str | None = None,
parse_mode: str | None = "MarkdownV2",
fire_and_forget: bool = True,
message_thread_id: str | None = None,
) -> str | None:
"""Enqueue a message to be sent (using limiter)."""
# Note: Bot API handles limits better, but we still use our limiter for nice queuing
if not self._limiter:
return await self.send_message(chat_id, text, reply_to, parse_mode)
return await self.send_message(
chat_id, text, reply_to, parse_mode, message_thread_id
)
async def _send():
return await self.send_message(chat_id, text, reply_to, parse_mode)
return await self.send_message(
chat_id, text, reply_to, parse_mode, message_thread_id
)
if fire_and_forget:
self._limiter.fire_and_forget(_send)
@ -490,6 +499,11 @@ class TelegramPlatform(MessagingPlatform):
if update.message.reply_to_message
else None
)
thread_id = (
str(update.message.message_thread_id)
if getattr(update.message, "message_thread_id", None) is not None
else None
)
text_preview = (update.message.text or "")[:80]
if len(update.message.text or "") > 80:
text_preview += "..."
@ -511,6 +525,7 @@ class TelegramPlatform(MessagingPlatform):
message_id=message_id,
platform="telegram",
reply_to_message_id=reply_to,
message_thread_id=thread_id,
raw_event=update,
)
@ -523,6 +538,7 @@ class TelegramPlatform(MessagingPlatform):
chat_id,
f"❌ *{escape_md_v2('Error:')}* {escape_md_v2(str(e)[:200])}",
reply_to=incoming.message_id,
message_thread_id=thread_id,
parse_mode="MarkdownV2",
)
@ -555,12 +571,18 @@ class TelegramPlatform(MessagingPlatform):
if not self._message_handler:
return
thread_id = (
str(update.message.message_thread_id)
if getattr(update.message, "message_thread_id", None) is not None
else None
)
status_msg_id = await self.queue_send_message(
chat_id,
format_status("", "Transcribing voice note..."),
reply_to=str(update.message.message_id),
parse_mode="MarkdownV2",
fire_and_forget=False,
message_thread_id=thread_id,
)
message_id = str(update.message.message_id)
@ -610,6 +632,7 @@ class TelegramPlatform(MessagingPlatform):
message_id=message_id,
platform="telegram",
reply_to_message_id=reply_to,
message_thread_id=thread_id,
raw_event=update,
status_message_id=status_msg_id,
)

View file

@ -81,6 +81,7 @@ class MessageNode:
"message_id": self.incoming.message_id,
"platform": self.incoming.platform,
"reply_to_message_id": self.incoming.reply_to_message_id,
"message_thread_id": self.incoming.message_thread_id,
"username": self.incoming.username,
},
"status_message_id": self.status_message_id,
@ -106,6 +107,7 @@ class MessageNode:
message_id=incoming_data["message_id"],
platform=incoming_data["platform"],
reply_to_message_id=incoming_data.get("reply_to_message_id"),
message_thread_id=incoming_data.get("message_thread_id"),
username=incoming_data.get("username"),
)
return cls(

View file

@ -123,6 +123,7 @@ def incoming_message_factory():
"message_id",
"platform",
"reply_to_message_id",
"message_thread_id",
"username",
"timestamp",
"raw_event",

View file

@ -70,6 +70,7 @@ async def test_handle_message_stop_command(
incoming.chat_id,
"⏹ *Stopped\\.* Cancelled 5 pending or active requests\\.",
fire_and_forget=False,
message_thread_id=None,
)
@ -106,6 +107,7 @@ async def test_handle_message_stop_command_reply_stops_only_target_node(
incoming.chat_id,
"⏹ *Stopped\\.* Cancelled 1 request\\.",
fire_and_forget=False,
message_thread_id=None,
)
@ -129,6 +131,7 @@ async def test_handle_message_stop_command_reply_unknown_does_not_stop_all(
incoming.chat_id,
"⏹ *Stopped\\.* Nothing to stop for that message\\.",
fire_and_forget=False,
message_thread_id=None,
)
@ -146,6 +149,7 @@ async def test_handle_message_stats_command(
assert "Active CLI: 2" in args[1]
assert "Max CLI: 5" in args[1]
assert kwargs["fire_and_forget"] is False
assert kwargs.get("message_thread_id") is None
@pytest.mark.asyncio