diff --git a/agent.py b/agent.py index 68a5e1369..fde9c923d 100644 --- a/agent.py +++ b/agent.py @@ -388,6 +388,7 @@ class Agent: self.context.streaming_agent = self # mark self as current streamer self.loop_data.iteration += 1 self.loop_data.params_temporary = {} # clear temporary params + last_response_stream_full = "" # call message_loop_start extensions await extension.call_extensions_async( @@ -425,12 +426,39 @@ class Agent: await self.handle_reasoning_stream(stream_data["full"]) async def stream_callback(chunk: str, full: str): + nonlocal last_response_stream_full await self.handle_intervention() # output the agent response stream if chunk == full: printer.print("Response: ") # start of response # Pass chunk and full data to extensions for processing stream_data = {"chunk": chunk, "full": full} + stop_response: str | None = None + + # wgnr.ai FIX: Only attempt JSON parse when stream + # might contain a complete JSON root (ends with } or ]). + # This eliminates O(n²) per-chunk DirtyJson overhead that + # causes event loop starvation with multiple chat sessions. + stripped = full.rstrip() + snapshot = None + if stripped and stripped[-1] in ('}', ']'): + snapshot = extract_tools.extract_json_root_string(full) + if snapshot: + parsed_snapshot = extract_tools.json_parse_dirty(snapshot) + if parsed_snapshot is not None: + try: + await self.validate_tool_request(parsed_snapshot) + except Exception: + pass + else: + previous_full = last_response_stream_full + stream_data["full"] = snapshot + if snapshot.startswith(previous_full): + stream_data["chunk"] = snapshot[len(previous_full) :] + else: + stream_data["chunk"] = snapshot + stop_response = snapshot + await extension.call_extensions_async( "response_stream_chunk", self, @@ -442,6 +470,9 @@ class Agent: printer.stream(stream_data["chunk"]) # Use the potentially modified full text for downstream processing await self.handle_response_stream(stream_data["full"]) + last_response_stream_full = stream_data["full"] + if stop_response is not None: + return stop_response # call main LLM agent_response, _reasoning = await self.call_chat_model( @@ -770,7 +801,7 @@ class Agent: async def call_chat_model( self, messages: list[BaseMessage], - response_callback: Callable[[str, str], Awaitable[None]] | None = None, + response_callback: Callable[[str, str], Awaitable[str | None]] | None = None, reasoning_callback: Callable[[str, str], Awaitable[None]] | None = None, background: bool = False, explicit_caching: bool = True, @@ -954,7 +985,7 @@ class Agent: raise ValueError("Tool request must be a dictionary") if not tool_request.get("tool_name") or not isinstance(tool_request.get("tool_name"), str): raise ValueError("Tool request must have a tool_name (type string) field") - if not tool_request.get("tool_args") or not isinstance(tool_request.get("tool_args"), dict): + if "tool_args" not in tool_request or not isinstance(tool_request.get("tool_args"), dict): raise ValueError("Tool request must have a tool_args (type dictionary) field") diff --git a/extensions/python/message_loop_prompts_before/_90_organize_history_wait.py b/extensions/python/message_loop_prompts_before/_90_organize_history_wait.py index 903981ad4..4f21fd9e9 100644 --- a/extensions/python/message_loop_prompts_before/_90_organize_history_wait.py +++ b/extensions/python/message_loop_prompts_before/_90_organize_history_wait.py @@ -1,3 +1,4 @@ +import asyncio from helpers.extension import Extension from agent import LoopData from extensions.python.message_loop_end._10_organize_history import DATA_NAME_TASK @@ -19,8 +20,8 @@ class OrganizeHistoryWait(Extension): if not task.is_ready(): self.agent.context.log.set_progress("Compressing history...") - # Wait for the task to complete - await task.result() + # Wait for the task to complete (with timeout to prevent indefinite blocking) + await asyncio.wait_for(task.result(), timeout=30) # Clear the coroutine data after it's done self.agent.set_data(DATA_NAME_TASK, None) diff --git a/helpers/defer.py b/helpers/defer.py index eb2a68e56..f899905a1 100644 --- a/helpers/defer.py +++ b/helpers/defer.py @@ -109,6 +109,20 @@ class DeferredTask: def _on_task_done(self, _future: Future): # Ensure child background tasks are always cleaned up once the parent finishes self.kill_children() + # Log any exception that killed the task silently + exc = _future.exception() if _future else None + if exc: + from helpers.print_style import PrintStyle + PrintStyle.error(f"DeferredTask died with exception: {exc}") + # Notify the AgentContext if available so UI can show the error + try: + from agent import AgentContext + ctx = AgentContext.current() + if ctx: + ctx.log.log(type="error", content=f"Task failed: {exc}") + ctx.streaming_agent = None + except Exception: + pass async def _run(self): return await self.func(*self.args, **self.kwargs) diff --git a/helpers/ws_manager.py b/helpers/ws_manager.py index 70e5d5b95..b6eea5219 100644 --- a/helpers/ws_manager.py +++ b/helpers/ws_manager.py @@ -305,7 +305,13 @@ class WsManager: return await coro future = asyncio.run_coroutine_threadsafe(coro, dispatcher_loop) - return await asyncio.wrap_future(future) + # wgnr.ai FIX: Add timeout to prevent indefinite blocking when main loop is busy. + # This prevents a stalled main event loop from blocking all chat emit operations. + try: + return await asyncio.wait_for(asyncio.wrap_future(future), timeout=10.0) + except asyncio.TimeoutError: + future.cancel() + raise RuntimeError("Dispatcher loop emit timed out after 10s") def _diagnostics_active(self) -> bool: if not self._diagnostics_enabled: @@ -694,8 +700,22 @@ class WsManager: instrument = self._diagnostics_active() start = time.perf_counter() if instrument else None try: - value = await self._get_handler_worker().execute_inside( - handler.process, event_type, payload, sid + # wgnr.ai FIX: Add timeout to handler execution to prevent + # a single stalled handler from blocking all event processing. + value = await asyncio.wait_for( + self._get_handler_worker().execute_inside( + handler.process, event_type, payload, sid + ), + timeout=60.0, + ) + except asyncio.TimeoutError: + duration_ms = ( + (time.perf_counter() - start) * 1000 if start is not None else None + ) + return _HandlerExecution( + handler, + RuntimeError(f"Handler {handler.identifier} timed out after 60s"), + duration_ms, ) except Exception as exc: # pragma: no cover - handled by caller duration_ms = ( @@ -1275,6 +1295,24 @@ class WsManager: "payloadSummary": self._summarize_payload(data), } ) + async def _emit_fire_and_forget( + self, + namespace: str, + sid: str, + event_type: str, + data: dict[str, Any], + ) -> None: + """Emit without blocking - for high-frequency streaming events. + + wgnr.ai FIX: Bypasses the dispatcher loop bridge entirely to avoid + timeout/blocking during active streaming when multiple chats compete + for the main event loop. + """ + try: + self.socketio.emit(event_type, data, to=sid, namespace=namespace) + except Exception: + pass # Best effort for streaming + async def send_data( self,