mirror of
https://github.com/eigent-ai/eigent.git
synced 2026-05-22 19:47:28 +00:00
Co-authored-by: Douglas <douglas.ym.lai@gmail.com> Co-authored-by: Douglas Lai <115660088+Douglasymlai@users.noreply.github.com>
298 lines
9.4 KiB
Python
298 lines
9.4 KiB
Python
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
|
|
|
|
import asyncio
|
|
import atexit
|
|
import os
|
|
import pathlib
|
|
import signal
|
|
import sys
|
|
import threading
|
|
|
|
# Add project root to Python path to import shared utils
|
|
_project_root = pathlib.Path(__file__).parent.parent
|
|
if str(_project_root) not in sys.path:
|
|
sys.path.insert(0, str(_project_root))
|
|
|
|
import logging
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
|
)
|
|
|
|
# Disable verbose CAMEL logs
|
|
logging.getLogger("camel").setLevel(logging.WARNING)
|
|
logging.getLogger("camel.base_model").setLevel(logging.WARNING)
|
|
logging.getLogger("camel.agents").setLevel(logging.WARNING)
|
|
logging.getLogger("camel.societies").setLevel(logging.WARNING)
|
|
logging.getLogger("httpx").setLevel(logging.WARNING)
|
|
logging.getLogger("httpcore").setLevel(logging.WARNING)
|
|
|
|
from app import api
|
|
from app.component.environment import env
|
|
from app.router import register_routers
|
|
from app.utils.event_loop_utils import set_main_event_loop
|
|
|
|
os.environ["PYTHONIOENCODING"] = "utf-8"
|
|
|
|
app_logger = logging.getLogger("main")
|
|
|
|
# Log application startup
|
|
app_logger.info("Starting Eigent Multi-Agent System API")
|
|
app_logger.info(f"Python encoding: {os.environ.get('PYTHONIOENCODING')}")
|
|
app_logger.info(f"Environment: {os.environ.get('ENVIRONMENT', 'development')}")
|
|
|
|
prefix = env("url_prefix", "")
|
|
app_logger.info(f"Loading routers with prefix: '{prefix}'")
|
|
app_logger.info(
|
|
f"MCP will be at: {prefix}/mcp/list, health at: {prefix}/health"
|
|
)
|
|
register_routers(api, prefix)
|
|
app_logger.info("All routers loaded successfully")
|
|
|
|
# Check if debug mode is enabled via environment variable
|
|
if os.environ.get("ENABLE_PYTHON_DEBUG") == "true":
|
|
try:
|
|
import debugpy
|
|
|
|
DEBUG_PORT = int(os.environ.get("DEBUG_PORT", "5678"))
|
|
app_logger.info(
|
|
f"Debug mode enabled - Starting debugpy server on port {DEBUG_PORT}"
|
|
)
|
|
debugpy.listen(("localhost", DEBUG_PORT))
|
|
app_logger.info(
|
|
f"Debugger ready for attachment on localhost:{DEBUG_PORT}"
|
|
)
|
|
# 📝 In VS Code: Run 'Debug Python Backend (Attach)' configuration
|
|
# Don't wait for client automatically - let it attach when ready
|
|
except ImportError:
|
|
app_logger.warning(
|
|
"debugpy not available, install with: uv add debugpy"
|
|
)
|
|
except Exception as e:
|
|
app_logger.error(f"Failed to start debugpy: {e}")
|
|
|
|
dir = pathlib.Path(__file__).parent / "runtime"
|
|
dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
# Write PID file asynchronously
|
|
async def write_pid_file():
|
|
r"""Write PID file asynchronously"""
|
|
import aiofiles
|
|
|
|
async with aiofiles.open(dir / "run.pid", "w") as f:
|
|
await f.write(str(os.getpid()))
|
|
app_logger.info(f"PID file written: {os.getpid()}")
|
|
|
|
|
|
# PID task will be created on startup
|
|
pid_task = None
|
|
|
|
|
|
@api.on_event("startup")
|
|
async def startup_event():
|
|
global pid_task
|
|
set_main_event_loop(asyncio.get_running_loop())
|
|
pid_task = asyncio.create_task(write_pid_file())
|
|
app_logger.info("PID write task created")
|
|
|
|
# Initialize EnvironmentHands from Brain deployment (full on local/cloud_vm, sandbox in Docker)
|
|
from app.router_layer.hands_resolver import init_environment_hands
|
|
|
|
hands = init_environment_hands()
|
|
app_logger.info(f"EnvironmentHands initialized: mode={hands.mode}")
|
|
|
|
# Initialize telemetry tracer provider
|
|
from app.utils.telemetry.workforce_metrics import (
|
|
initialize_tracer_provider,
|
|
)
|
|
|
|
initialize_tracer_provider()
|
|
app_logger.info("Telemetry tracer provider initialized")
|
|
|
|
|
|
@api.on_event("shutdown")
|
|
async def shutdown_event_handler():
|
|
r"""Run cleanup when uvicorn receives SIGINT/SIGTERM and shuts down."""
|
|
await cleanup_resources()
|
|
|
|
|
|
async def cleanup_resources():
|
|
r"""Cleanup all resources on shutdown"""
|
|
app_logger.info("Starting graceful shutdown process")
|
|
|
|
from app.service.task import _cleanup_task, task_locks
|
|
|
|
if _cleanup_task and not _cleanup_task.done():
|
|
_cleanup_task.cancel()
|
|
try:
|
|
await _cleanup_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
# Cleanup all task locks
|
|
for task_id in list(task_locks.keys()):
|
|
try:
|
|
task_lock = task_locks[task_id]
|
|
await task_lock.cleanup()
|
|
except Exception as e:
|
|
app_logger.error(f"Error cleaning up task {task_id}: {e}")
|
|
|
|
# Remove PID file
|
|
pid_file = dir / "run.pid"
|
|
if pid_file.exists():
|
|
pid_file.unlink()
|
|
|
|
# Shutdown OpenTelemetry tracer (releases BatchSpanProcessor worker threads)
|
|
try:
|
|
from app.utils.telemetry.workforce_metrics import (
|
|
shutdown_tracer_provider,
|
|
)
|
|
|
|
shutdown_tracer_provider()
|
|
except Exception as e:
|
|
app_logger.warning(f"Telemetry shutdown failed: {e}")
|
|
|
|
# Shutdown TerminalToolkit thread pool (prevents non-daemon threads blocking exit)
|
|
try:
|
|
from app.agent.toolkit.terminal_toolkit import TerminalToolkit
|
|
|
|
if TerminalToolkit._thread_pool is not None:
|
|
TerminalToolkit._thread_pool.shutdown(wait=False)
|
|
TerminalToolkit._thread_pool = None
|
|
except Exception as e:
|
|
app_logger.warning(f"TerminalToolkit shutdown failed: {e}")
|
|
|
|
# Best-effort close Browser toolkit WebSocket/Node connections.
|
|
# Use a timeout so shutdown stays responsive even if a wrapper is stuck.
|
|
try:
|
|
from app.agent.toolkit.hybrid_browser_toolkit import (
|
|
websocket_connection_pool,
|
|
)
|
|
|
|
await asyncio.wait_for(
|
|
websocket_connection_pool.close_all(), timeout=3.0
|
|
)
|
|
except TimeoutError:
|
|
app_logger.warning("Browser WebSocket pool shutdown timed out")
|
|
except Exception as e:
|
|
app_logger.warning(f"Browser WebSocket pool shutdown failed: {e}")
|
|
|
|
set_main_event_loop(None)
|
|
app_logger.info("All resources cleaned up successfully")
|
|
|
|
|
|
# Register cleanup on exit with safe synchronous wrapper
|
|
def sync_cleanup():
|
|
"""Synchronous cleanup for atexit - handles PID file removal"""
|
|
try:
|
|
# Only perform synchronous cleanup tasks
|
|
pid_file = dir / "run.pid"
|
|
if pid_file.exists():
|
|
pid_file.unlink()
|
|
app_logger.info("PID file removed during shutdown")
|
|
except Exception as e:
|
|
app_logger.error(f"Error during atexit cleanup: {e}")
|
|
|
|
|
|
atexit.register(sync_cleanup)
|
|
|
|
# Log successful initialization
|
|
app_logger.info("Application initialization completed successfully")
|
|
|
|
|
|
def run_standalone():
|
|
"""Run Brain in standalone mode (no Electron dependency)."""
|
|
import uvicorn
|
|
|
|
port = int(env("EIGENT_BRAIN_PORT", "5001"))
|
|
host = env("EIGENT_BRAIN_HOST", "0.0.0.0") # nosec B104 - bind all for Docker/dev
|
|
reload = os.environ.get("EIGENT_DEBUG", "").lower() in ("1", "true", "yes")
|
|
|
|
app_logger.info(
|
|
f"Starting Brain in standalone mode: {host}:{port} (reload={reload})"
|
|
)
|
|
if reload:
|
|
uvicorn.run(
|
|
"main:api",
|
|
host=host,
|
|
port=port,
|
|
reload=reload,
|
|
timeout_graceful_shutdown=5,
|
|
)
|
|
return
|
|
|
|
config = uvicorn.Config(
|
|
"main:api",
|
|
host=host,
|
|
port=port,
|
|
reload=False,
|
|
timeout_graceful_shutdown=5,
|
|
)
|
|
server = uvicorn.Server(config)
|
|
server.install_signal_handlers = lambda: None
|
|
|
|
force_exit_timer = None
|
|
signal_count = {"count": 0}
|
|
old_sigint = signal.getsignal(signal.SIGINT)
|
|
old_sigterm = signal.getsignal(signal.SIGTERM)
|
|
|
|
def _force_exit(signum: int):
|
|
signame = signal.Signals(signum).name
|
|
app_logger.error(
|
|
"Force exiting Brain after %s because graceful shutdown did not finish",
|
|
signame,
|
|
)
|
|
os._exit(128 + signum)
|
|
|
|
def _handle_signal(signum, _frame):
|
|
nonlocal force_exit_timer
|
|
signame = signal.Signals(signum).name
|
|
signal_count["count"] += 1
|
|
|
|
if signal_count["count"] == 1:
|
|
app_logger.warning(
|
|
"%s received, requesting graceful shutdown. Press Ctrl+C again to force exit.",
|
|
signame,
|
|
)
|
|
server.should_exit = True
|
|
if force_exit_timer is None:
|
|
force_exit_timer = threading.Timer(
|
|
5.0, _force_exit, args=(signum,)
|
|
)
|
|
force_exit_timer.daemon = True
|
|
force_exit_timer.start()
|
|
return
|
|
|
|
app_logger.error(
|
|
"%s received again, force exiting Brain immediately", signame
|
|
)
|
|
_force_exit(signum)
|
|
|
|
signal.signal(signal.SIGINT, _handle_signal)
|
|
signal.signal(signal.SIGTERM, _handle_signal)
|
|
try:
|
|
server.run()
|
|
finally:
|
|
if force_exit_timer is not None:
|
|
force_exit_timer.cancel()
|
|
signal.signal(signal.SIGINT, old_sigint)
|
|
signal.signal(signal.SIGTERM, old_sigterm)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run_standalone()
|