mirror of
https://github.com/Alishahryar1/free-claude-code.git
synced 2026-04-26 10:31:07 +00:00
Some checks are pending
CI / checks (push) Waiting to run
Consolidates the incremental refactor work into a single change set: modular web tools (api/web_tools), native Anthropic request building and SSE block policy, OpenAI conversion and error handling, provider transports and rate limiting, messaging handler and tree queue, safe logging, smoke tests, and broad test coverage.
581 lines
20 KiB
Python
581 lines
20 KiB
Python
"""Ordered transcript builder for messaging UIs (Telegram, etc.).
|
|
|
|
This module maintains an ordered list of "segments" that represent what the user
|
|
should see in the chat transcript: thinking, tool calls, tool results, subagent
|
|
headers, and assistant text. It is designed for in-place message editing where
|
|
the transcript grows over time and older content must be truncated.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from abc import ABC, abstractmethod
|
|
from collections import deque
|
|
from collections.abc import Callable, Iterable
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
from loguru import logger
|
|
|
|
|
|
def _safe_json_dumps(obj: Any) -> str:
|
|
try:
|
|
return json.dumps(obj, indent=2, ensure_ascii=False, sort_keys=True)
|
|
except Exception:
|
|
return str(obj)
|
|
|
|
|
|
@dataclass
|
|
class Segment(ABC):
|
|
kind: str
|
|
|
|
@abstractmethod
|
|
def render(self, ctx: RenderCtx) -> str: ...
|
|
|
|
|
|
@dataclass
|
|
class ThinkingSegment(Segment):
|
|
def __init__(self) -> None:
|
|
super().__init__(kind="thinking")
|
|
self._parts: list[str] = []
|
|
|
|
def append(self, t: str) -> None:
|
|
if t:
|
|
self._parts.append(t)
|
|
|
|
@property
|
|
def text(self) -> str:
|
|
return "".join(self._parts)
|
|
|
|
def render(self, ctx: RenderCtx) -> str:
|
|
raw = self.text or ""
|
|
if ctx.thinking_tail_max is not None and len(raw) > ctx.thinking_tail_max:
|
|
raw = "..." + raw[-(ctx.thinking_tail_max - 3) :]
|
|
inner = ctx.escape_code(raw)
|
|
return f"💭 {ctx.bold('Thinking')}\n```\n{inner}\n```"
|
|
|
|
|
|
@dataclass
|
|
class TextSegment(Segment):
|
|
def __init__(self) -> None:
|
|
super().__init__(kind="text")
|
|
self._parts: list[str] = []
|
|
|
|
def append(self, t: str) -> None:
|
|
if t:
|
|
self._parts.append(t)
|
|
|
|
@property
|
|
def text(self) -> str:
|
|
return "".join(self._parts)
|
|
|
|
def render(self, ctx: RenderCtx) -> str:
|
|
raw = self.text or ""
|
|
if ctx.text_tail_max is not None and len(raw) > ctx.text_tail_max:
|
|
raw = "..." + raw[-(ctx.text_tail_max - 3) :]
|
|
return ctx.render_markdown(raw)
|
|
|
|
|
|
@dataclass
|
|
class ToolCallSegment(Segment):
|
|
tool_use_id: str
|
|
name: str
|
|
closed: bool = False
|
|
indent_level: int = 0
|
|
|
|
def __init__(self, tool_use_id: str, name: str, *, indent_level: int = 0) -> None:
|
|
super().__init__(kind="tool_call")
|
|
self.tool_use_id = str(tool_use_id or "")
|
|
self.name = str(name or "tool")
|
|
self.indent_level = max(0, int(indent_level))
|
|
|
|
def render(self, ctx: RenderCtx) -> str:
|
|
name = ctx.code_inline(self.name)
|
|
# Per UX requirement: do not display tool args/results, only the tool call.
|
|
prefix = " " * self.indent_level
|
|
return f"{prefix}🛠 {ctx.bold('Tool call:')} {name}"
|
|
|
|
|
|
@dataclass
|
|
class ToolResultSegment(Segment):
|
|
tool_use_id: str
|
|
name: str | None
|
|
content_text: str
|
|
is_error: bool = False
|
|
|
|
def __init__(
|
|
self,
|
|
tool_use_id: str,
|
|
content: Any,
|
|
*,
|
|
name: str | None = None,
|
|
is_error: bool = False,
|
|
) -> None:
|
|
super().__init__(kind="tool_result")
|
|
self.tool_use_id = str(tool_use_id or "")
|
|
self.name = str(name) if name is not None else None
|
|
self.is_error = bool(is_error)
|
|
if isinstance(content, str):
|
|
self.content_text = content
|
|
else:
|
|
self.content_text = _safe_json_dumps(content)
|
|
|
|
def render(self, ctx: RenderCtx) -> str:
|
|
raw = self.content_text or ""
|
|
if ctx.tool_output_tail_max is not None and len(raw) > ctx.tool_output_tail_max:
|
|
raw = "..." + raw[-(ctx.tool_output_tail_max - 3) :]
|
|
inner = ctx.escape_code(raw)
|
|
label = "Tool error:" if self.is_error else "Tool result:"
|
|
maybe_name = f" {ctx.code_inline(self.name)}" if self.name else ""
|
|
return f"📤 {ctx.bold(label)}{maybe_name}\n```\n{inner}\n```"
|
|
|
|
|
|
@dataclass
|
|
class SubagentSegment(Segment):
|
|
description: str
|
|
tool_calls: int = 0
|
|
tools_used: set[str] = field(default_factory=set)
|
|
current_tool: ToolCallSegment | None = None
|
|
|
|
def __init__(self, description: str) -> None:
|
|
super().__init__(kind="subagent")
|
|
self.description = str(description or "Subagent")
|
|
self.tool_calls = 0
|
|
self.tools_used = set()
|
|
self.current_tool = None
|
|
|
|
def set_current_tool_call(self, tool_use_id: str, name: str) -> ToolCallSegment:
|
|
tool_use_id = str(tool_use_id or "")
|
|
name = str(name or "tool")
|
|
self.tools_used.add(name)
|
|
self.tool_calls += 1
|
|
self.current_tool = ToolCallSegment(tool_use_id, name, indent_level=1)
|
|
return self.current_tool
|
|
|
|
def render(self, ctx: RenderCtx) -> str:
|
|
inner_prefix = " "
|
|
|
|
lines: list[str] = [
|
|
f"🤖 {ctx.bold('Subagent:')} {ctx.code_inline(self.description)}"
|
|
]
|
|
|
|
if self.current_tool is not None:
|
|
try:
|
|
rendered = self.current_tool.render(ctx)
|
|
except Exception:
|
|
rendered = ""
|
|
if rendered:
|
|
lines.append(rendered)
|
|
|
|
tools_used = sorted(self.tools_used)
|
|
tools_set_raw = "{{{}}}".format(", ".join(tools_used)) if tools_used else "{}"
|
|
|
|
# Keep braces inside a code entity so MarkdownV2 doesn't require escaping them.
|
|
lines.append(
|
|
f"{inner_prefix}{ctx.bold('Tools used:')} {ctx.code_inline(tools_set_raw)}"
|
|
)
|
|
lines.append(
|
|
f"{inner_prefix}{ctx.bold('Tool calls:')} {ctx.code_inline(str(self.tool_calls))}"
|
|
)
|
|
return "\n".join(lines)
|
|
|
|
|
|
@dataclass
|
|
class ErrorSegment(Segment):
|
|
message: str
|
|
|
|
def __init__(self, message: str) -> None:
|
|
super().__init__(kind="error")
|
|
self.message = str(message or "Unknown error")
|
|
|
|
def render(self, ctx: RenderCtx) -> str:
|
|
return f"⚠️ {ctx.bold('Error:')} {ctx.code_inline(self.message)}"
|
|
|
|
|
|
@dataclass
|
|
class RenderCtx:
|
|
bold: Callable[[str], str]
|
|
code_inline: Callable[[str], str]
|
|
escape_code: Callable[[str], str]
|
|
escape_text: Callable[[str], str]
|
|
render_markdown: Callable[[str], str]
|
|
|
|
thinking_tail_max: int | None = 1000
|
|
tool_input_tail_max: int | None = 1200
|
|
tool_output_tail_max: int | None = 1600
|
|
text_tail_max: int | None = 2000
|
|
|
|
|
|
class TranscriptBuffer:
|
|
"""Maintains an ordered, truncatable transcript of events."""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
show_tool_results: bool = True,
|
|
debug_subagent_stack: bool = False,
|
|
) -> None:
|
|
self._segments: list[Segment] = []
|
|
self._open_thinking_by_index: dict[int, ThinkingSegment] = {}
|
|
self._open_text_by_index: dict[int, TextSegment] = {}
|
|
|
|
# content_block index -> tool call segment (for streaming tool args)
|
|
self._open_tools_by_index: dict[int, ToolCallSegment] = {}
|
|
|
|
# tool_use_id -> tool name (for tool_result labeling)
|
|
self._tool_name_by_id: dict[str, str] = {}
|
|
|
|
self._show_tool_results = bool(show_tool_results)
|
|
|
|
# subagent context stack. Each entry is the Task tool_use_id we are waiting to close.
|
|
self._subagent_stack: list[str] = []
|
|
# Parallel stack of segments for rendering nested subagents.
|
|
self._subagent_segments: list[SubagentSegment] = []
|
|
self._debug_subagent_stack = debug_subagent_stack
|
|
|
|
def _in_subagent(self) -> bool:
|
|
return bool(self._subagent_stack)
|
|
|
|
def _subagent_current(self) -> SubagentSegment | None:
|
|
return self._subagent_segments[-1] if self._subagent_segments else None
|
|
|
|
def _task_heading_from_input(self, inp: Any) -> str:
|
|
# We never display full JSON args; only extract a short heading.
|
|
if isinstance(inp, dict):
|
|
desc = str(inp.get("description", "") or "").strip()
|
|
if desc:
|
|
return desc
|
|
subagent_type = str(inp.get("subagent_type", "") or "").strip()
|
|
if subagent_type:
|
|
return subagent_type
|
|
typ = str(inp.get("type", "") or "").strip()
|
|
if typ:
|
|
return typ
|
|
return "Subagent"
|
|
|
|
def _subagent_push(self, tool_id: str, seg: SubagentSegment) -> None:
|
|
# Some providers can omit ids; still track depth for UI suppression.
|
|
tool_id = (
|
|
str(tool_id or "").strip() or f"__task_{len(self._subagent_stack) + 1}"
|
|
)
|
|
self._subagent_stack.append(tool_id)
|
|
self._subagent_segments.append(seg)
|
|
if self._debug_subagent_stack:
|
|
logger.debug(
|
|
"SUBAGENT_STACK: push id=%r depth=%d heading=%r",
|
|
tool_id,
|
|
len(self._subagent_stack),
|
|
getattr(seg, "description", None),
|
|
)
|
|
|
|
def _subagent_pop(self, tool_id: str) -> bool:
|
|
tool_id = str(tool_id or "").strip()
|
|
if not self._subagent_stack:
|
|
return False
|
|
|
|
def _ids_roughly_match(stack_id: str, result_id: str) -> bool:
|
|
if not stack_id or not result_id:
|
|
return False
|
|
if stack_id == result_id:
|
|
return True
|
|
# Some providers emit Task result ids with a suffix/prefix variant.
|
|
# Treat those as the same logical Task invocation.
|
|
return stack_id.startswith(result_id) or result_id.startswith(stack_id)
|
|
|
|
if tool_id:
|
|
# O(1) common case: LIFO - top of stack matches.
|
|
if _ids_roughly_match(self._subagent_stack[-1], tool_id):
|
|
self._subagent_stack.pop()
|
|
if self._subagent_segments:
|
|
self._subagent_segments.pop()
|
|
if self._debug_subagent_stack:
|
|
logger.debug(
|
|
"SUBAGENT_STACK: pop id=%r depth=%d (LIFO)",
|
|
tool_id,
|
|
len(self._subagent_stack),
|
|
)
|
|
return True
|
|
# Pop to the matching id (defensive against non-LIFO emissions).
|
|
idx = -1
|
|
for i in range(len(self._subagent_stack) - 1, -1, -1):
|
|
if _ids_roughly_match(self._subagent_stack[i], tool_id):
|
|
idx = i
|
|
break
|
|
if idx < 0:
|
|
return False
|
|
while len(self._subagent_stack) > idx:
|
|
popped = self._subagent_stack.pop()
|
|
if self._subagent_segments:
|
|
self._subagent_segments.pop()
|
|
if self._debug_subagent_stack:
|
|
logger.debug(
|
|
"SUBAGENT_STACK: pop id=%r depth=%d (matched=%r)",
|
|
popped,
|
|
len(self._subagent_stack),
|
|
tool_id,
|
|
)
|
|
return True
|
|
|
|
# No id in result; only close if we have a synthetic top marker.
|
|
if self._subagent_stack and self._subagent_stack[-1].startswith("__task_"):
|
|
popped = self._subagent_stack.pop()
|
|
if self._subagent_segments:
|
|
self._subagent_segments.pop()
|
|
if self._debug_subagent_stack:
|
|
logger.debug(
|
|
"SUBAGENT_STACK: pop id=%r depth=%d (synthetic)",
|
|
popped,
|
|
len(self._subagent_stack),
|
|
)
|
|
return True
|
|
return False
|
|
|
|
def _ensure_thinking(self) -> ThinkingSegment:
|
|
seg = ThinkingSegment()
|
|
self._segments.append(seg)
|
|
return seg
|
|
|
|
def _ensure_text(self) -> TextSegment:
|
|
seg = TextSegment()
|
|
self._segments.append(seg)
|
|
return seg
|
|
|
|
def apply(self, ev: dict[str, Any]) -> None:
|
|
"""Apply a parsed event to the transcript."""
|
|
et = ev.get("type")
|
|
|
|
# Subagent rules: inside a Task/subagent, we only show tool calls/results.
|
|
if self._in_subagent() and et in (
|
|
"thinking_start",
|
|
"thinking_delta",
|
|
"thinking_chunk",
|
|
"text_start",
|
|
"text_delta",
|
|
"text_chunk",
|
|
):
|
|
return
|
|
|
|
if et == "thinking_start":
|
|
idx = int(ev.get("index", -1))
|
|
if idx >= 0:
|
|
# Defensive: if a provider reuses indices without emitting a stop,
|
|
# close the previous open segment first.
|
|
self.apply({"type": "block_stop", "index": idx})
|
|
seg = self._ensure_thinking()
|
|
if idx >= 0:
|
|
self._open_thinking_by_index[idx] = seg
|
|
return
|
|
if et in ("thinking_delta", "thinking_chunk"):
|
|
idx = int(ev.get("index", -1))
|
|
seg = self._open_thinking_by_index.get(idx)
|
|
if seg is None:
|
|
seg = self._ensure_thinking()
|
|
if idx >= 0:
|
|
self._open_thinking_by_index[idx] = seg
|
|
seg.append(str(ev.get("text", "")))
|
|
return
|
|
if et == "thinking_stop":
|
|
idx = int(ev.get("index", -1))
|
|
if idx >= 0:
|
|
self._open_thinking_by_index.pop(idx, None)
|
|
return
|
|
|
|
if et == "text_start":
|
|
idx = int(ev.get("index", -1))
|
|
if idx >= 0:
|
|
self.apply({"type": "block_stop", "index": idx})
|
|
seg = self._ensure_text()
|
|
if idx >= 0:
|
|
self._open_text_by_index[idx] = seg
|
|
return
|
|
if et in ("text_delta", "text_chunk"):
|
|
idx = int(ev.get("index", -1))
|
|
seg = self._open_text_by_index.get(idx)
|
|
if seg is None:
|
|
seg = self._ensure_text()
|
|
if idx >= 0:
|
|
self._open_text_by_index[idx] = seg
|
|
seg.append(str(ev.get("text", "")))
|
|
return
|
|
if et == "text_stop":
|
|
idx = int(ev.get("index", -1))
|
|
if idx >= 0:
|
|
self._open_text_by_index.pop(idx, None)
|
|
return
|
|
|
|
if et == "tool_use_start":
|
|
idx = int(ev.get("index", -1))
|
|
if idx >= 0:
|
|
self.apply({"type": "block_stop", "index": idx})
|
|
tool_id = str(ev.get("id", "") or "").strip()
|
|
name = str(ev.get("name", "") or "tool")
|
|
if tool_id:
|
|
self._tool_name_by_id[tool_id] = name
|
|
|
|
# Task tool indicates subagent.
|
|
if name == "Task":
|
|
heading = self._task_heading_from_input(ev.get("input"))
|
|
seg = SubagentSegment(heading)
|
|
self._segments.append(seg)
|
|
self._subagent_push(tool_id, seg)
|
|
return
|
|
|
|
# Normal tool call.
|
|
if self._in_subagent():
|
|
parent = self._subagent_current()
|
|
if parent is not None:
|
|
seg = parent.set_current_tool_call(tool_id, name)
|
|
else:
|
|
seg = ToolCallSegment(tool_id, name)
|
|
self._segments.append(seg)
|
|
else:
|
|
seg = ToolCallSegment(tool_id, name)
|
|
self._segments.append(seg)
|
|
|
|
if idx >= 0:
|
|
self._open_tools_by_index[idx] = seg
|
|
return
|
|
|
|
if et == "tool_use_delta":
|
|
# Track open tool by index for tool_use_stop (closing state).
|
|
return
|
|
|
|
if et == "tool_use_stop":
|
|
idx = int(ev.get("index", -1))
|
|
seg = self._open_tools_by_index.pop(idx, None)
|
|
if seg is not None:
|
|
seg.closed = True
|
|
return
|
|
|
|
if et == "block_stop":
|
|
idx = int(ev.get("index", -1))
|
|
if idx in self._open_tools_by_index:
|
|
self.apply({"type": "tool_use_stop", "index": idx})
|
|
return
|
|
if idx in self._open_thinking_by_index:
|
|
self.apply({"type": "thinking_stop", "index": idx})
|
|
return
|
|
if idx in self._open_text_by_index:
|
|
self.apply({"type": "text_stop", "index": idx})
|
|
return
|
|
return
|
|
|
|
if et == "tool_use":
|
|
tool_id = str(ev.get("id", "") or "").strip()
|
|
name = str(ev.get("name", "") or "tool")
|
|
if tool_id:
|
|
self._tool_name_by_id[tool_id] = name
|
|
|
|
if name == "Task":
|
|
heading = self._task_heading_from_input(ev.get("input"))
|
|
seg = SubagentSegment(heading)
|
|
self._segments.append(seg)
|
|
self._subagent_push(tool_id, seg)
|
|
return
|
|
|
|
if self._in_subagent():
|
|
parent = self._subagent_current()
|
|
if parent is not None:
|
|
seg = parent.set_current_tool_call(tool_id, name)
|
|
else:
|
|
seg = ToolCallSegment(tool_id, name)
|
|
self._segments.append(seg)
|
|
else:
|
|
seg = ToolCallSegment(tool_id, name)
|
|
self._segments.append(seg)
|
|
|
|
seg.closed = True
|
|
return
|
|
|
|
if et == "tool_result":
|
|
tool_id = str(ev.get("tool_use_id", "") or "").strip()
|
|
name = self._tool_name_by_id.get(tool_id)
|
|
|
|
# If this was the Task tool result, close subagent context.
|
|
if self._subagent_stack:
|
|
popped = self._subagent_pop(tool_id)
|
|
top = self._subagent_stack[-1] if self._subagent_stack else ""
|
|
looks_like_task_id = "task" in tool_id.lower()
|
|
# Some streams omit Task tool_use ids (synthetic stack ids), but include
|
|
# a real Task id on tool_result (e.g. "functions.Task:0"). Reconcile that.
|
|
if (
|
|
not popped
|
|
and tool_id
|
|
and top.startswith("__task_")
|
|
and (name in (None, "Task"))
|
|
and looks_like_task_id
|
|
):
|
|
self._subagent_pop("")
|
|
|
|
if not self._show_tool_results:
|
|
return
|
|
|
|
seg = ToolResultSegment(
|
|
tool_id,
|
|
ev.get("content"),
|
|
name=name,
|
|
is_error=bool(ev.get("is_error", False)),
|
|
)
|
|
self._segments.append(seg)
|
|
return
|
|
|
|
if et == "error":
|
|
self._segments.append(ErrorSegment(str(ev.get("message", ""))))
|
|
return
|
|
|
|
def render(self, ctx: RenderCtx, *, limit_chars: int, status: str | None) -> str:
|
|
"""Render transcript with truncation (drop oldest segments)."""
|
|
# Filter out empty rendered segments.
|
|
rendered: list[str] = []
|
|
for seg in self._segments:
|
|
try:
|
|
out = seg.render(ctx)
|
|
except Exception:
|
|
continue
|
|
if out:
|
|
rendered.append(out)
|
|
|
|
status_text = f"\n\n{status}" if status else ""
|
|
prefix_marker = ctx.escape_text("... (truncated)\n")
|
|
|
|
def _join(parts: Iterable[str], add_marker: bool) -> str:
|
|
body = "\n".join(parts)
|
|
if add_marker and body:
|
|
body = prefix_marker + body
|
|
return body + status_text if (body or status_text) else status_text
|
|
|
|
# Fast path.
|
|
candidate = _join(rendered, add_marker=False)
|
|
if len(candidate) <= limit_chars:
|
|
return candidate
|
|
|
|
# Drop oldest segments until under limit (keep the tail).
|
|
# Use deque for O(1) popleft; list.pop(0) would be O(n) per iteration.
|
|
parts: deque[str] = deque(rendered)
|
|
dropped = False
|
|
last_part: str | None = None
|
|
while parts:
|
|
candidate = _join(parts, add_marker=True)
|
|
if len(candidate) <= limit_chars:
|
|
return candidate
|
|
last_part = parts.popleft()
|
|
dropped = True
|
|
|
|
# Nothing fits - preserve tail of last segment instead of only marker+status.
|
|
if dropped and last_part:
|
|
budget = limit_chars - len(prefix_marker) - len(status_text)
|
|
if budget > 20:
|
|
if len(last_part) > budget:
|
|
tail = "..." + last_part[-(budget - 3) :]
|
|
else:
|
|
tail = last_part
|
|
candidate = prefix_marker + tail + status_text
|
|
if len(candidate) <= limit_chars:
|
|
return candidate
|
|
|
|
# Fallback: marker + status only.
|
|
if dropped:
|
|
minimal = prefix_marker + status_text.lstrip("\n")
|
|
if len(minimal) <= limit_chars:
|
|
return minimal
|
|
return status or ""
|