agent-zero/plugins/_office/api/ws_office.py
Alessandro 10a6cd28c6 feat(office): replace Collabora with LibreOffice document runtime
Remove the Collabora/WOPI runtime and route stack, including the old status APIs, proxy helpers, bootstrap extensions, and WOPI store tests. Add the Markdown-first document store, LibreOffice status/conversion helpers, LibreOfficeKit session bridge, and reusable Xpra virtual desktop gateway used by the new document runtime.

Update image and self-update bootstrap paths so existing containers can acquire the LibreOffice, XFCE, Xpra, and desktop-control dependencies through the normal install hooks instead of an ad hoc manual install.
2026-05-02 13:07:10 +02:00

95 lines
4.5 KiB
Python

from __future__ import annotations
from typing import Any
from helpers.ws import WsHandler
from helpers.ws_manager import WsResult
from plugins._office.helpers import document_store, libreofficekit_sessions
class WsOffice(WsHandler):
async def on_disconnect(self, sid: str) -> None:
libreofficekit_sessions.get_manager().close_sid(sid)
async def process(self, event: str, data: dict[str, Any], sid: str) -> dict[str, Any] | WsResult | None:
if not event.startswith("office_"):
return None
try:
if event == "office_open":
return self._open(data, sid)
if event == "office_input":
return libreofficekit_sessions.get_manager().input(
str(data.get("session_id") or ""),
text=data.get("text") if "text" in data else None,
patch=data.get("patch") if isinstance(data.get("patch"), dict) else None,
)
if event == "office_key":
return libreofficekit_sessions.get_manager().key(
str(data.get("session_id") or ""),
data.get("key") if isinstance(data.get("key"), dict) else {},
)
if event == "office_mouse":
return libreofficekit_sessions.get_manager().mouse(
str(data.get("session_id") or ""),
data.get("mouse") if isinstance(data.get("mouse"), dict) else {},
)
if event == "office_cursor":
return libreofficekit_sessions.get_manager().cursor(
str(data.get("session_id") or ""),
data.get("cursor") if isinstance(data.get("cursor"), dict) else {},
)
if event == "office_selection":
return libreofficekit_sessions.get_manager().selection(
str(data.get("session_id") or ""),
data.get("selection") if isinstance(data.get("selection"), dict) else {},
)
if event == "office_invalidated_tiles":
session_id = str(data.get("session_id") or "")
return {"session_id": session_id, "tiles": libreofficekit_sessions.get_manager().tiles(session_id)}
if event == "office_command":
return libreofficekit_sessions.get_manager().command(
str(data.get("session_id") or ""),
str(data.get("command") or ""),
arguments=data.get("arguments"),
notify=bool(data.get("notify", True)),
)
if event == "office_command_values":
return libreofficekit_sessions.get_manager().command_values(
str(data.get("session_id") or ""),
str(data.get("command") or ""),
)
if event == "office_save":
return libreofficekit_sessions.get_manager().save(
str(data.get("session_id") or ""),
text=data.get("text") if "text" in data else None,
)
if event == "office_close":
return libreofficekit_sessions.get_manager().close(str(data.get("session_id") or ""))
except FileNotFoundError as exc:
return WsResult.error(code="OFFICE_SESSION_NOT_FOUND", message=str(exc), correlation_id=data.get("correlationId"))
except Exception as exc:
return WsResult.error(code="OFFICE_ERROR", message=str(exc), correlation_id=data.get("correlationId"))
return WsResult.error(
code="UNKNOWN_OFFICE_EVENT",
message=f"Unknown office event: {event}",
correlation_id=data.get("correlationId"),
)
def _open(self, data: dict[str, Any], sid: str) -> dict[str, Any] | WsResult:
context_id = str(data.get("ctxid") or data.get("context_id") or "")
file_id = str(data.get("file_id") or "").strip()
path = str(data.get("path") or "").strip()
if file_id:
doc = document_store.get_document(file_id)
elif path:
doc = document_store.register_document(path, context_id=context_id)
else:
doc = document_store.create_document(
kind=str(data.get("kind") or "document"),
title=str(data.get("title") or "Untitled"),
fmt=str(data.get("format") or "md"),
content=str(data.get("content") or ""),
context_id=context_id,
)
return libreofficekit_sessions.get_manager().open(doc, sid=sid)