From a2b0a04bd8d770d29c3238c0d7bad4cade4805db Mon Sep 17 00:00:00 2001 From: a7m-1st Date: Fri, 16 Jan 2026 18:30:24 +0300 Subject: [PATCH] enhance: delete tasklock on timeout --- backend/app/controller/chat_controller.py | 41 ++++++++++++++++++----- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/backend/app/controller/chat_controller.py b/backend/app/controller/chat_controller.py index b7fc78262..fdf9e13be 100644 --- a/backend/app/controller/chat_controller.py +++ b/backend/app/controller/chat_controller.py @@ -23,6 +23,7 @@ from app.service.task import ( get_or_create_task_lock, get_task_lock, set_current_task_id, + delete_task_lock ) from app.component.environment import set_user_env_path from app.utils.workforce import Workforce @@ -37,15 +38,16 @@ chat_logger = traceroot.get_logger("chat_controller") # SSE timeout configuration (10 minutes in seconds) SSE_TIMEOUT_SECONDS = 10 * 60 - -async def timeout_stream_wrapper(stream_generator, timeout_seconds: int = SSE_TIMEOUT_SECONDS): +async def timeout_stream_wrapper(stream_generator, timeout_seconds: int = SSE_TIMEOUT_SECONDS, task_lock = None): """ Wraps a stream generator with timeout handling. Closes the SSE connection if no data is received within the timeout period. + Triggers cleanup if timeout occurs to prevent resource leaks. """ last_data_time = time.time() generator = stream_generator.__aiter__() + cleanup_triggered = False try: while True: @@ -57,18 +59,39 @@ async def timeout_stream_wrapper(stream_generator, timeout_seconds: int = SSE_TI last_data_time = time.time() yield data except asyncio.TimeoutError: - chat_logger.warning(f"SSE timeout: No data received for {timeout_seconds} seconds, closing connection") - # yield sse_json("error", {"message": "Connection timeout: No data received for 10 minutes"}) - # TODO: Temporary change: suppress error signal to frontend on timeout. Needs proper fix later. + chat_logger.warning(f"SSE TIMEOUT: No data received for {timeout_seconds} seconds") + + # Trigger cleanup to prevent resource leaks + if task_lock: + try: + task_lock.status = Status.done + await delete_task_lock(task_lock.id) + cleanup_triggered = True + except Exception as cleanup_error: + chat_logger.error(f"[TIMEOUT-CLEANUP] Failed to cleanup task lock: {cleanup_error}", exc_info=True) break except StopAsyncIteration: break except asyncio.CancelledError: - chat_logger.info("Stream cancelled") + chat_logger.info("[STREAM-CANCELLED] Stream cancelled, triggering cleanup") + # Trigger cleanup on cancellation + if task_lock and not cleanup_triggered: + try: + task_lock.status = Status.done + await delete_task_lock(task_lock.id) + except Exception as cleanup_error: + chat_logger.error(f"[STREAM-CANCELLED] Failed to cleanup: {cleanup_error}") raise except Exception as e: - chat_logger.error(f"Error in stream wrapper: {e}", exc_info=True) + chat_logger.error(f"[STREAM-ERROR] Unexpected error in stream wrapper: {e}", exc_info=True) + # Trigger cleanup on unexpected errors + if task_lock and not cleanup_triggered: + try: + task_lock.status = Status.done + await delete_task_lock(task_lock.id) + except Exception as cleanup_error: + chat_logger.error(f"[STREAM-ERROR] Failed to cleanup: {cleanup_error}") raise @@ -124,7 +147,7 @@ async def post(data: Chat, request: Request): extra={"project_id": data.project_id, "task_id": data.task_id, "log_dir": str(camel_log)}, ) return StreamingResponse( - timeout_stream_wrapper(step_solve(data, request, task_lock)), media_type="text/event-stream" + timeout_stream_wrapper(step_solve(data, request, task_lock), task_lock=task_lock), media_type="text/event-stream" ) @@ -315,5 +338,5 @@ def skip_task(project_id: str): return Response(status_code=201) except Exception as e: - chat_logger.error(f"[STOP-BUTTON] ❌ Error skipping task for project_id: {project_id}: {e}") + chat_logger.error(f"[STOP-BUTTON] Error skipping task for project_id: {project_id}: {e}") raise UserException(code.error, f"Failed to skip task: {str(e)}")