mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-15 09:45:42 +00:00
187 lines
5.5 KiB
Python
187 lines
5.5 KiB
Python
"""Classify stream exceptions for logging and client error payloads."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import time
|
|
from typing import Any, Literal
|
|
|
|
from app.agents.new_chat.errors import BusyError
|
|
from app.agents.new_chat.middleware.busy_mutex import (
|
|
get_cancel_state,
|
|
is_cancel_requested,
|
|
)
|
|
|
|
TURN_CANCELLING_INITIAL_DELAY_MS = 200
|
|
TURN_CANCELLING_BACKOFF_FACTOR = 2
|
|
TURN_CANCELLING_MAX_DELAY_MS = 1500
|
|
|
|
|
|
def compute_turn_cancelling_retry_delay(attempt: int) -> int:
|
|
if attempt < 1:
|
|
attempt = 1
|
|
delay = TURN_CANCELLING_INITIAL_DELAY_MS * (
|
|
TURN_CANCELLING_BACKOFF_FACTOR ** (attempt - 1)
|
|
)
|
|
return min(delay, TURN_CANCELLING_MAX_DELAY_MS)
|
|
|
|
|
|
def log_chat_stream_error(
|
|
*,
|
|
flow: Literal["new", "resume", "regenerate"],
|
|
error_kind: str,
|
|
error_code: str | None,
|
|
severity: Literal["info", "warn", "error"],
|
|
is_expected: bool,
|
|
request_id: str | None,
|
|
thread_id: int | None,
|
|
search_space_id: int | None,
|
|
user_id: str | None,
|
|
message: str,
|
|
extra: dict[str, Any] | None = None,
|
|
) -> None:
|
|
payload: dict[str, Any] = {
|
|
"event": "chat_stream_error",
|
|
"flow": flow,
|
|
"error_kind": error_kind,
|
|
"error_code": error_code,
|
|
"severity": severity,
|
|
"is_expected": is_expected,
|
|
"request_id": request_id or "unknown",
|
|
"thread_id": thread_id,
|
|
"search_space_id": search_space_id,
|
|
"user_id": user_id,
|
|
"message": message,
|
|
}
|
|
if extra:
|
|
payload.update(extra)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
rendered = json.dumps(payload, ensure_ascii=False)
|
|
if severity == "error":
|
|
logger.error("[chat_stream_error] %s", rendered)
|
|
elif severity == "warn":
|
|
logger.warning("[chat_stream_error] %s", rendered)
|
|
else:
|
|
logger.info("[chat_stream_error] %s", rendered)
|
|
|
|
|
|
def _parse_error_payload(message: str) -> dict[str, Any] | None:
|
|
candidates = [message]
|
|
first_brace_idx = message.find("{")
|
|
if first_brace_idx >= 0:
|
|
candidates.append(message[first_brace_idx:])
|
|
|
|
for candidate in candidates:
|
|
try:
|
|
parsed = json.loads(candidate)
|
|
if isinstance(parsed, dict):
|
|
return parsed
|
|
except Exception:
|
|
continue
|
|
return None
|
|
|
|
|
|
def _extract_provider_error_code(parsed: dict[str, Any] | None) -> int | None:
|
|
if not isinstance(parsed, dict):
|
|
return None
|
|
candidates: list[Any] = [parsed.get("code")]
|
|
nested = parsed.get("error")
|
|
if isinstance(nested, dict):
|
|
candidates.append(nested.get("code"))
|
|
for value in candidates:
|
|
try:
|
|
if value is None:
|
|
continue
|
|
return int(value)
|
|
except Exception:
|
|
continue
|
|
return None
|
|
|
|
|
|
def is_provider_rate_limited(exc: BaseException) -> bool:
|
|
"""Return True if the exception looks like an upstream HTTP 429 / rate limit."""
|
|
raw = str(exc)
|
|
lowered = raw.lower()
|
|
if "ratelimit" in type(exc).__name__.lower():
|
|
return True
|
|
parsed = _parse_error_payload(raw)
|
|
provider_code = _extract_provider_error_code(parsed)
|
|
if provider_code == 429:
|
|
return True
|
|
|
|
provider_error_type = ""
|
|
if parsed:
|
|
top_type = parsed.get("type")
|
|
if isinstance(top_type, str):
|
|
provider_error_type = top_type.lower()
|
|
nested = parsed.get("error")
|
|
if isinstance(nested, dict):
|
|
nested_type = nested.get("type")
|
|
if isinstance(nested_type, str):
|
|
provider_error_type = nested_type.lower()
|
|
if provider_error_type == "rate_limit_error":
|
|
return True
|
|
|
|
return (
|
|
"rate limited" in lowered
|
|
or "rate-limited" in lowered
|
|
or "temporarily rate-limited upstream" in lowered
|
|
)
|
|
|
|
|
|
def classify_stream_exception(
|
|
exc: Exception,
|
|
*,
|
|
flow_label: str,
|
|
) -> tuple[
|
|
str, str, Literal["info", "warn", "error"], bool, str, dict[str, Any] | None
|
|
]:
|
|
"""Return kind, code, severity, expected flag, message, and optional extra dict."""
|
|
raw = str(exc)
|
|
if isinstance(exc, BusyError) or "Thread is busy with another request" in raw:
|
|
busy_thread_id = str(exc.request_id) if isinstance(exc, BusyError) else None
|
|
if busy_thread_id and is_cancel_requested(busy_thread_id):
|
|
cancel_state = get_cancel_state(busy_thread_id)
|
|
attempt = cancel_state[0] if cancel_state else 1
|
|
retry_after_ms = compute_turn_cancelling_retry_delay(attempt)
|
|
retry_after_at = int(time.time() * 1000) + retry_after_ms
|
|
return (
|
|
"thread_busy",
|
|
"TURN_CANCELLING",
|
|
"info",
|
|
True,
|
|
"A previous response is still stopping. Please try again in a moment.",
|
|
{
|
|
"retry_after_ms": retry_after_ms,
|
|
"retry_after_at": retry_after_at,
|
|
},
|
|
)
|
|
return (
|
|
"thread_busy",
|
|
"THREAD_BUSY",
|
|
"warn",
|
|
True,
|
|
"Another response is still finishing for this thread. Please try again in a moment.",
|
|
None,
|
|
)
|
|
|
|
if is_provider_rate_limited(exc):
|
|
return (
|
|
"rate_limited",
|
|
"RATE_LIMITED",
|
|
"warn",
|
|
True,
|
|
"This model is temporarily rate-limited. Please try again in a few seconds or switch models.",
|
|
None,
|
|
)
|
|
|
|
return (
|
|
"server_error",
|
|
"SERVER_ERROR",
|
|
"error",
|
|
False,
|
|
f"Error during {flow_label}: {raw}",
|
|
None,
|
|
)
|