mirror of
https://github.com/agent0ai/agent-zero.git
synced 2026-05-24 13:43:41 +00:00
Route Markdown files to Editor, txt and Office documents to Desktop, and browser-renderable files to Browser from the file browser action menu. Extend the Desktop/document allowlists for txt files, keep unsupported small files on the legacy editor path, and harden tooltip cleanup for dropdown-triggered modal closes.
2466 lines
89 KiB
Python
2466 lines
89 KiB
Python
from __future__ import annotations
|
|
|
|
import atexit
|
|
import fcntl
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import socket
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
import uuid
|
|
import xml.etree.ElementTree as ET
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from helpers import files, virtual_desktop
|
|
from helpers.localization import Localization
|
|
from plugins._desktop.helpers import desktop_state
|
|
from plugins._office.helpers import document_store, libreoffice
|
|
|
|
|
|
OFFICIAL_EXTENSIONS = {"odt", "ods", "odp", "docx", "xlsx", "pptx", "txt"}
|
|
PLUGIN_NAME = "_desktop"
|
|
SYSTEM_SESSION_ID = "agent-zero-desktop"
|
|
SYSTEM_FILE_ID = "system-desktop"
|
|
SYSTEM_TITLE = "Desktop"
|
|
STATE_DIR = Path(files.get_abs_path("usr", "plugins", PLUGIN_NAME))
|
|
RETIRED_STATE_DIR = Path(files.get_abs_path("usr", PLUGIN_NAME))
|
|
SESSION_DIR = STATE_DIR / "sessions"
|
|
PROFILE_DIR = STATE_DIR / "profiles"
|
|
LEGACY_SESSION_DIRS = (
|
|
RETIRED_STATE_DIR / "sessions",
|
|
Path(files.get_abs_path("tmp", "_office", "desktop", "sessions")),
|
|
)
|
|
DISPLAY_BASE = 120
|
|
XPRA_PORT_BASE = 14500
|
|
MAX_SESSIONS = 12
|
|
DEFAULT_SCREEN_WIDTH = virtual_desktop.DEFAULT_WIDTH
|
|
DEFAULT_SCREEN_HEIGHT = virtual_desktop.DEFAULT_HEIGHT
|
|
MAX_SCREEN_WIDTH = virtual_desktop.MAX_WIDTH
|
|
MAX_SCREEN_HEIGHT = virtual_desktop.MAX_HEIGHT
|
|
BLOCKING_DIALOG_TITLES = ("Remote Files", "File Services")
|
|
DISPLAY_START_TIMEOUT_SECONDS = 30.0
|
|
PORT_START_TIMEOUT_SECONDS = 30.0
|
|
STARTUP_GRACE_SECONDS = 45
|
|
RUNTIME_INSTALL_MESSAGE = (
|
|
"Installing Agent Zero Desktop runtime dependencies. "
|
|
"This can take a few minutes after an update."
|
|
)
|
|
HIDDEN_XPRA_DESKTOP_ENTRIES = (
|
|
"xpra.desktop",
|
|
"xpra-gui.desktop",
|
|
"xpra-launcher.desktop",
|
|
"xpra-shadow.desktop",
|
|
"xpra-start.desktop",
|
|
)
|
|
HIDDEN_XFCE_MENU_ENTRIES = (
|
|
("exo-mail-reader.desktop", "Mail Reader"),
|
|
("exo-web-browser.desktop", "Web Browser"),
|
|
("xfce4-mail-reader.desktop", "Mail Reader"),
|
|
("xfce4-web-browser.desktop", "Web Browser"),
|
|
("xfce4-session-logout.desktop", "Log Out"),
|
|
("xfce4-lock-screen.desktop", "Lock Screen"),
|
|
("xflock4.desktop", "Lock Screen"),
|
|
("xfce4-switch-user.desktop", "Switch User"),
|
|
)
|
|
DESKTOP_README_SOURCE = Path(__file__).resolve().parents[1] / "assets" / "desktop" / "README.md"
|
|
DESKTOP_FOLDER_LINKS = (
|
|
("Projects", ("usr", "projects")),
|
|
("Skills", ("usr", "skills")),
|
|
("Agents", ("usr", "agents")),
|
|
("Downloads", ("usr", "downloads")),
|
|
)
|
|
URL_INTENT_MAX_ITEMS = 50
|
|
URL_INTENT_MAX_LENGTH = 8192
|
|
URL_HANDLER_DESKTOP_ID = "agent-zero-browser.desktop"
|
|
SHUTDOWN_HANDLER_DESKTOP_ID = "agent-zero-shutdown.desktop"
|
|
SHUTDOWN_PANEL_LAUNCHER_ID = SHUTDOWN_HANDLER_DESKTOP_ID
|
|
SHUTDOWN_CONFIRM_SECONDS = 8
|
|
OOR_NS = "http://openoffice.org/2001/registry"
|
|
XS_NS = "http://www.w3.org/2001/XMLSchema"
|
|
XSI_NS = "http://www.w3.org/2001/XMLSchema-instance"
|
|
|
|
|
|
@dataclass
|
|
class DesktopSession:
|
|
session_id: str
|
|
file_id: str
|
|
extension: str
|
|
path: str
|
|
title: str
|
|
display: int
|
|
xpra_port: int
|
|
token: str
|
|
url: str
|
|
profile_dir: Path
|
|
width: int = DEFAULT_SCREEN_WIDTH
|
|
height: int = DEFAULT_SCREEN_HEIGHT
|
|
processes: dict[str, subprocess.Popen[Any]] = field(default_factory=dict)
|
|
process_ids: dict[str, int] = field(default_factory=dict)
|
|
owns_processes: bool = True
|
|
started_at: float = field(default_factory=time.time)
|
|
timezone: str = field(default_factory=lambda: Localization.get().get_timezone())
|
|
|
|
def alive(self) -> bool:
|
|
return _running(self.processes.get("xpra")) or _pid_is_running(self.process_ids.get("xpra", 0))
|
|
|
|
def public(self, doc: dict[str, Any] | None = None) -> dict[str, Any]:
|
|
title = str(doc.get("basename") or "") if doc else self.title
|
|
path = str(doc.get("path") or "") if doc else self.path
|
|
extension = str(doc.get("extension") or "") if doc else self.extension
|
|
file_id = str(doc.get("file_id") or "") if doc else self.file_id
|
|
return {
|
|
"available": True,
|
|
"session_id": self.session_id,
|
|
"file_id": file_id,
|
|
"extension": extension,
|
|
"title": title,
|
|
"path": document_store.display_path(path),
|
|
"url": self.url,
|
|
"token": self.token,
|
|
"display": f":{self.display}",
|
|
"desktop_path": virtual_desktop.SESSION_PATH,
|
|
"width": self.width,
|
|
"height": self.height,
|
|
"started_at": self.started_at,
|
|
}
|
|
|
|
|
|
class DesktopSessionManager:
|
|
def __init__(self) -> None:
|
|
self._lock = threading.RLock()
|
|
self._sessions: dict[str, DesktopSession] = {}
|
|
|
|
def ensure_system_desktop(self) -> dict[str, Any]:
|
|
try:
|
|
with self._lock:
|
|
self._reap_dead_locked()
|
|
session = self._ensure_system_desktop_locked()
|
|
return session.public()
|
|
except Exception as exc:
|
|
status = collect_desktop_status()
|
|
return {
|
|
"available": False,
|
|
"error": str(exc),
|
|
"status": status,
|
|
}
|
|
|
|
def open(self, doc: dict[str, Any], *, refresh: bool = False) -> dict[str, Any]:
|
|
ext = str(doc.get("extension") or "").lower()
|
|
if ext not in OFFICIAL_EXTENSIONS:
|
|
return {"available": False, "reason": f".{ext} does not use the LibreOffice desktop surface."}
|
|
|
|
with self._lock:
|
|
self._reap_dead_locked()
|
|
try:
|
|
session = self._ensure_system_desktop_locked()
|
|
except Exception as exc:
|
|
status = collect_desktop_status()
|
|
return {
|
|
"available": False,
|
|
"error": str(exc),
|
|
"status": status,
|
|
}
|
|
refreshed = False
|
|
try:
|
|
if refresh and session.file_id == str(doc.get("file_id") or ""):
|
|
refreshed = self._reload_document_locked(session, doc)
|
|
else:
|
|
self._open_document_locked(session, doc)
|
|
except Exception as exc:
|
|
return {
|
|
"available": False,
|
|
"error": str(exc),
|
|
"status": collect_desktop_status(),
|
|
}
|
|
session.file_id = str(doc["file_id"])
|
|
session.extension = ext
|
|
session.path = str(doc["path"])
|
|
session.title = str(doc["basename"])
|
|
self._write_manifest(session)
|
|
public = session.public(doc)
|
|
public["refreshed"] = refreshed
|
|
return public
|
|
|
|
def refresh_document(self, file_id: str) -> dict[str, Any]:
|
|
normalized = str(file_id or "").strip()
|
|
if not normalized:
|
|
return {"ok": True, "refreshed": False}
|
|
try:
|
|
doc = document_store.get_document(normalized)
|
|
except Exception:
|
|
return {"ok": False, "refreshed": False, "error": "Document not found."}
|
|
|
|
ext = str(doc.get("extension") or "").lower()
|
|
if ext not in OFFICIAL_EXTENSIONS:
|
|
return {"ok": True, "refreshed": False}
|
|
|
|
with self._lock:
|
|
self._reap_dead_locked()
|
|
session = self._find_by_file_id_locked(normalized)
|
|
if not session:
|
|
existing = self._load_system_desktop_from_manifest_locked()
|
|
if existing:
|
|
self._sessions[existing.session_id] = existing
|
|
self._register_virtual_desktop(existing)
|
|
if existing.file_id == normalized:
|
|
session = existing
|
|
if not session:
|
|
return {"ok": True, "refreshed": False}
|
|
refreshed = self._reload_document_locked(session, doc)
|
|
session.file_id = str(doc["file_id"])
|
|
session.extension = ext
|
|
session.path = str(doc["path"])
|
|
session.title = str(doc["basename"])
|
|
self._write_manifest(session)
|
|
return {"ok": True, "refreshed": refreshed, "desktop": session.public(doc)}
|
|
|
|
def save(self, session_id: str, file_id: str = "") -> dict[str, Any]:
|
|
session = self.require(session_id)
|
|
doc = self._document_for_save(session, file_id)
|
|
xdotool = shutil.which("xdotool")
|
|
if not xdotool:
|
|
updated = document_store.register_document(doc["path"]) if doc else None
|
|
return {
|
|
"ok": False,
|
|
"error": "xdotool is not installed; use LibreOffice's Save control inside the canvas.",
|
|
"document": _public_doc(updated) if updated else None,
|
|
}
|
|
|
|
result = subprocess.run(
|
|
[xdotool, "key", "--clearmodifiers", "ctrl+s"],
|
|
check=False,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=8,
|
|
env=self._display_env(session),
|
|
)
|
|
time.sleep(0.8)
|
|
updated = document_store.register_document(doc["path"]) if doc else None
|
|
if result.returncode != 0:
|
|
detail = (result.stderr or result.stdout or "").strip()
|
|
return {
|
|
"ok": False,
|
|
"error": detail or "LibreOffice desktop save shortcut failed.",
|
|
"document": _public_doc(updated) if updated else None,
|
|
}
|
|
return {
|
|
"ok": True,
|
|
"session_id": session.session_id,
|
|
"document": _public_doc(updated) if updated else None,
|
|
}
|
|
|
|
def sync(self, session_id: str = "", file_id: str = "") -> dict[str, Any]:
|
|
session = self.get(session_id) if session_id else self._find_by_file_id(file_id)
|
|
if not session:
|
|
return {"ok": False, "error": "LibreOffice desktop session not found."}
|
|
if not _url_bridge_script_path(session).exists():
|
|
try:
|
|
self._prepare_desktop_url_bridge(session)
|
|
self._refresh_xfce_desktop(session)
|
|
except Exception:
|
|
pass
|
|
url_intents = self.claim_url_intents(session.session_id)
|
|
shutdown_request = self.claim_shutdown_request(session.session_id)
|
|
if shutdown_request:
|
|
return self.shutdown_system_desktop(
|
|
save_first=True,
|
|
source=str(shutdown_request.get("source") or "tray"),
|
|
)
|
|
doc = self._document_for_save(session, file_id)
|
|
if not doc:
|
|
return {
|
|
"ok": True,
|
|
"session_id": session.session_id,
|
|
"desktop": session.public(),
|
|
"url_intents": url_intents,
|
|
}
|
|
updated = document_store.register_document(doc["path"])
|
|
return {
|
|
"ok": True,
|
|
"session_id": session.session_id,
|
|
"document": _public_doc(updated),
|
|
"url_intents": url_intents,
|
|
}
|
|
|
|
def state(self, *, include_screenshot: bool = False, context_id: str = "") -> dict[str, Any]:
|
|
with self._lock:
|
|
self._reap_dead_locked()
|
|
return desktop_state.collect_state(
|
|
include_screenshot=include_screenshot,
|
|
context_id=context_id,
|
|
)
|
|
|
|
def claim_url_intents(self, session_id: str = SYSTEM_SESSION_ID) -> list[dict[str, Any]]:
|
|
session = self.get(session_id) or self.get(SYSTEM_SESSION_ID)
|
|
if not session:
|
|
return []
|
|
return _claim_url_intents(session)
|
|
|
|
def claim_shutdown_request(self, session_id: str = SYSTEM_SESSION_ID) -> dict[str, Any] | None:
|
|
session = self.get(session_id) or self.get(SYSTEM_SESSION_ID)
|
|
if not session:
|
|
return None
|
|
return _claim_shutdown_request(session)
|
|
|
|
def shutdown_system_desktop(self, *, save_first: bool = True, source: str = "api") -> dict[str, Any]:
|
|
with self._lock:
|
|
session = self._sessions.get(SYSTEM_SESSION_ID)
|
|
if not session:
|
|
_remove_system_manifest()
|
|
return {
|
|
"ok": True,
|
|
"closed": 0,
|
|
"session_id": SYSTEM_SESSION_ID,
|
|
"shutdown": True,
|
|
"intentional_shutdown": True,
|
|
"source": source,
|
|
}
|
|
|
|
save_result = None
|
|
if save_first:
|
|
try:
|
|
save_result = self.save(session.session_id)
|
|
except Exception as exc:
|
|
save_result = {"ok": False, "error": str(exc)}
|
|
|
|
with self._lock:
|
|
if self._sessions.get(SYSTEM_SESSION_ID) is session:
|
|
self._sessions.pop(SYSTEM_SESSION_ID, None)
|
|
virtual_desktop.unregister_session(session.token)
|
|
self._terminate_session(session, include_rehydrated=True)
|
|
self._remove_manifest(session.session_id)
|
|
_clear_shutdown_request(session)
|
|
return {
|
|
"ok": True,
|
|
"closed": 1,
|
|
"session_id": session.session_id,
|
|
"shutdown": True,
|
|
"intentional_shutdown": True,
|
|
"source": source,
|
|
"save": save_result,
|
|
}
|
|
|
|
def retarget_document(self, file_id: str, doc: dict[str, Any]) -> dict[str, Any]:
|
|
session = self._find_by_file_id(file_id)
|
|
if not session:
|
|
return {"ok": True, "updated": False}
|
|
with self._lock:
|
|
session.path = str(doc["path"])
|
|
session.title = str(doc["basename"])
|
|
session.extension = str(doc["extension"])
|
|
self._write_manifest(session)
|
|
return {"ok": True, "updated": True, "desktop": session.public(doc)}
|
|
|
|
def close(self, session_id: str, save_first: bool = True) -> dict[str, Any]:
|
|
with self._lock:
|
|
normalized = str(session_id or "").strip()
|
|
session = self._sessions.get(normalized)
|
|
if not session:
|
|
return {"ok": True, "closed": 0}
|
|
if session.session_id == SYSTEM_SESSION_ID:
|
|
save_result = None
|
|
if save_first:
|
|
try:
|
|
save_result = self.save(session.session_id)
|
|
except Exception as exc:
|
|
save_result = {"ok": False, "error": str(exc)}
|
|
return {
|
|
"ok": True,
|
|
"closed": 0,
|
|
"session_id": session.session_id,
|
|
"persistent": True,
|
|
"save": save_result,
|
|
}
|
|
|
|
save_result = None
|
|
if save_first:
|
|
try:
|
|
save_result = self.save(session.session_id)
|
|
except Exception as exc:
|
|
save_result = {"ok": False, "error": str(exc)}
|
|
with self._lock:
|
|
self._sessions.pop(session.session_id, None)
|
|
virtual_desktop.unregister_session(session.token)
|
|
self._terminate_session(session, include_rehydrated=True)
|
|
self._remove_manifest(session.session_id)
|
|
return {"ok": True, "closed": 1, "session_id": session.session_id, "save": save_result}
|
|
|
|
def close_file(self, file_id: str) -> int:
|
|
return 0
|
|
|
|
def resize(self, session_id: str, width: int, height: int) -> dict[str, Any]:
|
|
session = self.get(session_id)
|
|
if not session:
|
|
return {"ok": False, "error": "LibreOffice desktop session not found."}
|
|
is_system_desktop = session.session_id == SYSTEM_SESSION_ID and session.extension == "desktop"
|
|
result = virtual_desktop.resize_display(
|
|
display=session.display,
|
|
width=width,
|
|
height=height,
|
|
max_width=MAX_SCREEN_WIDTH,
|
|
max_height=MAX_SCREEN_HEIGHT,
|
|
window_class="" if is_system_desktop else "libreoffice",
|
|
keys=() if is_system_desktop else ("Escape",),
|
|
xauthority=self._xauthority(session),
|
|
home=str(session.profile_dir),
|
|
)
|
|
if result.get("ok"):
|
|
session.width = int(result["width"])
|
|
session.height = int(result["height"])
|
|
if not is_system_desktop:
|
|
self._dismiss_blocking_dialogs(session)
|
|
return result
|
|
|
|
def proxy_for_token(self, token: str) -> tuple[str, int] | None:
|
|
normalized = str(token or "").strip()
|
|
with self._lock:
|
|
session = self._sessions.get(normalized)
|
|
if not session:
|
|
session = next((item for item in self._sessions.values() if item.token == normalized), None)
|
|
if not session or not session.alive():
|
|
return None
|
|
return ("127.0.0.1", session.xpra_port)
|
|
|
|
def resize_for_token(self, token: str, width: int, height: int) -> dict[str, Any]:
|
|
normalized = str(token or "").strip()
|
|
with self._lock:
|
|
session = self._sessions.get(normalized)
|
|
if not session:
|
|
session = next((item for item in self._sessions.values() if item.token == normalized), None)
|
|
if not session:
|
|
return {"ok": False, "error": "LibreOffice desktop session not found."}
|
|
return self.resize(session.session_id, width, height)
|
|
|
|
def get(self, session_id: str) -> DesktopSession | None:
|
|
with self._lock:
|
|
session = self._sessions.get(str(session_id or "").strip())
|
|
return session if session and session.alive() else None
|
|
|
|
def require(self, session_id: str) -> DesktopSession:
|
|
session = self.get(session_id)
|
|
if not session:
|
|
raise FileNotFoundError(f"LibreOffice desktop session not found: {session_id}")
|
|
return session
|
|
|
|
def shutdown(self) -> None:
|
|
with self._lock:
|
|
sessions = list(self._sessions.values())
|
|
self._sessions.clear()
|
|
for session in sessions:
|
|
virtual_desktop.unregister_session(session.token)
|
|
if session.owns_processes:
|
|
self._terminate_session(session)
|
|
self._remove_manifest(session.session_id)
|
|
|
|
def sync_timezone(self, timezone: str | None = None) -> dict[str, Any]:
|
|
target_timezone = str(timezone or Localization.get().get_timezone()).strip()
|
|
if not target_timezone:
|
|
target_timezone = Localization.get().get_timezone()
|
|
|
|
with self._lock:
|
|
self._reap_dead_locked()
|
|
session = self._sessions.get(SYSTEM_SESSION_ID)
|
|
if not session or not session.alive():
|
|
return {"ok": True, "restarted": False, "reason": "no_active_desktop"}
|
|
if session.timezone == target_timezone:
|
|
return {"ok": True, "restarted": False, "timezone": target_timezone}
|
|
|
|
replacement = self._restart_system_desktop_for_timezone_locked(session)
|
|
return {
|
|
"ok": True,
|
|
"restarted": True,
|
|
"session_id": replacement.session_id,
|
|
"timezone": replacement.timezone,
|
|
}
|
|
|
|
def _document_for_save(self, session: DesktopSession, file_id: str = "") -> dict[str, Any] | None:
|
|
normalized = str(file_id or "").strip()
|
|
if normalized == SYSTEM_FILE_ID:
|
|
return None
|
|
if normalized and normalized != SYSTEM_FILE_ID:
|
|
return document_store.get_document(normalized)
|
|
if session.file_id and session.file_id != SYSTEM_FILE_ID:
|
|
try:
|
|
return document_store.get_document(session.file_id)
|
|
except Exception:
|
|
path = Path(session.path)
|
|
if path.is_file():
|
|
return document_store.register_document(path)
|
|
return None
|
|
|
|
def _restart_system_desktop_for_timezone_locked(self, session: DesktopSession) -> DesktopSession:
|
|
try:
|
|
doc = self._document_for_save(session, session.file_id)
|
|
except Exception:
|
|
doc = None
|
|
if doc:
|
|
try:
|
|
self.save(session.session_id, str(doc.get("file_id") or ""))
|
|
except Exception:
|
|
pass
|
|
|
|
virtual_desktop.unregister_session(session.token)
|
|
self._sessions.pop(session.session_id, None)
|
|
self._terminate_session(session, include_rehydrated=True)
|
|
self._remove_manifest(session.session_id)
|
|
|
|
replacement = self._ensure_system_desktop_locked()
|
|
if doc:
|
|
self._open_document_locked(replacement, doc)
|
|
replacement.file_id = str(doc["file_id"])
|
|
replacement.extension = str(doc["extension"])
|
|
replacement.path = str(doc["path"])
|
|
replacement.title = str(doc["basename"])
|
|
self._write_manifest(replacement)
|
|
return replacement
|
|
|
|
def _register_virtual_desktop(self, session: DesktopSession) -> None:
|
|
virtual_desktop.register_session(
|
|
token=session.token,
|
|
host="127.0.0.1",
|
|
port=session.xpra_port,
|
|
owner="desktop",
|
|
title=session.title,
|
|
resize=lambda width, height, session_id=session.session_id: self.resize(session_id, width, height),
|
|
)
|
|
|
|
def _ensure_system_desktop_locked(self) -> DesktopSession:
|
|
target_timezone = Localization.get().get_timezone()
|
|
existing = self._sessions.get(SYSTEM_SESSION_ID)
|
|
if existing and existing.alive():
|
|
if existing.timezone != target_timezone:
|
|
return self._restart_system_desktop_for_timezone_locked(existing)
|
|
self._prepare_desktop_url_bridge(existing)
|
|
self._refresh_xfce_desktop(existing)
|
|
return existing
|
|
|
|
existing = self._load_system_desktop_from_manifest_locked()
|
|
if existing:
|
|
self._sessions[existing.session_id] = existing
|
|
self._register_virtual_desktop(existing)
|
|
if existing.timezone != target_timezone:
|
|
return self._restart_system_desktop_for_timezone_locked(existing)
|
|
self._prepare_desktop_url_bridge(existing)
|
|
self._refresh_xfce_desktop(existing)
|
|
return existing
|
|
|
|
status = collect_desktop_status()
|
|
if not status["healthy"]:
|
|
raise RuntimeError(status["message"])
|
|
|
|
display, xpra_port = self._allocate_endpoint_locked()
|
|
profile_dir = PROFILE_DIR / SYSTEM_SESSION_ID
|
|
session = DesktopSession(
|
|
session_id=SYSTEM_SESSION_ID,
|
|
file_id=SYSTEM_FILE_ID,
|
|
extension="desktop",
|
|
path=str(document_store.document_binary_home()),
|
|
title=SYSTEM_TITLE,
|
|
display=display,
|
|
xpra_port=xpra_port,
|
|
token=SYSTEM_SESSION_ID,
|
|
url=_xpra_url(SYSTEM_SESSION_ID),
|
|
profile_dir=profile_dir,
|
|
timezone=target_timezone,
|
|
)
|
|
try:
|
|
self._prepare_profile(session)
|
|
self._prepare_desktop_launchers(session)
|
|
self._spawn_desktop_locked(session)
|
|
except Exception:
|
|
self._terminate_session(session)
|
|
raise
|
|
self._sessions[session.session_id] = session
|
|
self._register_virtual_desktop(session)
|
|
self._write_manifest(session)
|
|
return session
|
|
|
|
def _load_system_desktop_from_manifest_locked(self) -> DesktopSession | None:
|
|
manifest = SESSION_DIR / f"{SYSTEM_SESSION_ID}.json"
|
|
if not manifest.exists():
|
|
return None
|
|
try:
|
|
payload = json.loads(manifest.read_text(encoding="utf-8"))
|
|
display = int(payload.get("display") or 0)
|
|
xpra_port = int(payload.get("xpra_port") or 0)
|
|
process_ids = {
|
|
str(name): pid
|
|
for name, value in dict(payload.get("pids") or {}).items()
|
|
if (pid := _coerce_pid(value))
|
|
}
|
|
if not display or not xpra_port:
|
|
return None
|
|
if not _pid_is_running(process_ids.get("xpra", 0)):
|
|
return None
|
|
if not _port_is_accepting("127.0.0.1", xpra_port):
|
|
return None
|
|
|
|
path = str(payload.get("path") or document_store.document_binary_home())
|
|
file_id = str(payload.get("file_id") or SYSTEM_FILE_ID)
|
|
extension = str(payload.get("extension") or "").lower()
|
|
if not extension:
|
|
extension = "desktop" if file_id == SYSTEM_FILE_ID else Path(path).suffix.lower().lstrip(".")
|
|
title = str(payload.get("title") or "")
|
|
if not title:
|
|
title = SYSTEM_TITLE if file_id == SYSTEM_FILE_ID else Path(path).name or SYSTEM_TITLE
|
|
return DesktopSession(
|
|
session_id=SYSTEM_SESSION_ID,
|
|
file_id=file_id,
|
|
extension=extension,
|
|
path=path,
|
|
title=title,
|
|
display=display,
|
|
xpra_port=xpra_port,
|
|
token=SYSTEM_SESSION_ID,
|
|
url=_xpra_url(SYSTEM_SESSION_ID),
|
|
profile_dir=_state_path_from_retired_root(
|
|
Path(payload.get("profile_dir") or PROFILE_DIR / SYSTEM_SESSION_ID)
|
|
),
|
|
width=int(payload.get("width") or DEFAULT_SCREEN_WIDTH),
|
|
height=int(payload.get("height") or DEFAULT_SCREEN_HEIGHT),
|
|
process_ids=process_ids,
|
|
owns_processes=False,
|
|
started_at=float(payload.get("started_at") or time.time()),
|
|
timezone=str(payload.get("timezone") or Localization.get().get_timezone()),
|
|
)
|
|
except Exception:
|
|
return None
|
|
|
|
def _spawn_desktop_locked(self, session: DesktopSession) -> None:
|
|
STATE_DIR.mkdir(parents=True, exist_ok=True)
|
|
SESSION_DIR.mkdir(parents=True, exist_ok=True)
|
|
session.profile_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
xpra = _require_binary("xpra")
|
|
xvfb = _require_binary("Xvfb")
|
|
_require_binary("xfce4-session")
|
|
_require_binary("dbus-launch")
|
|
xfce_launcher = self._prepare_xfce_launcher(session)
|
|
|
|
session.processes["xvfb"] = subprocess.Popen(
|
|
_xvfb_command(xvfb, session),
|
|
stdin=subprocess.DEVNULL,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
env=self._session_env(session),
|
|
)
|
|
self._wait_for_display(session)
|
|
self._set_display_size(session, session.width, session.height)
|
|
self._prepare_root_window(session)
|
|
session.processes["xfce"] = subprocess.Popen(
|
|
[str(xfce_launcher)],
|
|
stdin=subprocess.DEVNULL,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
env=self._display_env(session),
|
|
)
|
|
self._wait_for_xfce(session)
|
|
session.processes["xpra"] = subprocess.Popen(
|
|
_xpra_shadow_command(xpra, session),
|
|
stdin=subprocess.DEVNULL,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
env=self._display_env(session),
|
|
)
|
|
_wait_for_port(
|
|
"127.0.0.1",
|
|
session.xpra_port,
|
|
timeout=PORT_START_TIMEOUT_SECONDS,
|
|
process=session.processes.get("xpra"),
|
|
)
|
|
self._refresh_xfce_desktop(session)
|
|
|
|
def _restart_xpra_shadow(self, session: DesktopSession) -> None:
|
|
xpra = _require_binary("xpra")
|
|
process = session.processes.get("xpra")
|
|
if process:
|
|
_terminate_process(process)
|
|
session.processes["xpra"] = subprocess.Popen(
|
|
_xpra_shadow_command(xpra, session),
|
|
stdin=subprocess.DEVNULL,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
env=self._display_env(session),
|
|
)
|
|
_wait_for_port(
|
|
"127.0.0.1",
|
|
session.xpra_port,
|
|
timeout=PORT_START_TIMEOUT_SECONDS,
|
|
process=session.processes.get("xpra"),
|
|
)
|
|
|
|
def _open_document_locked(self, session: DesktopSession, doc: dict[str, Any]) -> None:
|
|
soffice = libreoffice.find_soffice()
|
|
if not soffice:
|
|
raise RuntimeError("LibreOffice is not installed in this runtime.")
|
|
path = str(doc["path"])
|
|
self._remove_stale_lock_file(session, path=path)
|
|
process_key = f"soffice-{doc['file_id']}"
|
|
session.processes[process_key] = subprocess.Popen(
|
|
[
|
|
soffice,
|
|
"--norestore",
|
|
"--nofirststartwizard",
|
|
"--nolockcheck",
|
|
f"-env:UserInstallation=file://{session.profile_dir}",
|
|
path,
|
|
],
|
|
cwd=str(Path(path).parent),
|
|
stdin=subprocess.DEVNULL,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
env=self._display_env(session),
|
|
)
|
|
self._fit_office_window(session, process=session.processes[process_key])
|
|
window_id = self._wait_for_office_window_locked(
|
|
session,
|
|
title=str(doc.get("basename") or ""),
|
|
process=session.processes[process_key],
|
|
)
|
|
if not window_id:
|
|
raise RuntimeError(
|
|
f"LibreOffice did not show {doc.get('basename') or 'the document'} "
|
|
f"on desktop :{session.display}.",
|
|
)
|
|
self._fit_office_window_id_locked(
|
|
session,
|
|
window_id,
|
|
env=self._display_env(session),
|
|
keys=("Escape",),
|
|
)
|
|
|
|
def _reload_document_locked(self, session: DesktopSession, doc: dict[str, Any]) -> bool:
|
|
if self._close_document_window_locked(session, doc):
|
|
self._open_document_locked(session, doc)
|
|
return True
|
|
if self._send_reload_shortcut_locked(session, doc):
|
|
return True
|
|
self._open_document_locked(session, doc)
|
|
return False
|
|
|
|
def _close_document_window_locked(self, session: DesktopSession, doc: dict[str, Any]) -> bool:
|
|
xdotool = shutil.which("xdotool")
|
|
if not xdotool:
|
|
return False
|
|
title = str(doc.get("basename") or Path(str(doc.get("path") or "")).name or "").strip()
|
|
if not title:
|
|
return False
|
|
window_id = self._office_window_id_locked(session, title=title, fallback=False)
|
|
if not window_id:
|
|
return False
|
|
env = self._display_env(session)
|
|
self._fit_office_window_id_locked(session, window_id, env=env, keys=("Escape",))
|
|
for command in (
|
|
[xdotool, "key", "--clearmodifiers", "alt+F4"],
|
|
[xdotool, "windowclose", window_id],
|
|
):
|
|
try:
|
|
subprocess.run(
|
|
command,
|
|
check=False,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
timeout=3,
|
|
env=env,
|
|
)
|
|
except (OSError, subprocess.TimeoutExpired):
|
|
continue
|
|
if self._wait_for_window_closed_locked(session, window_id):
|
|
return True
|
|
self._dismiss_blocking_dialogs(session)
|
|
return self._wait_for_window_closed_locked(session, window_id, timeout_seconds=1.5)
|
|
|
|
def _send_reload_shortcut_locked(self, session: DesktopSession, doc: dict[str, Any]) -> bool:
|
|
xdotool = shutil.which("xdotool")
|
|
if not xdotool:
|
|
return False
|
|
window_id = self._office_window_id_locked(
|
|
session,
|
|
title=str(doc.get("basename") or ""),
|
|
)
|
|
if not window_id:
|
|
return False
|
|
env = self._display_env(session)
|
|
self._fit_office_window_id_locked(session, window_id, env=env, keys=("Escape",))
|
|
try:
|
|
result = subprocess.run(
|
|
[xdotool, "key", "--clearmodifiers", "ctrl+shift+r"],
|
|
check=False,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=4,
|
|
env=env,
|
|
)
|
|
except (OSError, subprocess.TimeoutExpired):
|
|
return False
|
|
time.sleep(0.8)
|
|
self._dismiss_blocking_dialogs(session)
|
|
self._fit_office_window_id_locked(session, window_id, env=env, keys=("Escape",))
|
|
return result.returncode == 0
|
|
|
|
def _office_window_id_locked(
|
|
self,
|
|
session: DesktopSession,
|
|
*,
|
|
title: str = "",
|
|
fallback: bool = True,
|
|
) -> str:
|
|
xdotool = shutil.which("xdotool")
|
|
if not xdotool:
|
|
return ""
|
|
env = self._display_env(session)
|
|
title = str(title or "").strip()
|
|
searches: list[list[str]] = []
|
|
if title:
|
|
escaped_title = re.escape(title)
|
|
searches.append([
|
|
xdotool,
|
|
"search",
|
|
"--onlyvisible",
|
|
"--name",
|
|
escaped_title,
|
|
])
|
|
for window_class in (
|
|
"libreoffice",
|
|
"libreoffice-writer",
|
|
"libreoffice-calc",
|
|
"libreoffice-impress",
|
|
):
|
|
searches.append([
|
|
xdotool,
|
|
"search",
|
|
"--onlyvisible",
|
|
"--class",
|
|
window_class,
|
|
"--name",
|
|
escaped_title,
|
|
])
|
|
if fallback:
|
|
for window_class in (
|
|
"libreoffice",
|
|
"libreoffice-writer",
|
|
"libreoffice-calc",
|
|
"libreoffice-impress",
|
|
):
|
|
searches.append([xdotool, "search", "--onlyvisible", "--class", window_class])
|
|
searches.append([xdotool, "search", "--onlyvisible", "--name", "LibreOffice"])
|
|
for command in searches:
|
|
try:
|
|
result = subprocess.run(
|
|
command,
|
|
check=False,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=2,
|
|
env=env,
|
|
)
|
|
except (OSError, subprocess.TimeoutExpired):
|
|
continue
|
|
window_ids = [
|
|
line.strip()
|
|
for line in result.stdout.splitlines()
|
|
if line.strip()
|
|
]
|
|
if window_ids:
|
|
return window_ids[-1]
|
|
return ""
|
|
|
|
def _wait_for_office_window_locked(
|
|
self,
|
|
session: DesktopSession,
|
|
*,
|
|
title: str = "",
|
|
process: subprocess.Popen[Any] | None = None,
|
|
timeout_seconds: float = 20.0,
|
|
) -> str:
|
|
deadline = time.time() + timeout_seconds
|
|
last_fallback = ""
|
|
title = str(title or "").strip()
|
|
while time.time() < deadline:
|
|
window_id = self._office_window_id_locked(session, title=title, fallback=False)
|
|
if window_id:
|
|
return window_id
|
|
last_fallback = self._office_window_id_locked(session, fallback=True) or last_fallback
|
|
if last_fallback and not title:
|
|
return last_fallback
|
|
if process and process.poll() is not None and last_fallback:
|
|
return last_fallback
|
|
time.sleep(0.25)
|
|
return self._office_window_id_locked(session, title=title, fallback=True) or last_fallback
|
|
|
|
def _wait_for_window_closed_locked(
|
|
self,
|
|
session: DesktopSession,
|
|
window_id: str,
|
|
*,
|
|
timeout_seconds: float = 6.0,
|
|
) -> bool:
|
|
deadline = time.time() + timeout_seconds
|
|
while time.time() < deadline:
|
|
if not self._window_exists_locked(session, window_id):
|
|
return True
|
|
time.sleep(0.2)
|
|
return not self._window_exists_locked(session, window_id)
|
|
|
|
def _window_exists_locked(self, session: DesktopSession, window_id: str) -> bool:
|
|
xdotool = shutil.which("xdotool")
|
|
if not xdotool or not window_id:
|
|
return False
|
|
try:
|
|
result = subprocess.run(
|
|
[xdotool, "getwindowname", str(window_id)],
|
|
check=False,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
timeout=2,
|
|
env=self._display_env(session),
|
|
)
|
|
except (OSError, subprocess.TimeoutExpired):
|
|
return False
|
|
return result.returncode == 0
|
|
|
|
def _fit_office_window_id_locked(
|
|
self,
|
|
session: DesktopSession,
|
|
window_id: str,
|
|
*,
|
|
env: dict[str, str],
|
|
keys: tuple[str, ...] = (),
|
|
) -> None:
|
|
xdotool = shutil.which("xdotool")
|
|
if not xdotool or not window_id:
|
|
return
|
|
for command in (
|
|
[xdotool, "windowactivate", window_id],
|
|
[
|
|
xdotool,
|
|
"windowmove",
|
|
window_id,
|
|
"0",
|
|
"0",
|
|
"windowsize",
|
|
window_id,
|
|
str(session.width),
|
|
str(session.height),
|
|
],
|
|
):
|
|
try:
|
|
subprocess.run(
|
|
command,
|
|
check=False,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
timeout=2,
|
|
env=env,
|
|
)
|
|
except (OSError, subprocess.TimeoutExpired):
|
|
continue
|
|
for key in keys:
|
|
try:
|
|
subprocess.run(
|
|
[xdotool, "key", "--clearmodifiers", key],
|
|
check=False,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
timeout=2,
|
|
env=env,
|
|
)
|
|
except (OSError, subprocess.TimeoutExpired):
|
|
continue
|
|
|
|
def _prepare_profile(self, session: DesktopSession) -> None:
|
|
user_dir = session.profile_dir / "user"
|
|
user_dir.mkdir(parents=True, exist_ok=True)
|
|
registry = user_dir / "registrymodifications.xcu"
|
|
_write_libreoffice_registry_defaults(registry, document_store.document_home())
|
|
|
|
def _prepare_desktop_launchers(self, session: DesktopSession) -> None:
|
|
soffice = libreoffice.find_soffice()
|
|
if not soffice:
|
|
raise RuntimeError("LibreOffice is not installed in this runtime.")
|
|
workdir_home = document_store.document_home()
|
|
workdir_home.mkdir(parents=True, exist_ok=True)
|
|
documents_home = document_store.document_binary_home()
|
|
documents_home.mkdir(parents=True, exist_ok=True)
|
|
downloads_home = Path(files.get_abs_path("usr", "downloads"))
|
|
downloads_home.mkdir(parents=True, exist_ok=True)
|
|
|
|
desktop_dir = session.profile_dir / "Desktop"
|
|
desktop_dir.mkdir(parents=True, exist_ok=True)
|
|
_install_desktop_readme(desktop_dir)
|
|
_remove_path_if_owned(desktop_dir / "Browser.desktop")
|
|
_remove_path_if_owned(desktop_dir / "Files.desktop")
|
|
config_dir = session.profile_dir / ".config"
|
|
config_dir.mkdir(parents=True, exist_ok=True)
|
|
_remove_path_if_owned(config_dir / "xfce4" / "panel")
|
|
data_dir = session.profile_dir / ".local" / "share"
|
|
data_dir.mkdir(parents=True, exist_ok=True)
|
|
applications_dir = data_dir / "applications"
|
|
applications_dir.mkdir(parents=True, exist_ok=True)
|
|
cache_dir = session.profile_dir / ".cache"
|
|
cache_dir.mkdir(parents=True, exist_ok=True)
|
|
(config_dir / "user-dirs.dirs").write_text(
|
|
"\n".join(
|
|
[
|
|
'XDG_DESKTOP_DIR="$HOME/Desktop"',
|
|
f'XDG_DOCUMENTS_DIR="{workdir_home}"',
|
|
f'XDG_DOWNLOAD_DIR="{downloads_home}"',
|
|
f'XDG_TEMPLATES_DIR="{workdir_home}"',
|
|
f'XDG_PUBLICSHARE_DIR="{workdir_home}"',
|
|
f'XDG_MUSIC_DIR="{workdir_home}"',
|
|
f'XDG_PICTURES_DIR="{downloads_home}"',
|
|
f'XDG_VIDEOS_DIR="{workdir_home}"',
|
|
"",
|
|
],
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
xfce_conf_dir = config_dir / "xfce4" / "xfconf" / "xfce-perchannel-xml"
|
|
xfce_conf_dir.mkdir(parents=True, exist_ok=True)
|
|
(xfce_conf_dir / "xfce4-desktop.xml").write_text(
|
|
f"""<?xml version="1.1" encoding="UTF-8"?>
|
|
|
|
<channel name="xfce4-desktop" version="1.0">
|
|
<property name="last-settings-migration-version" type="uint" value="1"/>
|
|
<property name="backdrop" type="empty">
|
|
<property name="screen0" type="empty">
|
|
<property name="monitor0" type="empty">
|
|
<property name="image-path" type="string" value="{_xml_attr(str(downloads_home))}"/>
|
|
</property>
|
|
</property>
|
|
</property>
|
|
<property name="desktop-icons" type="empty">
|
|
<property name="style" type="int" value="2"/>
|
|
<property name="file-icons" type="empty">
|
|
<property name="show-home" type="bool" value="false"/>
|
|
<property name="show-filesystem" type="bool" value="false"/>
|
|
<property name="show-removable" type="bool" value="false"/>
|
|
<property name="show-trash" type="bool" value="false"/>
|
|
</property>
|
|
</property>
|
|
</channel>
|
|
""",
|
|
encoding="utf-8",
|
|
)
|
|
_write_thunar_defaults(xfce_conf_dir / "thunar.xml")
|
|
self._hide_xpra_desktop_entries(applications_dir)
|
|
self._hide_xfce_menu_entries(applications_dir)
|
|
self._prepare_desktop_url_bridge(session)
|
|
|
|
base_args = (
|
|
soffice,
|
|
"--norestore",
|
|
"--nofirststartwizard",
|
|
"--nolockcheck",
|
|
f"-env:UserInstallation=file://{session.profile_dir}",
|
|
)
|
|
office_launchers = (
|
|
("LibreOffice Writer", "libreoffice-writer", "--writer", "Office;WordProcessor;"),
|
|
("LibreOffice Calc", "libreoffice-calc", "--calc", "Office;Spreadsheet;"),
|
|
("LibreOffice Impress", "libreoffice-impress", "--impress", "Office;Presentation;"),
|
|
)
|
|
for name, icon, mode, categories in office_launchers:
|
|
_write_desktop_launcher(
|
|
desktop_dir / f"{name}.desktop",
|
|
name=name,
|
|
exec_line=_desktop_exec(*base_args, mode),
|
|
icon=icon,
|
|
categories=categories,
|
|
try_exec=soffice,
|
|
working_dir=workdir_home,
|
|
)
|
|
|
|
terminal = shutil.which("xfce4-terminal") or "xfce4-terminal"
|
|
settings = shutil.which("xfce4-settings-manager") or "xfce4-settings-manager"
|
|
desktop_apps = (
|
|
{
|
|
"filename": "Terminal.desktop",
|
|
"name": "Terminal",
|
|
"exec": _desktop_exec(terminal, f"--working-directory={workdir_home}"),
|
|
"try_exec": terminal,
|
|
"icon": _desktop_icon(
|
|
"/usr/share/icons/hicolor/128x128/apps/org.xfce.terminal.png",
|
|
"/usr/share/icons/hicolor/scalable/apps/org.xfce.terminal.svg",
|
|
"org.xfce.terminal",
|
|
"utilities-terminal",
|
|
),
|
|
"categories": "System;TerminalEmulator;",
|
|
},
|
|
{
|
|
"filename": "Settings.desktop",
|
|
"name": "Settings",
|
|
"exec": _desktop_exec(settings),
|
|
"try_exec": settings,
|
|
"icon": _desktop_icon(
|
|
"/usr/share/icons/hicolor/128x128/apps/org.xfce.settings.manager.png",
|
|
"/usr/share/icons/hicolor/scalable/apps/org.xfce.settings.manager.svg",
|
|
"org.xfce.settings.manager",
|
|
"preferences-system",
|
|
),
|
|
"categories": "Settings;DesktopSettings;",
|
|
},
|
|
)
|
|
for app in desktop_apps:
|
|
_write_desktop_launcher(
|
|
desktop_dir / str(app["filename"]),
|
|
name=str(app["name"]),
|
|
exec_line=str(app["exec"]),
|
|
icon=str(app["icon"]),
|
|
categories=str(app["categories"]),
|
|
try_exec=str(app["try_exec"]),
|
|
)
|
|
_ensure_desktop_folder_link(desktop_dir, "Workdir", workdir_home)
|
|
for label, target_parts in DESKTOP_FOLDER_LINKS:
|
|
_ensure_desktop_folder_link(desktop_dir, label, Path(files.get_abs_path(*target_parts)))
|
|
|
|
self._trust_desktop_launchers(session, desktop_dir)
|
|
self._prepare_xfce_panel_config(session)
|
|
self._prepare_xfce_profile_autostart(session)
|
|
|
|
def _prepare_desktop_url_bridge(self, session: DesktopSession) -> None:
|
|
desktop_dir = session.profile_dir / "Desktop"
|
|
config_dir = session.profile_dir / ".config"
|
|
data_dir = session.profile_dir / ".local" / "share"
|
|
applications_dir = data_dir / "applications"
|
|
desktop_dir.mkdir(parents=True, exist_ok=True)
|
|
config_dir.mkdir(parents=True, exist_ok=True)
|
|
applications_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
browser_bridge = _write_url_bridge_script(session)
|
|
shutdown_bridge = _write_shutdown_bridge_script(session)
|
|
helpers_rc = config_dir / "xfce4" / "helpers.rc"
|
|
helpers_rc.parent.mkdir(parents=True, exist_ok=True)
|
|
helpers_rc.write_text(
|
|
"\n".join(
|
|
[
|
|
"TerminalEmulator=xfce4-terminal",
|
|
"FileManager=thunar",
|
|
"WebBrowser=agent-zero-browser",
|
|
"",
|
|
],
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
_write_xfce_browser_helper(
|
|
config_dir / "xfce4" / "helpers" / "agent-zero-browser.desktop",
|
|
browser_bridge,
|
|
)
|
|
_write_mimeapps_defaults(config_dir / "mimeapps.list", URL_HANDLER_DESKTOP_ID)
|
|
_write_mimeapps_defaults(data_dir / "applications" / "mimeapps.list", URL_HANDLER_DESKTOP_ID)
|
|
_write_desktop_launcher(
|
|
applications_dir / URL_HANDLER_DESKTOP_ID,
|
|
name="Agent Zero Browser",
|
|
exec_line=_desktop_exec(browser_bridge, "%U"),
|
|
icon="web-browser",
|
|
categories="Network;WebBrowser;",
|
|
try_exec=str(browser_bridge),
|
|
mime_types=_url_handler_mime_types(),
|
|
no_display=True,
|
|
)
|
|
_write_desktop_launcher(
|
|
applications_dir / SHUTDOWN_HANDLER_DESKTOP_ID,
|
|
name="Shutdown Desktop",
|
|
exec_line=_desktop_exec(shutdown_bridge),
|
|
icon="system-shutdown",
|
|
categories="System;",
|
|
try_exec=str(shutdown_bridge),
|
|
no_display=True,
|
|
)
|
|
_write_desktop_launcher(
|
|
config_dir / "xfce4" / "panel" / "launcher-9" / SHUTDOWN_HANDLER_DESKTOP_ID,
|
|
name="Shutdown Desktop",
|
|
exec_line=_desktop_exec(shutdown_bridge),
|
|
icon="system-shutdown",
|
|
categories="System;",
|
|
try_exec=str(shutdown_bridge),
|
|
)
|
|
_write_desktop_launcher(
|
|
desktop_dir / "Browser.desktop",
|
|
name="Browser",
|
|
exec_line=_desktop_exec(browser_bridge),
|
|
icon="web-browser",
|
|
categories="Network;WebBrowser;",
|
|
try_exec=str(browser_bridge),
|
|
)
|
|
self._trust_desktop_launchers(session, desktop_dir)
|
|
|
|
def _hide_xpra_desktop_entries(self, applications_dir: Path) -> None:
|
|
for filename in HIDDEN_XPRA_DESKTOP_ENTRIES:
|
|
_write_hidden_application_entry(applications_dir / filename, "Xpra")
|
|
|
|
def _hide_xfce_menu_entries(self, applications_dir: Path) -> None:
|
|
for filename, name in HIDDEN_XFCE_MENU_ENTRIES:
|
|
_write_hidden_application_entry(applications_dir / filename, name)
|
|
|
|
def _prepare_xfce_panel_config(self, session: DesktopSession) -> None:
|
|
panel_xml = (
|
|
session.profile_dir
|
|
/ ".config"
|
|
/ "xfce4"
|
|
/ "xfconf"
|
|
/ "xfce-perchannel-xml"
|
|
/ "xfce4-panel.xml"
|
|
)
|
|
panel_xml.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
root = ET.Element("channel", {"name": "xfce4-panel", "version": "1.0"})
|
|
ET.SubElement(root, "property", {"name": "configver", "type": "int", "value": "2"})
|
|
|
|
panels = ET.SubElement(root, "property", {"name": "panels", "type": "array"})
|
|
ET.SubElement(panels, "value", {"type": "int", "value": "1"})
|
|
panel = ET.SubElement(panels, "property", {"name": "panel-1", "type": "empty"})
|
|
for name, prop_type, value in (
|
|
("position", "string", "p=6;x=0;y=0"),
|
|
("length", "uint", "100"),
|
|
("position-locked", "bool", "true"),
|
|
("size", "uint", "24"),
|
|
("mode", "uint", "0"),
|
|
("autohide-behavior", "uint", "0"),
|
|
("disable-struts", "bool", "false"),
|
|
("nrows", "uint", "1"),
|
|
):
|
|
ET.SubElement(panel, "property", {"name": name, "type": prop_type, "value": value})
|
|
plugin_ids = ET.SubElement(panel, "property", {"name": "plugin-ids", "type": "array"})
|
|
for plugin_id in ("1", "2", "3", "4", "5", "6", "7", "8", "9"):
|
|
ET.SubElement(plugin_ids, "value", {"type": "int", "value": plugin_id})
|
|
|
|
plugins = ET.SubElement(root, "property", {"name": "plugins", "type": "empty"})
|
|
ET.SubElement(plugins, "property", {"name": "plugin-1", "type": "string", "value": "applicationsmenu"})
|
|
ET.SubElement(plugins, "property", {"name": "plugin-2", "type": "string", "value": "tasklist"})
|
|
tasklist = _xfce_property(plugins, "plugin-2", "string", "tasklist")
|
|
ET.SubElement(tasklist, "property", {"name": "flat-buttons", "type": "bool", "value": "true"})
|
|
ET.SubElement(tasklist, "property", {"name": "show-handle", "type": "bool", "value": "false"})
|
|
ET.SubElement(tasklist, "property", {"name": "show-labels", "type": "bool", "value": "true"})
|
|
separator = ET.SubElement(plugins, "property", {"name": "plugin-3", "type": "string", "value": "separator"})
|
|
ET.SubElement(separator, "property", {"name": "expand", "type": "bool", "value": "true"})
|
|
ET.SubElement(separator, "property", {"name": "style", "type": "uint", "value": "0"})
|
|
ET.SubElement(plugins, "property", {"name": "plugin-4", "type": "string", "value": "pager"})
|
|
ET.SubElement(plugins, "property", {"name": "plugin-5", "type": "string", "value": "systray"})
|
|
ET.SubElement(plugins, "property", {"name": "plugin-6", "type": "string", "value": "separator"})
|
|
ET.SubElement(plugins, "property", {"name": "plugin-7", "type": "string", "value": "clock"})
|
|
ET.SubElement(plugins, "property", {"name": "plugin-8", "type": "string", "value": "separator"})
|
|
shutdown = ET.SubElement(plugins, "property", {"name": "plugin-9", "type": "string", "value": "launcher"})
|
|
shutdown_items = ET.SubElement(shutdown, "property", {"name": "items", "type": "array"})
|
|
ET.SubElement(shutdown_items, "value", {"type": "string", "value": SHUTDOWN_PANEL_LAUNCHER_ID})
|
|
|
|
tree = ET.ElementTree(root)
|
|
try:
|
|
ET.indent(tree, space=" ")
|
|
except AttributeError:
|
|
pass
|
|
tree.write(panel_xml, encoding="utf-8", xml_declaration=True)
|
|
|
|
def _prepare_xfce_profile_autostart(self, session: DesktopSession) -> None:
|
|
script = session.profile_dir / "prepare-xfce-profile.sh"
|
|
script.write_text(
|
|
"""#!/bin/sh
|
|
set -eu
|
|
export HOME="${HOME:-%s}"
|
|
export XDG_CONFIG_HOME="${XDG_CONFIG_HOME:-$HOME/.config}"
|
|
export XDG_DATA_HOME="${XDG_DATA_HOME:-$HOME/.local/share}"
|
|
export XDG_CACHE_HOME="${XDG_CACHE_HOME:-$HOME/.cache}"
|
|
export XDG_CONFIG_DIRS="/etc/xdg${XDG_CONFIG_DIRS:+:$XDG_CONFIG_DIRS}"
|
|
export XDG_DATA_DIRS="/usr/local/share:/usr/share${XDG_DATA_DIRS:+:$XDG_DATA_DIRS}"
|
|
export XDG_CURRENT_DESKTOP="${XDG_CURRENT_DESKTOP:-XFCE}"
|
|
mkdir -p "$HOME/Desktop" "$XDG_CONFIG_HOME" "$XDG_DATA_HOME" "$XDG_CACHE_HOME"
|
|
if command -v xfconf-query >/dev/null 2>&1; then
|
|
xfconf-query -c thunar -p /last-show-hidden -n -t bool -s true >/dev/null 2>&1 || true
|
|
xfconf-query -c xfce4-desktop -p /desktop-icons/style -n -t int -s 2 >/dev/null 2>&1 || true
|
|
xfconf-query -c xfce4-desktop -p /desktop-icons/file-icons/show-home -n -t bool -s false >/dev/null 2>&1 || true
|
|
xfconf-query -c xfce4-desktop -p /desktop-icons/file-icons/show-filesystem -n -t bool -s false >/dev/null 2>&1 || true
|
|
xfconf-query -c xfce4-desktop -p /desktop-icons/file-icons/show-removable -n -t bool -s false >/dev/null 2>&1 || true
|
|
xfconf-query -c xfce4-desktop -p /desktop-icons/file-icons/show-trash -n -t bool -s false >/dev/null 2>&1 || true
|
|
fi
|
|
for launcher in "$HOME"/Desktop/*.desktop; do
|
|
[ -f "$launcher" ] || continue
|
|
chmod +x "$launcher" 2>/dev/null || true
|
|
if command -v gio >/dev/null 2>&1; then
|
|
checksum="$(sha256sum "$launcher" 2>/dev/null | cut -d " " -f 1)"
|
|
gio set "$launcher" metadata::trusted true >/dev/null 2>&1 || true
|
|
if [ -n "$checksum" ]; then
|
|
gio set -t string "$launcher" metadata::xfce-exe-checksum "$checksum" >/dev/null 2>&1 || true
|
|
fi
|
|
fi
|
|
done
|
|
if command -v xfdesktop >/dev/null 2>&1; then
|
|
timeout 4 xfdesktop --reload >/dev/null 2>&1 || true
|
|
fi
|
|
""" % str(session.profile_dir),
|
|
encoding="utf-8",
|
|
)
|
|
try:
|
|
script.chmod(0o700)
|
|
except OSError:
|
|
pass
|
|
|
|
autostart_dir = session.profile_dir / ".config" / "autostart"
|
|
autostart_dir.mkdir(parents=True, exist_ok=True)
|
|
autostart = autostart_dir / "agent-zero-desktop.desktop"
|
|
autostart.write_text(
|
|
"\n".join(
|
|
[
|
|
"[Desktop Entry]",
|
|
"Type=Application",
|
|
"Name=Agent Zero desktop profile",
|
|
f"Exec={script}",
|
|
"Terminal=false",
|
|
"OnlyShowIn=XFCE;",
|
|
"X-GNOME-Autostart-enabled=true",
|
|
"",
|
|
],
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
def _prepare_xfce_launcher(self, session: DesktopSession) -> Path:
|
|
launcher = session.profile_dir / "start-xfce.sh"
|
|
launcher.write_text(
|
|
"\n".join(
|
|
[
|
|
"#!/bin/sh",
|
|
'export XDG_CONFIG_HOME="${XDG_CONFIG_HOME:-$HOME/.config}"',
|
|
'export XDG_DATA_HOME="${XDG_DATA_HOME:-$HOME/.local/share}"',
|
|
'export XDG_CACHE_HOME="${XDG_CACHE_HOME:-$HOME/.cache}"',
|
|
'export XDG_CONFIG_DIRS="/etc/xdg${XDG_CONFIG_DIRS:+:$XDG_CONFIG_DIRS}"',
|
|
'export XDG_DATA_DIRS="/usr/local/share:/usr/share${XDG_DATA_DIRS:+:$XDG_DATA_DIRS}"',
|
|
'export XDG_CURRENT_DESKTOP="${XDG_CURRENT_DESKTOP:-XFCE}"',
|
|
(
|
|
"exec dbus-launch --exit-with-session sh -c "
|
|
f"'\"{session.profile_dir / 'prepare-xfce-profile.sh'}\" >/dev/null 2>&1 || true; exec xfce4-session'"
|
|
),
|
|
"",
|
|
],
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
try:
|
|
launcher.chmod(0o700)
|
|
except OSError:
|
|
pass
|
|
return launcher
|
|
|
|
def _prepare_root_window(self, session: DesktopSession) -> None:
|
|
xsetroot = shutil.which("xsetroot")
|
|
if not xsetroot:
|
|
return
|
|
subprocess.run(
|
|
[xsetroot, "-solid", "#20242a"],
|
|
check=False,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
timeout=2,
|
|
env=self._display_env(session),
|
|
)
|
|
|
|
def _fit_office_window(
|
|
self,
|
|
session: DesktopSession,
|
|
*,
|
|
process: subprocess.Popen[Any] | None = None,
|
|
) -> None:
|
|
virtual_desktop.fit_window_until(
|
|
display=session.display,
|
|
width=session.width,
|
|
height=session.height,
|
|
window_class="libreoffice",
|
|
keys=("Escape",),
|
|
settle_seconds=4,
|
|
timeout_seconds=10,
|
|
process=process,
|
|
xauthority=self._xauthority(session),
|
|
home=str(session.profile_dir),
|
|
)
|
|
self._dismiss_blocking_dialogs(session)
|
|
|
|
def _set_display_size(self, session: DesktopSession, width: int, height: int) -> dict[str, Any]:
|
|
result = virtual_desktop.resize_display(
|
|
display=session.display,
|
|
width=width,
|
|
height=height,
|
|
max_width=MAX_SCREEN_WIDTH,
|
|
max_height=MAX_SCREEN_HEIGHT,
|
|
window_class="",
|
|
keys=(),
|
|
xauthority=self._xauthority(session),
|
|
home=str(session.profile_dir),
|
|
)
|
|
if result.get("ok"):
|
|
session.width = int(result["width"])
|
|
session.height = int(result["height"])
|
|
return result
|
|
|
|
def _dismiss_blocking_dialogs(self, session: DesktopSession) -> None:
|
|
virtual_desktop.close_windows(
|
|
display=session.display,
|
|
names=BLOCKING_DIALOG_TITLES,
|
|
xauthority=self._xauthority(session),
|
|
home=str(session.profile_dir),
|
|
)
|
|
|
|
def _refresh_xfce_desktop(self, session: DesktopSession) -> None:
|
|
xfdesktop = shutil.which("xfdesktop")
|
|
if not xfdesktop:
|
|
return
|
|
env = self._xfce_process_env(session, "xfdesktop")
|
|
try:
|
|
subprocess.run(
|
|
[xfdesktop, "--reload"],
|
|
check=False,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
timeout=4,
|
|
env=env,
|
|
)
|
|
except (OSError, subprocess.TimeoutExpired):
|
|
return
|
|
|
|
def _trust_desktop_launchers(self, session: DesktopSession, desktop_dir: Path) -> None:
|
|
gio = shutil.which("gio")
|
|
if not gio:
|
|
return
|
|
env = self._xfce_process_env(session, "xfdesktop")
|
|
for launcher in desktop_dir.glob("*.desktop"):
|
|
try:
|
|
launcher.chmod(0o755)
|
|
checksum = hashlib.sha256(launcher.read_bytes()).hexdigest()
|
|
except OSError:
|
|
continue
|
|
for command in (
|
|
[gio, "set", str(launcher), "metadata::trusted", "true"],
|
|
[gio, "set", "-t", "string", str(launcher), "metadata::xfce-exe-checksum", checksum],
|
|
):
|
|
try:
|
|
subprocess.run(
|
|
command,
|
|
check=False,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
timeout=4,
|
|
env=env,
|
|
)
|
|
except (OSError, subprocess.TimeoutExpired):
|
|
continue
|
|
|
|
def _xfce_process_env(self, session: DesktopSession, command_name: str) -> dict[str, str]:
|
|
env = self._display_env(session)
|
|
proc = Path("/proc")
|
|
for candidate in proc.iterdir():
|
|
if not candidate.name.isdigit():
|
|
continue
|
|
try:
|
|
if (candidate / "comm").read_text(encoding="utf-8").strip() != command_name:
|
|
continue
|
|
process_env = self._read_process_env(candidate)
|
|
except OSError:
|
|
continue
|
|
if process_env.get("HOME") != str(session.profile_dir):
|
|
continue
|
|
if process_env.get("DISPLAY") != f":{session.display}":
|
|
continue
|
|
for key, value in process_env.items():
|
|
if (
|
|
key in {"DBUS_SESSION_BUS_ADDRESS", "DISPLAY", "HOME", "XAUTHORITY"}
|
|
or key.startswith("XDG_")
|
|
):
|
|
env[key] = value
|
|
break
|
|
return env
|
|
|
|
def _read_process_env(self, proc_dir: Path) -> dict[str, str]:
|
|
raw = (proc_dir / "environ").read_bytes()
|
|
env: dict[str, str] = {}
|
|
for item in raw.split(b"\0"):
|
|
if not item or b"=" not in item:
|
|
continue
|
|
key, value = item.split(b"=", 1)
|
|
env[key.decode("utf-8", errors="ignore")] = value.decode("utf-8", errors="ignore")
|
|
return env
|
|
|
|
def _session_env(self, session: DesktopSession) -> dict[str, str]:
|
|
env = {
|
|
**os.environ,
|
|
"HOME": str(session.profile_dir),
|
|
"LANG": os.environ.get("LANG") or "C.UTF-8",
|
|
"TZ": session.timezone or Localization.get().get_timezone(),
|
|
}
|
|
browser_bridge = _url_bridge_script_path(session)
|
|
if browser_bridge.exists():
|
|
env["BROWSER"] = str(browser_bridge)
|
|
env.setdefault("XDG_RUNTIME_DIR", str(STATE_DIR / "xdg-runtime"))
|
|
runtime_dir = Path(env["XDG_RUNTIME_DIR"])
|
|
runtime_dir.mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
runtime_dir.chmod(0o700)
|
|
except OSError:
|
|
pass
|
|
return env
|
|
|
|
def _display_env(self, session: DesktopSession) -> dict[str, str]:
|
|
env = {
|
|
**self._session_env(session),
|
|
"DISPLAY": f":{session.display}",
|
|
"SAL_USE_VCLPLUGIN": os.environ.get("SAL_USE_VCLPLUGIN") or "gtk3",
|
|
}
|
|
xauthority = self._xauthority(session)
|
|
if xauthority:
|
|
env["XAUTHORITY"] = xauthority
|
|
return env
|
|
|
|
def _xauthority(self, session: DesktopSession) -> str:
|
|
path = session.profile_dir / ".Xauthority"
|
|
return str(path) if path.exists() else ""
|
|
|
|
def _allocate_endpoint_locked(self) -> tuple[int, int]:
|
|
used_displays = {session.display for session in self._sessions.values()}
|
|
used_ports = {session.xpra_port for session in self._sessions.values()}
|
|
for offset in range(MAX_SESSIONS):
|
|
display = DISPLAY_BASE + offset
|
|
port = XPRA_PORT_BASE + offset
|
|
if display in used_displays or port in used_ports:
|
|
continue
|
|
if _port_is_free(port):
|
|
return display, port
|
|
raise RuntimeError("No LibreOffice desktop slots are available.")
|
|
|
|
def _find_by_file_id_locked(self, file_id: str) -> DesktopSession | None:
|
|
for session in self._sessions.values():
|
|
if session.file_id == file_id and session.alive():
|
|
return session
|
|
return None
|
|
|
|
def _find_by_file_id(self, file_id: str) -> DesktopSession | None:
|
|
with self._lock:
|
|
return self._find_by_file_id_locked(str(file_id or "").strip())
|
|
|
|
def _reap_dead_locked(self) -> None:
|
|
for session_id, session in list(self._sessions.items()):
|
|
if not session.alive():
|
|
self._terminate_session(session)
|
|
virtual_desktop.unregister_session(session.token)
|
|
self._sessions.pop(session_id, None)
|
|
self._remove_manifest(session_id)
|
|
|
|
def _wait_for_display(self, session: DesktopSession) -> None:
|
|
marker = Path(f"/tmp/.X11-unix/X{session.display}")
|
|
deadline = time.time() + DISPLAY_START_TIMEOUT_SECONDS
|
|
while time.time() < deadline:
|
|
process = session.processes.get("xvfb") or session.processes.get("xpra")
|
|
if process and process.poll() is not None:
|
|
raise RuntimeError("The LibreOffice X display exited before it was ready.")
|
|
if marker.exists():
|
|
return
|
|
time.sleep(0.1)
|
|
raise TimeoutError("Timed out waiting for the LibreOffice X display.")
|
|
|
|
def _wait_for_xfce(self, session: DesktopSession) -> None:
|
|
deadline = time.time() + STARTUP_GRACE_SECONDS
|
|
while time.time() < deadline:
|
|
process = session.processes.get("xfce")
|
|
if process and process.poll() is not None:
|
|
return
|
|
if virtual_desktop.has_window(
|
|
display=session.display,
|
|
name="xfce4-panel",
|
|
xauthority=self._xauthority(session),
|
|
home=str(session.profile_dir),
|
|
):
|
|
return
|
|
time.sleep(0.25)
|
|
|
|
def _write_manifest(self, session: DesktopSession) -> None:
|
|
SESSION_DIR.mkdir(parents=True, exist_ok=True)
|
|
pids = dict(session.process_ids)
|
|
pids.update({name: process.pid for name, process in session.processes.items()})
|
|
payload = {
|
|
"session_id": session.session_id,
|
|
"file_id": session.file_id,
|
|
"extension": session.extension,
|
|
"path": session.path,
|
|
"title": session.title,
|
|
"display": session.display,
|
|
"xpra_port": session.xpra_port,
|
|
"profile_dir": str(session.profile_dir),
|
|
"width": session.width,
|
|
"height": session.height,
|
|
"started_at": session.started_at,
|
|
"timezone": session.timezone,
|
|
"owner_pid": os.getpid(),
|
|
"pids": pids,
|
|
}
|
|
(SESSION_DIR / f"{session.session_id}.json").write_text(json.dumps(payload), encoding="utf-8")
|
|
|
|
def _remove_manifest(self, session_id: str) -> None:
|
|
(SESSION_DIR / f"{session_id}.json").unlink(missing_ok=True)
|
|
|
|
def _terminate_session(self, session: DesktopSession, *, include_rehydrated: bool = False) -> None:
|
|
process_names = [name for name in session.processes if name.startswith("soffice")]
|
|
process_names.extend(["xfce", "xpra", "xvfb"])
|
|
terminated_pids: set[int] = set()
|
|
for name in process_names:
|
|
process = session.processes.get(name)
|
|
if not process:
|
|
continue
|
|
if process.pid:
|
|
terminated_pids.add(process.pid)
|
|
_terminate_process(process)
|
|
if session.owns_processes or include_rehydrated:
|
|
for name, pid in session.process_ids.items():
|
|
if pid in terminated_pids:
|
|
continue
|
|
if name.startswith("soffice") or name in {"xfce", "xpra", "xvfb"}:
|
|
_kill_pid(pid)
|
|
self._remove_stale_lock_file(session)
|
|
|
|
def _remove_stale_lock_file(self, session: DesktopSession, *, path: str | Path | None = None) -> None:
|
|
path = Path(path or session.path)
|
|
if not path.name:
|
|
return
|
|
lock_file = path.with_name(f".~lock.{path.name}#")
|
|
try:
|
|
lock_file.unlink(missing_ok=True)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def collect_desktop_status() -> dict[str, Any]:
|
|
desktop = virtual_desktop.collect_status()
|
|
binaries = {
|
|
**desktop["binaries"],
|
|
"soffice": libreoffice.find_soffice(),
|
|
"thunar": shutil.which("thunar") or "",
|
|
"xfce4-terminal": shutil.which("xfce4-terminal") or "",
|
|
"xfce4-settings-manager": shutil.which("xfce4-settings-manager") or "",
|
|
"gio": shutil.which("gio") or "",
|
|
}
|
|
missing = [
|
|
name
|
|
for name in (
|
|
"soffice",
|
|
"thunar",
|
|
"xfce4-terminal",
|
|
"xfce4-settings-manager",
|
|
"gio",
|
|
)
|
|
if not binaries[name]
|
|
]
|
|
missing.extend(
|
|
name
|
|
for name in ("xpra", "Xvfb", "xfce4-session", "dbus-launch", "xrandr", "xdotool")
|
|
if not binaries.get(name)
|
|
)
|
|
if not desktop.get("xpra_html_root"):
|
|
missing.append("xpra-html5")
|
|
if desktop.get("binaries", {}).get("xpra") and desktop.get("packages", {}).get("xpra-x11") is False:
|
|
missing.append("xpra-x11")
|
|
healthy = not missing
|
|
preparation = _runtime_preparation_status()
|
|
installing = bool(preparation.get("preparing")) and not healthy
|
|
return {
|
|
"ok": True,
|
|
"healthy": healthy,
|
|
"state": "healthy" if healthy else "installing" if installing else "missing",
|
|
"installing": installing,
|
|
"missing": missing,
|
|
"preparation": preparation,
|
|
"binaries": binaries,
|
|
"xpra_html_root": str(desktop.get("xpra_html_root") or ""),
|
|
"message": (
|
|
"Agent Zero Desktop sessions are available."
|
|
if healthy
|
|
else RUNTIME_INSTALL_MESSAGE
|
|
if installing
|
|
else f"Agent Zero Desktop sessions need: {', '.join(missing)}."
|
|
),
|
|
}
|
|
|
|
|
|
def _state_path_from_retired_root(path: Path) -> Path:
|
|
try:
|
|
relative = path.resolve(strict=False).relative_to(
|
|
RETIRED_STATE_DIR.resolve(strict=False)
|
|
)
|
|
except ValueError:
|
|
return path
|
|
return STATE_DIR / relative
|
|
|
|
|
|
def _runtime_preparation_status() -> dict[str, Any]:
|
|
try:
|
|
from plugins._desktop import hooks
|
|
|
|
return hooks.runtime_preparation_status()
|
|
except Exception:
|
|
return {"preparing": False, "active_count": 0, "started_at": 0.0, "completed_at": 0.0}
|
|
|
|
|
|
def cleanup_stale_runtime_state() -> dict[str, Any]:
|
|
killed: list[int] = []
|
|
errors: list[str] = []
|
|
seen_dirs: set[Path] = set()
|
|
for session_dir in (SESSION_DIR, *LEGACY_SESSION_DIRS):
|
|
if session_dir in seen_dirs:
|
|
continue
|
|
seen_dirs.add(session_dir)
|
|
if session_dir.exists():
|
|
for manifest in session_dir.glob("*.json"):
|
|
try:
|
|
payload = json.loads(manifest.read_text(encoding="utf-8"))
|
|
owner_pid = _coerce_pid(payload.get("owner_pid"))
|
|
if owner_pid and _pid_is_running(owner_pid):
|
|
continue
|
|
for pid in dict(payload.get("pids") or {}).values():
|
|
pid_int = _coerce_pid(pid)
|
|
if not pid_int:
|
|
continue
|
|
if _kill_pid(pid_int):
|
|
killed.append(pid_int)
|
|
manifest.unlink(missing_ok=True)
|
|
except Exception as exc:
|
|
errors.append(str(exc))
|
|
return {"ok": not errors, "killed": killed, "errors": errors}
|
|
|
|
|
|
def get_manager() -> DesktopSessionManager:
|
|
global _manager
|
|
try:
|
|
return _manager
|
|
except NameError:
|
|
_manager = DesktopSessionManager()
|
|
atexit.register(_manager.shutdown)
|
|
return _manager
|
|
|
|
|
|
def _xpra_url(token: str) -> str:
|
|
return virtual_desktop.session_url(token, title="Desktop")
|
|
|
|
|
|
def _xvfb_command(xvfb: str, session: DesktopSession) -> list[str]:
|
|
return [
|
|
xvfb,
|
|
f":{session.display}",
|
|
"-screen",
|
|
"0",
|
|
f"{MAX_SCREEN_WIDTH}x{MAX_SCREEN_HEIGHT}x24",
|
|
"+extension",
|
|
"GLX",
|
|
"+extension",
|
|
"RANDR",
|
|
"+extension",
|
|
"RENDER",
|
|
"+extension",
|
|
"Composite",
|
|
"-extension",
|
|
"DOUBLE-BUFFER",
|
|
"-nolisten",
|
|
"tcp",
|
|
"-noreset",
|
|
"-ac",
|
|
]
|
|
|
|
|
|
def _xpra_shadow_command(xpra: str, session: DesktopSession) -> list[str]:
|
|
return [
|
|
xpra,
|
|
"shadow",
|
|
f":{session.display}",
|
|
"--daemon=no",
|
|
"--mdns=no",
|
|
"--html=on",
|
|
"--tray=no",
|
|
"--system-tray=no",
|
|
"--notifications=no",
|
|
"--clipboard=yes",
|
|
"--clipboard-direction=both",
|
|
"--file-transfer=yes",
|
|
"--open-files=no",
|
|
"--open-url=no",
|
|
"--printing=yes",
|
|
"--audio=no",
|
|
"--speaker=off",
|
|
"--microphone=off",
|
|
"--encoding=jpeg",
|
|
"--quality=85",
|
|
"--speed=80",
|
|
f"--bind-tcp=127.0.0.1:{session.xpra_port}",
|
|
"--resize-display=yes",
|
|
f"--log-dir={session.profile_dir}",
|
|
"--log-file=xpra.log",
|
|
]
|
|
|
|
|
|
def _desktop_exec(*args: str | Path) -> str:
|
|
return " ".join(_desktop_exec_arg(str(arg)) for arg in args if str(arg))
|
|
|
|
|
|
def _desktop_icon(*candidates: str) -> str:
|
|
for candidate in candidates:
|
|
if candidate.startswith("/") and Path(candidate).exists():
|
|
return candidate
|
|
return next(
|
|
(candidate for candidate in candidates if not candidate.startswith("/")),
|
|
candidates[-1],
|
|
)
|
|
|
|
|
|
def _ensure_desktop_folder_link(desktop_dir: Path, label: str, target: Path) -> None:
|
|
target.mkdir(parents=True, exist_ok=True)
|
|
link = desktop_dir / label
|
|
try:
|
|
if link.is_symlink() or link.is_file():
|
|
link.unlink()
|
|
if not link.exists():
|
|
link.symlink_to(target, target_is_directory=True)
|
|
except OSError:
|
|
return
|
|
|
|
|
|
def _url_bridge_dir(session: DesktopSession) -> Path:
|
|
return session.profile_dir / ".agent-zero"
|
|
|
|
|
|
def _url_bridge_script_path(session: DesktopSession) -> Path:
|
|
return _url_bridge_dir(session) / "open-url"
|
|
|
|
|
|
def _url_bridge_queue_path(session: DesktopSession) -> Path:
|
|
return _url_bridge_dir(session) / "browser-url-intents.jsonl"
|
|
|
|
|
|
def _url_bridge_lock_path(session: DesktopSession) -> Path:
|
|
return _url_bridge_dir(session) / "browser-url-intents.lock"
|
|
|
|
|
|
def _shutdown_request_path(session: DesktopSession) -> Path:
|
|
return _url_bridge_dir(session) / "shutdown-request.json"
|
|
|
|
|
|
def _shutdown_arm_path(session: DesktopSession) -> Path:
|
|
return _url_bridge_dir(session) / "shutdown-request.arm.json"
|
|
|
|
|
|
def _shutdown_lock_path(session: DesktopSession) -> Path:
|
|
return _url_bridge_dir(session) / "shutdown-request.lock"
|
|
|
|
|
|
def _write_url_bridge_script(session: DesktopSession) -> Path:
|
|
bridge_dir = _url_bridge_dir(session)
|
|
bridge_dir.mkdir(parents=True, exist_ok=True)
|
|
script = _url_bridge_script_path(session)
|
|
queue = _url_bridge_queue_path(session)
|
|
lock = _url_bridge_lock_path(session)
|
|
script.write_text(
|
|
f"""#!/usr/bin/env python3
|
|
import fcntl
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
|
|
QUEUE_PATH = {str(queue)!r}
|
|
LOCK_PATH = {str(lock)!r}
|
|
MAX_URL_LENGTH = {URL_INTENT_MAX_LENGTH}
|
|
|
|
|
|
def main():
|
|
urls = [str(arg or "").strip()[:MAX_URL_LENGTH] for arg in sys.argv[1:] if str(arg or "").strip()]
|
|
if not urls:
|
|
urls = [""]
|
|
os.makedirs(os.path.dirname(QUEUE_PATH), exist_ok=True)
|
|
with open(LOCK_PATH, "a+", encoding="utf-8") as lock_file:
|
|
fcntl.flock(lock_file, fcntl.LOCK_EX)
|
|
with open(QUEUE_PATH, "a", encoding="utf-8") as queue_file:
|
|
for url in urls:
|
|
queue_file.write(json.dumps({{
|
|
"url": url,
|
|
"created_at": time.time(),
|
|
"source": "desktop",
|
|
}}, ensure_ascii=True) + "\\n")
|
|
queue_file.flush()
|
|
os.fsync(queue_file.fileno())
|
|
fcntl.flock(lock_file, fcntl.LOCK_UN)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
""",
|
|
encoding="utf-8",
|
|
)
|
|
try:
|
|
script.chmod(0o755)
|
|
except OSError:
|
|
pass
|
|
return script
|
|
|
|
|
|
def _write_shutdown_bridge_script(session: DesktopSession) -> Path:
|
|
bridge_dir = _url_bridge_dir(session)
|
|
bridge_dir.mkdir(parents=True, exist_ok=True)
|
|
script = bridge_dir / "shutdown-desktop"
|
|
request = _shutdown_request_path(session)
|
|
arm = _shutdown_arm_path(session)
|
|
lock = _shutdown_lock_path(session)
|
|
script.write_text(
|
|
f"""#!/usr/bin/env python3
|
|
import fcntl
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import time
|
|
|
|
REQUEST_PATH = {str(request)!r}
|
|
ARM_PATH = {str(arm)!r}
|
|
LOCK_PATH = {str(lock)!r}
|
|
CONFIRM_SECONDS = {SHUTDOWN_CONFIRM_SECONDS}
|
|
|
|
|
|
def notify(message, timeout=None):
|
|
if not os.environ.get("DISPLAY"):
|
|
return
|
|
xmessage = shutil.which("xmessage")
|
|
if not xmessage:
|
|
return
|
|
try:
|
|
subprocess.Popen(
|
|
[
|
|
xmessage,
|
|
"-buttons",
|
|
"",
|
|
"-timeout",
|
|
str(timeout or CONFIRM_SECONDS),
|
|
"-center",
|
|
message,
|
|
],
|
|
stdin=subprocess.DEVNULL,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
start_new_session=True,
|
|
)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def read_arm(now):
|
|
try:
|
|
with open(ARM_PATH, "r", encoding="utf-8") as handle:
|
|
payload = json.load(handle)
|
|
except (OSError, json.JSONDecodeError):
|
|
return None
|
|
try:
|
|
created_at = float(payload.get("created_at"))
|
|
except (TypeError, ValueError):
|
|
return None
|
|
if now - created_at > CONFIRM_SECONDS:
|
|
return None
|
|
return created_at
|
|
|
|
|
|
def write_json_atomic(path, payload):
|
|
tmp_path = path + ".tmp"
|
|
with open(tmp_path, "w", encoding="utf-8") as handle:
|
|
json.dump(payload, handle, ensure_ascii=True)
|
|
handle.write("\\n")
|
|
handle.flush()
|
|
os.fsync(handle.fileno())
|
|
os.replace(tmp_path, path)
|
|
|
|
|
|
def main():
|
|
os.makedirs(os.path.dirname(REQUEST_PATH), exist_ok=True)
|
|
now = time.time()
|
|
with open(LOCK_PATH, "a+", encoding="utf-8") as lock_file:
|
|
fcntl.flock(lock_file, fcntl.LOCK_EX)
|
|
armed_at = read_arm(now)
|
|
if armed_at is None:
|
|
write_json_atomic(ARM_PATH, {{"created_at": now, "source": "tray"}})
|
|
notify(
|
|
f"Shutdown Desktop armed. Click Shutdown Desktop again within {{CONFIRM_SECONDS}} seconds to close it.",
|
|
CONFIRM_SECONDS,
|
|
)
|
|
return
|
|
try:
|
|
os.unlink(ARM_PATH)
|
|
except OSError:
|
|
pass
|
|
payload = {{
|
|
"created_at": now,
|
|
"armed_at": armed_at,
|
|
"source": "tray",
|
|
}}
|
|
write_json_atomic(REQUEST_PATH, payload)
|
|
notify("Shutting down Agent Zero Desktop.", 2)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
""",
|
|
encoding="utf-8",
|
|
)
|
|
try:
|
|
script.chmod(0o755)
|
|
except OSError:
|
|
pass
|
|
return script
|
|
|
|
|
|
def _claim_url_intents(session: DesktopSession) -> list[dict[str, Any]]:
|
|
queue = _url_bridge_queue_path(session)
|
|
lock = _url_bridge_lock_path(session)
|
|
if not queue.exists():
|
|
return []
|
|
lock.parent.mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
with open(lock, "a+", encoding="utf-8") as lock_file:
|
|
fcntl.flock(lock_file, fcntl.LOCK_EX)
|
|
try:
|
|
raw = queue.read_text(encoding="utf-8")
|
|
queue.write_text("", encoding="utf-8")
|
|
finally:
|
|
fcntl.flock(lock_file, fcntl.LOCK_UN)
|
|
except OSError:
|
|
return []
|
|
|
|
intents: list[dict[str, Any]] = []
|
|
for line in raw.splitlines():
|
|
try:
|
|
payload = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
url = str(payload.get("url") or "").strip()
|
|
if len(url) > URL_INTENT_MAX_LENGTH:
|
|
url = url[:URL_INTENT_MAX_LENGTH]
|
|
created_at = payload.get("created_at")
|
|
try:
|
|
created_at = float(created_at)
|
|
except (TypeError, ValueError):
|
|
created_at = time.time()
|
|
intents.append(
|
|
{
|
|
"url": url,
|
|
"created_at": created_at,
|
|
"source": str(payload.get("source") or "desktop"),
|
|
},
|
|
)
|
|
if len(intents) >= URL_INTENT_MAX_ITEMS:
|
|
break
|
|
return intents
|
|
|
|
|
|
def _claim_shutdown_request(session: DesktopSession) -> dict[str, Any] | None:
|
|
request = _shutdown_request_path(session)
|
|
if not request.exists():
|
|
return None
|
|
try:
|
|
raw = request.read_text(encoding="utf-8")
|
|
request.unlink(missing_ok=True)
|
|
except OSError:
|
|
return None
|
|
try:
|
|
payload = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
payload = {}
|
|
created_at = payload.get("created_at")
|
|
try:
|
|
created_at = float(created_at)
|
|
except (TypeError, ValueError):
|
|
created_at = time.time()
|
|
return {
|
|
"created_at": created_at,
|
|
"source": str(payload.get("source") or "tray"),
|
|
}
|
|
|
|
|
|
def _clear_shutdown_request(session: DesktopSession) -> None:
|
|
request = _shutdown_request_path(session)
|
|
arm = _shutdown_arm_path(session)
|
|
lock = _shutdown_lock_path(session)
|
|
request.unlink(missing_ok=True)
|
|
request.with_suffix(request.suffix + ".tmp").unlink(missing_ok=True)
|
|
arm.unlink(missing_ok=True)
|
|
arm.with_suffix(arm.suffix + ".tmp").unlink(missing_ok=True)
|
|
lock.unlink(missing_ok=True)
|
|
|
|
|
|
def _remove_system_manifest() -> None:
|
|
(SESSION_DIR / f"{SYSTEM_SESSION_ID}.json").unlink(missing_ok=True)
|
|
|
|
|
|
def _url_handler_mime_types() -> tuple[str, ...]:
|
|
return (
|
|
"x-scheme-handler/http",
|
|
"x-scheme-handler/https",
|
|
"text/html",
|
|
"application/xhtml+xml",
|
|
)
|
|
|
|
|
|
def _write_mimeapps_defaults(path: Path, desktop_id: str) -> None:
|
|
associations = ";".join([desktop_id, ""])
|
|
lines = [
|
|
"[Default Applications]",
|
|
*(f"{mime_type}={desktop_id}" for mime_type in _url_handler_mime_types()),
|
|
"",
|
|
"[Added Associations]",
|
|
*(f"{mime_type}={associations}" for mime_type in _url_handler_mime_types()),
|
|
"",
|
|
]
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text("\n".join(lines), encoding="utf-8")
|
|
|
|
|
|
def _write_xfce_browser_helper(path: Path, bridge_script: Path) -> None:
|
|
command = _desktop_exec(bridge_script)
|
|
command_with_parameter = _desktop_exec(bridge_script, "%s")
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(
|
|
"\n".join(
|
|
[
|
|
"[Desktop Entry]",
|
|
"NoDisplay=true",
|
|
"Version=1.0",
|
|
"Type=X-XFCE-Helper",
|
|
"X-XFCE-Category=WebBrowser",
|
|
f"X-XFCE-Commands={command}",
|
|
f"X-XFCE-CommandsWithParameter={command_with_parameter}",
|
|
"Icon=web-browser",
|
|
"Name=Agent Zero Browser",
|
|
"",
|
|
],
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
|
|
def _remove_path_if_owned(path: Path) -> None:
|
|
try:
|
|
if path.is_symlink() or path.is_file():
|
|
path.unlink()
|
|
elif path.is_dir():
|
|
shutil.rmtree(path)
|
|
except OSError:
|
|
return
|
|
|
|
|
|
def _desktop_exec_arg(value: str) -> str:
|
|
if not any(char.isspace() or char in '"\\' for char in value):
|
|
return value
|
|
escaped = value.replace("\\", "\\\\").replace('"', '\\"')
|
|
return f'"{escaped}"'
|
|
|
|
|
|
def _xml_attr(value: str) -> str:
|
|
return (
|
|
str(value)
|
|
.replace("&", "&")
|
|
.replace('"', """)
|
|
.replace("<", "<")
|
|
.replace(">", ">")
|
|
)
|
|
|
|
|
|
def _oor(name: str) -> str:
|
|
return f"{{{OOR_NS}}}{name}"
|
|
|
|
|
|
def _file_uri(path: str | Path) -> str:
|
|
return Path(path).resolve(strict=False).as_uri()
|
|
|
|
|
|
def _write_libreoffice_registry_defaults(registry: Path, workdir: str | Path) -> None:
|
|
Path(workdir).mkdir(parents=True, exist_ok=True)
|
|
ET.register_namespace("oor", OOR_NS)
|
|
ET.register_namespace("xs", XS_NS)
|
|
ET.register_namespace("xsi", XSI_NS)
|
|
root = _read_libreoffice_registry(registry)
|
|
workdir_uri = _file_uri(workdir)
|
|
for path, prop, value in (
|
|
("/org.openoffice.Office.Common/Misc", "FirstRun", "false"),
|
|
("/org.openoffice.Setup/Office", "ooSetupInstCompleted", "true"),
|
|
("/org.openoffice.Setup/Office", "MigrationCompleted", "true"),
|
|
("/org.openoffice.Setup/Office", "OfficeRestartInProgress", "false"),
|
|
("/org.openoffice.Setup/L10N", "ooLocale", "en-US"),
|
|
("/org.openoffice.Office.Paths/Variables", "Work", workdir_uri),
|
|
(
|
|
"/org.openoffice.Office.Paths/Paths/org.openoffice.Office.Paths:NamedPath['Work']",
|
|
"WritePath",
|
|
workdir_uri,
|
|
),
|
|
):
|
|
_set_registry_prop(root, path, prop, value)
|
|
registry.parent.mkdir(parents=True, exist_ok=True)
|
|
ET.ElementTree(root).write(registry, encoding="utf-8", xml_declaration=True)
|
|
|
|
|
|
def _read_libreoffice_registry(registry: Path) -> ET.Element:
|
|
if registry.exists():
|
|
try:
|
|
return ET.parse(registry).getroot()
|
|
except ET.ParseError:
|
|
pass
|
|
return ET.Element(
|
|
_oor("items"),
|
|
{
|
|
"xmlns:xs": XS_NS,
|
|
"xmlns:xsi": XSI_NS,
|
|
},
|
|
)
|
|
|
|
|
|
def _set_registry_prop(root: ET.Element, item_path: str, prop_name: str, value: str) -> None:
|
|
item = _find_registry_item(root, item_path)
|
|
if item is None:
|
|
item = ET.SubElement(root, "item", {_oor("path"): item_path})
|
|
prop = next((child for child in item.findall("prop") if child.get(_oor("name")) == prop_name), None)
|
|
if prop is None:
|
|
prop = ET.SubElement(item, "prop", {_oor("name"): prop_name, _oor("op"): "fuse"})
|
|
else:
|
|
prop.set(_oor("op"), "fuse")
|
|
value_node = prop.find("value")
|
|
if value_node is None:
|
|
value_node = ET.SubElement(prop, "value")
|
|
value_node.text = str(value)
|
|
|
|
|
|
def _find_registry_item(root: ET.Element, item_path: str) -> ET.Element | None:
|
|
for item in root.findall("item"):
|
|
if item.get(_oor("path")) == item_path:
|
|
return item
|
|
return None
|
|
|
|
|
|
def _write_desktop_launcher(
|
|
path: Path,
|
|
*,
|
|
name: str,
|
|
exec_line: str,
|
|
icon: str,
|
|
categories: str,
|
|
try_exec: str = "",
|
|
working_dir: str | Path | None = None,
|
|
mime_types: tuple[str, ...] = (),
|
|
no_display: bool = False,
|
|
) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
lines = [
|
|
"[Desktop Entry]",
|
|
"Version=1.0",
|
|
"Type=Application",
|
|
f"Name={name}",
|
|
f"Exec={exec_line}",
|
|
]
|
|
if try_exec:
|
|
lines.append(f"TryExec={try_exec}")
|
|
if working_dir:
|
|
lines.append(f"Path={working_dir}")
|
|
if mime_types:
|
|
lines.append(f"MimeType={';'.join(mime_types)};")
|
|
if no_display:
|
|
lines.append("NoDisplay=true")
|
|
lines.extend(
|
|
[
|
|
f"Icon={icon}",
|
|
"Terminal=false",
|
|
f"Categories={categories}",
|
|
"StartupNotify=true",
|
|
"X-XFCE-Trusted=true",
|
|
"",
|
|
],
|
|
)
|
|
path.write_text("\n".join(lines), encoding="utf-8")
|
|
try:
|
|
path.chmod(0o755)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def _write_hidden_application_entry(path: Path, name: str) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(
|
|
"\n".join(
|
|
[
|
|
"[Desktop Entry]",
|
|
"Type=Application",
|
|
f"Name={name}",
|
|
"NoDisplay=true",
|
|
"Hidden=true",
|
|
"",
|
|
],
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
|
|
def _write_thunar_defaults(path: Path) -> None:
|
|
root = _read_xfce_channel(path, "thunar")
|
|
if _find_xfce_property(root, "last-view") is None:
|
|
_xfce_property(root, "last-view", "string", "ThunarIconView")
|
|
_xfce_property(root, "last-show-hidden", "bool", "true")
|
|
_write_xfce_channel(path, root)
|
|
|
|
|
|
def _read_xfce_channel(path: Path, channel_name: str) -> ET.Element:
|
|
if path.exists():
|
|
try:
|
|
root = ET.parse(path).getroot()
|
|
if root.tag == "channel" and root.get("name") == channel_name:
|
|
root.set("version", root.get("version") or "1.0")
|
|
return root
|
|
except (ET.ParseError, OSError):
|
|
pass
|
|
return ET.Element("channel", {"name": channel_name, "version": "1.0"})
|
|
|
|
|
|
def _write_xfce_channel(path: Path, root: ET.Element) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
tree = ET.ElementTree(root)
|
|
try:
|
|
ET.indent(tree, space=" ")
|
|
except AttributeError:
|
|
pass
|
|
tree.write(path, encoding="utf-8", xml_declaration=True)
|
|
|
|
|
|
def _find_xfce_property(parent: ET.Element, name: str) -> ET.Element | None:
|
|
return next((child for child in parent.findall("property") if child.get("name") == name), None)
|
|
|
|
|
|
def _install_desktop_readme(desktop_dir: Path) -> None:
|
|
if not DESKTOP_README_SOURCE.exists():
|
|
return
|
|
target = desktop_dir / "README.md"
|
|
try:
|
|
content = DESKTOP_README_SOURCE.read_text(encoding="utf-8")
|
|
if target.exists() and target.read_text(encoding="utf-8") == content:
|
|
return
|
|
target.write_text(content, encoding="utf-8")
|
|
target.chmod(0o644)
|
|
except OSError:
|
|
return
|
|
|
|
|
|
def _xfce_property(parent: ET.Element, name: str, property_type: str, value: str | None = None) -> ET.Element:
|
|
for child in parent.findall("property"):
|
|
if child.get("name") == name:
|
|
child.set("type", property_type)
|
|
if value is None:
|
|
child.attrib.pop("value", None)
|
|
else:
|
|
child.set("value", value)
|
|
return child
|
|
attributes = {"name": name, "type": property_type}
|
|
if value is not None:
|
|
attributes["value"] = value
|
|
return ET.SubElement(parent, "property", attributes)
|
|
|
|
|
|
def _public_doc(doc: dict[str, Any]) -> dict[str, Any]:
|
|
return {
|
|
"file_id": doc["file_id"],
|
|
"path": document_store.display_path(doc["path"]),
|
|
"basename": doc["basename"],
|
|
"extension": doc["extension"],
|
|
"size": doc["size"],
|
|
"version": document_store.item_version(doc),
|
|
"last_modified": doc["last_modified"],
|
|
}
|
|
|
|
|
|
def _require_binary(name: str) -> str:
|
|
found = shutil.which(name)
|
|
if not found:
|
|
raise RuntimeError(f"{name} is required for official LibreOffice desktop sessions.")
|
|
return found
|
|
|
|
|
|
def _running(process: subprocess.Popen[Any] | None) -> bool:
|
|
return bool(process and process.poll() is None)
|
|
|
|
|
|
def _wait_for_port(
|
|
host: str,
|
|
port: int,
|
|
timeout: float = 15.0,
|
|
process: subprocess.Popen[Any] | None = None,
|
|
) -> None:
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
if process and process.poll() is not None:
|
|
raise RuntimeError(f"Xpra exited before port {port} was ready.")
|
|
try:
|
|
with socket.create_connection((host, port), timeout=0.2):
|
|
return
|
|
except OSError:
|
|
time.sleep(0.1)
|
|
raise TimeoutError(f"Timed out waiting for Xpra port {port}.")
|
|
|
|
|
|
def _port_is_free(port: int) -> bool:
|
|
try:
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as probe:
|
|
probe.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
probe.bind(("127.0.0.1", port))
|
|
return True
|
|
except OSError:
|
|
return False
|
|
|
|
|
|
def _port_is_accepting(host: str, port: int) -> bool:
|
|
try:
|
|
with socket.create_connection((host, port), timeout=0.2):
|
|
return True
|
|
except OSError:
|
|
return False
|
|
|
|
|
|
def _terminate_process(process: subprocess.Popen[Any]) -> None:
|
|
if process.poll() is not None:
|
|
return
|
|
try:
|
|
process.terminate()
|
|
process.wait(timeout=2)
|
|
return
|
|
except Exception:
|
|
pass
|
|
try:
|
|
process.kill()
|
|
process.wait(timeout=2)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _kill_pid(pid: int) -> bool:
|
|
if pid <= 0:
|
|
return False
|
|
try:
|
|
os.kill(pid, 15)
|
|
return True
|
|
except ProcessLookupError:
|
|
return False
|
|
except PermissionError:
|
|
return False
|
|
|
|
|
|
def _coerce_pid(value: Any) -> int:
|
|
try:
|
|
pid = int(value)
|
|
except (TypeError, ValueError):
|
|
return 0
|
|
return pid if pid > 0 else 0
|
|
|
|
|
|
def _pid_is_running(pid: int) -> bool:
|
|
if pid <= 0:
|
|
return False
|
|
try:
|
|
os.kill(pid, 0)
|
|
return True
|
|
except ProcessLookupError:
|
|
return False
|
|
except PermissionError:
|
|
return True
|