mirror of
https://github.com/agent0ai/agent-zero.git
synced 2026-05-19 07:59:34 +00:00
Remove the Collabora/WOPI runtime and route stack, including the old status APIs, proxy helpers, bootstrap extensions, and WOPI store tests. Add the Markdown-first document store, LibreOffice status/conversion helpers, LibreOfficeKit session bridge, and reusable Xpra virtual desktop gateway used by the new document runtime. Update image and self-update bootstrap paths so existing containers can acquire the LibreOffice, XFCE, Xpra, and desktop-control dependencies through the normal install hooks instead of an ad hoc manual install.
95 lines
4.5 KiB
Python
95 lines
4.5 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from helpers.ws import WsHandler
|
|
from helpers.ws_manager import WsResult
|
|
from plugins._office.helpers import document_store, libreofficekit_sessions
|
|
|
|
|
|
class WsOffice(WsHandler):
|
|
async def on_disconnect(self, sid: str) -> None:
|
|
libreofficekit_sessions.get_manager().close_sid(sid)
|
|
|
|
async def process(self, event: str, data: dict[str, Any], sid: str) -> dict[str, Any] | WsResult | None:
|
|
if not event.startswith("office_"):
|
|
return None
|
|
try:
|
|
if event == "office_open":
|
|
return self._open(data, sid)
|
|
if event == "office_input":
|
|
return libreofficekit_sessions.get_manager().input(
|
|
str(data.get("session_id") or ""),
|
|
text=data.get("text") if "text" in data else None,
|
|
patch=data.get("patch") if isinstance(data.get("patch"), dict) else None,
|
|
)
|
|
if event == "office_key":
|
|
return libreofficekit_sessions.get_manager().key(
|
|
str(data.get("session_id") or ""),
|
|
data.get("key") if isinstance(data.get("key"), dict) else {},
|
|
)
|
|
if event == "office_mouse":
|
|
return libreofficekit_sessions.get_manager().mouse(
|
|
str(data.get("session_id") or ""),
|
|
data.get("mouse") if isinstance(data.get("mouse"), dict) else {},
|
|
)
|
|
if event == "office_cursor":
|
|
return libreofficekit_sessions.get_manager().cursor(
|
|
str(data.get("session_id") or ""),
|
|
data.get("cursor") if isinstance(data.get("cursor"), dict) else {},
|
|
)
|
|
if event == "office_selection":
|
|
return libreofficekit_sessions.get_manager().selection(
|
|
str(data.get("session_id") or ""),
|
|
data.get("selection") if isinstance(data.get("selection"), dict) else {},
|
|
)
|
|
if event == "office_invalidated_tiles":
|
|
session_id = str(data.get("session_id") or "")
|
|
return {"session_id": session_id, "tiles": libreofficekit_sessions.get_manager().tiles(session_id)}
|
|
if event == "office_command":
|
|
return libreofficekit_sessions.get_manager().command(
|
|
str(data.get("session_id") or ""),
|
|
str(data.get("command") or ""),
|
|
arguments=data.get("arguments"),
|
|
notify=bool(data.get("notify", True)),
|
|
)
|
|
if event == "office_command_values":
|
|
return libreofficekit_sessions.get_manager().command_values(
|
|
str(data.get("session_id") or ""),
|
|
str(data.get("command") or ""),
|
|
)
|
|
if event == "office_save":
|
|
return libreofficekit_sessions.get_manager().save(
|
|
str(data.get("session_id") or ""),
|
|
text=data.get("text") if "text" in data else None,
|
|
)
|
|
if event == "office_close":
|
|
return libreofficekit_sessions.get_manager().close(str(data.get("session_id") or ""))
|
|
except FileNotFoundError as exc:
|
|
return WsResult.error(code="OFFICE_SESSION_NOT_FOUND", message=str(exc), correlation_id=data.get("correlationId"))
|
|
except Exception as exc:
|
|
return WsResult.error(code="OFFICE_ERROR", message=str(exc), correlation_id=data.get("correlationId"))
|
|
|
|
return WsResult.error(
|
|
code="UNKNOWN_OFFICE_EVENT",
|
|
message=f"Unknown office event: {event}",
|
|
correlation_id=data.get("correlationId"),
|
|
)
|
|
|
|
def _open(self, data: dict[str, Any], sid: str) -> dict[str, Any] | WsResult:
|
|
context_id = str(data.get("ctxid") or data.get("context_id") or "")
|
|
file_id = str(data.get("file_id") or "").strip()
|
|
path = str(data.get("path") or "").strip()
|
|
if file_id:
|
|
doc = document_store.get_document(file_id)
|
|
elif path:
|
|
doc = document_store.register_document(path, context_id=context_id)
|
|
else:
|
|
doc = document_store.create_document(
|
|
kind=str(data.get("kind") or "document"),
|
|
title=str(data.get("title") or "Untitled"),
|
|
fmt=str(data.get("format") or "md"),
|
|
content=str(data.get("content") or ""),
|
|
context_id=context_id,
|
|
)
|
|
return libreofficekit_sessions.get_manager().open(doc, sid=sid)
|