eigent/backend/main.py
Tong Chen 6c827a3d06
refactor: establish Brain-centered architecture and frontend/backend separation foundations (#1597)
Co-authored-by: Douglas <douglas.ym.lai@gmail.com>
Co-authored-by: Douglas Lai <115660088+Douglasymlai@users.noreply.github.com>
2026-05-01 17:03:33 +08:00

298 lines
9.4 KiB
Python

# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
import asyncio
import atexit
import os
import pathlib
import signal
import sys
import threading
# Add project root to Python path to import shared utils
_project_root = pathlib.Path(__file__).parent.parent
if str(_project_root) not in sys.path:
sys.path.insert(0, str(_project_root))
import logging
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# Disable verbose CAMEL logs
logging.getLogger("camel").setLevel(logging.WARNING)
logging.getLogger("camel.base_model").setLevel(logging.WARNING)
logging.getLogger("camel.agents").setLevel(logging.WARNING)
logging.getLogger("camel.societies").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
from app import api
from app.component.environment import env
from app.router import register_routers
from app.utils.event_loop_utils import set_main_event_loop
os.environ["PYTHONIOENCODING"] = "utf-8"
app_logger = logging.getLogger("main")
# Log application startup
app_logger.info("Starting Eigent Multi-Agent System API")
app_logger.info(f"Python encoding: {os.environ.get('PYTHONIOENCODING')}")
app_logger.info(f"Environment: {os.environ.get('ENVIRONMENT', 'development')}")
prefix = env("url_prefix", "")
app_logger.info(f"Loading routers with prefix: '{prefix}'")
app_logger.info(
f"MCP will be at: {prefix}/mcp/list, health at: {prefix}/health"
)
register_routers(api, prefix)
app_logger.info("All routers loaded successfully")
# Check if debug mode is enabled via environment variable
if os.environ.get("ENABLE_PYTHON_DEBUG") == "true":
try:
import debugpy
DEBUG_PORT = int(os.environ.get("DEBUG_PORT", "5678"))
app_logger.info(
f"Debug mode enabled - Starting debugpy server on port {DEBUG_PORT}"
)
debugpy.listen(("localhost", DEBUG_PORT))
app_logger.info(
f"Debugger ready for attachment on localhost:{DEBUG_PORT}"
)
# 📝 In VS Code: Run 'Debug Python Backend (Attach)' configuration
# Don't wait for client automatically - let it attach when ready
except ImportError:
app_logger.warning(
"debugpy not available, install with: uv add debugpy"
)
except Exception as e:
app_logger.error(f"Failed to start debugpy: {e}")
dir = pathlib.Path(__file__).parent / "runtime"
dir.mkdir(parents=True, exist_ok=True)
# Write PID file asynchronously
async def write_pid_file():
r"""Write PID file asynchronously"""
import aiofiles
async with aiofiles.open(dir / "run.pid", "w") as f:
await f.write(str(os.getpid()))
app_logger.info(f"PID file written: {os.getpid()}")
# PID task will be created on startup
pid_task = None
@api.on_event("startup")
async def startup_event():
global pid_task
set_main_event_loop(asyncio.get_running_loop())
pid_task = asyncio.create_task(write_pid_file())
app_logger.info("PID write task created")
# Initialize EnvironmentHands from Brain deployment (full on local/cloud_vm, sandbox in Docker)
from app.router_layer.hands_resolver import init_environment_hands
hands = init_environment_hands()
app_logger.info(f"EnvironmentHands initialized: mode={hands.mode}")
# Initialize telemetry tracer provider
from app.utils.telemetry.workforce_metrics import (
initialize_tracer_provider,
)
initialize_tracer_provider()
app_logger.info("Telemetry tracer provider initialized")
@api.on_event("shutdown")
async def shutdown_event_handler():
r"""Run cleanup when uvicorn receives SIGINT/SIGTERM and shuts down."""
await cleanup_resources()
async def cleanup_resources():
r"""Cleanup all resources on shutdown"""
app_logger.info("Starting graceful shutdown process")
from app.service.task import _cleanup_task, task_locks
if _cleanup_task and not _cleanup_task.done():
_cleanup_task.cancel()
try:
await _cleanup_task
except asyncio.CancelledError:
pass
# Cleanup all task locks
for task_id in list(task_locks.keys()):
try:
task_lock = task_locks[task_id]
await task_lock.cleanup()
except Exception as e:
app_logger.error(f"Error cleaning up task {task_id}: {e}")
# Remove PID file
pid_file = dir / "run.pid"
if pid_file.exists():
pid_file.unlink()
# Shutdown OpenTelemetry tracer (releases BatchSpanProcessor worker threads)
try:
from app.utils.telemetry.workforce_metrics import (
shutdown_tracer_provider,
)
shutdown_tracer_provider()
except Exception as e:
app_logger.warning(f"Telemetry shutdown failed: {e}")
# Shutdown TerminalToolkit thread pool (prevents non-daemon threads blocking exit)
try:
from app.agent.toolkit.terminal_toolkit import TerminalToolkit
if TerminalToolkit._thread_pool is not None:
TerminalToolkit._thread_pool.shutdown(wait=False)
TerminalToolkit._thread_pool = None
except Exception as e:
app_logger.warning(f"TerminalToolkit shutdown failed: {e}")
# Best-effort close Browser toolkit WebSocket/Node connections.
# Use a timeout so shutdown stays responsive even if a wrapper is stuck.
try:
from app.agent.toolkit.hybrid_browser_toolkit import (
websocket_connection_pool,
)
await asyncio.wait_for(
websocket_connection_pool.close_all(), timeout=3.0
)
except TimeoutError:
app_logger.warning("Browser WebSocket pool shutdown timed out")
except Exception as e:
app_logger.warning(f"Browser WebSocket pool shutdown failed: {e}")
set_main_event_loop(None)
app_logger.info("All resources cleaned up successfully")
# Register cleanup on exit with safe synchronous wrapper
def sync_cleanup():
"""Synchronous cleanup for atexit - handles PID file removal"""
try:
# Only perform synchronous cleanup tasks
pid_file = dir / "run.pid"
if pid_file.exists():
pid_file.unlink()
app_logger.info("PID file removed during shutdown")
except Exception as e:
app_logger.error(f"Error during atexit cleanup: {e}")
atexit.register(sync_cleanup)
# Log successful initialization
app_logger.info("Application initialization completed successfully")
def run_standalone():
"""Run Brain in standalone mode (no Electron dependency)."""
import uvicorn
port = int(env("EIGENT_BRAIN_PORT", "5001"))
host = env("EIGENT_BRAIN_HOST", "0.0.0.0") # nosec B104 - bind all for Docker/dev
reload = os.environ.get("EIGENT_DEBUG", "").lower() in ("1", "true", "yes")
app_logger.info(
f"Starting Brain in standalone mode: {host}:{port} (reload={reload})"
)
if reload:
uvicorn.run(
"main:api",
host=host,
port=port,
reload=reload,
timeout_graceful_shutdown=5,
)
return
config = uvicorn.Config(
"main:api",
host=host,
port=port,
reload=False,
timeout_graceful_shutdown=5,
)
server = uvicorn.Server(config)
server.install_signal_handlers = lambda: None
force_exit_timer = None
signal_count = {"count": 0}
old_sigint = signal.getsignal(signal.SIGINT)
old_sigterm = signal.getsignal(signal.SIGTERM)
def _force_exit(signum: int):
signame = signal.Signals(signum).name
app_logger.error(
"Force exiting Brain after %s because graceful shutdown did not finish",
signame,
)
os._exit(128 + signum)
def _handle_signal(signum, _frame):
nonlocal force_exit_timer
signame = signal.Signals(signum).name
signal_count["count"] += 1
if signal_count["count"] == 1:
app_logger.warning(
"%s received, requesting graceful shutdown. Press Ctrl+C again to force exit.",
signame,
)
server.should_exit = True
if force_exit_timer is None:
force_exit_timer = threading.Timer(
5.0, _force_exit, args=(signum,)
)
force_exit_timer.daemon = True
force_exit_timer.start()
return
app_logger.error(
"%s received again, force exiting Brain immediately", signame
)
_force_exit(signum)
signal.signal(signal.SIGINT, _handle_signal)
signal.signal(signal.SIGTERM, _handle_signal)
try:
server.run()
finally:
if force_exit_timer is not None:
force_exit_timer.cancel()
signal.signal(signal.SIGINT, old_sigint)
signal.signal(signal.SIGTERM, old_sigterm)
if __name__ == "__main__":
run_standalone()