free-claude-code/api/dependencies.py
chinfeng b07892fbec fix: extract client token from request when passthrough is enabled without server auth
When ENABLE_API_KEY_PASSTHROUGH is active but ANTHROPIC_AUTH_TOKEN is not
set, require_api_key must still extract the client token from x-api-key or
Authorization headers so it can be forwarded to the upstream provider.
Previously it returned an empty string, causing "no client auth token was
provided" errors even when the client sent a valid header.
2026-05-18 00:51:01 +08:00

183 lines
6.6 KiB
Python

"""Dependency injection for FastAPI."""
import secrets
from fastapi import Depends, HTTPException, Request
from loguru import logger
from starlette.applications import Starlette
from config.settings import Settings
from config.settings import get_settings as _get_settings
from core.anthropic import get_user_facing_error_message
from providers.base import BaseProvider
from providers.exceptions import (
AuthenticationError,
ServiceUnavailableError,
UnknownProviderTypeError,
)
from providers.registry import PROVIDER_DESCRIPTORS, ProviderRegistry
# Process-level cache: only for :func:`get_provider_for_type` / :func:`get_provider`
# when there is no ``Request``/``app`` (unit tests, scripts). HTTP handlers must pass
# ``app`` to :func:`resolve_provider` so the app-scoped registry is used.
_providers: dict[str, BaseProvider] = {}
def get_settings() -> Settings:
"""Return cached :class:`~config.settings.Settings` (FastAPI-friendly alias)."""
return _get_settings()
def resolve_provider(
provider_type: str,
*,
app: Starlette | None,
settings: Settings,
passthrough_api_key: str = "",
) -> BaseProvider:
"""Resolve a provider using the app-scoped registry when ``app`` is set.
When ``app`` is not ``None``, the app-owned :attr:`app.state.provider_registry`
must exist (installed by :class:`~api.runtime.AppRuntime` during startup).
Callers that construct a bare ``FastAPI`` without lifespan must set
``app.state.provider_registry`` explicitly.
When ``app`` is ``None`` (no HTTP context), uses the process-level
:data:`_providers` cache only.
When ``passthrough_api_key`` is non-empty and
``settings.enable_api_key_passthrough`` is True, the client's auth token
replaces per-provider credentials in the upstream request.
"""
if app is not None:
reg = getattr(app.state, "provider_registry", None)
if reg is None:
raise ServiceUnavailableError(
"Provider registry is not configured. Ensure AppRuntime startup ran "
"or assign app.state.provider_registry for test apps."
)
return _resolve_with_registry(
reg, provider_type, settings, passthrough_api_key=passthrough_api_key
)
return _resolve_with_registry(ProviderRegistry(_providers), provider_type, settings)
def _resolve_with_registry(
registry: ProviderRegistry,
provider_type: str,
settings: Settings,
*,
passthrough_api_key: str = "",
) -> BaseProvider:
cache_key = provider_type
if passthrough_api_key and settings.enable_api_key_passthrough:
cache_key = f"{provider_type}:{passthrough_api_key}"
should_log_init = not registry.is_cached(cache_key)
try:
provider = registry.get(
provider_type, settings, passthrough_api_key=passthrough_api_key
)
except AuthenticationError as e:
# Provider :class:`~providers.exceptions.AuthenticationError` messages are
# curated configuration hints (env var names, docs links), not upstream noise.
detail = str(e).strip() or get_user_facing_error_message(e)
raise HTTPException(status_code=503, detail=detail) from e
except UnknownProviderTypeError:
logger.error(
"Unknown provider_type: '{}'. Supported: {}",
provider_type,
", ".join(f"'{key}'" for key in PROVIDER_DESCRIPTORS),
)
raise
if should_log_init:
logger.info("Provider initialized: {}", provider_type)
return provider
def get_provider_for_type(provider_type: str) -> BaseProvider:
"""Get or create a provider in the process-level cache (no ``app``/Request).
HTTP route handlers should call :func:`resolve_provider` with the active
:attr:`request.app` (via :class:`~api.runtime.AppRuntime`) instead of this
process-wide cache.
"""
return resolve_provider(provider_type, app=None, settings=get_settings())
def _parse_bearer_token(header: str) -> str:
"""Extract and clean a token from an Authorization or x-api-key header value."""
token = header
if header.lower().startswith("bearer "):
token = header.split(" ", 1)[1]
if token and ":" in token:
token = token.split(":", 1)[0]
return token
def _extract_client_token(request: Request) -> str:
"""Extract a client auth token from the request for passthrough mode."""
header = (
request.headers.get("x-api-key")
or request.headers.get("authorization")
or request.headers.get("anthropic-auth-token")
)
if not header:
raise HTTPException(status_code=401, detail="Missing API key")
return _parse_bearer_token(header)
def require_api_key(
request: Request, settings: Settings = Depends(get_settings)
) -> str:
"""Require a server API key (Anthropic-style).
Checks `x-api-key` header or `Authorization: Bearer ...` against
`Settings.anthropic_auth_token`. If `ANTHROPIC_AUTH_TOKEN` is empty, this is a no-op.
Returns the extracted client token for pass-through when
``ENABLE_API_KEY_PASSTHROUGH`` is active.
"""
anthropic_auth_token = settings.anthropic_auth_token
if not anthropic_auth_token:
# No server auth configured. In passthrough mode, still extract the
# client token from the request header so it can be forwarded upstream.
if settings.enable_api_key_passthrough:
return _extract_client_token(request)
return ""
header = (
request.headers.get("x-api-key")
or request.headers.get("authorization")
or request.headers.get("anthropic-auth-token")
)
if not header:
raise HTTPException(status_code=401, detail="Missing API key")
token = _parse_bearer_token(header)
# Constant-time comparison to avoid leaking the configured token via
# response-time differences on a per-byte mismatch (CWE-208).
if not secrets.compare_digest(
token.encode("utf-8"), anthropic_auth_token.encode("utf-8")
):
raise HTTPException(status_code=401, detail="Invalid API key")
return token
def get_provider() -> BaseProvider:
"""Get or create the default provider (``MODEL`` / ``provider_type``).
Process-cache helper for scripts, unit tests, and non-FastAPI callers. HTTP
handlers must use :func:`resolve_provider` with :attr:`request.app` so the
app-scoped :class:`~providers.registry.ProviderRegistry` is used.
"""
return get_provider_for_type(get_settings().provider_type)
async def cleanup_provider():
"""Cleanup all provider resources."""
global _providers
await ProviderRegistry(_providers).cleanup()
_providers = {}
logger.debug("Provider cleanup completed")