mirror of
https://github.com/Alishahryar1/free-claude-code.git
synced 2026-04-28 03:20:01 +00:00
Some checks are pending
CI / checks (push) Waiting to run
Consolidates the incremental refactor work into a single change set: modular web tools (api/web_tools), native Anthropic request building and SSE block policy, OpenAI conversion and error handling, provider transports and rate limiting, messaging handler and tree queue, safe logging, smoke tests, and broad test coverage.
313 lines
11 KiB
Python
313 lines
11 KiB
Python
"""Shared native Anthropic SSE thinking policy, block remapping, and overlap repair.
|
|
|
|
Used by :class:`OpenRouterProvider` and line-mode
|
|
:class:`providers.anthropic_messages.AnthropicMessagesTransport` providers.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import copy
|
|
import json
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
__all__ = [
|
|
"NativeSseBlockPolicyState",
|
|
"format_native_sse_event",
|
|
"is_terminal_openrouter_done_event",
|
|
"parse_native_sse_event",
|
|
"transform_native_sse_block_event",
|
|
]
|
|
|
|
|
|
@dataclass
|
|
class _UpstreamBlockState:
|
|
"""Per-upstream content block: segment index and liveness in the model stream."""
|
|
|
|
block_type: str
|
|
down_index: int
|
|
open: bool
|
|
last_start_block: dict[str, Any] | None = None
|
|
|
|
|
|
@dataclass
|
|
class NativeSseBlockPolicyState:
|
|
"""Track per-upstream content blocks and remapped Anthropic ``index`` field."""
|
|
|
|
next_index: int = 0
|
|
by_upstream: dict[int, _UpstreamBlockState] = field(default_factory=dict)
|
|
dropped_indexes: set[int] = field(default_factory=set)
|
|
pending_suppressed_stops: set[int] = field(default_factory=set)
|
|
message_stopped: bool = False
|
|
|
|
|
|
def format_native_sse_event(event_name: str | None, data_text: str) -> str:
|
|
"""Format an SSE event from its event name and data payload."""
|
|
lines: list[str] = []
|
|
if event_name:
|
|
lines.append(f"event: {event_name}")
|
|
lines.extend(f"data: {line}" for line in data_text.splitlines())
|
|
return "\n".join(lines) + "\n\n"
|
|
|
|
|
|
def parse_native_sse_event(event: str) -> tuple[str | None, str]:
|
|
"""Extract the event name and raw data payload from an SSE event."""
|
|
event_name = None
|
|
data_lines: list[str] = []
|
|
for line in event.strip().splitlines():
|
|
if line.startswith("event:"):
|
|
event_name = line[6:].strip()
|
|
elif line.startswith("data:"):
|
|
data_lines.append(line[5:].lstrip())
|
|
return event_name, "\n".join(data_lines)
|
|
|
|
|
|
def is_terminal_openrouter_done_event(event_name: str | None, data_text: str) -> bool:
|
|
"""Return whether an event is OpenAI-style terminal noise (``[DONE]``)."""
|
|
return (event_name is None or event_name in {"data", "done"}) and (
|
|
data_text.strip().upper() == "[DONE]"
|
|
)
|
|
|
|
|
|
def _delta_type_to_block_kind(delta_type: Any) -> str | None:
|
|
"""Map a content_block_delta type to a content block kind (text/thinking/tool_use)."""
|
|
if not isinstance(delta_type, str):
|
|
return None
|
|
if delta_type in {"thinking_delta", "signature_delta"}:
|
|
return "thinking"
|
|
if delta_type == "text_delta":
|
|
return "text"
|
|
if delta_type == "input_json_delta":
|
|
return "tool_use"
|
|
return None
|
|
|
|
|
|
def _synthetic_start_content_block(
|
|
block_kind: str,
|
|
*,
|
|
upstream_index: int,
|
|
stored_tool_block: dict[str, Any] | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Build a `content_block` for a `content_block_start` with empty streaming fields."""
|
|
if block_kind == "tool_use":
|
|
if (
|
|
isinstance(stored_tool_block, dict)
|
|
and stored_tool_block.get("type") == "tool_use"
|
|
):
|
|
tool_id = stored_tool_block.get("id")
|
|
name = stored_tool_block.get("name")
|
|
inp = stored_tool_block.get("input")
|
|
return {
|
|
"type": "tool_use",
|
|
"id": tool_id
|
|
if isinstance(tool_id, str) and tool_id
|
|
else f"toolu_or_{upstream_index}",
|
|
"name": name if isinstance(name, str) else "",
|
|
"input": inp if isinstance(inp, dict) else {},
|
|
}
|
|
return {
|
|
"type": "tool_use",
|
|
"id": f"toolu_or_{upstream_index}",
|
|
"name": "",
|
|
"input": {},
|
|
}
|
|
if block_kind == "thinking":
|
|
return {"type": "thinking", "thinking": ""}
|
|
if block_kind == "text":
|
|
return {"type": "text", "text": ""}
|
|
return {"type": "text", "text": ""}
|
|
|
|
|
|
def _should_drop_block_type(block_type: Any, *, thinking_enabled: bool) -> bool:
|
|
if not isinstance(block_type, str):
|
|
return False
|
|
if block_type.startswith("redacted_thinking"):
|
|
return not thinking_enabled
|
|
return not thinking_enabled and "thinking" in block_type
|
|
|
|
|
|
def _synthetic_close_other_open_blocks(
|
|
state: NativeSseBlockPolicyState, current_upstream: int
|
|
) -> str:
|
|
"""Close every open block except `current_upstream` and track duplicate upstream stops."""
|
|
out: list[str] = []
|
|
for upstream, seg in list(state.by_upstream.items()):
|
|
if upstream == current_upstream or not seg.open:
|
|
continue
|
|
out.append(
|
|
format_native_sse_event(
|
|
"content_block_stop",
|
|
json.dumps(
|
|
{
|
|
"type": "content_block_stop",
|
|
"index": seg.down_index,
|
|
}
|
|
),
|
|
)
|
|
)
|
|
seg.open = False
|
|
state.pending_suppressed_stops.add(upstream)
|
|
return "".join(out)
|
|
|
|
|
|
def _allocate_new_segment(
|
|
state: NativeSseBlockPolicyState,
|
|
upstream_index: int,
|
|
block_type: str,
|
|
*,
|
|
last_start_block: dict[str, Any] | None = None,
|
|
) -> int:
|
|
"""Assign a new downstream `index` for a segment and record upstream state."""
|
|
new_idx = state.next_index
|
|
state.next_index += 1
|
|
state.by_upstream[upstream_index] = _UpstreamBlockState(
|
|
block_type=block_type,
|
|
down_index=new_idx,
|
|
open=True,
|
|
last_start_block=last_start_block,
|
|
)
|
|
return new_idx
|
|
|
|
|
|
def transform_native_sse_block_event(
|
|
event: str,
|
|
state: NativeSseBlockPolicyState,
|
|
*,
|
|
thinking_enabled: bool,
|
|
) -> str | None:
|
|
"""Normalize native Anthropic SSE events and enforce local thinking policy."""
|
|
event_name, data_text = parse_native_sse_event(event)
|
|
if not event_name or not data_text:
|
|
return event
|
|
|
|
try:
|
|
payload = json.loads(data_text)
|
|
except json.JSONDecodeError:
|
|
return event
|
|
|
|
if event_name == "content_block_start":
|
|
block = payload.get("content_block")
|
|
if not isinstance(block, dict):
|
|
return event
|
|
block_type = block.get("type")
|
|
upstream_index = payload.get("index")
|
|
if not isinstance(upstream_index, int):
|
|
return event
|
|
if _should_drop_block_type(block_type, thinking_enabled=thinking_enabled):
|
|
state.dropped_indexes.add(upstream_index)
|
|
return None
|
|
|
|
if not isinstance(block_type, str):
|
|
return event
|
|
prefix = _synthetic_close_other_open_blocks(state, upstream_index)
|
|
stored = copy.deepcopy(block)
|
|
new_idx = _allocate_new_segment(
|
|
state,
|
|
upstream_index,
|
|
block_type=block_type,
|
|
last_start_block=stored,
|
|
)
|
|
payload["index"] = new_idx
|
|
return prefix + format_native_sse_event(event_name, json.dumps(payload))
|
|
|
|
if event_name == "content_block_delta":
|
|
delta = payload.get("delta")
|
|
if not isinstance(delta, dict):
|
|
return event
|
|
delta_type = delta.get("type")
|
|
upstream_index = payload.get("index")
|
|
if not isinstance(upstream_index, int):
|
|
return event
|
|
if upstream_index in state.dropped_indexes:
|
|
return None
|
|
if _should_drop_block_type(delta_type, thinking_enabled=thinking_enabled):
|
|
return None
|
|
|
|
block_kind = _delta_type_to_block_kind(delta_type)
|
|
if block_kind is None:
|
|
return event
|
|
|
|
seg = state.by_upstream.get(upstream_index)
|
|
if seg and seg.open:
|
|
payload["index"] = seg.down_index
|
|
return format_native_sse_event(event_name, json.dumps(payload))
|
|
|
|
if seg is not None and not seg.open:
|
|
# More deltas for an upstream block after a synthetic (or other) close:
|
|
# reopen with a new downstream `index` and emit a synthetic `content_block_start` first.
|
|
state.pending_suppressed_stops.discard(upstream_index)
|
|
carry = seg.last_start_block
|
|
new_idx = _allocate_new_segment(
|
|
state,
|
|
upstream_index,
|
|
block_type=block_kind,
|
|
last_start_block=carry,
|
|
)
|
|
stored_tool = (
|
|
carry
|
|
if isinstance(carry, dict) and carry.get("type") == "tool_use"
|
|
else None
|
|
)
|
|
start_payload = {
|
|
"type": "content_block_start",
|
|
"index": new_idx,
|
|
"content_block": _synthetic_start_content_block(
|
|
block_kind,
|
|
upstream_index=upstream_index,
|
|
stored_tool_block=stored_tool,
|
|
),
|
|
}
|
|
prefix = format_native_sse_event(
|
|
"content_block_start", json.dumps(start_payload)
|
|
)
|
|
payload["index"] = new_idx
|
|
return prefix + format_native_sse_event(event_name, json.dumps(payload))
|
|
|
|
# Delta with no prior `content_block_start` in this stream
|
|
if block_kind in ("text", "tool_use"):
|
|
synthetic_block = _synthetic_start_content_block(
|
|
block_kind,
|
|
upstream_index=upstream_index,
|
|
)
|
|
new_idx = _allocate_new_segment(
|
|
state,
|
|
upstream_index,
|
|
block_type=block_kind,
|
|
last_start_block=copy.deepcopy(synthetic_block),
|
|
)
|
|
start_payload = {
|
|
"type": "content_block_start",
|
|
"index": new_idx,
|
|
"content_block": synthetic_block,
|
|
}
|
|
prefix = format_native_sse_event(
|
|
"content_block_start", json.dumps(start_payload)
|
|
)
|
|
payload["index"] = new_idx
|
|
return prefix + format_native_sse_event(event_name, json.dumps(payload))
|
|
# thinking: pass through raw (unusual upstream shape)
|
|
return event
|
|
|
|
if event_name == "content_block_stop":
|
|
upstream_index = payload.get("index")
|
|
if not isinstance(upstream_index, int):
|
|
return event
|
|
if upstream_index in state.dropped_indexes:
|
|
return None
|
|
if upstream_index in state.pending_suppressed_stops:
|
|
state.pending_suppressed_stops.discard(upstream_index)
|
|
return None
|
|
|
|
seg = state.by_upstream.get(upstream_index)
|
|
if seg is not None and seg.open:
|
|
payload["index"] = seg.down_index
|
|
seg.open = False
|
|
return format_native_sse_event(event_name, json.dumps(payload))
|
|
if seg is not None:
|
|
# Spurious or duplicate `content_block_stop` for a closed block.
|
|
return None
|
|
if not thinking_enabled:
|
|
return None
|
|
return event
|
|
|
|
return event
|