From 7c287fe6d1cae5fa6910667dac97359c9f9f9535 Mon Sep 17 00:00:00 2001 From: MkDev11 <94194147+MkDev11@users.noreply.github.com> Date: Tue, 3 Feb 2026 06:15:36 -0800 Subject: [PATCH] refactor: extract event_loop_utils module and improve agent response handling (#999) Co-authored-by: mkdev11 Co-authored-by: Tao Sun <168447269+fengju0213@users.noreply.github.com> Co-authored-by: a7m-1st Co-authored-by: Wendong-Fan Co-authored-by: Claude --- backend/app/agent/__init__.py | 3 +- backend/app/agent/agent_model.py | 62 +---- backend/app/agent/listen_chat_agent.py | 285 ++++++++++++--------- backend/app/component/__init__.py | 14 + backend/app/component/pydantic/__init__.py | 14 + backend/app/exception/__init__.py | 14 + backend/app/model/__init__.py | 14 + backend/app/service/__init__.py | 14 + backend/app/service/chat_service.py | 3 +- backend/app/utils/event_loop_utils.py | 69 +++++ backend/app/utils/listen/__init__.py | 14 + backend/app/utils/server/__init__.py | 14 + backend/app/utils/single_agent_worker.py | 147 ++++++++--- backend/app/utils/toolkit/__init__.py | 14 + backend/pyproject.toml | 2 +- 15 files changed, 463 insertions(+), 220 deletions(-) create mode 100644 backend/app/utils/event_loop_utils.py diff --git a/backend/app/agent/__init__.py b/backend/app/agent/__init__.py index eda5abe29..264103bba 100644 --- a/backend/app/agent/__init__.py +++ b/backend/app/agent/__init__.py @@ -12,7 +12,7 @@ # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= -from app.agent.agent_model import agent_model, set_main_event_loop +from app.agent.agent_model import agent_model from app.agent.factory import ( browser_agent, developer_agent, @@ -31,7 +31,6 @@ __all__ = [ "agent_model", "get_mcp_tools", "get_toolkits", - "set_main_event_loop", "browser_agent", "developer_agent", "document_agent", diff --git a/backend/app/agent/agent_model.py b/backend/app/agent/agent_model.py index 2cc9e1a00..a203e3193 100644 --- a/backend/app/agent/agent_model.py +++ b/backend/app/agent/agent_model.py @@ -12,11 +12,8 @@ # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= -import asyncio -import contextvars import logging import uuid -from threading import Lock from typing import Any, Callable from camel.messages import BaseMessage @@ -24,68 +21,11 @@ from camel.models import ModelFactory from camel.toolkits import FunctionTool, RegisteredAgentToolkit from camel.types import ModelPlatformType +from app.utils.event_loop_utils import _schedule_async_task from app.agent.listen_chat_agent import ListenChatAgent, logger from app.model.chat import AgentModelConfig, Chat from app.service.task import ActionCreateAgentData, Agents, get_task_lock -# Thread-safe reference to main event loop using contextvars -# This ensures each request has its own event loop reference, -# avoiding race conditions -_main_event_loop_var: contextvars.ContextVar[asyncio.AbstractEventLoop - | None] = contextvars.ContextVar( - "_main_event_loop", - default=None - ) - -# Global fallback for main event loop reference -# Used when contextvars don't propagate to worker threads -# (e.g., asyncio.to_thread) -_GLOBAL_MAIN_LOOP: asyncio.AbstractEventLoop | None = None -_GLOBAL_MAIN_LOOP_LOCK = Lock() - - -def set_main_event_loop(loop: asyncio.AbstractEventLoop | None): - """Set the main event loop reference for thread-safe task scheduling. - - This should be called from the main async context before spawning threads - that need to schedule async tasks. Uses both contextvars (for request - isolation) and a global fallback (for thread pool workers where - contextvars may not propagate). - """ - global _GLOBAL_MAIN_LOOP - _main_event_loop_var.set(loop) - with _GLOBAL_MAIN_LOOP_LOCK: - _GLOBAL_MAIN_LOOP = loop - - -def _schedule_async_task(coro): - """Schedule an async coroutine as a task, thread-safe. - - This function handles scheduling from both the main event loop thread - and from worker threads (e.g., when using asyncio.to_thread). - """ - try: - # Try to get the running loop (works in main event loop thread) - loop = asyncio.get_running_loop() - loop.create_task(coro) - except RuntimeError: - # No running loop in this thread (we're in a worker thread) - # First try contextvars, then fallback to global reference - main_loop = _main_event_loop_var.get() - if main_loop is None: - with _GLOBAL_MAIN_LOOP_LOCK: - main_loop = _GLOBAL_MAIN_LOOP - if main_loop is not None and main_loop.is_running(): - asyncio.run_coroutine_threadsafe(coro, main_loop) - else: - # This should not happen in normal operation - log error and skip - logging.error( - "No event loop available for async task " - "scheduling, task skipped. Ensure " - "set_main_event_loop() is called " - "before parallel agent creation." - ) - def agent_model( agent_name: str, diff --git a/backend/app/agent/listen_chat_agent.py b/backend/app/agent/listen_chat_agent.py index c4a806e6a..3dffb5950 100644 --- a/backend/app/agent/listen_chat_agent.py +++ b/backend/app/agent/listen_chat_agent.py @@ -34,6 +34,7 @@ from camel.types import ModelPlatformType, ModelType from camel.types.agents import ToolCallingRecord from pydantic import BaseModel +from app.utils.event_loop_utils import _schedule_async_task from app.service.task import ( Action, ActionActivateAgentData, @@ -50,7 +51,6 @@ logger = logging.getLogger("agent") class ListenChatAgent(ChatAgent): - def __init__( self, api_task_id: str, @@ -77,8 +77,7 @@ class ListenChatAgent(ChatAgent): tools: List[FunctionTool | Callable[..., Any]] | None = None, toolkits_to_register_agent: List[RegisteredAgentToolkit] | None = None, external_tools: ( - List[FunctionTool | Callable[..., Any] - | Dict[str, Any]] | None + List[FunctionTool | Callable[..., Any] | Dict[str, Any]] | None ) = None, response_terminators: List[ResponseTerminator] | None = None, scheduling_strategy: str = "round_robin", @@ -121,26 +120,112 @@ class ListenChatAgent(ChatAgent): process_task_id: str = "" + def _send_agent_deactivate(self, message: str, tokens: int) -> None: + """Send agent deactivation event to the frontend. + + Args: + message: The accumulated message content + tokens: The total token count used + """ + task_lock = get_task_lock(self.api_task_id) + _schedule_async_task( + task_lock.put_queue( + ActionDeactivateAgentData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, + "message": message, + "tokens": tokens, + }, + ) + ) + ) + + @staticmethod + def _extract_tokens(response) -> int: + """Extract total token count from a response chunk. + + Args: + response: The response chunk (ChatAgentResponse or similar) + + Returns: + Total token count or 0 if not available + """ + if response is None: + return 0 + usage_info = ( + response.info.get("usage") + or response.info.get("token_usage") + or {} + ) + return usage_info.get("total_tokens", 0) + + def _stream_chunks(self, response_gen): + """Generator that wraps a streaming response and sends chunks to frontend. + + Args: + response_gen: The original streaming response generator + + Yields: + Each chunk from the original generator + + Returns: + Tuple of (accumulated_content, total_tokens) via StopIteration value + """ + accumulated_content = "" + last_chunk = None + + try: + for chunk in response_gen: + last_chunk = chunk + if chunk.msg and chunk.msg.content: + accumulated_content += chunk.msg.content + yield chunk + finally: + total_tokens = self._extract_tokens(last_chunk) + self._send_agent_deactivate(accumulated_content, total_tokens) + + async def _astream_chunks(self, response_gen): + """Async generator that wraps a streaming response and sends chunks to frontend. + + Args: + response_gen: The original async streaming response generator + + Yields: + Each chunk from the original generator + """ + accumulated_content = "" + last_chunk = None + + try: + async for chunk in response_gen: + last_chunk = chunk + if chunk.msg and chunk.msg.content: + delta_content = chunk.msg.content + accumulated_content += delta_content + yield chunk + finally: + total_tokens = self._extract_tokens(last_chunk) + self._send_agent_deactivate(accumulated_content, total_tokens) + def step( self, input_message: BaseMessage | str, response_format: type[BaseModel] | None = None, ) -> ChatAgentResponse | StreamingChatAgentResponse: task_lock = get_task_lock(self.api_task_id) - asyncio.create_task( + _schedule_async_task( task_lock.put_queue( ActionActivateAgentData( data={ - "agent_name": - self.agent_name, - "process_task_id": - self.process_task_id, - "agent_id": - self.agent_id, + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, "message": ( input_message.content - if isinstance(input_message, BaseMessage) else - input_message + if isinstance(input_message, BaseMessage) + else input_message ), }, ) @@ -151,7 +236,8 @@ class ListenChatAgent(ChatAgent): res = None msg = ( input_message.content - if isinstance(input_message, BaseMessage) else input_message + if isinstance(input_message, BaseMessage) + else input_message ) logger.info( f"Agent {self.agent_name} starting step with message: {msg}" @@ -164,7 +250,7 @@ class ListenChatAgent(ChatAgent): if "Budget has been exceeded" in str(e): message = "Budget has been exceeded" logger.warning(f"Agent {self.agent_name} budget exceeded") - asyncio.create_task( + _schedule_async_task( task_lock.put_queue(ActionBudgetNotEnough()) ) else: @@ -178,59 +264,23 @@ class ListenChatAgent(ChatAgent): error_info = e logger.error( f"Agent {self.agent_name} unexpected error in step: {e}", - exc_info=True + exc_info=True, ) message = f"Error processing message: {e!s}" total_tokens = 0 if res is not None: if isinstance(res, StreamingChatAgentResponse): - - def _stream_with_deactivate(): - last_response: ChatAgentResponse | None = None - # With stream_accumulate=False, - # we need to accumulate delta content - accumulated_content = "" - try: - for chunk in res: - last_response = chunk - # Accumulate content from each chunk (delta mode) - if chunk.msg and chunk.msg.content: - accumulated_content += chunk.msg.content - yield chunk - finally: - total_tokens = 0 - if last_response: - usage_info = last_response.info.get( - "usage" - ) or last_response.info.get("token_usage") or {} - if usage_info: - total_tokens = usage_info.get( - "total_tokens", 0 - ) - asyncio.create_task( - task_lock.put_queue( - ActionDeactivateAgentData( - data={ - "agent_name": self.agent_name, - "process_task_id": - self.process_task_id, - "agent_id": self.agent_id, - "message": accumulated_content, - "tokens": total_tokens, - }, - ) - ) - ) - - return StreamingChatAgentResponse(_stream_with_deactivate()) + # Use reusable stream wrapper to send chunks to frontend + return StreamingChatAgentResponse(self._stream_chunks(res)) message = res.msg.content if res.msg else "" - usage_info = res.info.get("usage") or res.info.get("token_usage" - ) or {} - total_tokens = usage_info.get( - "total_tokens", 0 - ) if usage_info else 0 + usage_info = ( + res.info.get("usage") or res.info.get("token_usage") or {} + ) + total_tokens = ( + usage_info.get("total_tokens", 0) if usage_info else 0 + ) logger.info( f"Agent {self.agent_name} completed step, " f"tokens used: {total_tokens}" @@ -238,7 +288,7 @@ class ListenChatAgent(ChatAgent): assert message is not None - asyncio.create_task( + _schedule_async_task( task_lock.put_queue( ActionDeactivateAgentData( data={ @@ -267,16 +317,13 @@ class ListenChatAgent(ChatAgent): ActionActivateAgentData( action=Action.activate_agent, data={ - "agent_name": - self.agent_name, - "process_task_id": - self.process_task_id, - "agent_id": - self.agent_id, + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, "message": ( input_message.content - if isinstance(input_message, BaseMessage) else - input_message + if isinstance(input_message, BaseMessage) + else input_message ), }, ) @@ -287,17 +334,20 @@ class ListenChatAgent(ChatAgent): res = None msg = ( input_message.content - if isinstance(input_message, BaseMessage) else input_message + if isinstance(input_message, BaseMessage) + else input_message ) logger.debug( - f"Agent {self.agent_name} starting async step " - f"with message: {msg}" + f"Agent {self.agent_name} starting async step with message: {msg}" ) try: res = await super().astep(input_message, response_format) if isinstance(res, AsyncStreamingChatAgentResponse): - res = await res._get_final_response() + # Use reusable async stream wrapper to send chunks to frontend + return AsyncStreamingChatAgentResponse( + self._astream_chunks(res) + ) except ModelProcessingError as e: res = None error_info = e @@ -318,19 +368,29 @@ class ListenChatAgent(ChatAgent): error_info = e logger.error( f"Agent {self.agent_name} unexpected error in async step: {e}", - exc_info=True + exc_info=True, ) message = f"Error processing message: {e!s}" total_tokens = 0 - if res is not None: + # For non-streaming responses, extract message and tokens from response + if res is not None and not isinstance( + res, AsyncStreamingChatAgentResponse + ): message = res.msg.content if res.msg else "" - total_tokens = res.info["usage"]["total_tokens"] + usage_info = ( + res.info.get("usage") or res.info.get("token_usage") or {} + ) + total_tokens = ( + usage_info.get("total_tokens", 0) if usage_info else 0 + ) logger.info( f"Agent {self.agent_name} completed step, " f"tokens used: {total_tokens}" ) + # Send deactivation for all non-streaming cases (success or error) + # Streaming responses handle deactivation in _astream_chunks assert message is not None asyncio.create_task( @@ -377,9 +437,11 @@ class ListenChatAgent(ChatAgent): try: task_lock = get_task_lock(self.api_task_id) - toolkit_name = getattr(tool, "_toolkit_name") if hasattr( - tool, "_toolkit_name" - ) else "mcp_toolkit" + toolkit_name = ( + getattr(tool, "_toolkit_name") + if hasattr(tool, "_toolkit_name") + else "mcp_toolkit" + ) logger.debug( f"Agent {self.agent_name} executing tool: " f"{func_name} from toolkit: {toolkit_name} " @@ -389,7 +451,7 @@ class ListenChatAgent(ChatAgent): # Only send activate event if tool is # NOT wrapped by @listen_toolkit if not has_listen_decorator: - asyncio.create_task( + _schedule_async_task( task_lock.put_queue( ActionActivateToolkitData( data={ @@ -397,8 +459,9 @@ class ListenChatAgent(ChatAgent): "process_task_id": self.process_task_id, "toolkit_name": toolkit_name, "method_name": func_name, - "message": - json.dumps(args, ensure_ascii=False), + "message": json.dumps( + args, ensure_ascii=False + ), }, ) ) @@ -424,11 +487,9 @@ class ListenChatAgent(ChatAgent): result_str = repr(result) MAX_RESULT_LENGTH = 500 if len(result_str) > MAX_RESULT_LENGTH: - result_msg = ( - result_str[:MAX_RESULT_LENGTH] + ( - f"... (truncated, total length: " - f"{len(result_str)} chars)" - ) + result_msg = result_str[:MAX_RESULT_LENGTH] + ( + f"... (truncated, total length: " + f"{len(result_str)} chars)" ) else: result_msg = result_str @@ -436,7 +497,7 @@ class ListenChatAgent(ChatAgent): # Only send deactivate event if tool is # NOT wrapped by @listen_toolkit if not has_listen_decorator: - asyncio.create_task( + _schedule_async_task( task_lock.put_queue( ActionDeactivateToolkitData( data={ @@ -485,10 +546,12 @@ class ListenChatAgent(ChatAgent): if hasattr(tool, "_toolkit_name"): toolkit_name = tool._toolkit_name - # Method 2: For MCP tools, check if func - # has __self__ (the toolkit instance) - if not toolkit_name and hasattr(tool, "func" - ) and hasattr(tool.func, "__self__"): + # Method 2: For MCP tools, check if func has __self__ (the toolkit instance) + if ( + not toolkit_name + and hasattr(tool, "func") + and hasattr(tool.func, "__self__") + ): toolkit_instance = tool.func.__self__ if hasattr(toolkit_instance, "toolkit_name") and callable( toolkit_instance.toolkit_name @@ -497,8 +560,9 @@ class ListenChatAgent(ChatAgent): # Method 3: Check if tool.func is a bound method with toolkit if not toolkit_name and hasattr(tool, "func"): - if hasattr(tool.func, - "func") and hasattr(tool.func.func, "__self__"): + if hasattr(tool.func, "func") and hasattr( + tool.func.func, "__self__" + ): toolkit_instance = tool.func.func.__self__ if hasattr(toolkit_instance, "toolkit_name") and callable( toolkit_instance.toolkit_name @@ -510,11 +574,9 @@ class ListenChatAgent(ChatAgent): toolkit_name = "mcp_toolkit" logger.info( - f"Agent {self.agent_name} executing" - f" async tool: {func_name}" - f" from toolkit: {toolkit_name}" - " with args:" - f" {json.dumps(args, ensure_ascii=False)}" + f"Agent {self.agent_name} executing async tool: {func_name} " + f"from toolkit: {toolkit_name} " + f"with args: {json.dumps(args, ensure_ascii=False)}" ) # Check if tool is wrapped by @listen_toolkit decorator @@ -540,8 +602,7 @@ class ListenChatAgent(ChatAgent): # Try different invocation paths in order of preference if hasattr(tool, "func") and hasattr(tool.func, "async_call"): # Case: FunctionTool wrapping an MCP tool - # Check if the wrapped tool is sync - # to avoid run_in_executor + # Check if the wrapped tool is sync to avoid run_in_executor if hasattr(tool, "is_async") and not tool.is_async: # Sync tool: call directly to preserve ContextVar result = tool(**args) @@ -553,14 +614,13 @@ class ListenChatAgent(ChatAgent): elif hasattr(tool, "async_call") and callable(tool.async_call): # Case: tool itself has async_call - # Check if this is a sync tool to avoid - # run_in_executor (breaks ContextVar) + # Check if this is a sync tool to avoid run_in_executor + # (which breaks ContextVar) if hasattr(tool, "is_async") and not tool.is_async: - # Sync tool: call directly to preserve - # ContextVar in same thread + # Sync tool: call directly to preserve ContextVar + # in same thread result = tool(**args) - # Handle case where synchronous call - # returns a coroutine + # Handle case where synchronous call returns a coroutine if asyncio.iscoroutine(result): result = await result else: @@ -578,8 +638,7 @@ class ListenChatAgent(ChatAgent): result = await tool(**args) else: - # Fallback: synchronous call - call - # directly in current context + # Fallback: synchronous call - call directly in current context # DO NOT use run_in_executor to preserve ContextVar result = tool(**args) # Handle case where synchronous call returns a coroutine @@ -592,7 +651,7 @@ class ListenChatAgent(ChatAgent): result = {"error": error_msg} logger.error( f"Async tool execution failed for {func_name}: {e}", - exc_info=True + exc_info=True, ) # Prepare result message with truncation @@ -603,9 +662,8 @@ class ListenChatAgent(ChatAgent): MAX_RESULT_LENGTH = 500 if len(result_str) > MAX_RESULT_LENGTH: result_msg = ( - result_str[:MAX_RESULT_LENGTH] + "... (truncated, total" - f" length: {len(result_str)}" - " chars)" + result_str[:MAX_RESULT_LENGTH] + + f"... (truncated, total length: {len(result_str)} chars)" ) else: result_msg = result_str @@ -655,8 +713,7 @@ class ListenChatAgent(ChatAgent): schema for schema in self._external_tool_schemas.values() ], response_terminators=self.response_terminators, - scheduling_strategy=self.model_backend.scheduling_strategy. - __name__, + scheduling_strategy=self.model_backend.scheduling_strategy.__name__, max_iteration=self.max_iteration, stop_event=self.stop_event, tool_execution_timeout=self.tool_execution_timeout, diff --git a/backend/app/component/__init__.py b/backend/app/component/__init__.py index e69de29bb..3a4d90c0e 100644 --- a/backend/app/component/__init__.py +++ b/backend/app/component/__init__.py @@ -0,0 +1,14 @@ +# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= + diff --git a/backend/app/component/pydantic/__init__.py b/backend/app/component/pydantic/__init__.py index e69de29bb..3a4d90c0e 100644 --- a/backend/app/component/pydantic/__init__.py +++ b/backend/app/component/pydantic/__init__.py @@ -0,0 +1,14 @@ +# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= + diff --git a/backend/app/exception/__init__.py b/backend/app/exception/__init__.py index e69de29bb..3a4d90c0e 100644 --- a/backend/app/exception/__init__.py +++ b/backend/app/exception/__init__.py @@ -0,0 +1,14 @@ +# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= + diff --git a/backend/app/model/__init__.py b/backend/app/model/__init__.py index e69de29bb..3a4d90c0e 100644 --- a/backend/app/model/__init__.py +++ b/backend/app/model/__init__.py @@ -0,0 +1,14 @@ +# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= + diff --git a/backend/app/service/__init__.py b/backend/app/service/__init__.py index e69de29bb..3a4d90c0e 100644 --- a/backend/app/service/__init__.py +++ b/backend/app/service/__init__.py @@ -0,0 +1,14 @@ +# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= + diff --git a/backend/app/service/chat_service.py b/backend/app/service/chat_service.py index 58b72e338..399ae8ca2 100644 --- a/backend/app/service/chat_service.py +++ b/backend/app/service/chat_service.py @@ -28,7 +28,7 @@ from fastapi import Request from inflection import titleize from pydash import chain -from app.agent.agent_model import agent_model, set_main_event_loop +from app.agent.agent_model import agent_model from app.agent.factory import ( browser_agent, developer_agent, @@ -53,6 +53,7 @@ from app.service.task import ( delete_task_lock, set_current_task_id, ) +from app.utils.event_loop_utils import set_main_event_loop from app.utils.file_utils import get_working_directory from app.utils.server.sync_step import sync_step from app.utils.telemetry.workforce_metrics import WorkforceMetricsCallback diff --git a/backend/app/utils/event_loop_utils.py b/backend/app/utils/event_loop_utils.py new file mode 100644 index 000000000..13207054e --- /dev/null +++ b/backend/app/utils/event_loop_utils.py @@ -0,0 +1,69 @@ +# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= + +import asyncio +import contextvars +import logging +from threading import Lock + +# Thread-safe reference to main event loop using contextvars +# This ensures each request has its own event loop reference, avoiding race conditions +_main_event_loop_var: contextvars.ContextVar[ + asyncio.AbstractEventLoop | None +] = contextvars.ContextVar("_main_event_loop", default=None) + +# Global fallback for main event loop reference +# Used when contextvars don't propagate to worker threads (e.g., asyncio.to_thread) +_GLOBAL_MAIN_LOOP: asyncio.AbstractEventLoop | None = None +_GLOBAL_MAIN_LOOP_LOCK = Lock() + + +def set_main_event_loop(loop: asyncio.AbstractEventLoop | None): + """Set the main event loop reference for thread-safe task scheduling. + + This should be called from the main async context before spawning threads + that need to schedule async tasks. Uses both contextvars (for request isolation) + and a global fallback (for thread pool workers where contextvars may not propagate). + """ + global _GLOBAL_MAIN_LOOP + _main_event_loop_var.set(loop) + with _GLOBAL_MAIN_LOOP_LOCK: + _GLOBAL_MAIN_LOOP = loop + + +def _schedule_async_task(coro): + """Schedule an async coroutine as a task, thread-safe. + + This function handles scheduling from both the main event loop thread + and from worker threads (e.g., when using asyncio.to_thread). + """ + try: + # Try to get the running loop (works in main event loop thread) + loop = asyncio.get_running_loop() + loop.create_task(coro) + except RuntimeError: + # No running loop in this thread (we're in a worker thread) + # First try contextvars, then fallback to global reference + main_loop = _main_event_loop_var.get() + if main_loop is None: + with _GLOBAL_MAIN_LOOP_LOCK: + main_loop = _GLOBAL_MAIN_LOOP + if main_loop is not None and main_loop.is_running(): + asyncio.run_coroutine_threadsafe(coro, main_loop) + else: + # This should not happen in normal operation - log error and skip + logging.error( + "No event loop available for async task scheduling, task skipped. " + "Ensure set_main_event_loop() is called before parallel agent creation." + ) diff --git a/backend/app/utils/listen/__init__.py b/backend/app/utils/listen/__init__.py index e69de29bb..3a4d90c0e 100644 --- a/backend/app/utils/listen/__init__.py +++ b/backend/app/utils/listen/__init__.py @@ -0,0 +1,14 @@ +# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= + diff --git a/backend/app/utils/server/__init__.py b/backend/app/utils/server/__init__.py index e69de29bb..3a4d90c0e 100644 --- a/backend/app/utils/server/__init__.py +++ b/backend/app/utils/server/__init__.py @@ -0,0 +1,14 @@ +# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= + diff --git a/backend/app/utils/single_agent_worker.py b/backend/app/utils/single_agent_worker.py index 7d05b1cc1..2e82a4a42 100644 --- a/backend/app/utils/single_agent_worker.py +++ b/backend/app/utils/single_agent_worker.py @@ -14,7 +14,9 @@ import datetime from camel.agents.chat_agent import AsyncStreamingChatAgentResponse -from camel.societies.workforce.single_agent_worker import SingleAgentWorker as BaseSingleAgentWorker +from camel.societies.workforce.single_agent_worker import ( + SingleAgentWorker as BaseSingleAgentWorker, +) from camel.tasks.task import Task, TaskState, is_task_result_insufficient import logging @@ -40,13 +42,16 @@ class SingleAgentWorker(BaseSingleAgentWorker): context_utility: ContextUtility | None = None, enable_workflow_memory: bool = False, ) -> None: - logger.info("Initializing SingleAgentWorker", extra={ - "description": description, - "worker_agent_name": worker.agent_name, - "use_agent_pool": use_agent_pool, - "pool_max_size": pool_max_size, - "enable_workflow_memory": enable_workflow_memory - }) + logger.info( + "Initializing SingleAgentWorker", + extra={ + "description": description, + "worker_agent_name": worker.agent_name, + "use_agent_pool": use_agent_pool, + "pool_max_size": pool_max_size, + "enable_workflow_memory": enable_workflow_memory, + }, + ) super().__init__( description=description, worker=worker, @@ -60,7 +65,9 @@ class SingleAgentWorker(BaseSingleAgentWorker): ) self.worker = worker # change type hint - async def _process_task(self, task: Task, dependencies: list[Task]) -> TaskState: + async def _process_task( + self, task: Task, dependencies: list[Task] + ) -> TaskState: r"""Processes a task with its dependencies using an efficient agent management system. @@ -82,11 +89,14 @@ class SingleAgentWorker(BaseSingleAgentWorker): worker_agent = await self._get_worker_agent() worker_agent.process_task_id = task.id # type: ignore rewrite line - logger.info("Starting task processing", extra={ - "task_id": task.id, - "worker_agent_id": worker_agent.agent_id, - "dependencies_count": len(dependencies) - }) + logger.info( + "Starting task processing", + extra={ + "task_id": task.id, + "worker_agent_id": worker_agent.agent_id, + "dependencies_count": len(dependencies), + }, + ) response_content = "" final_response = None @@ -120,38 +130,61 @@ class SingleAgentWorker(BaseSingleAgentWorker): if isinstance(response, AsyncStreamingChatAgentResponse): # With stream_accumulate=False, we need to accumulate delta content accumulated_content = "" + last_chunk = None + chunk_count = 0 async for chunk in response: + chunk_count += 1 + last_chunk = chunk if chunk.msg and chunk.msg.content: accumulated_content += chunk.msg.content + logger.info( + f"Streaming complete: {chunk_count} chunks, content_length={len(accumulated_content)}" + ) response_content = accumulated_content + # Store usage info from last chunk for later use + response._last_chunk_info = ( + last_chunk.info if last_chunk else {} + ) else: # Regular ChatAgentResponse - response_content = response.msg.content if response.msg else "" + response_content = ( + response.msg.content if response.msg else "" + ) - task_result = self.structured_handler.parse_structured_response( - response_text=response_content, - schema=TaskResult, - fallback_values={ - "content": "Task processing failed", - "failed": True, - }, + task_result = ( + self.structured_handler.parse_structured_response( + response_text=response_content, + schema=TaskResult, + fallback_values={ + "content": "Task processing failed", + "failed": True, + }, + ) ) else: # Use native structured output if supported - response = await worker_agent.astep(prompt, response_format=TaskResult) + response = await worker_agent.astep( + prompt, response_format=TaskResult + ) - # Handle streaming response for native output + # Handle streaming response for native output (shouldn't happen now but keep for safety) if isinstance(response, AsyncStreamingChatAgentResponse): task_result = None # With stream_accumulate=False, we need to accumulate delta content accumulated_content = "" + last_chunk = None async for chunk in response: + last_chunk = chunk if chunk.msg: if chunk.msg.content: accumulated_content += chunk.msg.content if chunk.msg.parsed: task_result = chunk.msg.parsed response_content = accumulated_content + # Store usage info from last chunk for later use + response._last_chunk_info = ( + last_chunk.info if last_chunk else {} + ) # If no parsed result found in streaming, create fallback if task_result is None: task_result = TaskResult( @@ -161,16 +194,24 @@ class SingleAgentWorker(BaseSingleAgentWorker): else: # Regular ChatAgentResponse task_result = response.msg.parsed - response_content = response.msg.content if response.msg else "" + response_content = ( + response.msg.content if response.msg else "" + ) # Get token usage from the response if isinstance(response, AsyncStreamingChatAgentResponse): - # For streaming responses, get the final response info - final_response = await response - usage_info = final_response.info.get("usage") or final_response.info.get("token_usage") + # For streaming responses, get info from last chunk captured during iteration + chunk_info = getattr(response, "_last_chunk_info", {}) + usage_info = chunk_info.get("usage") or chunk_info.get( + "token_usage" + ) else: - usage_info = response.info.get("usage") or response.info.get("token_usage") - total_tokens = usage_info.get("total_tokens", 0) if usage_info else 0 + usage_info = response.info.get("usage") or response.info.get( + "token_usage" + ) + total_tokens = ( + usage_info.get("total_tokens", 0) if usage_info else 0 + ) # collect conversation from working agent to # accumulator for workflow memory @@ -184,16 +225,24 @@ class SingleAgentWorker(BaseSingleAgentWorker): work_records = worker_agent.memory.retrieve() # write these records to the accumulator's memory - memory_records = [record.memory_record for record in work_records] + memory_records = [ + record.memory_record for record in work_records + ] accumulator.memory.write_records(memory_records) - logger.debug(f"Transferred {len(memory_records)} memory records to accumulator") + logger.debug( + f"Transferred {len(memory_records)} memory records to accumulator" + ) except Exception as e: - logger.warning(f"Failed to transfer conversation to accumulator: {e}") + logger.warning( + f"Failed to transfer conversation to accumulator: {e}" + ) except Exception as e: - logger.error(f"Error processing task {task.id}: {type(e).__name__}: {e}") + logger.error( + f"Error processing task {task.id}: {type(e).__name__}: {e}" + ) # Store error information in task result task.result = f"{type(e).__name__}: {e!s}" return TaskState.FAILED @@ -207,10 +256,16 @@ class SingleAgentWorker(BaseSingleAgentWorker): # Create worker attempt details with descriptive keys # Use final_response if available (streaming), otherwise use response - response_for_info = final_response if final_response is not None else response + response_for_info = ( + final_response if final_response is not None else response + ) worker_attempt_details = { - "agent_id": getattr(worker_agent, "agent_id", worker_agent.role_name), - "original_worker_id": getattr(self.worker, "agent_id", self.worker.role_name), + "agent_id": getattr( + worker_agent, "agent_id", worker_agent.role_name + ), + "original_worker_id": getattr( + self.worker, "agent_id", self.worker.role_name + ), "timestamp": str(datetime.datetime.now()), "description": f"Attempt by " f"{getattr(worker_agent, 'agent_id', worker_agent.role_name)} " @@ -218,7 +273,11 @@ class SingleAgentWorker(BaseSingleAgentWorker): f"{getattr(self.worker, 'agent_id', self.worker.role_name)}) " f"to process task: {task.content}", "response_content": response_content[:50], - "tool_calls": str(response_for_info.info.get("tool_calls", []) if response_for_info and hasattr(response_for_info, 'info') else [])[:50], + "tool_calls": str( + response_for_info.info.get("tool_calls", []) + if response_for_info and hasattr(response_for_info, "info") + else [] + )[:50], "total_tokens": total_tokens, } @@ -237,8 +296,12 @@ class SingleAgentWorker(BaseSingleAgentWorker): if not self.use_structured_output_handler: # Handle native structured output parsing if task_result is None: - logger.error("Error in worker step execution: Invalid task result") - print(f"{Fore.RED}Error in worker step execution: Invalid task result{Fore.RESET}") + logger.error( + "Error in worker step execution: Invalid task result" + ) + print( + f"{Fore.RED}Error in worker step execution: Invalid task result{Fore.RESET}" + ) task_result = TaskResult( content="Failed to generate valid task result.", failed=True, @@ -260,6 +323,8 @@ class SingleAgentWorker(BaseSingleAgentWorker): return TaskState.FAILED if is_task_result_insufficient(task): - logger.warning(f"Task {task.id}: Content validation failed - task marked as failed") + logger.warning( + f"Task {task.id}: Content validation failed - task marked as failed" + ) return TaskState.FAILED return TaskState.DONE diff --git a/backend/app/utils/toolkit/__init__.py b/backend/app/utils/toolkit/__init__.py index e69de29bb..3a4d90c0e 100644 --- a/backend/app/utils/toolkit/__init__.py +++ b/backend/app/utils/toolkit/__init__.py @@ -0,0 +1,14 @@ +# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= + diff --git a/backend/pyproject.toml b/backend/pyproject.toml index b06e93a4e..e1d7a8f76 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -6,7 +6,7 @@ readme = "README.md" requires-python = ">=3.10,<3.11" dependencies = [ "pip>=23.0", - "camel-ai[eigent]==0.2.85a0", + "camel-ai[eigent]==0.2.85", "fastapi>=0.115.12", "fastapi-babel>=1.0.0", "uvicorn[standard]>=0.34.2",