mirror of
https://github.com/Alishahryar1/free-claude-code.git
synced 2026-05-21 10:22:51 +00:00
Add core/trace.py with trace_event, traced_async_stream, and payload snapshots. Merge TRACE fields into JSON logs; promote claude_session_id, http path/method. Instrument API, messaging/CLI, and OpenAI-compat/native provider paths. Harden log sink with enqueue and stdlib intercept re-entrancy guard. Document behavior in .env.example and README; extend tests.
214 lines
6 KiB
Python
214 lines
6 KiB
Python
"""Structured TRACE events for end-to-end request / CLI / provider logging.
|
|
|
|
Emitted lines are merged into JSON log rows by ``config.logging_config``.
|
|
Conversation and Claude Code prompts are logged verbatim unless values live under
|
|
sanitized credential keys (e.g. ``api_key``, ``authorization``).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from collections.abc import AsyncIterator, Mapping
|
|
from typing import Any
|
|
|
|
from loguru import logger
|
|
|
|
TRACE_PAYLOAD_BINDING = "trace_payload"
|
|
|
|
_SECRET_VALUE_KEYS = frozenset(
|
|
k.lower()
|
|
for k in (
|
|
"authorization",
|
|
"x-api-key",
|
|
"anthropic-auth-token",
|
|
"api_key",
|
|
"password",
|
|
"secret",
|
|
"token",
|
|
"bearer_token",
|
|
"openapi_token",
|
|
"nvidia-api-key",
|
|
)
|
|
)
|
|
|
|
|
|
def _sanitize_trace_value(obj: Any) -> Any:
|
|
"""Recursively copy JSON-like structures redacting credential-shaped keys."""
|
|
if isinstance(obj, Mapping):
|
|
out: dict[str, Any] = {}
|
|
for k, v in obj.items():
|
|
if str(k).lower() in _SECRET_VALUE_KEYS:
|
|
out[str(k)] = "<redacted>"
|
|
else:
|
|
out[str(k)] = _sanitize_trace_value(v)
|
|
return out
|
|
if isinstance(obj, tuple | list):
|
|
return [_sanitize_trace_value(x) for x in obj]
|
|
return obj
|
|
|
|
|
|
def trace_event(*, stage: str, event: str, source: str, **fields: Any) -> None:
|
|
"""Emit one structured TRACE row (merged into JSON by the log sink)."""
|
|
payload = _sanitize_trace_value(
|
|
{
|
|
"stage": stage,
|
|
"event": event,
|
|
"source": source,
|
|
**fields,
|
|
},
|
|
)
|
|
logger.bind(trace_payload=payload).info("TRACE {}", event)
|
|
|
|
|
|
def api_messages_request_snapshot(req: Any) -> dict[str, Any]:
|
|
"""Return a sanitized snapshot of an Anthropic ``MessagesRequest``-like body."""
|
|
if hasattr(req, "model_dump"):
|
|
data = req.model_dump(mode="python")
|
|
elif isinstance(req, Mapping):
|
|
data = dict(req)
|
|
else:
|
|
data = {}
|
|
|
|
snapshot: dict[str, Any] = {}
|
|
for key in (
|
|
"model",
|
|
"messages",
|
|
"system",
|
|
"tools",
|
|
"tool_choice",
|
|
"max_tokens",
|
|
"thinking",
|
|
"temperature",
|
|
"top_p",
|
|
"top_k",
|
|
"stop_sequences",
|
|
"metadata",
|
|
"stream",
|
|
"thinking_enabled",
|
|
):
|
|
if key in data and data[key] is not None:
|
|
snapshot[key] = data[key]
|
|
return _sanitize_trace_value(snapshot)
|
|
|
|
|
|
def extract_claude_session_id_from_headers(headers: Mapping[str, str]) -> str | None:
|
|
"""Best-effort session id forwarded by Claude Code / SDK via HTTP."""
|
|
lowered = {str(k).lower(): v for k, v in headers.items() if isinstance(v, str)}
|
|
for key in (
|
|
"anthropic-session-id",
|
|
"x-anthropic-session-id",
|
|
"claude-session-id",
|
|
"x-claude-session-id",
|
|
):
|
|
candidate = lowered.get(key)
|
|
if candidate:
|
|
return candidate
|
|
return None
|
|
|
|
|
|
async def traced_async_stream(
|
|
agen: AsyncIterator[str],
|
|
*,
|
|
stage: str,
|
|
source: str,
|
|
complete_event: str,
|
|
interrupted_event: str,
|
|
chunk_event: str | None = None,
|
|
chunk_interval: int = 250,
|
|
extra: Mapping[str, Any] | None = None,
|
|
) -> AsyncIterator[str]:
|
|
"""Emit TRACE rows when a text stream completes, fails, cancels, or periodically."""
|
|
common = dict(extra or {})
|
|
count = 0
|
|
nbytes = 0
|
|
interrupted = False
|
|
try:
|
|
async for chunk in agen:
|
|
count += 1
|
|
nbytes += len(chunk.encode("utf-8", errors="replace"))
|
|
if chunk_event and chunk_interval > 0 and count % chunk_interval == 0:
|
|
trace_event(
|
|
stage=stage,
|
|
event=chunk_event,
|
|
source=source,
|
|
stream_chunks_so_far=count,
|
|
stream_bytes_so_far=nbytes,
|
|
**common,
|
|
)
|
|
yield chunk
|
|
except asyncio.CancelledError:
|
|
interrupted = True
|
|
trace_event(
|
|
stage=stage,
|
|
event=interrupted_event,
|
|
source=source,
|
|
stream_chunks=count,
|
|
stream_bytes=nbytes,
|
|
outcome="cancelled",
|
|
**common,
|
|
)
|
|
raise
|
|
except BaseExceptionGroup as grp:
|
|
interrupted = True
|
|
trace_event(
|
|
stage=stage,
|
|
event=interrupted_event,
|
|
source=source,
|
|
stream_chunks=count,
|
|
stream_bytes=nbytes,
|
|
outcome="exception_group",
|
|
note=str(grp),
|
|
**common,
|
|
)
|
|
raise
|
|
except BaseException as exc:
|
|
interrupted = True
|
|
trace_event(
|
|
stage=stage,
|
|
event=interrupted_event,
|
|
source=source,
|
|
stream_chunks=count,
|
|
stream_bytes=nbytes,
|
|
outcome="error",
|
|
exc_type=type(exc).__name__,
|
|
**common,
|
|
)
|
|
raise
|
|
|
|
if not interrupted:
|
|
trace_event(
|
|
stage=stage,
|
|
event=complete_event,
|
|
source=source,
|
|
stream_chunks=count,
|
|
stream_bytes=nbytes,
|
|
outcome="ok",
|
|
**common,
|
|
)
|
|
|
|
|
|
def provider_chat_body_snapshot(body: Mapping[str, Any]) -> dict[str, Any]:
|
|
"""Sanitized OpenAI-compat chat body subset for traces (conversation text verbatim)."""
|
|
keys = ("model", "messages", "tools", "tool_choice", "temperature", "max_tokens")
|
|
snap = {k: body[k] for k in keys if k in body and body[k] is not None}
|
|
return _sanitize_trace_value(snap)
|
|
|
|
|
|
def provider_native_messages_body_snapshot(body: Mapping[str, Any]) -> dict[str, Any]:
|
|
"""Sanitized Anthropic Messages API body subset for traces."""
|
|
keys = (
|
|
"model",
|
|
"messages",
|
|
"system",
|
|
"tools",
|
|
"tool_choice",
|
|
"max_tokens",
|
|
"thinking",
|
|
"metadata",
|
|
"temperature",
|
|
"top_p",
|
|
"top_k",
|
|
"stop_sequences",
|
|
)
|
|
snap = {k: body[k] for k in keys if k in body and body[k] is not None}
|
|
return _sanitize_trace_value(snap)
|