feat(chat_controller): add SSE timeout handling and improve logging (#614)

This commit is contained in:
Wendong-Fan 2025-11-11 16:44:11 +08:00 committed by GitHub
commit 706c82d952
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,6 +1,7 @@
import asyncio
import os
import re
import time
from pathlib import Path
from dotenv import load_dotenv
from fastapi import APIRouter, HTTPException, Request, Response
@ -8,7 +9,7 @@ from fastapi.responses import StreamingResponse
from utils import traceroot_wrapper as traceroot
from app.component import code
from app.exception.exception import UserException
from app.model.chat import Chat, HumanReply, McpServers, Status, SupplementChat, AddTaskRequest
from app.model.chat import Chat, HumanReply, McpServers, Status, SupplementChat, AddTaskRequest, sse_json
from app.service.chat_service import step_solve
from app.service.task import (
Action,
@ -30,13 +31,51 @@ from camel.tasks.task import Task
router = APIRouter()
# Create traceroot logger for chat controller
chat_logger = traceroot.get_logger('chat_controller')
chat_logger = traceroot.get_logger("chat_controller")
# SSE timeout configuration (10 minutes in seconds)
SSE_TIMEOUT_SECONDS = 10 * 60
async def timeout_stream_wrapper(stream_generator, timeout_seconds: int = SSE_TIMEOUT_SECONDS):
"""
Wraps a stream generator with timeout handling.
Closes the SSE connection if no data is received within the timeout period.
"""
last_data_time = time.time()
generator = stream_generator.__aiter__()
try:
while True:
elapsed = time.time() - last_data_time
remaining_timeout = timeout_seconds - elapsed
try:
data = await asyncio.wait_for(generator.__anext__(), timeout=remaining_timeout)
last_data_time = time.time()
yield data
except asyncio.TimeoutError:
chat_logger.warning(f"SSE timeout: No data received for {timeout_seconds} seconds, closing connection")
yield sse_json("error", {"message": "Connection timeout: No data received for 10 minutes"})
break
except StopAsyncIteration:
break
except asyncio.CancelledError:
chat_logger.info("Stream cancelled")
raise
except Exception as e:
chat_logger.error(f"Error in stream wrapper: {e}", exc_info=True)
raise
@router.post("/chat", name="start chat")
@traceroot.trace()
async def post(data: Chat, request: Request):
chat_logger.info("Starting new chat session", extra={"project_id": data.project_id, "task_id": data.task_id, "user": data.email})
chat_logger.info(
"Starting new chat session", extra={"project_id": data.project_id, "task_id": data.task_id, "user": data.email}
)
task_lock = get_or_create_task_lock(data.project_id)
# Set user-specific environment path for this thread
@ -57,7 +96,14 @@ async def post(data: Chat, request: Request):
chat_logger.info(f"Set search config: {key}", extra={"project_id": data.project_id})
email_sanitized = re.sub(r'[\\/*?:"<>|\s]', "_", data.email.split("@")[0]).strip(".")
camel_log = Path.home() / ".eigent" / email_sanitized / ("project_" + data.project_id) / ("task_" + data.task_id) / "camel_logs"
camel_log = (
Path.home()
/ ".eigent"
/ email_sanitized
/ ("project_" + data.project_id)
/ ("task_" + data.task_id)
/ "camel_logs"
)
camel_log.mkdir(parents=True, exist_ok=True)
os.environ["CAMEL_LOG_DIR"] = str(camel_log)
@ -68,8 +114,13 @@ async def post(data: Chat, request: Request):
# Put initial action in queue to start processing
await task_lock.put_queue(ActionImproveData(data=data.question))
chat_logger.info("Chat session initialized, starting streaming response", extra={"project_id": data.project_id, "task_id": data.task_id, "log_dir": str(camel_log)})
return StreamingResponse(step_solve(data, request, task_lock), media_type="text/event-stream")
chat_logger.info(
"Chat session initialized, starting streaming response",
extra={"project_id": data.project_id, "task_id": data.task_id, "log_dir": str(camel_log)},
)
return StreamingResponse(
timeout_stream_wrapper(step_solve(data, request, task_lock)), media_type="text/event-stream"
)
@router.post("/chat/{id}", name="improve chat")
@ -84,14 +135,14 @@ def improve(id: str, data: SupplementChat):
# Reset status to allow processing new messages
task_lock.status = Status.confirming
# Clear any existing background tasks since workforce was stopped
if hasattr(task_lock, 'background_tasks'):
if hasattr(task_lock, "background_tasks"):
task_lock.background_tasks.clear()
# Note: conversation_history and last_task_result are preserved
# Log context preservation
if hasattr(task_lock, 'conversation_history'):
if hasattr(task_lock, "conversation_history"):
chat_logger.info(f"[CONTEXT] Preserved {len(task_lock.conversation_history)} conversation entries")
if hasattr(task_lock, 'last_task_result'):
if hasattr(task_lock, "last_task_result"):
chat_logger.info(f"[CONTEXT] Preserved task result: {len(task_lock.last_task_result)} chars")
# Update file save path if task_id is provided
@ -100,7 +151,7 @@ def improve(id: str, data: SupplementChat):
try:
# Get current environment values needed to construct new path
current_email = None
# Extract email from current file_save_path if available
current_file_save_path = os.environ.get("file_save_path", "")
if current_file_save_path:
@ -109,7 +160,7 @@ def improve(id: str, data: SupplementChat):
eigent_index = path_parts.index("eigent")
if eigent_index + 1 < len(path_parts):
current_email = path_parts[eigent_index + 1]
# If we have the necessary information, update the file_save_path
if current_email and id:
# Create new path using the existing pattern: email/project_{project_id}/task_{task_id}
@ -117,12 +168,12 @@ def improve(id: str, data: SupplementChat):
new_folder_path.mkdir(parents=True, exist_ok=True)
os.environ["file_save_path"] = str(new_folder_path)
chat_logger.info(f"Updated file_save_path to: {new_folder_path}")
# Store the new folder path in task_lock for potential cleanup and persistence
task_lock.new_folder_path = new_folder_path
else:
chat_logger.warning(f"Could not update file_save_path - email: {current_email}, project_id: {id}")
except Exception as e:
chat_logger.error(f"Error updating file path for project_id: {id}, task_id: {data.task_id}: {e}")
@ -167,7 +218,7 @@ def human_reply(id: str, data: HumanReply):
@router.post("/chat/{id}/install-mcp")
@traceroot.trace()
def install_mcp(id: str, data: McpServers):
chat_logger.info("Installing MCP servers", extra={"task_id": id, "servers_count": len(data.get('mcpServers', {}))})
chat_logger.info("Installing MCP servers", extra={"task_id": id, "servers_count": len(data.get("mcpServers", {}))})
task_lock = get_task_lock(id)
asyncio.run(task_lock.put_queue(ActionInstallMcpData(action=Action.install_mcp, data=data)))
chat_logger.info("MCP installation queued", extra={"task_id": id})
@ -180,7 +231,7 @@ def add_task(id: str, data: AddTaskRequest):
"""Add a new task to the workforce"""
chat_logger.info(f"Adding task to workforce for task_id: {id}, content: {data.content[:100]}...")
task_lock = get_task_lock(id)
try:
# Queue the add task action
add_task_action = ActionAddTaskData(
@ -188,11 +239,11 @@ def add_task(id: str, data: AddTaskRequest):
project_id=data.project_id,
task_id=data.task_id,
additional_info=data.additional_info,
insert_position=data.insert_position
insert_position=data.insert_position,
)
asyncio.run(task_lock.put_queue(add_task_action))
return Response(status_code=201)
except Exception as e:
chat_logger.error(f"Error adding task for task_id: {id}: {e}")
raise UserException(code.error, f"Failed to add task: {str(e)}")
@ -204,7 +255,7 @@ def remove_task(project_id: str, task_id: str):
"""Remove a task from the workforce"""
chat_logger.info(f"Removing task {task_id} from workforce for project_id: {project_id}")
task_lock = get_task_lock(project_id)
try:
# Queue the remove task action
remove_task_action = ActionRemoveTaskData(task_id=task_id, project_id=project_id)
@ -212,7 +263,7 @@ def remove_task(project_id: str, task_id: str):
chat_logger.info(f"Task removal request queued for project_id: {project_id}, removing task: {task_id}")
return Response(status_code=204)
except Exception as e:
chat_logger.error(f"Error removing task {task_id} for project_id: {project_id}: {e}")
raise UserException(code.error, f"Failed to remove task: {str(e)}")
@ -224,7 +275,7 @@ def skip_task(project_id: str):
"""Skip a task in the workforce"""
chat_logger.info(f"Skipping task in workforce for project_id: {project_id}")
task_lock = get_task_lock(project_id)
try:
# Queue the skip task action
skip_task_action = ActionSkipTaskData(project_id=project_id)
@ -232,7 +283,7 @@ def skip_task(project_id: str):
chat_logger.info(f"Task skip request queued for project_id: {project_id}")
return Response(status_code=201)
except Exception as e:
chat_logger.error(f"Error skipping task for project_id: {project_id}: {e}")
raise UserException(code.error, f"Failed to skip task: {str(e)}")