From 645591bef5694e2cd97fb4f5075dd03cd0e0870c Mon Sep 17 00:00:00 2001 From: puzhen <1303385763@qq.com> Date: Mon, 19 Jan 2026 23:19:11 +0000 Subject: [PATCH] fix potential bug --- backend/app/service/chat_service.py | 6 ++++-- backend/app/utils/agent.py | 19 +++++++++++++++---- backend/app/utils/workforce.py | 1 - 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/backend/app/service/chat_service.py b/backend/app/service/chat_service.py index 28205e73f..e6e6918fb 100644 --- a/backend/app/service/chat_service.py +++ b/backend/app/service/chat_service.py @@ -1471,9 +1471,11 @@ The current date is {datetime.date.today()}. For any date-related tasks, you MUS ) except Exception as e: logger.error(f"Failed to create agents in parallel: {e}", exc_info=True) - # Clear event loop reference on failure - set_main_event_loop(None) raise + finally: + # Always clear event loop reference after parallel agent creation completes + # This prevents stale references and potential cross-request interference + set_main_event_loop(None) # Unpack results ( diff --git a/backend/app/utils/agent.py b/backend/app/utils/agent.py index 070d188b4..d9f162276 100644 --- a/backend/app/utils/agent.py +++ b/backend/app/utils/agent.py @@ -3,7 +3,7 @@ import contextvars import json import os import platform -from threading import Event +from threading import Event, Lock import traceback from typing import Any, Callable, Dict, List, Tuple import uuid @@ -15,15 +15,23 @@ _main_event_loop_var: contextvars.ContextVar[asyncio.AbstractEventLoop | None] = '_main_event_loop', default=None ) +# Global fallback for main event loop reference +# Used when contextvars don't propagate to worker threads (e.g., asyncio.to_thread) +_GLOBAL_MAIN_LOOP: asyncio.AbstractEventLoop | None = None +_GLOBAL_MAIN_LOOP_LOCK = Lock() + def set_main_event_loop(loop: asyncio.AbstractEventLoop | None): """Set the main event loop reference for thread-safe task scheduling. This should be called from the main async context before spawning threads - that need to schedule async tasks. Uses contextvars to ensure thread-safety - across concurrent requests. + that need to schedule async tasks. Uses both contextvars (for request isolation) + and a global fallback (for thread pool workers where contextvars may not propagate). """ + global _GLOBAL_MAIN_LOOP _main_event_loop_var.set(loop) + with _GLOBAL_MAIN_LOOP_LOCK: + _GLOBAL_MAIN_LOOP = loop def _schedule_async_task(coro): @@ -38,8 +46,11 @@ def _schedule_async_task(coro): loop.create_task(coro) except RuntimeError: # No running loop in this thread (we're in a worker thread) - # Use the stored main loop reference from contextvars + # First try contextvars, then fallback to global reference main_loop = _main_event_loop_var.get() + if main_loop is None: + with _GLOBAL_MAIN_LOOP_LOCK: + main_loop = _GLOBAL_MAIN_LOOP if main_loop is not None and main_loop.is_running(): asyncio.run_coroutine_threadsafe(coro, main_loop) else: diff --git a/backend/app/utils/workforce.py b/backend/app/utils/workforce.py index db2f4fa96..0d17b9655 100644 --- a/backend/app/utils/workforce.py +++ b/backend/app/utils/workforce.py @@ -1,5 +1,4 @@ import asyncio -import time from typing import Generator, List, Optional from camel.agents import ChatAgent from camel.societies.workforce.workforce import (