# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= import asyncio import inspect import logging import os import time from pathlib import Path from dotenv import load_dotenv from fastapi import APIRouter, Request, Response from fastapi.responses import StreamingResponse from app.component import code from app.component.environment import sanitize_env_path, set_user_env_path from app.exception.exception import UserException from app.model.chat import ( AddTaskRequest, Chat, HumanReply, McpServers, Status, SupplementChat, sse_json, ) from app.service.chat_service import step_solve from app.service.task import ( Action, ActionAddTaskData, ActionImproveData, ActionInstallMcpData, ActionRemoveTaskData, ActionSkipTaskData, ActionStopData, ActionSupplementData, ImprovePayload, delete_task_lock, get_or_create_task_lock, get_task_lock, get_task_lock_if_exists, set_current_task_id, task_locks, ) from app.service.upload.service import PartialUploadContext from app.utils.browser_launcher import ( ensure_cdp_browser_endpoint, is_cdp_url_available, normalize_cdp_url, ) from app.utils.workspace_resolver import get_workspace_resolver router = APIRouter() # Logger for chat controller chat_logger = logging.getLogger("chat_controller") # SSE timeout configuration (60 minutes in seconds) SSE_TIMEOUT_SECONDS = 60 * 60 def _user_upload_ids(attaches: list[str] | None) -> list[str]: return [ attach for attach in (attaches or []) if isinstance(attach, str) and attach.startswith("upload://") ] def _remember_user_uploads( task_lock, task_id: str | None, attaches: list[str] | None, ) -> None: if not task_id: return uploads = _user_upload_ids(attaches) if not uploads: return by_task = getattr(task_lock, "user_upload_ids_by_task", None) if by_task is None: by_task = {} task_lock.user_upload_ids_by_task = by_task existing = by_task.setdefault(task_id, []) for upload_id in uploads: if upload_id not in existing: existing.append(upload_id) def _email_from_task_lock_or_env(task_lock) -> str | None: email = getattr(task_lock, "email", None) if email: return email current_file_save_path = os.environ.get("file_save_path", "") if not current_file_save_path: return None path_parts = Path(current_file_save_path).parts for root_name in ("eigent", ".eigent"): if root_name in path_parts: root_index = path_parts.index(root_name) if root_index + 1 < len(path_parts): return path_parts[root_index + 1] return None def _is_remote_browser_hands(request: Request | None) -> bool: hands = getattr(getattr(request, "state", None), "hands", None) if hands is None: return False get_manifest = getattr(hands, "get_capability_manifest", None) if get_manifest is None or inspect.iscoroutinefunction(get_manifest): return False try: manifest = get_manifest() except Exception: return False if inspect.isawaitable(manifest): if hasattr(manifest, "close"): manifest.close() return False if not isinstance(manifest, dict): return False return manifest.get("deployment") == "remote_cluster" async def _prepare_browser_for_request( request: Request | None, port: int, ) -> bool: existing_cdp_url = os.environ.get("EIGENT_CDP_URL", "").strip() if existing_cdp_url: is_available = await asyncio.to_thread( is_cdp_url_available, existing_cdp_url ) if is_available: normalized_endpoint, _, selected_port = normalize_cdp_url( existing_cdp_url ) os.environ["EIGENT_CDP_URL"] = normalized_endpoint os.environ["browser_port"] = str(selected_port) if request is not None: request.state.browser_available = True return True os.environ.pop("EIGENT_CDP_URL", None) if _is_remote_browser_hands(request): if request is not None: request.state.browser_available = True return True try: endpoint = await asyncio.to_thread(ensure_cdp_browser_endpoint, port) except Exception as e: os.environ.pop("EIGENT_CDP_URL", None) chat_logger.warning( "Could not ensure CDP browser for web mode", extra={"error": str(e), "port": port}, ) if request is not None: request.state.browser_available = False return False if endpoint: os.environ["EIGENT_CDP_URL"] = endpoint _, _, selected_port = normalize_cdp_url(endpoint) os.environ["browser_port"] = str(selected_port) if request is not None: request.state.browser_available = True return True os.environ.pop("EIGENT_CDP_URL", None) chat_logger.warning( "CDP browser not available after ensure attempt", extra={"port": port}, ) if request is not None: request.state.browser_available = False return False async def _cleanup_task_lock_safe(task_lock, reason: str) -> bool: """Safely cleanup task lock with existence check. Args: task_lock: The task lock to cleanup reason: Reason for cleanup (for logging) Returns: True if cleanup was performed, False otherwise """ if not task_lock: return False # Check if task_lock still exists before attempting cleanup if task_lock.id not in task_locks: chat_logger.debug( f"[{reason}] Task lock already removed, skipping cleanup", extra={"task_id": task_lock.id}, ) return False try: task_lock.status = Status.done await delete_task_lock(task_lock.id) chat_logger.info( f"[{reason}] Task lock cleanup completed", extra={"task_id": task_lock.id}, ) return True except Exception as e: chat_logger.error( f"[{reason}] Failed to cleanup task lock", extra={"task_id": task_lock.id, "error": str(e)}, exc_info=True, ) return False async def timeout_stream_wrapper( stream_generator, timeout_seconds: int = SSE_TIMEOUT_SECONDS, task_lock=None, ): """Wraps a stream generator with timeout handling. Closes the SSE connection if no data is received within the timeout period. Triggers cleanup if timeout occurs to prevent resource leaks. """ last_data_time = time.time() generator = stream_generator.__aiter__() cleanup_triggered = False try: while True: elapsed = time.time() - last_data_time remaining_timeout = timeout_seconds - elapsed try: data = await asyncio.wait_for( generator.__anext__(), timeout=remaining_timeout ) last_data_time = time.time() yield data except TimeoutError: chat_logger.warning( "SSE timeout: No data received, closing connection", extra={"timeout_seconds": timeout_seconds}, ) timeout_min = timeout_seconds // 60 yield sse_json( "error", { "message": "Connection timeout: No data" f" received for {timeout_min}" " minutes" }, ) cleanup_triggered = await _cleanup_task_lock_safe( task_lock, "TIMEOUT" ) break except StopAsyncIteration: break except asyncio.CancelledError: chat_logger.info( "[STREAM-CANCELLED] Stream cancelled, triggering cleanup" ) if not cleanup_triggered: await _cleanup_task_lock_safe(task_lock, "CANCELLED") raise except Exception as e: chat_logger.error( "[STREAM-ERROR] Unexpected error in stream wrapper", extra={"error": str(e)}, exc_info=True, ) if not cleanup_triggered: await _cleanup_task_lock_safe(task_lock, "ERROR") raise async def start_chat_stream(data: Chat, request: Request): """ Setup and start chat stream. Used by POST /chat and Message Router. Returns async generator of SSE chunks. """ chat_logger.info( "Starting new chat session", extra={ "project_id": data.project_id, "task_id": data.task_id, "user": data.email, }, ) task_lock = get_or_create_task_lock(data.project_id) # Set user-specific environment path for this thread set_user_env_path(data.env_path) # Load environment with validated path safe_env_path = sanitize_env_path(data.env_path) if safe_env_path: load_dotenv(dotenv_path=safe_env_path) resolver = get_workspace_resolver() frozen_dirs = resolver.freeze_task_directories(data, task_lock) try: await asyncio.to_thread( resolver.write_task_snapshot, data.email, frozen_dirs.snapshot, ) except Exception: chat_logger.warning( "Failed to persist task workspace snapshot", extra={"project_id": data.project_id, "task_id": data.task_id}, exc_info=True, ) session_id = getattr(request.state, "session_id", "") task_lock.session_id = session_id _remember_user_uploads(task_lock, data.task_id, data.attaches) # TODO(multi-tenant): os.environ is global – concurrent sessions overwrite # each other's API keys, file paths, and browser ports. Pass these values # through Chat / request context instead of mutating the process environment. os.environ["file_save_path"] = str(frozen_dirs.working_directory) os.environ["browser_port"] = str(data.browser_port) # Web mode: reuse an existing CDP endpoint first, otherwise acquire browser # through RemoteHands or launch a local browser when available. if not data.cdp_browsers: await _prepare_browser_for_request(request, data.browser_port) os.environ["OPENAI_API_KEY"] = data.api_key os.environ["OPENAI_API_BASE_URL"] = ( data.api_url or "https://api.openai.com/v1" ) os.environ["CAMEL_MODEL_LOG_ENABLED"] = "true" # Set user-specific search engine configuration if provided if data.search_config: for key, value in data.search_config.items(): if value: os.environ[key] = value chat_logger.debug( f"Set search config: {key}", extra={"project_id": data.project_id}, ) camel_log = resolver.log_root(data.project_id, data.task_id, data.email) camel_log.mkdir(parents=True, exist_ok=True) os.environ["CAMEL_LOG_DIR"] = str(camel_log) task_lock.upload_context_partial = PartialUploadContext( session_id=session_id, raw_server_url=data.server_url or "", authorization=request.headers.get("Authorization", ""), task_output_root=frozen_dirs.task_output_root, task_start_time=frozen_dirs.task_start_time, camel_log_dir=camel_log, ) if data.is_cloud(): os.environ["cloud_api_key"] = data.api_key # Set the initial current_task_id in task_lock set_current_task_id(data.project_id, data.task_id) # Put initial action in queue to start processing await task_lock.put_queue( ActionImproveData( data=ImprovePayload( question=data.question, attaches=data.attaches or [], ), new_task_id=data.task_id, ) ) chat_logger.info( "Chat session initialized", extra={ "project_id": data.project_id, "task_id": data.task_id, "log_dir": str(camel_log), }, ) return timeout_stream_wrapper( step_solve(data, request, task_lock), task_lock=task_lock ) @router.post("/chat", name="start chat") async def post(data: Chat, request: Request): stream = await start_chat_stream(data, request) return StreamingResponse( stream, media_type="text/event-stream", ) @router.post("/chat/{id}", name="improve chat") def improve(id: str, data: SupplementChat, request: Request): chat_logger.info( "Chat improvement requested", extra={"task_id": id, "question_length": len(data.question)}, ) task_lock = get_task_lock(id) # Reuse an existing endpoint when possible to avoid tearing down # a browser that was manually connected through the Browser page. port = int(os.environ.get("browser_port", "9222")) asyncio.run(_prepare_browser_for_request(request, port)) # Allow continuing conversation even after task is done # This supports multi-turn conversation after complex task completion if task_lock.status == Status.done: # Reset status to allow processing new messages task_lock.status = Status.confirming # Clear any existing background tasks since workforce was stopped if hasattr(task_lock, "background_tasks"): task_lock.background_tasks.clear() # Note: conversation_history and last_task_result are preserved # Log context preservation if hasattr(task_lock, "conversation_history"): hist_len = len(task_lock.conversation_history) chat_logger.info( f"[CONTEXT] Preserved {hist_len} conversation entries" ) if hasattr(task_lock, "last_task_result"): result_len = len(task_lock.last_task_result) chat_logger.info( f"[CONTEXT] Preserved task result: {result_len} chars" ) # If task_id is provided, freeze the task-level workspace context before # the action reaches the workforce. project_root() intentionally stays # project-scoped; agent execution reads TaskLock.working_directory. if data.task_id: try: current_email = _email_from_task_lock_or_env(task_lock) if current_email and id: resolver = get_workspace_resolver() frozen_dirs = resolver.freeze_task_directories_for( project_id=id, task_id=data.task_id, email=current_email, task_lock=task_lock, ) try: resolver.write_task_snapshot( current_email, frozen_dirs.snapshot ) except Exception: chat_logger.warning( "Failed to persist task workspace snapshot", extra={"project_id": id, "task_id": data.task_id}, exc_info=True, ) os.environ["file_save_path"] = str( frozen_dirs.working_directory ) camel_log = resolver.log_root(id, data.task_id, current_email) camel_log.mkdir(parents=True, exist_ok=True) os.environ["CAMEL_LOG_DIR"] = str(camel_log) task_lock.new_folder_path = ( frozen_dirs.working_directory if frozen_dirs.binding_source == "default" else None ) previous_context = getattr( task_lock, "upload_context_partial", None ) task_lock.upload_context_partial = PartialUploadContext( session_id=getattr( request.state, "session_id", getattr(task_lock, "session_id", "") or "", ), raw_server_url=getattr( previous_context, "raw_server_url", "" ), authorization=request.headers.get( "Authorization", getattr(previous_context, "authorization", ""), ), task_output_root=frozen_dirs.task_output_root, task_start_time=frozen_dirs.task_start_time, camel_log_dir=camel_log, ) _remember_user_uploads(task_lock, data.task_id, data.attaches) chat_logger.info( f"Updated working directory to: {frozen_dirs.working_directory}" ) else: chat_logger.warning( "Could not update" " file_save_path -" f" email: {current_email}," f" project_id: {id}" ) except Exception as e: chat_logger.error( "Error updating file path for" f" project_id: {id}," f" task_id: {data.task_id}:" f" {e}" ) asyncio.run( task_lock.put_queue( ActionImproveData( data=ImprovePayload( question=data.question, attaches=data.attaches or [], ), new_task_id=data.task_id, ) ) ) chat_logger.info( "Improvement request queued with preserved context", extra={"project_id": id}, ) return Response(status_code=201) @router.put("/chat/{id}", name="supplement task") def supplement(id: str, data: SupplementChat): chat_logger.info("Chat supplement requested", extra={"task_id": id}) task_lock = get_task_lock(id) if task_lock.status != Status.done: raise UserException(code.error, "Please wait task done") asyncio.run(task_lock.put_queue(ActionSupplementData(data=data))) chat_logger.debug("Supplement data queued", extra={"task_id": id}) return Response(status_code=201) @router.delete("/chat/{id}", name="stop chat") def stop(id: str): """stop the task""" chat_logger.info("=" * 80) chat_logger.info( "🛑 [STOP-BUTTON] DELETE /chat/{id} request received from frontend" ) chat_logger.info(f"[STOP-BUTTON] project_id/task_id: {id}") chat_logger.info("=" * 80) task_lock = get_task_lock_if_exists(id) if task_lock is not None: chat_logger.info( "[STOP-BUTTON] Task lock retrieved," f" task_lock.id: {task_lock.id}," f" task_lock.status: {task_lock.status}" ) chat_logger.info( "[STOP-BUTTON] Queueing" " ActionStopData(Action.stop)" " to task_lock queue" ) try: asyncio.run( task_lock.put_queue(ActionStopData(action=Action.stop)) ) chat_logger.info( "[STOP-BUTTON] ActionStopData queued" " successfully, this will trigger" " workforce.stop_gracefully()" ) except Exception as e: chat_logger.warning( "[STOP-BUTTON] Failed to queue ActionStopData", extra={"task_id": id, "error": str(e)}, ) else: chat_logger.warning( "[STOP-BUTTON] Task lock not found, task may already be stopped", extra={"task_id": id}, ) return Response(status_code=204) @router.post("/chat/{id}/human-reply") def human_reply(id: str, data: HumanReply): chat_logger.info( "Human reply received", extra={"task_id": id, "reply_length": len(data.reply)}, ) task_lock = get_task_lock(id) asyncio.run(task_lock.put_human_input(data.agent, data.reply)) chat_logger.debug("Human reply processed", extra={"task_id": id}) return Response(status_code=201) @router.post("/chat/{id}/install-mcp") def install_mcp(id: str, data: McpServers): chat_logger.info( "Installing MCP servers", extra={ "task_id": id, "servers_count": len(data.get("mcpServers", {})), }, ) task_lock = get_task_lock(id) asyncio.run( task_lock.put_queue( ActionInstallMcpData(action=Action.install_mcp, data=data) ) ) chat_logger.info("MCP installation queued", extra={"task_id": id}) return Response(status_code=201) @router.post("/chat/{id}/add-task", name="add task to workforce") def add_task(id: str, data: AddTaskRequest): """Add a new task to the workforce""" chat_logger.info( "Adding task to workforce for" f" task_id: {id}," f" content: {data.content[:100]}..." ) task_lock = get_task_lock(id) try: # Queue the add task action add_task_action = ActionAddTaskData( content=data.content, project_id=data.project_id, task_id=data.task_id, additional_info=data.additional_info, insert_position=data.insert_position, ) asyncio.run(task_lock.put_queue(add_task_action)) return Response(status_code=201) except Exception as e: chat_logger.error(f"Error adding task for task_id: {id}: {e}") raise UserException(code.error, f"Failed to add task: {str(e)}") @router.delete( "/chat/{project_id}/remove-task/{task_id}", name="remove task from workforce", ) def remove_task(project_id: str, task_id: str): """Remove a task from the workforce""" chat_logger.info( f"Removing task {task_id} from workforce for project_id: {project_id}" ) task_lock = get_task_lock(project_id) try: # Queue the remove task action remove_task_action = ActionRemoveTaskData( task_id=task_id, project_id=project_id ) asyncio.run(task_lock.put_queue(remove_task_action)) chat_logger.info( "Task removal request queued for" f" project_id: {project_id}," f" removing task: {task_id}" ) return Response(status_code=204) except Exception as e: chat_logger.error( f"Error removing task {task_id} for project_id: {project_id}: {e}" ) raise UserException(code.error, f"Failed to remove task: {str(e)}") @router.post("/chat/{project_id}/skip-task", name="skip task in workforce") def skip_task(project_id: str): """ Skip/Stop current task execution while preserving context. This endpoint is called when user clicks the Stop button. Behavior: - Stops workforce gracefully - Marks task as done - Preserves conversation_history and last_task_result in task_lock - Sends 'end' event to frontend - Keeps SSE connection alive for multi-turn conversation """ chat_logger.info("=" * 80) chat_logger.info( "[STOP-BUTTON] SKIP-TASK request" " received from frontend" " (User clicked Stop)" ) chat_logger.info(f"[STOP-BUTTON] project_id: {project_id}") chat_logger.info("=" * 80) task_lock = get_task_lock_if_exists(project_id) if task_lock is None: chat_logger.warning( "[STOP-BUTTON] Task lock not found, task may already be stopped", extra={"project_id": project_id}, ) return Response(status_code=204) chat_logger.info( "[STOP-BUTTON] Task lock retrieved," f" task_lock.id: {task_lock.id}," " task_lock.status:" f" {task_lock.status}" ) try: # Queue the skip task action - this will # preserve context for multi-turn skip_task_action = ActionSkipTaskData(project_id=project_id) chat_logger.info( "[STOP-BUTTON] Queueing" " ActionSkipTaskData" " (preserves context," " marks as done)" ) asyncio.run(task_lock.put_queue(skip_task_action)) chat_logger.info( "[STOP-BUTTON] Skip request" " queued - task will stop" " gracefully and preserve context" ) return Response(status_code=201) except Exception as e: chat_logger.error( "[STOP-BUTTON] Error skipping" " task for" f" project_id: {project_id}:" f" {e}" ) raise UserException(code.error, f"Failed to skip task: {str(e)}")