"""Application runtime composition and lifecycle ownership.""" from __future__ import annotations import asyncio import os from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any from fastapi import FastAPI from loguru import logger from config.settings import Settings, get_settings from providers.registry import ProviderRegistry if TYPE_CHECKING: from cli.manager import CLISessionManager from messaging.handler import ClaudeMessageHandler from messaging.platforms.base import MessagingPlatform from messaging.session import SessionStore _SHUTDOWN_TIMEOUT_S = 5.0 async def best_effort( name: str, awaitable: Any, timeout_s: float = _SHUTDOWN_TIMEOUT_S ) -> None: """Run a shutdown step with timeout; never raise to callers.""" try: await asyncio.wait_for(awaitable, timeout=timeout_s) except TimeoutError: logger.warning(f"Shutdown step timed out: {name} ({timeout_s}s)") except Exception as e: logger.warning(f"Shutdown step failed: {name}: {type(e).__name__}: {e}") def warn_if_process_auth_token(settings: Settings) -> None: """Warn when server auth was implicitly inherited from the shell.""" if settings.uses_process_anthropic_auth_token(): logger.warning( "ANTHROPIC_AUTH_TOKEN is set in the process environment but not in " "a configured .env file. The proxy will require that token. Add " "ANTHROPIC_AUTH_TOKEN= to .env to disable proxy auth, or set the " "same token in .env to make server auth explicit." ) @dataclass(slots=True) class AppRuntime: """Own optional messaging, CLI, session, and provider runtime resources.""" app: FastAPI settings: Settings _provider_registry: ProviderRegistry | None = field(default=None, init=False) messaging_platform: MessagingPlatform | None = None message_handler: ClaudeMessageHandler | None = None cli_manager: CLISessionManager | None = None @classmethod def for_app( cls, app: FastAPI, settings: Settings | None = None, ) -> AppRuntime: return cls(app=app, settings=settings or get_settings()) async def startup(self) -> None: logger.info("Starting Claude Code Proxy...") self._provider_registry = ProviderRegistry() self.app.state.provider_registry = self._provider_registry warn_if_process_auth_token(self.settings) await self._start_messaging_if_configured() self._publish_state() async def shutdown(self) -> None: if self.message_handler is not None: try: self.message_handler.session_store.flush_pending_save() except Exception as e: logger.warning(f"Session store flush on shutdown: {e}") logger.info("Shutdown requested, cleaning up...") if self.messaging_platform: await best_effort("messaging_platform.stop", self.messaging_platform.stop()) if self.cli_manager: await best_effort("cli_manager.stop_all", self.cli_manager.stop_all()) if self._provider_registry is not None: await best_effort( "provider_registry.cleanup", self._provider_registry.cleanup() ) await self._shutdown_limiter() logger.info("Server shut down cleanly") async def _start_messaging_if_configured(self) -> None: try: from messaging.platforms.factory import ( MessagingPlatformOptions, create_messaging_platform, ) self.messaging_platform = create_messaging_platform( self.settings.messaging_platform, MessagingPlatformOptions( telegram_bot_token=self.settings.telegram_bot_token, allowed_telegram_user_id=self.settings.allowed_telegram_user_id, discord_bot_token=self.settings.discord_bot_token, allowed_discord_channels=self.settings.allowed_discord_channels, voice_note_enabled=self.settings.voice_note_enabled, whisper_model=self.settings.whisper_model, whisper_device=self.settings.whisper_device, hf_token=self.settings.hf_token, nvidia_nim_api_key=self.settings.nvidia_nim_api_key, ), ) if self.messaging_platform: await self._start_message_handler() except ImportError as e: logger.warning(f"Messaging module import error: {e}") except Exception as e: logger.error(f"Failed to start messaging platform: {e}") import traceback logger.error(traceback.format_exc()) async def _start_message_handler(self) -> None: from cli.manager import CLISessionManager from messaging.handler import ClaudeMessageHandler from messaging.session import SessionStore workspace = ( os.path.abspath(self.settings.allowed_dir) if self.settings.allowed_dir else os.getcwd() ) os.makedirs(workspace, exist_ok=True) data_path = os.path.abspath(self.settings.claude_workspace) os.makedirs(data_path, exist_ok=True) api_url = f"http://{self.settings.host}:{self.settings.port}/v1" allowed_dirs = [workspace] if self.settings.allowed_dir else [] plans_dir_abs = os.path.abspath( os.path.join(self.settings.claude_workspace, "plans") ) plans_directory = os.path.relpath(plans_dir_abs, workspace) self.cli_manager = CLISessionManager( workspace_path=workspace, api_url=api_url, allowed_dirs=allowed_dirs, plans_directory=plans_directory, claude_bin=self.settings.claude_cli_bin, ) session_store = SessionStore( storage_path=os.path.join(data_path, "sessions.json") ) platform = self.messaging_platform assert platform is not None self.message_handler = ClaudeMessageHandler( platform=platform, cli_manager=self.cli_manager, session_store=session_store, ) self._restore_tree_state(session_store) platform.on_message(self.message_handler.handle_message) await platform.start() logger.info(f"{platform.name} platform started with message handler") def _restore_tree_state(self, session_store: SessionStore) -> None: saved_trees = session_store.get_all_trees() if not saved_trees: return if self.message_handler is None: return logger.info(f"Restoring {len(saved_trees)} conversation trees...") from messaging.trees.queue_manager import TreeQueueManager self.message_handler.replace_tree_queue( TreeQueueManager.from_dict( { "trees": saved_trees, "node_to_tree": session_store.get_node_mapping(), }, queue_update_callback=self.message_handler.update_queue_positions, node_started_callback=self.message_handler.mark_node_processing, ) ) if self.message_handler.tree_queue.cleanup_stale_nodes() > 0: tree_data = self.message_handler.tree_queue.to_dict() session_store.sync_from_tree_data( tree_data["trees"], tree_data["node_to_tree"] ) def _publish_state(self) -> None: self.app.state.messaging_platform = self.messaging_platform self.app.state.message_handler = self.message_handler self.app.state.cli_manager = self.cli_manager async def _shutdown_limiter(self) -> None: try: from messaging.limiter import MessagingRateLimiter except Exception as e: logger.debug( "Rate limiter shutdown skipped (import failed): {}: {}", type(e).__name__, e, ) return await best_effort( "MessagingRateLimiter.shutdown_instance", MessagingRateLimiter.shutdown_instance(), timeout_s=2.0, )