Implemented markdownv2 rendering

This commit is contained in:
Alishahryar1 2026-02-05 17:31:40 -08:00
parent 3a69a51f62
commit 0cad642e4b
3 changed files with 181 additions and 1 deletions

View file

@ -11,6 +11,8 @@ import asyncio
import logging
from typing import List, Optional
from markdown_it import MarkdownIt
from .base import MessagingPlatform, SessionManagerInterface
from .models import IncomingMessage
from .session import SessionStore
@ -22,6 +24,11 @@ logger = logging.getLogger(__name__)
MDV2_SPECIAL_CHARS = set("\\_*[]()~`>#+-=|{}.!")
MDV2_LINK_ESCAPE = set("\\)")
_MD = MarkdownIt("commonmark", {"html": False, "breaks": False})
_MD.enable("strikethrough")
def escape_md_v2(text: str) -> str:
"""Escape text for Telegram MarkdownV2."""
@ -33,6 +40,11 @@ def escape_md_v2_code(text: str) -> str:
return text.replace("\\", "\\\\").replace("`", "\\`")
def escape_md_v2_link_url(text: str) -> str:
"""Escape URL for Telegram MarkdownV2 link destination."""
return "".join(f"\\{ch}" if ch in MDV2_LINK_ESCAPE else ch for ch in text)
def mdv2_bold(text: str) -> str:
return f"*{escape_md_v2(text)}*"
@ -48,6 +60,171 @@ def format_status(emoji: str, label: str, suffix: Optional[str] = None) -> str:
return base
def render_markdown_to_mdv2(text: str) -> str:
"""Render common Markdown into Telegram MarkdownV2."""
if not text:
return ""
tokens = _MD.parse(text)
def render_inline_plain(children) -> str:
out: List[str] = []
for tok in children:
if tok.type == "text":
out.append(escape_md_v2(tok.content))
elif tok.type == "code_inline":
out.append(escape_md_v2(tok.content))
elif tok.type in {"softbreak", "hardbreak"}:
out.append("\n")
return "".join(out)
def render_inline(children) -> str:
out: List[str] = []
i = 0
while i < len(children):
tok = children[i]
t = tok.type
if t == "text":
out.append(escape_md_v2(tok.content))
elif t in {"softbreak", "hardbreak"}:
out.append("\n")
elif t == "em_open":
out.append("_")
elif t == "em_close":
out.append("_")
elif t == "strong_open":
out.append("*")
elif t == "strong_close":
out.append("*")
elif t == "s_open":
out.append("~")
elif t == "s_close":
out.append("~")
elif t == "code_inline":
out.append(f"`{escape_md_v2_code(tok.content)}`")
elif t == "link_open":
href = ""
if tok.attrs:
for key, val in tok.attrs:
if key == "href":
href = val
break
inner_tokens = []
i += 1
while i < len(children) and children[i].type != "link_close":
inner_tokens.append(children[i])
i += 1
link_text = ""
for child in inner_tokens:
if child.type == "text":
link_text += child.content
elif child.type == "code_inline":
link_text += child.content
out.append(
f"[{escape_md_v2(link_text)}]({escape_md_v2_link_url(href)})"
)
elif t == "image":
href = ""
alt = tok.content or ""
if tok.attrs:
for key, val in tok.attrs:
if key == "src":
href = val
break
if alt:
out.append(
f"{escape_md_v2(alt)} ({escape_md_v2_link_url(href)})"
)
else:
out.append(escape_md_v2_link_url(href))
else:
out.append(escape_md_v2(tok.content or ""))
i += 1
return "".join(out)
out: List[str] = []
list_stack: List[dict] = []
pending_prefix: Optional[str] = None
blockquote_level = 0
in_heading = False
def apply_blockquote(val: str) -> str:
if blockquote_level <= 0:
return val
prefix = "> " * blockquote_level
return prefix + val.replace("\n", "\n" + prefix)
i = 0
while i < len(tokens):
tok = tokens[i]
t = tok.type
if t == "paragraph_open":
pass
elif t == "paragraph_close":
out.append("\n")
elif t == "heading_open":
in_heading = True
elif t == "heading_close":
in_heading = False
out.append("\n")
elif t == "bullet_list_open":
list_stack.append({"type": "bullet", "index": 1})
elif t == "bullet_list_close":
if list_stack:
list_stack.pop()
out.append("\n")
elif t == "ordered_list_open":
start = 1
if tok.attrs:
for key, val in tok.attrs:
if key == "start":
try:
start = int(val)
except ValueError:
start = 1
break
list_stack.append({"type": "ordered", "index": start})
elif t == "ordered_list_close":
if list_stack:
list_stack.pop()
out.append("\n")
elif t == "list_item_open":
if list_stack:
top = list_stack[-1]
if top["type"] == "bullet":
pending_prefix = "\\- "
else:
pending_prefix = f"{top['index']}\\."
top["index"] += 1
pending_prefix += " "
elif t == "list_item_close":
out.append("\n")
elif t == "blockquote_open":
blockquote_level += 1
elif t == "blockquote_close":
blockquote_level = max(0, blockquote_level - 1)
out.append("\n")
elif t in {"code_block", "fence"}:
code = escape_md_v2_code(tok.content.rstrip("\n"))
out.append(f"```\n{code}\n```")
out.append("\n")
elif t == "inline":
rendered = render_inline(tok.children or [])
if in_heading:
rendered = f"*{render_inline_plain(tok.children or [])}*"
if pending_prefix:
rendered = pending_prefix + rendered
pending_prefix = None
rendered = apply_blockquote(rendered)
out.append(rendered)
else:
if tok.content:
out.append(escape_md_v2(tok.content))
i += 1
return "".join(out).rstrip()
class ClaudeMessageHandler:
"""
Platform-agnostic handler for Claude interactions.
@ -409,7 +586,7 @@ class ClaudeMessageHandler:
# 4. Content
if components["content"]:
lines.append(escape_md_v2("".join(components["content"])))
lines.append(render_markdown_to_mdv2("".join(components["content"])))
# 5. Errors
if components["errors"]: