From 86748275171de642eba612077cda40194b18a196 Mon Sep 17 00:00:00 2001 From: luo <479933015@qq.com> Date: Thu, 6 Nov 2025 11:08:14 +0800 Subject: [PATCH 1/3] feat(chat_controller): add SSE timeout handling and improve logging structure --- backend/app/controller/chat_controller.py | 93 ++++++++++++++++++----- 1 file changed, 72 insertions(+), 21 deletions(-) diff --git a/backend/app/controller/chat_controller.py b/backend/app/controller/chat_controller.py index d9afc4acd..338537845 100644 --- a/backend/app/controller/chat_controller.py +++ b/backend/app/controller/chat_controller.py @@ -1,6 +1,7 @@ import asyncio import os import re +import time from pathlib import Path from dotenv import load_dotenv from fastapi import APIRouter, HTTPException, Request, Response @@ -8,7 +9,7 @@ from fastapi.responses import StreamingResponse from utils import traceroot_wrapper as traceroot from app.component import code from app.exception.exception import UserException -from app.model.chat import Chat, HumanReply, McpServers, Status, SupplementChat, AddTaskRequest +from app.model.chat import Chat, HumanReply, McpServers, Status, SupplementChat, AddTaskRequest, sse_json from app.service.chat_service import step_solve from app.service.task import ( Action, @@ -30,13 +31,51 @@ from camel.tasks.task import Task router = APIRouter(tags=["chat"]) # Create traceroot logger for chat controller -chat_logger = traceroot.get_logger('chat_controller') +chat_logger = traceroot.get_logger("chat_controller") + +# SSE timeout configuration (10 minutes in seconds) +SSE_TIMEOUT_SECONDS = 10 * 60 + + +async def timeout_stream_wrapper(stream_generator, timeout_seconds: int = SSE_TIMEOUT_SECONDS): + last_data_time = [time.time()] + generator = stream_generator.__aiter__() + should_stop = False + + try: + while not should_stop: + elapsed = time.time() - last_data_time[0] + remaining_timeout = max(0, timeout_seconds - elapsed) + + if elapsed >= timeout_seconds: + chat_logger.warning(f"SSE timeout: No data received for {elapsed:.1f} seconds, closing connection") + yield sse_json("timeout", {"message": "Connection timeout: No data received for 10 minutes"}) + break + try: + data = await asyncio.wait_for(generator.__anext__(), timeout=remaining_timeout) + last_data_time[0] = time.time() + yield data + except asyncio.TimeoutError: + chat_logger.warning(f"SSE timeout: No data received for {timeout_seconds} seconds, closing connection") + yield sse_json("timeout", {"message": "Connection timeout: No data received for 10 minutes"}) + break + except StopAsyncIteration: + break + + except asyncio.CancelledError: + chat_logger.info("Stream cancelled") + raise + except Exception as e: + chat_logger.error(f"Error in stream wrapper: {e}", exc_info=True) + raise @router.post("/chat", name="start chat") @traceroot.trace() async def post(data: Chat, request: Request): - chat_logger.info("Starting new chat session", extra={"project_id": data.project_id, "task_id": data.task_id, "user": data.email}) + chat_logger.info( + "Starting new chat session", extra={"project_id": data.project_id, "task_id": data.task_id, "user": data.email} + ) task_lock = get_or_create_task_lock(data.project_id) # Set user-specific environment path for this thread @@ -50,7 +89,14 @@ async def post(data: Chat, request: Request): os.environ["CAMEL_MODEL_LOG_ENABLED"] = "true" email_sanitized = re.sub(r'[\\/*?:"<>|\s]', "_", data.email.split("@")[0]).strip(".") - camel_log = Path.home() / ".eigent" / email_sanitized / ("project_" + data.project_id) / ("task_" + data.task_id) / "camel_logs" + camel_log = ( + Path.home() + / ".eigent" + / email_sanitized + / ("project_" + data.project_id) + / ("task_" + data.task_id) + / "camel_logs" + ) camel_log.mkdir(parents=True, exist_ok=True) os.environ["CAMEL_LOG_DIR"] = str(camel_log) @@ -61,8 +107,13 @@ async def post(data: Chat, request: Request): # Put initial action in queue to start processing await task_lock.put_queue(ActionImproveData(data=data.question)) - chat_logger.info("Chat session initialized, starting streaming response", extra={"project_id": data.project_id, "task_id": data.task_id, "log_dir": str(camel_log)}) - return StreamingResponse(step_solve(data, request, task_lock), media_type="text/event-stream") + chat_logger.info( + "Chat session initialized, starting streaming response", + extra={"project_id": data.project_id, "task_id": data.task_id, "log_dir": str(camel_log)}, + ) + return StreamingResponse( + timeout_stream_wrapper(step_solve(data, request, task_lock)), media_type="text/event-stream" + ) @router.post("/chat/{id}", name="improve chat") @@ -77,14 +128,14 @@ def improve(id: str, data: SupplementChat): # Reset status to allow processing new messages task_lock.status = Status.confirming # Clear any existing background tasks since workforce was stopped - if hasattr(task_lock, 'background_tasks'): + if hasattr(task_lock, "background_tasks"): task_lock.background_tasks.clear() # Note: conversation_history and last_task_result are preserved # Log context preservation - if hasattr(task_lock, 'conversation_history'): + if hasattr(task_lock, "conversation_history"): chat_logger.info(f"[CONTEXT] Preserved {len(task_lock.conversation_history)} conversation entries") - if hasattr(task_lock, 'last_task_result'): + if hasattr(task_lock, "last_task_result"): chat_logger.info(f"[CONTEXT] Preserved task result: {len(task_lock.last_task_result)} chars") # Update file save path if task_id is provided @@ -93,7 +144,7 @@ def improve(id: str, data: SupplementChat): try: # Get current environment values needed to construct new path current_email = None - + # Extract email from current file_save_path if available current_file_save_path = os.environ.get("file_save_path", "") if current_file_save_path: @@ -102,7 +153,7 @@ def improve(id: str, data: SupplementChat): eigent_index = path_parts.index("eigent") if eigent_index + 1 < len(path_parts): current_email = path_parts[eigent_index + 1] - + # If we have the necessary information, update the file_save_path if current_email and id: # Create new path using the existing pattern: email/project_{project_id}/task_{task_id} @@ -110,12 +161,12 @@ def improve(id: str, data: SupplementChat): new_folder_path.mkdir(parents=True, exist_ok=True) os.environ["file_save_path"] = str(new_folder_path) chat_logger.info(f"Updated file_save_path to: {new_folder_path}") - + # Store the new folder path in task_lock for potential cleanup and persistence task_lock.new_folder_path = new_folder_path else: chat_logger.warning(f"Could not update file_save_path - email: {current_email}, project_id: {id}") - + except Exception as e: chat_logger.error(f"Error updating file path for project_id: {id}, task_id: {data.task_id}: {e}") @@ -160,7 +211,7 @@ def human_reply(id: str, data: HumanReply): @router.post("/chat/{id}/install-mcp") @traceroot.trace() def install_mcp(id: str, data: McpServers): - chat_logger.info("Installing MCP servers", extra={"task_id": id, "servers_count": len(data.get('mcpServers', {}))}) + chat_logger.info("Installing MCP servers", extra={"task_id": id, "servers_count": len(data.get("mcpServers", {}))}) task_lock = get_task_lock(id) asyncio.run(task_lock.put_queue(ActionInstallMcpData(action=Action.install_mcp, data=data))) chat_logger.info("MCP installation queued", extra={"task_id": id}) @@ -173,7 +224,7 @@ def add_task(id: str, data: AddTaskRequest): """Add a new task to the workforce""" chat_logger.info(f"Adding task to workforce for task_id: {id}, content: {data.content[:100]}...") task_lock = get_task_lock(id) - + try: # Queue the add task action add_task_action = ActionAddTaskData( @@ -181,11 +232,11 @@ def add_task(id: str, data: AddTaskRequest): project_id=data.project_id, task_id=data.task_id, additional_info=data.additional_info, - insert_position=data.insert_position + insert_position=data.insert_position, ) asyncio.run(task_lock.put_queue(add_task_action)) return Response(status_code=201) - + except Exception as e: chat_logger.error(f"Error adding task for task_id: {id}: {e}") raise UserException(code.error, f"Failed to add task: {str(e)}") @@ -197,7 +248,7 @@ def remove_task(project_id: str, task_id: str): """Remove a task from the workforce""" chat_logger.info(f"Removing task {task_id} from workforce for project_id: {project_id}") task_lock = get_task_lock(project_id) - + try: # Queue the remove task action remove_task_action = ActionRemoveTaskData(task_id=task_id, project_id=project_id) @@ -205,7 +256,7 @@ def remove_task(project_id: str, task_id: str): chat_logger.info(f"Task removal request queued for project_id: {project_id}, removing task: {task_id}") return Response(status_code=204) - + except Exception as e: chat_logger.error(f"Error removing task {task_id} for project_id: {project_id}: {e}") raise UserException(code.error, f"Failed to remove task: {str(e)}") @@ -217,7 +268,7 @@ def skip_task(project_id: str): """Skip a task in the workforce""" chat_logger.info(f"Skipping task in workforce for project_id: {project_id}") task_lock = get_task_lock(project_id) - + try: # Queue the skip task action skip_task_action = ActionSkipTaskData(project_id=project_id) @@ -225,7 +276,7 @@ def skip_task(project_id: str): chat_logger.info(f"Task skip request queued for project_id: {project_id}") return Response(status_code=201) - + except Exception as e: chat_logger.error(f"Error skipping task for project_id: {project_id}: {e}") raise UserException(code.error, f"Failed to skip task: {str(e)}") From cbaa477fe8240617a448f177a70f38dcab5b31c2 Mon Sep 17 00:00:00 2001 From: luo <479933015@qq.com> Date: Thu, 6 Nov 2025 12:45:40 +0800 Subject: [PATCH 2/3] fix(chat_controller): change SSE timeout response type from 'timeout' to 'error' --- backend/app/controller/chat_controller.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/app/controller/chat_controller.py b/backend/app/controller/chat_controller.py index 338537845..7500ea248 100644 --- a/backend/app/controller/chat_controller.py +++ b/backend/app/controller/chat_controller.py @@ -49,7 +49,7 @@ async def timeout_stream_wrapper(stream_generator, timeout_seconds: int = SSE_TI if elapsed >= timeout_seconds: chat_logger.warning(f"SSE timeout: No data received for {elapsed:.1f} seconds, closing connection") - yield sse_json("timeout", {"message": "Connection timeout: No data received for 10 minutes"}) + yield sse_json("error", {"message": "Connection timeout: No data received for 10 minutes"}) break try: data = await asyncio.wait_for(generator.__anext__(), timeout=remaining_timeout) @@ -57,7 +57,7 @@ async def timeout_stream_wrapper(stream_generator, timeout_seconds: int = SSE_TI yield data except asyncio.TimeoutError: chat_logger.warning(f"SSE timeout: No data received for {timeout_seconds} seconds, closing connection") - yield sse_json("timeout", {"message": "Connection timeout: No data received for 10 minutes"}) + yield sse_json("error", {"message": "Connection timeout: No data received for 10 minutes"}) break except StopAsyncIteration: break From 03f8b1d246caf7ad0f9b63faa63683e836c7dc90 Mon Sep 17 00:00:00 2001 From: Wendong-Fan Date: Fri, 7 Nov 2025 14:46:27 +0800 Subject: [PATCH 3/3] enhance: add SSE timeout handling and improve logging PR614 --- backend/app/controller/chat_controller.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/backend/app/controller/chat_controller.py b/backend/app/controller/chat_controller.py index 3f0b96ce9..9342b20d6 100644 --- a/backend/app/controller/chat_controller.py +++ b/backend/app/controller/chat_controller.py @@ -38,22 +38,22 @@ SSE_TIMEOUT_SECONDS = 10 * 60 async def timeout_stream_wrapper(stream_generator, timeout_seconds: int = SSE_TIMEOUT_SECONDS): - last_data_time = [time.time()] + """ + Wraps a stream generator with timeout handling. + + Closes the SSE connection if no data is received within the timeout period. + """ + last_data_time = time.time() generator = stream_generator.__aiter__() - should_stop = False try: - while not should_stop: - elapsed = time.time() - last_data_time[0] - remaining_timeout = max(0, timeout_seconds - elapsed) + while True: + elapsed = time.time() - last_data_time + remaining_timeout = timeout_seconds - elapsed - if elapsed >= timeout_seconds: - chat_logger.warning(f"SSE timeout: No data received for {elapsed:.1f} seconds, closing connection") - yield sse_json("error", {"message": "Connection timeout: No data received for 10 minutes"}) - break try: data = await asyncio.wait_for(generator.__anext__(), timeout=remaining_timeout) - last_data_time[0] = time.time() + last_data_time = time.time() yield data except asyncio.TimeoutError: chat_logger.warning(f"SSE timeout: No data received for {timeout_seconds} seconds, closing connection")