Merge branch 'enhance/cleanup-terminal-env' into chore/optimize_workforce_construct

Merged cleanup features:
- Delete task lock on SSE timeout
- Cleanup terminal env (venv) on close session
- Register toolkits for cleanup when task ends
This commit is contained in:
puzhen 2026-01-19 23:26:59 +00:00
commit 81bb3b555f
3 changed files with 101 additions and 6 deletions

View file

@ -23,6 +23,7 @@ from app.service.task import (
get_or_create_task_lock,
get_task_lock,
set_current_task_id,
delete_task_lock
)
from app.component.environment import set_user_env_path
from app.utils.workforce import Workforce
@ -37,15 +38,16 @@ chat_logger = traceroot.get_logger("chat_controller")
# SSE timeout configuration (30 minutes in seconds)
SSE_TIMEOUT_SECONDS = 30 * 60
async def timeout_stream_wrapper(stream_generator, timeout_seconds: int = SSE_TIMEOUT_SECONDS):
async def timeout_stream_wrapper(stream_generator, timeout_seconds: int = SSE_TIMEOUT_SECONDS, task_lock = None):
"""
Wraps a stream generator with timeout handling.
Closes the SSE connection if no data is received within the timeout period.
Triggers cleanup if timeout occurs to prevent resource leaks.
"""
last_data_time = time.time()
generator = stream_generator.__aiter__()
cleanup_triggered = False
try:
while True:
@ -59,15 +61,38 @@ async def timeout_stream_wrapper(stream_generator, timeout_seconds: int = SSE_TI
except asyncio.TimeoutError:
chat_logger.warning(f"SSE timeout: No data received for {timeout_seconds} seconds, closing connection")
yield sse_json("error", {"message": f"Connection timeout: No data received for {timeout_seconds // 60} minutes"})
# Trigger cleanup to prevent resource leaks
if task_lock:
try:
task_lock.status = Status.done
await delete_task_lock(task_lock.id)
cleanup_triggered = True
except Exception as cleanup_error:
chat_logger.error(f"[TIMEOUT-CLEANUP] Failed to cleanup task lock: {cleanup_error}", exc_info=True)
break
except StopAsyncIteration:
break
except asyncio.CancelledError:
chat_logger.info("Stream cancelled")
chat_logger.info("[STREAM-CANCELLED] Stream cancelled, triggering cleanup")
# Trigger cleanup on cancellation
if task_lock and not cleanup_triggered:
try:
task_lock.status = Status.done
await delete_task_lock(task_lock.id)
except Exception as cleanup_error:
chat_logger.error(f"[STREAM-CANCELLED] Failed to cleanup: {cleanup_error}")
raise
except Exception as e:
chat_logger.error(f"Error in stream wrapper: {e}", exc_info=True)
chat_logger.error(f"[STREAM-ERROR] Unexpected error in stream wrapper: {e}", exc_info=True)
# Trigger cleanup on unexpected errors
if task_lock and not cleanup_triggered:
try:
task_lock.status = Status.done
await delete_task_lock(task_lock.id)
except Exception as cleanup_error:
chat_logger.error(f"[STREAM-ERROR] Failed to cleanup: {cleanup_error}")
raise
@ -128,7 +153,7 @@ async def post(data: Chat, request: Request):
extra={"project_id": data.project_id, "task_id": data.task_id, "log_dir": str(camel_log)},
)
return StreamingResponse(
timeout_stream_wrapper(step_solve(data, request, task_lock)), media_type="text/event-stream"
timeout_stream_wrapper(step_solve(data, request, task_lock), task_lock=task_lock), media_type="text/event-stream"
)
@ -319,5 +344,5 @@ def skip_task(project_id: str):
return Response(status_code=201)
except Exception as e:
chat_logger.error(f"[STOP-BUTTON] Error skipping task for project_id: {project_id}: {e}")
chat_logger.error(f"[STOP-BUTTON] Error skipping task for project_id: {project_id}: {e}")
raise UserException(code.error, f"Failed to skip task: {str(e)}")