From e472193948e186d243f42edbe196b5dd14f112af Mon Sep 17 00:00:00 2001 From: LuoPengcheng <2653972504@qq.com> Date: Wed, 22 Oct 2025 20:51:14 +0800 Subject: [PATCH 1/2] fix: #557 create eventloop in terminal_toolkit --- backend/app/utils/toolkit/terminal_toolkit.py | 38 +++++-- .../tests/unit/utils/test_terminal_toolkit.py | 100 ++++++++++++++++++ backend/utils/traceroot_wrapper.py | 75 +++++++++++++ 3 files changed, 203 insertions(+), 10 deletions(-) create mode 100644 backend/tests/unit/utils/test_terminal_toolkit.py create mode 100644 backend/utils/traceroot_wrapper.py diff --git a/backend/app/utils/toolkit/terminal_toolkit.py b/backend/app/utils/toolkit/terminal_toolkit.py index 64e65cb6..e6625c4f 100644 --- a/backend/app/utils/toolkit/terminal_toolkit.py +++ b/backend/app/utils/toolkit/terminal_toolkit.py @@ -1,5 +1,6 @@ import asyncio import os +import threading from camel.toolkits.terminal_toolkit import TerminalToolkit as BaseTerminalToolkit from camel.toolkits.terminal_toolkit.terminal_toolkit import _to_plain from app.component.environment import env @@ -55,16 +56,33 @@ class TerminalToolkit(BaseTerminalToolkit, AbstractToolkit): def _update_terminal_output(self, output: str): task_lock = get_task_lock(self.api_task_id) - # This method will be called during init. At that time, the process_task_id parameter does not exist, so it is set to be empty default process_task_id = process_task.get("") - task = asyncio.create_task( - task_lock.put_queue( - ActionTerminalData( - action=Action.terminal, - process_task_id=process_task_id, - data=output, - ) + + # Create the coroutine + coro = task_lock.put_queue( + ActionTerminalData( + action=Action.terminal, + process_task_id=process_task_id, + data=output, ) ) - if hasattr(task_lock, "add_background_task"): - task_lock.add_background_task(task) + + # Try to get the current event loop, if none exists, create a new one in a thread + try: + loop = asyncio.get_running_loop() + # If we're in an async context, schedule the coroutine + task = loop.create_task(coro) + if hasattr(task_lock, "add_background_task"): + task_lock.add_background_task(task) + except RuntimeError: + # No event loop running, schedule it in a new thread + def run_in_thread(): + new_loop = asyncio.new_event_loop() + asyncio.set_event_loop(new_loop) + try: + new_loop.run_until_complete(coro) + finally: + new_loop.close() + + thread = threading.Thread(target=run_in_thread, daemon=True) + thread.start() diff --git a/backend/tests/unit/utils/test_terminal_toolkit.py b/backend/tests/unit/utils/test_terminal_toolkit.py new file mode 100644 index 00000000..97ea6e04 --- /dev/null +++ b/backend/tests/unit/utils/test_terminal_toolkit.py @@ -0,0 +1,100 @@ +import asyncio +import threading +import time +import pytest +from app.service.task import task_locks, TaskLock +from app.utils.toolkit.terminal_toolkit import TerminalToolkit + + +@pytest.mark.unit +class TestTerminalToolkit: + """Test to verify the RuntimeError: no running event loop.""" + + def test_no_runtime_error_in_sync_context(self): + """Test no running event loop.""" + test_api_task_id = "test_api_task_123" + + if test_api_task_id not in task_locks: + task_locks[test_api_task_id] = TaskLock(id=test_api_task_id, queue=asyncio.Queue(), human_input={}) + toolkit = TerminalToolkit("test_api_task_123") + + # This should NOT raise RuntimeError: no running event loop + # This simulates the exact scenario from the error traceback + try: + toolkit._write_to_log("/tmp/test.log", "Test output") + time.sleep(0.1) # Give thread time to complete + + except RuntimeError as e: + if "no running event loop" in str(e): + pytest.fail("RuntimeError: no running event loop should not be raised - the fix is not working!") + else: + raise # Re-raise if it's a different RuntimeError + + def test_multiple_calls_no_runtime_error(self): + """Test that multiple calls don't raise RuntimeError.""" + test_api_task_id = "test_api_task_123" + + if test_api_task_id not in task_locks: + task_locks[test_api_task_id] = TaskLock(id=test_api_task_id, queue=asyncio.Queue(), human_input={}) + toolkit = TerminalToolkit("test_api_task_123") + + # Make multiple calls - none should raise RuntimeError + try: + for i in range(5): + toolkit._write_to_log(f"/tmp/test_{i}.log", f"Output {i}") + time.sleep(0.2) # Give threads time to complete + except RuntimeError as e: + if "no running event loop" in str(e): + pytest.fail("RuntimeError: no running event loop should not be raised!") + else: + raise + + def test_thread_safety_no_runtime_error(self): + """Test thread safety without RuntimeError.""" + test_api_task_id = "test_api_task_123" + + if test_api_task_id not in task_locks: + task_locks[test_api_task_id] = TaskLock(id=test_api_task_id, queue=asyncio.Queue(), human_input={}) + toolkit = TerminalToolkit("test_api_task_123") + + # Create multiple threads that call _write_to_log + threads = [] + for i in range(5): + thread = threading.Thread( + target=toolkit._write_to_log, + args=(f"/tmp/test_{i}.log", f"Thread {i} output") + ) + threads.append(thread) + thread.start() + + # Wait for all threads to complete + for thread in threads: + thread.join() + + time.sleep(0.2) # Give async operations time to complete + + # Should not have raised any RuntimeError + + def test_async_context_still_works(self): + """Test that async context still works without RuntimeError.""" + test_api_task_id = "test_api_task_123" + + if test_api_task_id not in task_locks: + task_locks[test_api_task_id] = TaskLock(id=test_api_task_id, queue=asyncio.Queue(), human_input={}) + toolkit = TerminalToolkit("test_api_task_123") + + async def test_async_context(): + toolkit._write_to_log("/tmp/async_test.log", "Async context test") + await asyncio.sleep(0.1) + + # Should work in async context without RuntimeError + try: + asyncio.run(test_async_context()) + except RuntimeError as e: + if "no running event loop" in str(e): + pytest.fail("RuntimeError: no running event loop should not be raised in async context!") + else: + raise + + + diff --git a/backend/utils/traceroot_wrapper.py b/backend/utils/traceroot_wrapper.py new file mode 100644 index 00000000..6a1a912e --- /dev/null +++ b/backend/utils/traceroot_wrapper.py @@ -0,0 +1,75 @@ +from pathlib import Path +from typing import Callable +import logging +import traceroot +from dotenv import load_dotenv + +# Auto-detect module name based on caller's path +def _get_module_name(): + """Automatically detect if this is being called from backend or server.""" + import inspect + frame = inspect.currentframe() + try: + # Go up the stack to find the caller + caller_frame = frame.f_back.f_back if frame and frame.f_back else None + if caller_frame: + caller_file = caller_frame.f_globals.get('__file__', '') + if 'backend' in caller_file: + return 'backend' + elif 'server' in caller_file: + return 'server' + finally: + del frame + return 'unknown' + +env_path = Path(__file__).resolve().parents[1] / '.env' + +load_dotenv(env_path) + +if traceroot.init(): + from traceroot.logger import get_logger as _get_traceroot_logger + + trace = traceroot.trace + + def get_logger(name: str = __name__): + """Get TraceRoot logger instance.""" + return _get_traceroot_logger(name) + + def is_enabled() -> bool: + """Check if TraceRoot is enabled.""" + return True + + # Log successful initialization + module_name = _get_module_name() + _init_logger = _get_traceroot_logger("traceroot_wrapper") + _init_logger.info("TraceRoot initialized successfully", extra={"backend": "traceroot", "module": module_name}) +else: + # No-op implementations when TraceRoot is not configured + def trace(*args, **kwargs): + """No-op trace decorator.""" + def decorator(func: Callable) -> Callable: + return func + return decorator + + def get_logger(name: str = __name__): + """Get standard Python logger when TraceRoot is disabled.""" + logger = logging.getLogger(name) + if not logger.handlers: + # Configure basic logging if no handlers exist + handler = logging.StreamHandler() + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(logging.INFO) + return logger + + def is_enabled() -> bool: + """Check if TraceRoot is enabled.""" + return False + + # Log fallback mode + _fallback_logger = logging.getLogger("traceroot_wrapper") + _fallback_logger.warning("TraceRoot not initialized - using Python logging as fallback") + + +__all__ = ['trace', 'get_logger', 'is_enabled'] From 778d8ef0282e90b7542b8afce5b7c8dc36538d12 Mon Sep 17 00:00:00 2001 From: LuoPengcheng <2653972504@qq.com> Date: Fri, 24 Oct 2025 01:59:46 +0800 Subject: [PATCH 2/2] fix: #557 add ThreadPoolExecutor in terminal_toolkit --- backend/app/utils/toolkit/terminal_toolkit.py | 54 ++++++++++--- backend/utils/traceroot_wrapper.py | 75 ------------------- 2 files changed, 44 insertions(+), 85 deletions(-) delete mode 100644 backend/utils/traceroot_wrapper.py diff --git a/backend/app/utils/toolkit/terminal_toolkit.py b/backend/app/utils/toolkit/terminal_toolkit.py index e6625c4f..0868724f 100644 --- a/backend/app/utils/toolkit/terminal_toolkit.py +++ b/backend/app/utils/toolkit/terminal_toolkit.py @@ -1,6 +1,9 @@ import asyncio +import logging import os import threading +from concurrent.futures import ThreadPoolExecutor +from typing import Optional from camel.toolkits.terminal_toolkit import TerminalToolkit as BaseTerminalToolkit from camel.toolkits.terminal_toolkit.terminal_toolkit import _to_plain from app.component.environment import env @@ -13,6 +16,8 @@ from app.service.task import process_task @auto_listen_toolkit(BaseTerminalToolkit) class TerminalToolkit(BaseTerminalToolkit, AbstractToolkit): agent_name: str = Agents.developer_agent + _thread_pool: Optional[ThreadPoolExecutor] = None + _thread_local = threading.local() def __init__( self, @@ -32,6 +37,11 @@ class TerminalToolkit(BaseTerminalToolkit, AbstractToolkit): self.agent_name = agent_name if working_directory is None: working_directory = env("file_save_path", os.path.expanduser("~/.eigent/terminal/")) + if TerminalToolkit._thread_pool is None: + TerminalToolkit._thread_pool = ThreadPoolExecutor( + max_workers=1, + thread_name_prefix="terminal_toolkit" + ) super().__init__( timeout=timeout, working_directory=working_directory, @@ -75,14 +85,38 @@ class TerminalToolkit(BaseTerminalToolkit, AbstractToolkit): if hasattr(task_lock, "add_background_task"): task_lock.add_background_task(task) except RuntimeError: - # No event loop running, schedule it in a new thread - def run_in_thread(): - new_loop = asyncio.new_event_loop() - asyncio.set_event_loop(new_loop) - try: - new_loop.run_until_complete(coro) - finally: - new_loop.close() + self._thread_pool.submit(self._run_coro_in_thread, coro,task_lock) - thread = threading.Thread(target=run_in_thread, daemon=True) - thread.start() + @staticmethod + def _run_coro_in_thread(coro,task_lock): + """ + Execute coro in the thread pool, with each thread bound to a long-term event loop + """ + if not hasattr(TerminalToolkit._thread_local, "loop"): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + TerminalToolkit._thread_local.loop = loop + else: + loop = TerminalToolkit._thread_local.loop + + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + TerminalToolkit._thread_local.loop = loop + + try: + task = loop.create_task(coro) + if hasattr(task_lock, "add_background_task"): + task_lock.add_background_task(task) + loop.run_until_complete(task) + except Exception as e: + logging.error( + f"Failed to execute coroutine in thread pool: {str(e)}", + exc_info=True + ) + + @classmethod + def shutdown(cls): + if cls._thread_pool: + cls._thread_pool.shutdown(wait=True) + cls._thread_pool = None diff --git a/backend/utils/traceroot_wrapper.py b/backend/utils/traceroot_wrapper.py deleted file mode 100644 index 6a1a912e..00000000 --- a/backend/utils/traceroot_wrapper.py +++ /dev/null @@ -1,75 +0,0 @@ -from pathlib import Path -from typing import Callable -import logging -import traceroot -from dotenv import load_dotenv - -# Auto-detect module name based on caller's path -def _get_module_name(): - """Automatically detect if this is being called from backend or server.""" - import inspect - frame = inspect.currentframe() - try: - # Go up the stack to find the caller - caller_frame = frame.f_back.f_back if frame and frame.f_back else None - if caller_frame: - caller_file = caller_frame.f_globals.get('__file__', '') - if 'backend' in caller_file: - return 'backend' - elif 'server' in caller_file: - return 'server' - finally: - del frame - return 'unknown' - -env_path = Path(__file__).resolve().parents[1] / '.env' - -load_dotenv(env_path) - -if traceroot.init(): - from traceroot.logger import get_logger as _get_traceroot_logger - - trace = traceroot.trace - - def get_logger(name: str = __name__): - """Get TraceRoot logger instance.""" - return _get_traceroot_logger(name) - - def is_enabled() -> bool: - """Check if TraceRoot is enabled.""" - return True - - # Log successful initialization - module_name = _get_module_name() - _init_logger = _get_traceroot_logger("traceroot_wrapper") - _init_logger.info("TraceRoot initialized successfully", extra={"backend": "traceroot", "module": module_name}) -else: - # No-op implementations when TraceRoot is not configured - def trace(*args, **kwargs): - """No-op trace decorator.""" - def decorator(func: Callable) -> Callable: - return func - return decorator - - def get_logger(name: str = __name__): - """Get standard Python logger when TraceRoot is disabled.""" - logger = logging.getLogger(name) - if not logger.handlers: - # Configure basic logging if no handlers exist - handler = logging.StreamHandler() - formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - handler.setFormatter(formatter) - logger.addHandler(handler) - logger.setLevel(logging.INFO) - return logger - - def is_enabled() -> bool: - """Check if TraceRoot is enabled.""" - return False - - # Log fallback mode - _fallback_logger = logging.getLogger("traceroot_wrapper") - _fallback_logger.warning("TraceRoot not initialized - using Python logging as fallback") - - -__all__ = ['trace', 'get_logger', 'is_enabled']