diff --git a/backend/app/utils/toolkit/terminal_toolkit.py b/backend/app/utils/toolkit/terminal_toolkit.py index 64e65cb67..e6625c4f7 100644 --- a/backend/app/utils/toolkit/terminal_toolkit.py +++ b/backend/app/utils/toolkit/terminal_toolkit.py @@ -1,5 +1,6 @@ import asyncio import os +import threading from camel.toolkits.terminal_toolkit import TerminalToolkit as BaseTerminalToolkit from camel.toolkits.terminal_toolkit.terminal_toolkit import _to_plain from app.component.environment import env @@ -55,16 +56,33 @@ class TerminalToolkit(BaseTerminalToolkit, AbstractToolkit): def _update_terminal_output(self, output: str): task_lock = get_task_lock(self.api_task_id) - # This method will be called during init. At that time, the process_task_id parameter does not exist, so it is set to be empty default process_task_id = process_task.get("") - task = asyncio.create_task( - task_lock.put_queue( - ActionTerminalData( - action=Action.terminal, - process_task_id=process_task_id, - data=output, - ) + + # Create the coroutine + coro = task_lock.put_queue( + ActionTerminalData( + action=Action.terminal, + process_task_id=process_task_id, + data=output, ) ) - if hasattr(task_lock, "add_background_task"): - task_lock.add_background_task(task) + + # Try to get the current event loop, if none exists, create a new one in a thread + try: + loop = asyncio.get_running_loop() + # If we're in an async context, schedule the coroutine + task = loop.create_task(coro) + if hasattr(task_lock, "add_background_task"): + task_lock.add_background_task(task) + except RuntimeError: + # No event loop running, schedule it in a new thread + def run_in_thread(): + new_loop = asyncio.new_event_loop() + asyncio.set_event_loop(new_loop) + try: + new_loop.run_until_complete(coro) + finally: + new_loop.close() + + thread = threading.Thread(target=run_in_thread, daemon=True) + thread.start() diff --git a/backend/tests/unit/utils/test_terminal_toolkit.py b/backend/tests/unit/utils/test_terminal_toolkit.py new file mode 100644 index 000000000..97ea6e046 --- /dev/null +++ b/backend/tests/unit/utils/test_terminal_toolkit.py @@ -0,0 +1,100 @@ +import asyncio +import threading +import time +import pytest +from app.service.task import task_locks, TaskLock +from app.utils.toolkit.terminal_toolkit import TerminalToolkit + + +@pytest.mark.unit +class TestTerminalToolkit: + """Test to verify the RuntimeError: no running event loop.""" + + def test_no_runtime_error_in_sync_context(self): + """Test no running event loop.""" + test_api_task_id = "test_api_task_123" + + if test_api_task_id not in task_locks: + task_locks[test_api_task_id] = TaskLock(id=test_api_task_id, queue=asyncio.Queue(), human_input={}) + toolkit = TerminalToolkit("test_api_task_123") + + # This should NOT raise RuntimeError: no running event loop + # This simulates the exact scenario from the error traceback + try: + toolkit._write_to_log("/tmp/test.log", "Test output") + time.sleep(0.1) # Give thread time to complete + + except RuntimeError as e: + if "no running event loop" in str(e): + pytest.fail("RuntimeError: no running event loop should not be raised - the fix is not working!") + else: + raise # Re-raise if it's a different RuntimeError + + def test_multiple_calls_no_runtime_error(self): + """Test that multiple calls don't raise RuntimeError.""" + test_api_task_id = "test_api_task_123" + + if test_api_task_id not in task_locks: + task_locks[test_api_task_id] = TaskLock(id=test_api_task_id, queue=asyncio.Queue(), human_input={}) + toolkit = TerminalToolkit("test_api_task_123") + + # Make multiple calls - none should raise RuntimeError + try: + for i in range(5): + toolkit._write_to_log(f"/tmp/test_{i}.log", f"Output {i}") + time.sleep(0.2) # Give threads time to complete + except RuntimeError as e: + if "no running event loop" in str(e): + pytest.fail("RuntimeError: no running event loop should not be raised!") + else: + raise + + def test_thread_safety_no_runtime_error(self): + """Test thread safety without RuntimeError.""" + test_api_task_id = "test_api_task_123" + + if test_api_task_id not in task_locks: + task_locks[test_api_task_id] = TaskLock(id=test_api_task_id, queue=asyncio.Queue(), human_input={}) + toolkit = TerminalToolkit("test_api_task_123") + + # Create multiple threads that call _write_to_log + threads = [] + for i in range(5): + thread = threading.Thread( + target=toolkit._write_to_log, + args=(f"/tmp/test_{i}.log", f"Thread {i} output") + ) + threads.append(thread) + thread.start() + + # Wait for all threads to complete + for thread in threads: + thread.join() + + time.sleep(0.2) # Give async operations time to complete + + # Should not have raised any RuntimeError + + def test_async_context_still_works(self): + """Test that async context still works without RuntimeError.""" + test_api_task_id = "test_api_task_123" + + if test_api_task_id not in task_locks: + task_locks[test_api_task_id] = TaskLock(id=test_api_task_id, queue=asyncio.Queue(), human_input={}) + toolkit = TerminalToolkit("test_api_task_123") + + async def test_async_context(): + toolkit._write_to_log("/tmp/async_test.log", "Async context test") + await asyncio.sleep(0.1) + + # Should work in async context without RuntimeError + try: + asyncio.run(test_async_context()) + except RuntimeError as e: + if "no running event loop" in str(e): + pytest.fail("RuntimeError: no running event loop should not be raised in async context!") + else: + raise + + + diff --git a/backend/utils/traceroot_wrapper.py b/backend/utils/traceroot_wrapper.py new file mode 100644 index 000000000..6a1a912e8 --- /dev/null +++ b/backend/utils/traceroot_wrapper.py @@ -0,0 +1,75 @@ +from pathlib import Path +from typing import Callable +import logging +import traceroot +from dotenv import load_dotenv + +# Auto-detect module name based on caller's path +def _get_module_name(): + """Automatically detect if this is being called from backend or server.""" + import inspect + frame = inspect.currentframe() + try: + # Go up the stack to find the caller + caller_frame = frame.f_back.f_back if frame and frame.f_back else None + if caller_frame: + caller_file = caller_frame.f_globals.get('__file__', '') + if 'backend' in caller_file: + return 'backend' + elif 'server' in caller_file: + return 'server' + finally: + del frame + return 'unknown' + +env_path = Path(__file__).resolve().parents[1] / '.env' + +load_dotenv(env_path) + +if traceroot.init(): + from traceroot.logger import get_logger as _get_traceroot_logger + + trace = traceroot.trace + + def get_logger(name: str = __name__): + """Get TraceRoot logger instance.""" + return _get_traceroot_logger(name) + + def is_enabled() -> bool: + """Check if TraceRoot is enabled.""" + return True + + # Log successful initialization + module_name = _get_module_name() + _init_logger = _get_traceroot_logger("traceroot_wrapper") + _init_logger.info("TraceRoot initialized successfully", extra={"backend": "traceroot", "module": module_name}) +else: + # No-op implementations when TraceRoot is not configured + def trace(*args, **kwargs): + """No-op trace decorator.""" + def decorator(func: Callable) -> Callable: + return func + return decorator + + def get_logger(name: str = __name__): + """Get standard Python logger when TraceRoot is disabled.""" + logger = logging.getLogger(name) + if not logger.handlers: + # Configure basic logging if no handlers exist + handler = logging.StreamHandler() + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(logging.INFO) + return logger + + def is_enabled() -> bool: + """Check if TraceRoot is enabled.""" + return False + + # Log fallback mode + _fallback_logger = logging.getLogger("traceroot_wrapper") + _fallback_logger.warning("TraceRoot not initialized - using Python logging as fallback") + + +__all__ = ['trace', 'get_logger', 'is_enabled']