From 663d4b5e794d3e24dd763ff18d70d33229797aee Mon Sep 17 00:00:00 2001 From: wgnr-ai Date: Fri, 10 Apr 2026 02:17:22 +0000 Subject: [PATCH] fix: prevent silent chat death and reduce streaming overhead MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three-factor compound failure causing chat sessions to silently stall: 1. Silent Exception Death (helpers/defer.py): DeferredTask._on_task_done() captures exceptions in Future but no code reads it. Chat sessions die with zero UI feedback. Fix: Log exceptions and notify AgentContext on task failure. 2. O(n²) Per-Chunk DirtyJson Parsing (agent.py): stream_callback runs extract_json_root_string() + json_parse_dirty() on EVERY streaming chunk with growing string length. Fix: Only parse when full.rstrip() ends with } or ]. 3. Indefinite History Compression Wait: organize_history_wait has await task.result() with no timeout. Fix: Wrap with asyncio.wait_for() 30s timeout. Amplifier: All chats share one EventLoopThread singleton, making concurrent sessions particularly affected. --- agent.py | 35 +++++++++++++++++-- .../_90_organize_history_wait.py | 5 +-- helpers/defer.py | 14 ++++++++ 3 files changed, 50 insertions(+), 4 deletions(-) diff --git a/agent.py b/agent.py index 68a5e1369..fde9c923d 100644 --- a/agent.py +++ b/agent.py @@ -388,6 +388,7 @@ class Agent: self.context.streaming_agent = self # mark self as current streamer self.loop_data.iteration += 1 self.loop_data.params_temporary = {} # clear temporary params + last_response_stream_full = "" # call message_loop_start extensions await extension.call_extensions_async( @@ -425,12 +426,39 @@ class Agent: await self.handle_reasoning_stream(stream_data["full"]) async def stream_callback(chunk: str, full: str): + nonlocal last_response_stream_full await self.handle_intervention() # output the agent response stream if chunk == full: printer.print("Response: ") # start of response # Pass chunk and full data to extensions for processing stream_data = {"chunk": chunk, "full": full} + stop_response: str | None = None + + # wgnr.ai FIX: Only attempt JSON parse when stream + # might contain a complete JSON root (ends with } or ]). + # This eliminates O(n²) per-chunk DirtyJson overhead that + # causes event loop starvation with multiple chat sessions. + stripped = full.rstrip() + snapshot = None + if stripped and stripped[-1] in ('}', ']'): + snapshot = extract_tools.extract_json_root_string(full) + if snapshot: + parsed_snapshot = extract_tools.json_parse_dirty(snapshot) + if parsed_snapshot is not None: + try: + await self.validate_tool_request(parsed_snapshot) + except Exception: + pass + else: + previous_full = last_response_stream_full + stream_data["full"] = snapshot + if snapshot.startswith(previous_full): + stream_data["chunk"] = snapshot[len(previous_full) :] + else: + stream_data["chunk"] = snapshot + stop_response = snapshot + await extension.call_extensions_async( "response_stream_chunk", self, @@ -442,6 +470,9 @@ class Agent: printer.stream(stream_data["chunk"]) # Use the potentially modified full text for downstream processing await self.handle_response_stream(stream_data["full"]) + last_response_stream_full = stream_data["full"] + if stop_response is not None: + return stop_response # call main LLM agent_response, _reasoning = await self.call_chat_model( @@ -770,7 +801,7 @@ class Agent: async def call_chat_model( self, messages: list[BaseMessage], - response_callback: Callable[[str, str], Awaitable[None]] | None = None, + response_callback: Callable[[str, str], Awaitable[str | None]] | None = None, reasoning_callback: Callable[[str, str], Awaitable[None]] | None = None, background: bool = False, explicit_caching: bool = True, @@ -954,7 +985,7 @@ class Agent: raise ValueError("Tool request must be a dictionary") if not tool_request.get("tool_name") or not isinstance(tool_request.get("tool_name"), str): raise ValueError("Tool request must have a tool_name (type string) field") - if not tool_request.get("tool_args") or not isinstance(tool_request.get("tool_args"), dict): + if "tool_args" not in tool_request or not isinstance(tool_request.get("tool_args"), dict): raise ValueError("Tool request must have a tool_args (type dictionary) field") diff --git a/extensions/python/message_loop_prompts_before/_90_organize_history_wait.py b/extensions/python/message_loop_prompts_before/_90_organize_history_wait.py index 903981ad4..4f21fd9e9 100644 --- a/extensions/python/message_loop_prompts_before/_90_organize_history_wait.py +++ b/extensions/python/message_loop_prompts_before/_90_organize_history_wait.py @@ -1,3 +1,4 @@ +import asyncio from helpers.extension import Extension from agent import LoopData from extensions.python.message_loop_end._10_organize_history import DATA_NAME_TASK @@ -19,8 +20,8 @@ class OrganizeHistoryWait(Extension): if not task.is_ready(): self.agent.context.log.set_progress("Compressing history...") - # Wait for the task to complete - await task.result() + # Wait for the task to complete (with timeout to prevent indefinite blocking) + await asyncio.wait_for(task.result(), timeout=30) # Clear the coroutine data after it's done self.agent.set_data(DATA_NAME_TASK, None) diff --git a/helpers/defer.py b/helpers/defer.py index eb2a68e56..f899905a1 100644 --- a/helpers/defer.py +++ b/helpers/defer.py @@ -109,6 +109,20 @@ class DeferredTask: def _on_task_done(self, _future: Future): # Ensure child background tasks are always cleaned up once the parent finishes self.kill_children() + # Log any exception that killed the task silently + exc = _future.exception() if _future else None + if exc: + from helpers.print_style import PrintStyle + PrintStyle.error(f"DeferredTask died with exception: {exc}") + # Notify the AgentContext if available so UI can show the error + try: + from agent import AgentContext + ctx = AgentContext.current() + if ctx: + ctx.log.log(type="error", content=f"Task failed: {exc}") + ctx.streaming_agent = None + except Exception: + pass async def _run(self): return await self.func(*self.args, **self.kwargs)