free-claude-code/messaging/rendering/telegram_markdown.py
2026-02-18 04:13:41 -08:00

380 lines
13 KiB
Python

"""Telegram MarkdownV2 utilities.
Renders common Markdown into Telegram MarkdownV2 format.
Used by the message handler and Telegram platform adapter.
"""
import re
from markdown_it import MarkdownIt
MDV2_SPECIAL_CHARS = set("\\_*[]()~`>#+-=|{}.!")
MDV2_LINK_ESCAPE = set("\\)")
_MD = MarkdownIt("commonmark", {"html": False, "breaks": False})
_MD.enable("strikethrough")
_MD.enable("table")
_TABLE_SEP_RE = re.compile(r"^\s*\|?\s*:?-{3,}:?\s*(\|\s*:?-{3,}:?\s*)+\|?\s*$")
_FENCE_RE = re.compile(r"^\s*```")
def _is_gfm_table_header_line(line: str) -> bool:
"""Check if line is a GFM table header (pipe-delimited, not separator)."""
if "|" not in line:
return False
if _TABLE_SEP_RE.match(line):
return False
stripped = line.strip()
parts = [p.strip() for p in stripped.strip("|").split("|")]
parts = [p for p in parts if p != ""]
return len(parts) >= 2
def _normalize_gfm_tables(text: str) -> str:
"""
Many LLMs emit tables immediately after a paragraph line (no blank line).
Markdown-it will treat that as a softbreak within the paragraph, so the
table extension won't trigger. Insert a blank line before detected tables.
We only do this outside fenced code blocks.
"""
lines = text.splitlines()
if len(lines) < 2:
return text
out_lines: list[str] = []
in_fence = False
for idx, line in enumerate(lines):
if _FENCE_RE.match(line):
in_fence = not in_fence
out_lines.append(line)
continue
if (
not in_fence
and idx + 1 < len(lines)
and _is_gfm_table_header_line(line)
and _TABLE_SEP_RE.match(lines[idx + 1])
and out_lines
and out_lines[-1].strip() != ""
):
m = re.match(r"^(\s*)", line)
indent = m.group(1) if m else ""
out_lines.append(indent)
out_lines.append(line)
return "\n".join(out_lines)
def escape_md_v2(text: str) -> str:
"""Escape text for Telegram MarkdownV2."""
return "".join(f"\\{ch}" if ch in MDV2_SPECIAL_CHARS else ch for ch in text)
def escape_md_v2_code(text: str) -> str:
"""Escape text for Telegram MarkdownV2 code spans/blocks."""
return text.replace("\\", "\\\\").replace("`", "\\`")
def escape_md_v2_link_url(text: str) -> str:
"""Escape URL for Telegram MarkdownV2 link destination."""
return "".join(f"\\{ch}" if ch in MDV2_LINK_ESCAPE else ch for ch in text)
def mdv2_bold(text: str) -> str:
"""Format text as bold in MarkdownV2."""
return f"*{escape_md_v2(text)}*"
def mdv2_code_inline(text: str) -> str:
"""Format text as inline code in MarkdownV2."""
return f"`{escape_md_v2_code(text)}`"
def format_status(emoji: str, label: str, suffix: str | None = None) -> str:
"""Format a status message with emoji and optional suffix."""
base = f"{emoji} {mdv2_bold(label)}"
if suffix:
return f"{base} {escape_md_v2(suffix)}"
return base
def render_markdown_to_mdv2(text: str) -> str:
"""Render common Markdown into Telegram MarkdownV2."""
if not text:
return ""
text = _normalize_gfm_tables(text)
tokens = _MD.parse(text)
def render_inline_table_plain(children) -> str:
out: list[str] = []
for tok in children:
if tok.type == "text" or tok.type == "code_inline":
out.append(tok.content)
elif tok.type in {"softbreak", "hardbreak"}:
out.append(" ")
elif tok.type == "image" and tok.content:
out.append(tok.content)
return "".join(out)
def render_inline_plain(children) -> str:
out: list[str] = []
for tok in children:
if tok.type == "text" or tok.type == "code_inline":
out.append(escape_md_v2(tok.content))
elif tok.type in {"softbreak", "hardbreak"}:
out.append("\n")
return "".join(out)
def render_inline(children) -> str:
out: list[str] = []
i = 0
while i < len(children):
tok = children[i]
t = tok.type
if t == "text":
out.append(escape_md_v2(tok.content))
elif t in {"softbreak", "hardbreak"}:
out.append("\n")
elif t == "em_open" or t == "em_close":
out.append("_")
elif t == "strong_open" or t == "strong_close":
out.append("*")
elif t == "s_open" or t == "s_close":
out.append("~")
elif t == "code_inline":
out.append(f"`{escape_md_v2_code(tok.content)}`")
elif t == "link_open":
href = ""
if tok.attrs:
if isinstance(tok.attrs, dict):
href = tok.attrs.get("href", "")
else:
for key, val in tok.attrs:
if key == "href":
href = val
break
inner_tokens = []
i += 1
while i < len(children) and children[i].type != "link_close":
inner_tokens.append(children[i])
i += 1
link_text = ""
for child in inner_tokens:
if child.type == "text" or child.type == "code_inline":
link_text += child.content
out.append(
f"[{escape_md_v2(link_text)}]({escape_md_v2_link_url(href)})"
)
elif t == "image":
href = ""
alt = tok.content or ""
if tok.attrs:
if isinstance(tok.attrs, dict):
href = tok.attrs.get("src", "")
else:
for key, val in tok.attrs:
if key == "src":
href = val
break
if alt:
out.append(f"{escape_md_v2(alt)} ({escape_md_v2_link_url(href)})")
else:
out.append(escape_md_v2_link_url(href))
else:
out.append(escape_md_v2(tok.content or ""))
i += 1
return "".join(out)
out: list[str] = []
list_stack: list[dict] = []
pending_prefix: str | None = None
blockquote_level = 0
in_heading = False
def apply_blockquote(val: str) -> str:
if blockquote_level <= 0:
return val
prefix = "> " * blockquote_level
return prefix + val.replace("\n", "\n" + prefix)
i = 0
while i < len(tokens):
tok = tokens[i]
t = tok.type
if t == "paragraph_open":
pass
elif t == "paragraph_close":
out.append("\n")
elif t == "heading_open":
in_heading = True
elif t == "heading_close":
in_heading = False
out.append("\n")
elif t == "bullet_list_open":
list_stack.append({"type": "bullet", "index": 1})
elif t == "bullet_list_close":
if list_stack:
list_stack.pop()
out.append("\n")
elif t == "ordered_list_open":
start = 1
if tok.attrs:
if isinstance(tok.attrs, dict):
val = tok.attrs.get("start")
if val is not None:
try:
start = int(val)
except TypeError, ValueError:
start = 1
else:
for key, val in tok.attrs:
if key == "start":
try:
start = int(val)
except TypeError, ValueError:
start = 1
break
list_stack.append({"type": "ordered", "index": start})
elif t == "ordered_list_close":
if list_stack:
list_stack.pop()
out.append("\n")
elif t == "list_item_open":
if list_stack:
top = list_stack[-1]
if top["type"] == "bullet":
pending_prefix = "\\- "
else:
pending_prefix = f"{top['index']}\\."
top["index"] += 1
pending_prefix += " "
elif t == "list_item_close":
out.append("\n")
elif t == "blockquote_open":
blockquote_level += 1
elif t == "blockquote_close":
blockquote_level = max(0, blockquote_level - 1)
out.append("\n")
elif t == "table_open":
if pending_prefix:
out.append(apply_blockquote(pending_prefix.rstrip()))
out.append("\n")
pending_prefix = None
rows: list[list[str]] = []
row_is_header: list[bool] = []
j = i + 1
in_thead = False
in_row = False
current_row: list[str] = []
current_row_header = False
in_cell = False
cell_parts: list[str] = []
while j < len(tokens):
tt = tokens[j].type
if tt == "thead_open":
in_thead = True
elif tt == "thead_close":
in_thead = False
elif tt == "tr_open":
in_row = True
current_row = []
current_row_header = in_thead
elif tt in {"th_open", "td_open"}:
in_cell = True
cell_parts = []
elif tt == "inline" and in_cell:
cell_parts.append(
render_inline_table_plain(tokens[j].children or [])
)
elif tt in {"th_close", "td_close"} and in_cell:
cell = " ".join(cell_parts).strip()
current_row.append(cell)
in_cell = False
cell_parts = []
elif tt == "tr_close" and in_row:
rows.append(current_row)
row_is_header.append(bool(current_row_header))
in_row = False
elif tt == "table_close":
break
j += 1
if rows:
col_count = max((len(r) for r in rows), default=0)
norm_rows: list[list[str]] = []
for r in rows:
if len(r) < col_count:
r = r + [""] * (col_count - len(r))
norm_rows.append(r)
widths: list[int] = []
for c in range(col_count):
w = max((len(r[c]) for r in norm_rows), default=0)
widths.append(max(w, 3))
def fmt_row(
r: list[str], _w: list[int] = widths, _c: int = col_count
) -> str:
cells = [r[c].ljust(_w[c]) for c in range(_c)]
return "| " + " | ".join(cells) + " |"
def fmt_sep(_w: list[int] = widths, _c: int = col_count) -> str:
cells = ["-" * _w[c] for c in range(_c)]
return "| " + " | ".join(cells) + " |"
last_header_idx = -1
for idx, is_h in enumerate(row_is_header):
if is_h:
last_header_idx = idx
lines: list[str] = []
for idx, r in enumerate(norm_rows):
lines.append(fmt_row(r))
if idx == last_header_idx:
lines.append(fmt_sep())
table_text = "\n".join(lines).rstrip()
out.append(f"```\n{escape_md_v2_code(table_text)}\n```")
out.append("\n")
i = j + 1
continue
elif t in {"code_block", "fence"}:
code = escape_md_v2_code(tok.content.rstrip("\n"))
out.append(f"```\n{code}\n```")
out.append("\n")
elif t == "inline":
rendered = render_inline(tok.children or [])
if in_heading:
rendered = f"*{render_inline_plain(tok.children or [])}*"
if pending_prefix:
rendered = pending_prefix + rendered
pending_prefix = None
rendered = apply_blockquote(rendered)
out.append(rendered)
else:
if tok.content:
out.append(escape_md_v2(tok.content))
i += 1
return "".join(out).rstrip()
__all__ = [
"escape_md_v2",
"escape_md_v2_code",
"escape_md_v2_link_url",
"format_status",
"mdv2_bold",
"mdv2_code_inline",
"render_markdown_to_mdv2",
]