mirror of
https://github.com/agent0ai/agent-zero.git
synced 2026-04-28 03:30:23 +00:00
- Add WsHandler base class, WsManager (connection tracking / event routing / buffering), WsResult - Extract network.py (is_loopback_address) and context_utils.py (use_context) to eliminate duplication - Migrate three handlers to api/ following the ws_* py naming convention - Simplify run_ui.py WebSocket init from ~170 lines to ~10 - Update import paths in api.py, plugins.py, state_monitor.py
32 lines
1 KiB
Python
32 lines
1 KiB
Python
from helpers.ws import WsHandler
|
|
from helpers import extension
|
|
|
|
|
|
class WsWebui(WsHandler):
|
|
"""State synchronisation handler — the primary WebSocket endpoint for the UI."""
|
|
|
|
async def on_connect(self, sid: str) -> None:
|
|
await extension.call_extensions_async(
|
|
"webui_ws_connect", agent=None, instance=self, sid=sid
|
|
)
|
|
|
|
async def on_disconnect(self, sid: str) -> None:
|
|
await extension.call_extensions_async(
|
|
"webui_ws_disconnect", agent=None, instance=self, sid=sid
|
|
)
|
|
|
|
async def process(self, event: str, data: dict, sid: str) -> dict | None:
|
|
response_data: dict = {}
|
|
|
|
await extension.call_extensions_async(
|
|
"webui_ws_event",
|
|
agent=None,
|
|
instance=self,
|
|
sid=sid,
|
|
event_type=event,
|
|
data=data,
|
|
response_data=response_data,
|
|
)
|
|
|
|
# Return None (fire-and-forget) when no extension populated the response.
|
|
return response_data if response_data else None
|