agent-zero/api/ws_webui.py
keyboardstaff 1d81f72a31 refactor: Backend core rewrite - WsHandler + WsManager + handler migration
- Add WsHandler base class, WsManager (connection tracking / event routing / buffering), WsResult
- Extract network.py (is_loopback_address) and context_utils.py (use_context) to eliminate duplication
- Migrate three handlers to api/ following the ws_* py naming convention
- Simplify run_ui.py WebSocket init from ~170 lines to ~10
- Update import paths in api.py, plugins.py, state_monitor.py
2026-03-26 00:58:01 -07:00

32 lines
1 KiB
Python

from helpers.ws import WsHandler
from helpers import extension
class WsWebui(WsHandler):
"""State synchronisation handler — the primary WebSocket endpoint for the UI."""
async def on_connect(self, sid: str) -> None:
await extension.call_extensions_async(
"webui_ws_connect", agent=None, instance=self, sid=sid
)
async def on_disconnect(self, sid: str) -> None:
await extension.call_extensions_async(
"webui_ws_disconnect", agent=None, instance=self, sid=sid
)
async def process(self, event: str, data: dict, sid: str) -> dict | None:
response_data: dict = {}
await extension.call_extensions_async(
"webui_ws_event",
agent=None,
instance=self,
sid=sid,
event_type=event,
data=data,
response_data=response_data,
)
# Return None (fire-and-forget) when no extension populated the response.
return response_data if response_data else None