This commit is contained in:
Wagner dos Santos 2026-05-14 08:37:03 +08:00 committed by GitHub
commit b54066b590
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 91 additions and 7 deletions

View file

@ -388,6 +388,7 @@ class Agent:
self.context.streaming_agent = self # mark self as current streamer
self.loop_data.iteration += 1
self.loop_data.params_temporary = {} # clear temporary params
last_response_stream_full = ""
# call message_loop_start extensions
await extension.call_extensions_async(
@ -425,12 +426,39 @@ class Agent:
await self.handle_reasoning_stream(stream_data["full"])
async def stream_callback(chunk: str, full: str):
nonlocal last_response_stream_full
await self.handle_intervention()
# output the agent response stream
if chunk == full:
printer.print("Response: ") # start of response
# Pass chunk and full data to extensions for processing
stream_data = {"chunk": chunk, "full": full}
stop_response: str | None = None
# wgnr.ai FIX: Only attempt JSON parse when stream
# might contain a complete JSON root (ends with } or ]).
# This eliminates O(n²) per-chunk DirtyJson overhead that
# causes event loop starvation with multiple chat sessions.
stripped = full.rstrip()
snapshot = None
if stripped and stripped[-1] in ('}', ']'):
snapshot = extract_tools.extract_json_root_string(full)
if snapshot:
parsed_snapshot = extract_tools.json_parse_dirty(snapshot)
if parsed_snapshot is not None:
try:
await self.validate_tool_request(parsed_snapshot)
except Exception:
pass
else:
previous_full = last_response_stream_full
stream_data["full"] = snapshot
if snapshot.startswith(previous_full):
stream_data["chunk"] = snapshot[len(previous_full) :]
else:
stream_data["chunk"] = snapshot
stop_response = snapshot
await extension.call_extensions_async(
"response_stream_chunk",
self,
@ -442,6 +470,9 @@ class Agent:
printer.stream(stream_data["chunk"])
# Use the potentially modified full text for downstream processing
await self.handle_response_stream(stream_data["full"])
last_response_stream_full = stream_data["full"]
if stop_response is not None:
return stop_response
# call main LLM
agent_response, _reasoning = await self.call_chat_model(
@ -770,7 +801,7 @@ class Agent:
async def call_chat_model(
self,
messages: list[BaseMessage],
response_callback: Callable[[str, str], Awaitable[None]] | None = None,
response_callback: Callable[[str, str], Awaitable[str | None]] | None = None,
reasoning_callback: Callable[[str, str], Awaitable[None]] | None = None,
background: bool = False,
explicit_caching: bool = True,
@ -954,7 +985,7 @@ class Agent:
raise ValueError("Tool request must be a dictionary")
if not tool_request.get("tool_name") or not isinstance(tool_request.get("tool_name"), str):
raise ValueError("Tool request must have a tool_name (type string) field")
if not tool_request.get("tool_args") or not isinstance(tool_request.get("tool_args"), dict):
if "tool_args" not in tool_request or not isinstance(tool_request.get("tool_args"), dict):
raise ValueError("Tool request must have a tool_args (type dictionary) field")

View file

@ -1,3 +1,4 @@
import asyncio
from helpers.extension import Extension
from agent import LoopData
from extensions.python.message_loop_end._10_organize_history import DATA_NAME_TASK
@ -19,8 +20,8 @@ class OrganizeHistoryWait(Extension):
if not task.is_ready():
self.agent.context.log.set_progress("Compressing history...")
# Wait for the task to complete
await task.result()
# Wait for the task to complete (with timeout to prevent indefinite blocking)
await asyncio.wait_for(task.result(), timeout=30)
# Clear the coroutine data after it's done
self.agent.set_data(DATA_NAME_TASK, None)

View file

@ -109,6 +109,20 @@ class DeferredTask:
def _on_task_done(self, _future: Future):
# Ensure child background tasks are always cleaned up once the parent finishes
self.kill_children()
# Log any exception that killed the task silently
exc = _future.exception() if _future else None
if exc:
from helpers.print_style import PrintStyle
PrintStyle.error(f"DeferredTask died with exception: {exc}")
# Notify the AgentContext if available so UI can show the error
try:
from agent import AgentContext
ctx = AgentContext.current()
if ctx:
ctx.log.log(type="error", content=f"Task failed: {exc}")
ctx.streaming_agent = None
except Exception:
pass
async def _run(self):
return await self.func(*self.args, **self.kwargs)

View file

@ -305,7 +305,13 @@ class WsManager:
return await coro
future = asyncio.run_coroutine_threadsafe(coro, dispatcher_loop)
return await asyncio.wrap_future(future)
# wgnr.ai FIX: Add timeout to prevent indefinite blocking when main loop is busy.
# This prevents a stalled main event loop from blocking all chat emit operations.
try:
return await asyncio.wait_for(asyncio.wrap_future(future), timeout=10.0)
except asyncio.TimeoutError:
future.cancel()
raise RuntimeError("Dispatcher loop emit timed out after 10s")
def _diagnostics_active(self) -> bool:
if not self._diagnostics_enabled:
@ -694,8 +700,22 @@ class WsManager:
instrument = self._diagnostics_active()
start = time.perf_counter() if instrument else None
try:
value = await self._get_handler_worker().execute_inside(
handler.process, event_type, payload, sid
# wgnr.ai FIX: Add timeout to handler execution to prevent
# a single stalled handler from blocking all event processing.
value = await asyncio.wait_for(
self._get_handler_worker().execute_inside(
handler.process, event_type, payload, sid
),
timeout=60.0,
)
except asyncio.TimeoutError:
duration_ms = (
(time.perf_counter() - start) * 1000 if start is not None else None
)
return _HandlerExecution(
handler,
RuntimeError(f"Handler {handler.identifier} timed out after 60s"),
duration_ms,
)
except Exception as exc: # pragma: no cover - handled by caller
duration_ms = (
@ -1275,6 +1295,24 @@ class WsManager:
"payloadSummary": self._summarize_payload(data),
}
)
async def _emit_fire_and_forget(
self,
namespace: str,
sid: str,
event_type: str,
data: dict[str, Any],
) -> None:
"""Emit without blocking - for high-frequency streaming events.
wgnr.ai FIX: Bypasses the dispatcher loop bridge entirely to avoid
timeout/blocking during active streaming when multiple chats compete
for the main event loop.
"""
try:
self.socketio.emit(event_type, data, to=sid, namespace=namespace)
except Exception:
pass # Best effort for streaming
async def send_data(
self,