free-claude-code/core/trace.py
Alishahryar1 29e7714337 feat(logging): structured TRACE events and end-to-end request correlation
Add core/trace.py with trace_event, traced_async_stream, and payload snapshots.
Merge TRACE fields into JSON logs; promote claude_session_id, http path/method.
Instrument API, messaging/CLI, and OpenAI-compat/native provider paths.
Harden log sink with enqueue and stdlib intercept re-entrancy guard.
Document behavior in .env.example and README; extend tests.
2026-05-10 18:24:48 -07:00

214 lines
6 KiB
Python

"""Structured TRACE events for end-to-end request / CLI / provider logging.
Emitted lines are merged into JSON log rows by ``config.logging_config``.
Conversation and Claude Code prompts are logged verbatim unless values live under
sanitized credential keys (e.g. ``api_key``, ``authorization``).
"""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator, Mapping
from typing import Any
from loguru import logger
TRACE_PAYLOAD_BINDING = "trace_payload"
_SECRET_VALUE_KEYS = frozenset(
k.lower()
for k in (
"authorization",
"x-api-key",
"anthropic-auth-token",
"api_key",
"password",
"secret",
"token",
"bearer_token",
"openapi_token",
"nvidia-api-key",
)
)
def _sanitize_trace_value(obj: Any) -> Any:
"""Recursively copy JSON-like structures redacting credential-shaped keys."""
if isinstance(obj, Mapping):
out: dict[str, Any] = {}
for k, v in obj.items():
if str(k).lower() in _SECRET_VALUE_KEYS:
out[str(k)] = "<redacted>"
else:
out[str(k)] = _sanitize_trace_value(v)
return out
if isinstance(obj, tuple | list):
return [_sanitize_trace_value(x) for x in obj]
return obj
def trace_event(*, stage: str, event: str, source: str, **fields: Any) -> None:
"""Emit one structured TRACE row (merged into JSON by the log sink)."""
payload = _sanitize_trace_value(
{
"stage": stage,
"event": event,
"source": source,
**fields,
},
)
logger.bind(trace_payload=payload).info("TRACE {}", event)
def api_messages_request_snapshot(req: Any) -> dict[str, Any]:
"""Return a sanitized snapshot of an Anthropic ``MessagesRequest``-like body."""
if hasattr(req, "model_dump"):
data = req.model_dump(mode="python")
elif isinstance(req, Mapping):
data = dict(req)
else:
data = {}
snapshot: dict[str, Any] = {}
for key in (
"model",
"messages",
"system",
"tools",
"tool_choice",
"max_tokens",
"thinking",
"temperature",
"top_p",
"top_k",
"stop_sequences",
"metadata",
"stream",
"thinking_enabled",
):
if key in data and data[key] is not None:
snapshot[key] = data[key]
return _sanitize_trace_value(snapshot)
def extract_claude_session_id_from_headers(headers: Mapping[str, str]) -> str | None:
"""Best-effort session id forwarded by Claude Code / SDK via HTTP."""
lowered = {str(k).lower(): v for k, v in headers.items() if isinstance(v, str)}
for key in (
"anthropic-session-id",
"x-anthropic-session-id",
"claude-session-id",
"x-claude-session-id",
):
candidate = lowered.get(key)
if candidate:
return candidate
return None
async def traced_async_stream(
agen: AsyncIterator[str],
*,
stage: str,
source: str,
complete_event: str,
interrupted_event: str,
chunk_event: str | None = None,
chunk_interval: int = 250,
extra: Mapping[str, Any] | None = None,
) -> AsyncIterator[str]:
"""Emit TRACE rows when a text stream completes, fails, cancels, or periodically."""
common = dict(extra or {})
count = 0
nbytes = 0
interrupted = False
try:
async for chunk in agen:
count += 1
nbytes += len(chunk.encode("utf-8", errors="replace"))
if chunk_event and chunk_interval > 0 and count % chunk_interval == 0:
trace_event(
stage=stage,
event=chunk_event,
source=source,
stream_chunks_so_far=count,
stream_bytes_so_far=nbytes,
**common,
)
yield chunk
except asyncio.CancelledError:
interrupted = True
trace_event(
stage=stage,
event=interrupted_event,
source=source,
stream_chunks=count,
stream_bytes=nbytes,
outcome="cancelled",
**common,
)
raise
except BaseExceptionGroup as grp:
interrupted = True
trace_event(
stage=stage,
event=interrupted_event,
source=source,
stream_chunks=count,
stream_bytes=nbytes,
outcome="exception_group",
note=str(grp),
**common,
)
raise
except BaseException as exc:
interrupted = True
trace_event(
stage=stage,
event=interrupted_event,
source=source,
stream_chunks=count,
stream_bytes=nbytes,
outcome="error",
exc_type=type(exc).__name__,
**common,
)
raise
if not interrupted:
trace_event(
stage=stage,
event=complete_event,
source=source,
stream_chunks=count,
stream_bytes=nbytes,
outcome="ok",
**common,
)
def provider_chat_body_snapshot(body: Mapping[str, Any]) -> dict[str, Any]:
"""Sanitized OpenAI-compat chat body subset for traces (conversation text verbatim)."""
keys = ("model", "messages", "tools", "tool_choice", "temperature", "max_tokens")
snap = {k: body[k] for k in keys if k in body and body[k] is not None}
return _sanitize_trace_value(snap)
def provider_native_messages_body_snapshot(body: Mapping[str, Any]) -> dict[str, Any]:
"""Sanitized Anthropic Messages API body subset for traces."""
keys = (
"model",
"messages",
"system",
"tools",
"tool_choice",
"max_tokens",
"thinking",
"metadata",
"temperature",
"top_p",
"top_k",
"stop_sequences",
)
snap = {k: body[k] for k in keys if k in body and body[k] is not None}
return _sanitize_trace_value(snap)