Move office and desktop state under plugin storage

Migrate retired /usr/_office and /usr/_desktop trees from plugin startup into /usr/plugins/<plugin>. Update office document storage, desktop session/runtime paths, and context-scoped screenshots to use the plugin-owned state layout. Add focused tests for retired-state migration and the new path behavior.
This commit is contained in:
Alessandro 2026-05-12 16:21:43 +02:00
parent 7b61ceb241
commit 68c3b8b022
15 changed files with 393 additions and 62 deletions

View file

@ -0,0 +1,78 @@
from __future__ import annotations
import shutil
from pathlib import Path
def migrate_retired_state_tree(
*,
source: Path,
destination: Path,
owner: str,
migrated: list[str],
warnings: list[str],
errors: list[str],
) -> None:
"""Move retired plugin state into its plugin-owned state directory.
Existing destination data wins. Colliding source entries are preserved under
a suffixed name in the destination instead of overwriting live data.
"""
if not source.exists() and not source.is_symlink():
return
if _same_path(source, destination):
return
try:
if source.is_dir() and not source.is_symlink():
destination.mkdir(parents=True, exist_ok=True)
for child in list(source.iterdir()):
try:
_move_path(child, destination / child.name, migrated)
except Exception as exc:
errors.append(f"{owner} state migration failed for {child}: {exc}")
_remove_empty_dir(source, owner=owner, warnings=warnings)
return
_move_path(source, destination, migrated)
except Exception as exc:
errors.append(f"{owner} state migration failed from {source} to {destination}: {exc}")
def _move_path(source: Path, target: Path, migrated: list[str]) -> None:
if source.is_dir() and not source.is_symlink() and target.is_dir() and not target.is_symlink():
for child in list(source.iterdir()):
_move_path(child, target / child.name, migrated)
source.rmdir()
return
final_target = target
if target.exists() or target.is_symlink():
final_target = _next_conflict_path(target)
final_target.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(source), str(final_target))
migrated.append(f"{source} -> {final_target}")
def _next_conflict_path(path: Path) -> Path:
candidate = path.with_name(f"{path.name}.retired")
counter = 2
while candidate.exists() or candidate.is_symlink():
candidate = path.with_name(f"{path.name}.retired-{counter}")
counter += 1
return candidate
def _remove_empty_dir(path: Path, *, owner: str, warnings: list[str]) -> None:
try:
path.rmdir()
except OSError:
warnings.append(f"Retired {owner} state directory was not empty after migration: {path}")
def _same_path(left: Path, right: Path) -> bool:
try:
return left.resolve(strict=False) == right.resolve(strict=False)
except OSError:
return False

View file

@ -15,7 +15,7 @@ from urllib.parse import quote, urlencode
from helpers import files
STATE_DIR = Path(files.get_abs_path("usr", "_desktop", "virtual_desktop"))
STATE_DIR = Path(files.get_abs_path("usr", "plugins", "_desktop", "virtual_desktop"))
DEFAULT_WIDTH = 1440
DEFAULT_HEIGHT = 900
MAX_WIDTH = 1920

View file

@ -148,6 +148,7 @@ class DesktopSession(ApiHandler):
def _state(self, input: dict) -> dict:
return desktop_session.get_manager().state(
include_screenshot=bool(input.get("include_screenshot") is True),
context_id=str(input.get("ctxid") or input.get("context_id") or ""),
)
def _shutdown(self, input: dict) -> dict:

View file

@ -43,5 +43,7 @@ def _prepare_runtime_safely() -> None:
def _log_runtime_preparation_result(result: dict[str, Any]) -> None:
if result.get("errors"):
PrintStyle.warning("Desktop runtime preparation reported errors:", result["errors"])
elif result.get("installed") or result.get("removed"):
elif result.get("warnings"):
PrintStyle.warning("Desktop runtime preparation reported warnings:", result["warnings"])
elif result.get("installed") or result.get("removed") or result.get("migrated"):
PrintStyle.info("Desktop runtime prepared:", result)

View file

@ -23,13 +23,16 @@ from plugins._office.helpers import document_store, libreoffice
OFFICIAL_EXTENSIONS = {"odt", "ods", "odp", "docx", "xlsx", "pptx"}
PLUGIN_NAME = "_desktop"
SYSTEM_SESSION_ID = "agent-zero-desktop"
SYSTEM_FILE_ID = "system-desktop"
SYSTEM_TITLE = "Desktop"
STATE_DIR = Path(files.get_abs_path("usr", "_desktop"))
STATE_DIR = Path(files.get_abs_path("usr", "plugins", PLUGIN_NAME))
RETIRED_STATE_DIR = Path(files.get_abs_path("usr", PLUGIN_NAME))
SESSION_DIR = STATE_DIR / "sessions"
PROFILE_DIR = STATE_DIR / "profiles"
LEGACY_SESSION_DIRS = (
RETIRED_STATE_DIR / "sessions",
Path(files.get_abs_path("tmp", "_office", "desktop", "sessions")),
)
DISPLAY_BASE = 120
@ -283,10 +286,13 @@ class DesktopSessionManager:
"url_intents": url_intents,
}
def state(self, *, include_screenshot: bool = False) -> dict[str, Any]:
def state(self, *, include_screenshot: bool = False, context_id: str = "") -> dict[str, Any]:
with self._lock:
self._reap_dead_locked()
return desktop_state.collect_state(include_screenshot=include_screenshot)
return desktop_state.collect_state(
include_screenshot=include_screenshot,
context_id=context_id,
)
def claim_url_intents(self, session_id: str = SYSTEM_SESSION_ID) -> list[dict[str, Any]]:
session = self.get(session_id) or self.get(SYSTEM_SESSION_ID)
@ -558,7 +564,9 @@ class DesktopSessionManager:
xpra_port=xpra_port,
token=SYSTEM_SESSION_ID,
url=_xpra_url(SYSTEM_SESSION_ID),
profile_dir=Path(payload.get("profile_dir") or PROFILE_DIR / SYSTEM_SESSION_ID),
profile_dir=_state_path_from_retired_root(
Path(payload.get("profile_dir") or PROFILE_DIR / SYSTEM_SESSION_ID)
),
width=int(payload.get("width") or DEFAULT_SCREEN_WIDTH),
height=int(payload.get("height") or DEFAULT_SCREEN_HEIGHT),
process_ids=process_ids,
@ -1614,6 +1622,16 @@ def collect_desktop_status() -> dict[str, Any]:
}
def _state_path_from_retired_root(path: Path) -> Path:
try:
relative = path.resolve(strict=False).relative_to(
RETIRED_STATE_DIR.resolve(strict=False)
)
except ValueError:
return path
return STATE_DIR / relative
def _runtime_preparation_status() -> dict[str, Any]:
try:
from plugins._desktop import hooks

View file

@ -14,23 +14,40 @@ PROJECT_ROOT = Path(__file__).resolve().parents[3]
SESSION_ID = "agent-zero-desktop"
PLUGIN_NAME = "_desktop"
BASE_DIR = Path(os.environ.get("A0_BASE_DIR") or ("/a0" if Path("/a0").exists() else PROJECT_ROOT))
STATE_DIR = BASE_DIR / "usr" / "_desktop"
STATE_DIR = BASE_DIR / "usr" / "plugins" / PLUGIN_NAME
RETIRED_STATE_DIR = BASE_DIR / "usr" / PLUGIN_NAME
SESSION_DIR = STATE_DIR / "sessions"
PROFILE_DIR = STATE_DIR / "profiles"
SCREENSHOT_DIR = STATE_DIR / "screenshots"
RECENT_SCREENSHOT_SECONDS = 600
_SAFE_CONTEXT_RE = re.compile(r"[^a-zA-Z0-9_.-]+")
def session_manifest_path(session_id: str = SESSION_ID) -> Path:
return Path(os.environ.get("A0_DESKTOP_MANIFEST") or SESSION_DIR / f"{session_id}.json")
def context_screenshot_dir(context_id: str = "") -> Path:
return SCREENSHOT_DIR / _safe_context_id(context_id)
def _safe_context_id(context_id: str = "") -> str:
raw = str(context_id or os.environ.get("A0_DESKTOP_CONTEXT_ID") or "default")
return _SAFE_CONTEXT_RE.sub("_", raw).strip("._") or "default"
def session_manifest_exists(session_id: str = SESSION_ID) -> bool:
return session_manifest_path(session_id).exists()
def collect_state(*, include_screenshot: bool = False, screenshot_path: str | Path | None = None) -> dict[str, Any]:
def collect_state(
*,
include_screenshot: bool = False,
screenshot_path: str | Path | None = None,
context_id: str = "",
) -> dict[str, Any]:
errors: list[str] = []
env_info = resolve_environment(errors=errors)
display = env_info["display"]
@ -46,10 +63,16 @@ def collect_state(*, include_screenshot: bool = False, screenshot_path: str | Pa
pointer = collect_pointer(env, capabilities, errors)
active_window = collect_active_window(env, capabilities, errors)
windows = collect_windows(env, capabilities, errors)
screenshot = latest_screenshot()
screenshot = latest_screenshot(context_id=context_id)
if include_screenshot:
screenshot = capture_screenshot(env, capabilities, path=screenshot_path, errors=errors)
screenshot = capture_screenshot(
env,
capabilities,
path=screenshot_path,
errors=errors,
context_id=context_id,
)
return stable_state(
display=display,
@ -70,6 +93,7 @@ def capture_screenshot(
*,
path: str | Path | None = None,
errors: list[str] | None = None,
context_id: str = "",
) -> dict[str, Any]:
local_errors = errors if errors is not None else []
capabilities = capabilities or collect_capabilities()
@ -85,9 +109,10 @@ def capture_screenshot(
local_errors.append(message)
return {"ok": False, "path": "", "format": "", "captured_at": "", "error": message}
SCREENSHOT_DIR.mkdir(parents=True, exist_ok=True)
screenshot_dir = context_screenshot_dir(context_id)
screenshot_dir.mkdir(parents=True, exist_ok=True)
timestamp = time.strftime("%Y%m%d-%H%M%S")
target = Path(path) if path else SCREENSHOT_DIR / f"desktop-{timestamp}.png"
target = Path(path) if path else screenshot_dir / f"desktop-{timestamp}.png"
target.parent.mkdir(parents=True, exist_ok=True)
raw_path = target.with_suffix(".xwd")
@ -291,21 +316,33 @@ def resolve_environment(*, errors: list[str] | None = None, session_id: str = SE
display = ""
local_errors.append("Desktop DISPLAY is unavailable; the persistent Desktop session is not running.")
profile_dir = str(
os.environ.get("A0_DESKTOP_PROFILE")
or os.environ.get("A0_DESKTOP_HOME")
or payload.get("profile_dir")
or os.environ.get("HOME")
or PROFILE_DIR / session_id
profile_dir = _state_path_from_retired_root(
Path(
os.environ.get("A0_DESKTOP_PROFILE")
or os.environ.get("A0_DESKTOP_HOME")
or payload.get("profile_dir")
or os.environ.get("HOME")
or PROFILE_DIR / session_id
)
)
return {
"display": display,
"profile_dir": profile_dir,
"profile_dir": str(profile_dir),
"manifest": str(manifest),
}
def _state_path_from_retired_root(path: Path) -> Path:
try:
relative = path.resolve(strict=False).relative_to(
RETIRED_STATE_DIR.resolve(strict=False)
)
except ValueError:
return path
return STATE_DIR / relative
def display_env(*, display: str, profile_dir: str) -> dict[str, str]:
env = {
**os.environ,
@ -480,12 +517,13 @@ def parse_xprop(output: str) -> dict[str, str]:
return values
def latest_screenshot() -> dict[str, Any]:
if not SCREENSHOT_DIR.exists():
def latest_screenshot(*, context_id: str = "") -> dict[str, Any]:
screenshot_dir = context_screenshot_dir(context_id)
if not screenshot_dir.exists():
return {"ok": False, "path": "", "format": "", "captured_at": "", "recent": False}
candidates = [
path
for path in SCREENSHOT_DIR.iterdir()
for path in screenshot_dir.iterdir()
if path.is_file() and path.suffix.lower() in {".png", ".jpg", ".jpeg", ".xwd"}
]
if not candidates:
@ -640,19 +678,25 @@ def main(argv: list[str] | None = None) -> int:
state_parser = subparsers.add_parser("state")
state_parser.add_argument("--json", action="store_true")
state_parser.add_argument("--screenshot", action="store_true")
state_parser.add_argument("--context-id", default="")
observe_parser = subparsers.add_parser("observe")
observe_parser.add_argument("--json", action="store_true")
observe_parser.add_argument("--screenshot", action="store_true")
observe_parser.add_argument("--context-id", default="")
screenshot_parser = subparsers.add_parser("screenshot")
screenshot_parser.add_argument("path", nargs="?")
screenshot_parser.add_argument("--json", action="store_true")
screenshot_parser.add_argument("--context-id", default="")
args = parser.parse_args(argv)
command = args.command or "state"
if command in {"state", "observe"}:
payload = collect_state(include_screenshot=bool(args.screenshot))
payload = collect_state(
include_screenshot=bool(args.screenshot),
context_id=str(args.context_id or ""),
)
print(json.dumps(payload, sort_keys=True))
return 0 if payload.get("ok") else 1
@ -664,6 +708,7 @@ def main(argv: list[str] | None = None) -> int:
collect_capabilities(),
path=args.path,
errors=errors,
context_id=str(args.context_id or ""),
)
if args.json:
print(json.dumps(payload, sort_keys=True))

View file

@ -9,7 +9,7 @@ import urllib.request
from pathlib import Path
from typing import Any
from helpers import system_packages
from helpers import files, state_migration, system_packages
LIBREOFFICE_RUNTIME_PACKAGES = (
"libreoffice-core",
@ -61,6 +61,9 @@ OPTIONAL_RUNTIME_PACKAGES = (
RETIRED_RUNTIME_PACKAGES = (
"firefox-esr",
)
PLUGIN_NAME = "_desktop"
STATE_DIR = Path(files.get_abs_path("usr", "plugins", PLUGIN_NAME))
RETIRED_STATE_DIR = Path(files.get_abs_path("usr", PLUGIN_NAME))
_preparation_lock = threading.RLock()
_preparation_state: dict[str, Any] = {
"preparing": False,
@ -81,8 +84,13 @@ def cleanup_stale_runtime_state(force: bool = False) -> dict[str, Any]:
try:
installed: list[str] = []
removed: list[str] = []
migrated: list[str] = []
warnings: list[str] = []
errors: list[str] = []
_migrate_retired_plugin_state(migrated, warnings, errors)
_migrate_unscoped_screenshots(migrated, warnings, errors)
retired_packages = _installed_packages(RETIRED_RUNTIME_PACKAGES)
if retired_packages:
_purge_packages(removed, errors, installed_packages=retired_packages)
@ -95,6 +103,8 @@ def cleanup_stale_runtime_state(force: bool = False) -> dict[str, Any]:
"skipped": False,
"removed": removed,
"installed": installed,
"migrated": migrated,
"warnings": warnings,
"errors": errors,
}
return result
@ -117,6 +127,50 @@ def runtime_preparation_status() -> dict[str, Any]:
}
def _migrate_retired_plugin_state(
migrated: list[str],
warnings: list[str],
errors: list[str],
) -> None:
state_migration.migrate_retired_state_tree(
source=RETIRED_STATE_DIR,
destination=STATE_DIR,
owner="Desktop",
migrated=migrated,
warnings=warnings,
errors=errors,
)
def _migrate_unscoped_screenshots(
migrated: list[str],
warnings: list[str],
errors: list[str],
) -> None:
screenshots_dir = STATE_DIR / "screenshots"
if not screenshots_dir.exists():
return
legacy_screenshots = [
path
for path in screenshots_dir.iterdir()
if path.is_file() and path.suffix.lower() in {".png", ".jpg", ".jpeg", ".xwd"}
]
if not legacy_screenshots:
return
context_dir = screenshots_dir / "default"
context_dir.mkdir(parents=True, exist_ok=True)
for screenshot in legacy_screenshots:
state_migration.migrate_retired_state_tree(
source=screenshot,
destination=context_dir / screenshot.name,
owner="Desktop screenshot",
migrated=migrated,
warnings=warnings,
errors=errors,
)
def _begin_runtime_preparation() -> None:
with _preparation_lock:
if not _preparation_state["active_count"]:

View file

@ -3,8 +3,8 @@ set -euo pipefail
SESSION="${A0_DESKTOP_SESSION:-agent-zero-desktop}"
BASE_DIR="${A0_BASE_DIR:-/a0}"
PROFILE_DIR="${A0_DESKTOP_PROFILE:-$BASE_DIR/usr/_desktop/profiles/$SESSION}"
MANIFEST="${A0_DESKTOP_MANIFEST:-$BASE_DIR/usr/_desktop/sessions/$SESSION.json}"
PROFILE_DIR="${A0_DESKTOP_PROFILE:-$BASE_DIR/usr/plugins/_desktop/profiles/$SESSION}"
MANIFEST="${A0_DESKTOP_MANIFEST:-$BASE_DIR/usr/plugins/_desktop/sessions/$SESSION.json}"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
DESKTOP_STATE_HELPER="$SCRIPT_DIR/../../../helpers/desktop_state.py"
DESKTOP_STATE_PYTHON="${A0_DESKTOP_STATE_PYTHON:-$(command -v /usr/bin/python3 || command -v python3 || true)}"
@ -55,10 +55,12 @@ Usage: desktopctl.sh <command> [args]
Commands:
env Print the X11 environment used for the Desktop.
check Verify that xdotool can reach the Desktop display.
state --json Return structured Desktop state as JSON.
observe --json [--screenshot]
state --json [--screenshot] [--context-id ID]
Return structured Desktop state as JSON.
observe --json [--screenshot] [--context-id ID]
Return structured state, optionally with a fresh screenshot.
screenshot [PATH] Capture the Desktop to PATH, or to the default screenshot directory.
screenshot [PATH] [--context-id ID]
Capture the Desktop to PATH, or to the default screenshot directory.
active-window Print the active window name.
geometry PATTERN Print the first matching visible window geometry.
wait-window PATTERN Wait for a visible matching window and print its id.
@ -295,7 +297,8 @@ case "$command_name" in
echo "state currently requires --json." >&2
exit 2
fi
desktop_state state --json
shift
desktop_state state --json "$@"
;;
observe)
if [ "${1:-}" != "--json" ]; then
@ -310,7 +313,7 @@ case "$command_name" in
shift
desktop_state screenshot --json "$@"
elif [ "$#" -gt 0 ]; then
desktop_state screenshot "$1"
desktop_state screenshot "$@"
else
desktop_state screenshot
fi

View file

@ -221,7 +221,10 @@ class OfficeSession(ApiHandler):
def _desktop_state(self, input: dict) -> dict:
include_screenshot = bool(input.get("include_screenshot") is True)
return desktop_session.get_manager().state(include_screenshot=include_screenshot)
return desktop_session.get_manager().state(
include_screenshot=include_screenshot,
context_id=str(input.get("ctxid") or input.get("context_id") or ""),
)
def _desktop_shutdown(self, input: dict) -> dict:
save_first = input.get("save_first") is not False

View file

@ -41,5 +41,7 @@ def _prepare_runtime_safely() -> None:
def _log_runtime_preparation_result(result: dict[str, Any]) -> None:
if result.get("errors"):
PrintStyle.warning("Office document runtime preparation reported errors:", result["errors"])
elif result.get("installed") or result.get("removed"):
elif result.get("warnings"):
PrintStyle.warning("Office document runtime preparation reported warnings:", result["warnings"])
elif result.get("installed") or result.get("removed") or result.get("migrated"):
PrintStyle.info("Office document runtime prepared:", result)

View file

@ -40,7 +40,7 @@ ODF_MIMETYPES = {
"odp": "application/vnd.oasis.opendocument.presentation",
}
STATE_DIR = Path(files.get_abs_path("usr", PLUGIN_NAME, "documents"))
STATE_DIR = Path(files.get_abs_path("usr", "plugins", PLUGIN_NAME, "documents"))
DB_PATH = STATE_DIR / "documents.sqlite3"
BACKUP_DIR = STATE_DIR / "backups"
WORKDIR = Path(files.get_abs_path("usr", "workdir"))

View file

@ -7,14 +7,16 @@ import subprocess
from pathlib import Path
from typing import Any
from helpers import files, system_packages
from helpers import files, state_migration, system_packages
PROJECT_ROOT = Path(__file__).resolve().parents[2]
STATE_DIR = Path(files.get_abs_path("usr", "_office"))
PLUGIN_NAME = "_office"
STATE_DIR = Path(files.get_abs_path("usr", "plugins", PLUGIN_NAME))
RETIRED_STATE_DIR = Path(files.get_abs_path("usr", PLUGIN_NAME))
DOCUMENT_STATE_DIR = STATE_DIR / "documents"
LEGACY_DOCUMENT_STATE_DIRS = [
Path(files.get_abs_path("usr", "plugins", "_office", "documents")),
RETIRED_STATE_DIR / "documents",
Path(files.get_abs_path("usr", "state", "_office", "documents")),
Path(files.get_abs_path("usr", "state", "office", "documents")),
]
@ -84,6 +86,7 @@ def cleanup_stale_runtime_state(force: bool = False) -> dict[str, Any]:
warnings: list[str] = []
errors: list[str] = []
_migrate_retired_plugin_state(migrated, warnings, errors)
_migrate_legacy_document_state(migrated, warnings, errors)
retired_web_paths = [
@ -123,7 +126,7 @@ def cleanup_stale_runtime_state(force: bool = False) -> dict[str, Any]:
_retire_supervisor_program(errors)
_ensure_runtime_dependencies(installed, errors)
_ensure_desktop_runtime_compat(installed, removed, warnings, errors)
_ensure_desktop_runtime_compat(installed, removed, migrated, warnings, errors)
return {
"ok": not errors,
"skipped": not cleanup_needed,
@ -209,9 +212,25 @@ def _migrate_legacy_document_state(
)
def _migrate_retired_plugin_state(
migrated: list[str],
warnings: list[str],
errors: list[str],
) -> None:
state_migration.migrate_retired_state_tree(
source=RETIRED_STATE_DIR,
destination=STATE_DIR,
owner="Office",
migrated=migrated,
warnings=warnings,
errors=errors,
)
def _ensure_desktop_runtime_compat(
installed: list[str],
removed: list[str],
migrated: list[str],
warnings: list[str],
errors: list[str],
) -> None:
@ -239,6 +258,7 @@ def _ensure_desktop_runtime_compat(
return
installed.extend(str(item) for item in result.get("installed") or [])
removed.extend(str(item) for item in result.get("removed") or [])
migrated.extend(str(item) for item in result.get("migrated") or [])
warnings.extend(str(item) for item in result.get("warnings") or [])
errors.extend(str(item) for item in result.get("errors") or [])

View file

@ -184,8 +184,8 @@ def test_desktop_plugin_owns_routes_runtime_surface_and_state_paths():
assert 'data-surface-id="desktop"' in desktop_main
assert "virtual_desktop.session_url" in desktop_session
assert 'owner="desktop"' in desktop_session
assert 'STATE_DIR = Path(files.get_abs_path("usr", "_desktop"))' in desktop_session
assert 'STATE_DIR = BASE_DIR / "usr" / "_desktop"' in desktop_state
assert 'STATE_DIR = Path(files.get_abs_path("usr", "plugins", PLUGIN_NAME))' in desktop_session
assert 'STATE_DIR = BASE_DIR / "usr" / "plugins" / PLUGIN_NAME' in desktop_state
assert "> x-component > div[x-data] > .office-panel" in desktop_web_panel
assert ".office-state-line > span:not(.material-symbols-outlined)" in desktop_web_panel
@ -201,7 +201,7 @@ def test_plugin_owned_runtime_state_paths_are_declared():
docker_playwright = read("docker", "run", "fs", "ins", "install_playwright.sh")
assert 'PLUGIN_NAME = "_office"' in office_documents
assert 'STATE_DIR = Path(files.get_abs_path("usr", PLUGIN_NAME, "documents"))' in office_documents
assert 'STATE_DIR = Path(files.get_abs_path("usr", "plugins", PLUGIN_NAME, "documents"))' in office_documents
assert 'PLAYWRIGHT_CACHE_DIR = ("tmp", "playwright")' in browser_playwright
assert '"usr", "plugins", "_browser", "playwright"' in browser_playwright
assert "Path(files.get_abs_path(*PLAYWRIGHT_CACHE_DIR))" in browser_playwright
@ -335,8 +335,8 @@ def test_office_and_desktop_skills_are_rehomed_and_renamed():
desktopctl = (desktop_skills / "linux-desktop" / "scripts" / "desktopctl.sh").read_text(encoding="utf-8")
assert "/a0/plugins/_desktop/skills/linux-desktop/scripts/desktopctl.sh" in desktop_skill
assert "Open in Desktop action" in desktop_skill
assert "$BASE_DIR/usr/_desktop/profiles/$SESSION" in desktopctl
assert "$BASE_DIR/usr/_desktop/sessions/$SESSION.json" in desktopctl
assert "$BASE_DIR/usr/plugins/_desktop/profiles/$SESSION" in desktopctl
assert "$BASE_DIR/usr/plugins/_desktop/sessions/$SESSION.json" in desktopctl
def test_skill_catalog_and_connector_boundaries_are_static_guarded():

View file

@ -189,6 +189,53 @@ def test_desktop_state_screenshot_capture_uses_xwd_and_pillow_when_available(tmp
assert not (tmp_path / "shot.xwd").exists()
def test_desktop_state_default_screenshot_path_is_context_scoped(tmp_path, monkeypatch):
monkeypatch.setattr(desktop_state, "SCREENSHOT_DIR", tmp_path)
capabilities = {"xwd": "/usr/bin/xwd"}
env = {"DISPLAY": ":120"}
def fake_run(command, *, env, timeout):
raw_path = Path(command[command.index("-out") + 1])
raw_path.write_bytes(b"xwd")
return _completed(command)
image_module = types.ModuleType("PIL.Image")
class FakeImage:
width = 320
height = 240
def __enter__(self):
return self
def __exit__(self, *_args):
return False
def save(self, target):
Path(target).write_bytes(b"png")
image_module.open = lambda _path: FakeImage()
pil_module = types.ModuleType("PIL")
pil_module.Image = image_module
monkeypatch.setattr(desktop_state, "run", fake_run)
monkeypatch.setitem(sys.modules, "PIL", pil_module)
monkeypatch.setitem(sys.modules, "PIL.Image", image_module)
screenshot = desktop_state.capture_screenshot(
env,
capabilities,
errors=[],
context_id="ctx/id",
)
path = Path(screenshot["path"])
assert screenshot["ok"] is True
assert path.parent == tmp_path / "ctx_id"
assert path.name.startswith("desktop-")
assert desktop_state.latest_screenshot(context_id="ctx/id")["path"] == str(path)
def test_xwd_fallback_parser_handles_truecolor_pixels(tmp_path, monkeypatch):
raw_path = tmp_path / "shot.xwd"
target = tmp_path / "shot.png"

View file

@ -607,12 +607,12 @@ def test_office_session_desktop_state_action_defaults_without_screenshot(monkeyp
calls = []
class FakeManager:
def state(self, *, include_screenshot=False):
calls.append(include_screenshot)
def state(self, *, include_screenshot=False, context_id=""):
calls.append((include_screenshot, context_id))
return {
"ok": True,
"display": ":120",
"profile_dir": "/a0/usr/_desktop/profiles/agent-zero-desktop",
"profile_dir": "/a0/usr/plugins/_desktop/profiles/agent-zero-desktop",
"size": {"width": 1440, "height": 900},
"pointer": {"x": 0, "y": 0, "screen": 0, "window": 0},
"active_window": None,
@ -633,7 +633,7 @@ def test_office_session_desktop_state_action_defaults_without_screenshot(monkeyp
assert default_result["ok"] is True
assert screenshot_result["ok"] is True
assert calls == [False, True]
assert calls == [(False, ""), (True, "")]
monkeypatch.delitem(sys.modules, "plugins._office.api.office_session", raising=False)
api_package = sys.modules.get("plugins._office.api")
if api_package is not None:
@ -1135,11 +1135,17 @@ def test_desktop_session_removes_stale_lock_file(tmp_path):
def _isolate_office_cleanup_hook(monkeypatch, tmp_path):
state_dir = tmp_path / "usr" / "plugins" / "_office"
retired_state_dir = tmp_path / "usr" / "_office"
monkeypatch.setattr(hooks, "RETIRED_WEB_APT_SOURCE_FILE", tmp_path / "missing.sources")
monkeypatch.setattr(hooks, "RETIRED_WEB_APT_KEYRING_FILE", tmp_path / "missing.gpg")
monkeypatch.setattr(hooks, "RETIRED_WEB_SUPERVISOR_FILE", tmp_path / "missing.conf")
monkeypatch.setattr(hooks, "RETIRED_WEB_RUNTIME_DIRS", [])
monkeypatch.setattr(hooks, "CLEANUP_MARKER", tmp_path / "state" / "cleanup.done")
monkeypatch.setattr(hooks, "STATE_DIR", state_dir)
monkeypatch.setattr(hooks, "RETIRED_STATE_DIR", retired_state_dir)
monkeypatch.setattr(hooks, "DOCUMENT_STATE_DIR", state_dir / "documents")
monkeypatch.setattr(hooks, "LEGACY_DOCUMENT_STATE_DIRS", [])
monkeypatch.setattr(hooks, "CLEANUP_MARKER", state_dir / "cleanup.done")
monkeypatch.setattr(hooks, "_installed_retired_web_packages", lambda: [])
monkeypatch.setattr(hooks, "_installed_packages", lambda packages: [])
monkeypatch.setattr(hooks, "_kill_old_processes", lambda errors: None)
@ -1148,10 +1154,31 @@ def _isolate_office_cleanup_hook(monkeypatch, tmp_path):
monkeypatch.setattr(hooks.shutil, "which", lambda name: "")
def test_cleanup_hook_moves_retired_office_state_to_plugin_state(tmp_path, monkeypatch):
_isolate_office_cleanup_hook(monkeypatch, tmp_path)
retired_state = tmp_path / "usr" / "_office"
plugin_state = tmp_path / "usr" / "plugins" / "_office"
(retired_state / "documents" / "backups").mkdir(parents=True)
(retired_state / "documents" / "documents.sqlite3").write_text("db\n", encoding="utf-8")
(retired_state / "documents" / "backups" / "draft.md").write_text("backup\n", encoding="utf-8")
(retired_state / "stale-cleanup-v3.done").write_text("ok\n", encoding="utf-8")
plugin_state.mkdir(parents=True)
monkeypatch.setattr(hooks, "_ensure_desktop_runtime_compat", lambda installed, removed, migrated, warnings, errors: None)
result = hooks.cleanup_stale_runtime_state(force=True)
assert result["ok"] is True
assert (plugin_state / "documents" / "documents.sqlite3").read_text(encoding="utf-8") == "db\n"
assert (plugin_state / "documents" / "backups" / "draft.md").read_text(encoding="utf-8") == "backup\n"
assert (plugin_state / "stale-cleanup-v3.done").read_text(encoding="utf-8") == "ok\n"
assert not retired_state.exists()
def test_cleanup_hook_migrates_legacy_document_state_without_removing_source(tmp_path, monkeypatch):
_isolate_office_cleanup_hook(monkeypatch, tmp_path)
legacy_documents = tmp_path / "usr" / "plugins" / "_office" / "documents"
document_state = tmp_path / "usr" / "_office" / "documents"
legacy_documents = tmp_path / "usr" / "state" / "_office" / "documents"
document_state = tmp_path / "usr" / "plugins" / "_office" / "documents"
legacy_documents.mkdir(parents=True)
(legacy_documents / "documents.sqlite3").write_text("legacy-db\n", encoding="utf-8")
(legacy_documents / "backups").mkdir()
@ -1159,7 +1186,7 @@ def test_cleanup_hook_migrates_legacy_document_state_without_removing_source(tmp
monkeypatch.setattr(hooks, "DOCUMENT_STATE_DIR", document_state)
monkeypatch.setattr(hooks, "LEGACY_DOCUMENT_STATE_DIRS", [legacy_documents])
monkeypatch.setattr(hooks, "_ensure_desktop_runtime_compat", lambda installed, removed, warnings, errors: None)
monkeypatch.setattr(hooks, "_ensure_desktop_runtime_compat", lambda installed, removed, migrated, warnings, errors: None)
result = hooks.cleanup_stale_runtime_state(force=True)
@ -1173,7 +1200,7 @@ def test_cleanup_hook_migrates_legacy_document_state_without_removing_source(tmp
def test_cleanup_hook_prefers_existing_new_document_state_without_merge(tmp_path, monkeypatch):
_isolate_office_cleanup_hook(monkeypatch, tmp_path)
legacy_documents = tmp_path / "legacy-documents"
document_state = tmp_path / "usr" / "_office" / "documents"
document_state = tmp_path / "usr" / "plugins" / "_office" / "documents"
legacy_documents.mkdir(parents=True)
document_state.mkdir(parents=True)
(legacy_documents / "documents.sqlite3").write_text("legacy-db\n", encoding="utf-8")
@ -1181,7 +1208,7 @@ def test_cleanup_hook_prefers_existing_new_document_state_without_merge(tmp_path
monkeypatch.setattr(hooks, "DOCUMENT_STATE_DIR", document_state)
monkeypatch.setattr(hooks, "LEGACY_DOCUMENT_STATE_DIRS", [legacy_documents])
monkeypatch.setattr(hooks, "_ensure_desktop_runtime_compat", lambda installed, removed, warnings, errors: None)
monkeypatch.setattr(hooks, "_ensure_desktop_runtime_compat", lambda installed, removed, migrated, warnings, errors: None)
result = hooks.cleanup_stale_runtime_state(force=True)
@ -1201,19 +1228,22 @@ def test_office_hook_desktop_compat_forwards_runtime_result(monkeypatch):
lambda: {
"installed": ["xpra-server"],
"removed": ["firefox-esr"],
"migrated": ["desktop state"],
"warnings": ["desktop warning"],
"errors": ["desktop error"],
},
)
installed = []
removed = []
migrated = []
warnings = []
errors = []
hooks._ensure_desktop_runtime_compat(installed, removed, warnings, errors)
hooks._ensure_desktop_runtime_compat(installed, removed, migrated, warnings, errors)
assert installed == ["xpra-server"]
assert removed == ["firefox-esr"]
assert migrated == ["desktop state"]
assert warnings == ["desktop warning"]
assert errors == ["desktop error"]
@ -1266,14 +1296,15 @@ def test_installed_retired_web_packages_discovers_collabora_split_packages(monke
def test_cleanup_hook_delegates_desktop_runtime_for_legacy_self_update(tmp_path, monkeypatch):
_isolate_office_cleanup_hook(monkeypatch, tmp_path)
monkeypatch.setattr(hooks, "DOCUMENT_STATE_DIR", tmp_path / "usr" / "_office" / "documents")
monkeypatch.setattr(hooks, "DOCUMENT_STATE_DIR", tmp_path / "usr" / "plugins" / "_office" / "documents")
monkeypatch.setattr(hooks, "LEGACY_DOCUMENT_STATE_DIRS", [])
calls = []
def fake_desktop_compat(installed, removed, warnings, errors):
def fake_desktop_compat(installed, removed, migrated, warnings, errors):
calls.append("desktop")
installed.append("xpra-server")
removed.append("firefox-esr")
migrated.append("desktop state migrated")
warnings.append("desktop runtime prepared through office compatibility hook")
monkeypatch.setattr(hooks, "_ensure_desktop_runtime_compat", fake_desktop_compat)
@ -1283,6 +1314,7 @@ def test_cleanup_hook_delegates_desktop_runtime_for_legacy_self_update(tmp_path,
assert calls == ["desktop"]
assert result["installed"] == ["xpra-server"]
assert result["removed"] == ["firefox-esr"]
assert result["migrated"] == ["desktop state migrated"]
assert result["warnings"] == ["desktop runtime prepared through office compatibility hook"]
@ -1310,12 +1342,12 @@ def test_cleanup_hook_removes_stale_runtime_state_idempotently(tmp_path, monkeyp
monkeypatch.setattr(hooks, "RETIRED_WEB_SUPERVISOR_FILE", supervisor)
monkeypatch.setattr(hooks, "RETIRED_WEB_RUNTIME_DIRS", [runtime_dir, legacy_cool_dir, legacy_collabora_dir])
monkeypatch.setattr(hooks, "CLEANUP_MARKER", marker)
monkeypatch.setattr(hooks, "DOCUMENT_STATE_DIR", tmp_path / "usr" / "_office" / "documents")
monkeypatch.setattr(hooks, "DOCUMENT_STATE_DIR", tmp_path / "usr" / "plugins" / "_office" / "documents")
monkeypatch.setattr(hooks, "LEGACY_DOCUMENT_STATE_DIRS", [])
monkeypatch.setattr(hooks, "_installed_retired_web_packages", lambda: [])
monkeypatch.setattr(hooks, "_installed_packages", lambda packages: [])
monkeypatch.setattr(hooks, "_kill_old_processes", lambda errors: None)
monkeypatch.setattr(hooks, "_ensure_desktop_runtime_compat", lambda installed, removed, warnings, errors: None)
monkeypatch.setattr(hooks, "_ensure_desktop_runtime_compat", lambda installed, removed, migrated, warnings, errors: None)
def fake_ensure(installed, errors):
assert not source.exists()
@ -1397,7 +1429,7 @@ def test_cleanup_hook_reruns_when_stale_packages_exist_after_old_marker(tmp_path
monkeypatch.setattr(hooks, "RETIRED_WEB_SUPERVISOR_FILE", tmp_path / "missing.conf")
monkeypatch.setattr(hooks, "RETIRED_WEB_RUNTIME_DIRS", [])
monkeypatch.setattr(hooks, "CLEANUP_MARKER", marker)
monkeypatch.setattr(hooks, "DOCUMENT_STATE_DIR", tmp_path / "usr" / "_office" / "documents")
monkeypatch.setattr(hooks, "DOCUMENT_STATE_DIR", tmp_path / "usr" / "plugins" / "_office" / "documents")
monkeypatch.setattr(hooks, "LEGACY_DOCUMENT_STATE_DIRS", [])
retired_web_packages = [
"coolwsd",
@ -1409,7 +1441,7 @@ def test_cleanup_hook_reruns_when_stale_packages_exist_after_old_marker(tmp_path
monkeypatch.setattr(hooks, "_installed_packages", lambda packages: [])
monkeypatch.setattr(hooks, "_ensure_runtime_dependencies", lambda installed, errors: None)
monkeypatch.setattr(hooks, "_kill_old_processes", lambda errors: None)
monkeypatch.setattr(hooks, "_ensure_desktop_runtime_compat", lambda installed, removed, warnings, errors: None)
monkeypatch.setattr(hooks, "_ensure_desktop_runtime_compat", lambda installed, removed, migrated, warnings, errors: None)
def fake_purge(removed, errors, **kwargs):
removed.extend(kwargs["installed_packages"])
@ -1433,12 +1465,12 @@ def test_cleanup_hook_removes_retired_supervisor_program_after_marker(tmp_path,
monkeypatch.setattr(hooks, "RETIRED_WEB_SUPERVISOR_FILE", tmp_path / "missing.conf")
monkeypatch.setattr(hooks, "RETIRED_WEB_RUNTIME_DIRS", [])
monkeypatch.setattr(hooks, "CLEANUP_MARKER", marker)
monkeypatch.setattr(hooks, "DOCUMENT_STATE_DIR", tmp_path / "usr" / "_office" / "documents")
monkeypatch.setattr(hooks, "DOCUMENT_STATE_DIR", tmp_path / "usr" / "plugins" / "_office" / "documents")
monkeypatch.setattr(hooks, "LEGACY_DOCUMENT_STATE_DIRS", [])
monkeypatch.setattr(hooks, "_installed_retired_web_packages", lambda: [])
monkeypatch.setattr(hooks, "_installed_packages", lambda packages: [])
monkeypatch.setattr(hooks, "_ensure_runtime_dependencies", lambda installed, errors: None)
monkeypatch.setattr(hooks, "_ensure_desktop_runtime_compat", lambda installed, removed, warnings, errors: None)
monkeypatch.setattr(hooks, "_ensure_desktop_runtime_compat", lambda installed, removed, migrated, warnings, errors: None)
monkeypatch.setattr(hooks.shutil, "which", lambda name: "/usr/bin/supervisorctl" if name == "supervisorctl" else "")
def fake_supervisorctl(*args):
@ -1515,6 +1547,32 @@ def test_desktop_runtime_packages_include_libreoffice_for_desktop_status():
assert "libreoffice-impress" in desktop_hooks.RUNTIME_PACKAGES
def test_desktop_cleanup_moves_retired_state_to_plugin_state(tmp_path, monkeypatch):
retired_state = tmp_path / "usr" / "_desktop"
plugin_state = tmp_path / "usr" / "plugins" / "_desktop"
(retired_state / "profiles" / "agent-zero-desktop").mkdir(parents=True)
(retired_state / "profiles" / "agent-zero-desktop" / "profile.txt").write_text("profile\n", encoding="utf-8")
(retired_state / "sessions").mkdir()
(retired_state / "sessions" / "agent-zero-desktop.json").write_text("{}\n", encoding="utf-8")
(retired_state / "screenshots").mkdir()
(retired_state / "screenshots" / "desktop.png").write_bytes(b"png")
plugin_state.mkdir(parents=True)
monkeypatch.setattr(desktop_hooks, "STATE_DIR", plugin_state)
monkeypatch.setattr(desktop_hooks, "RETIRED_STATE_DIR", retired_state)
monkeypatch.setattr(desktop_hooks, "_installed_packages", lambda packages: [])
monkeypatch.setattr(desktop_hooks, "_ensure_runtime_dependencies", lambda installed, errors: None)
monkeypatch.setattr(desktop_hooks, "_cleanup_desktop_sessions", lambda errors: None)
result = desktop_hooks.cleanup_stale_runtime_state(force=True)
assert result["ok"] is True
assert (plugin_state / "profiles" / "agent-zero-desktop" / "profile.txt").read_text(encoding="utf-8") == "profile\n"
assert (plugin_state / "sessions" / "agent-zero-desktop.json").read_text(encoding="utf-8") == "{}\n"
assert (plugin_state / "screenshots" / "default" / "desktop.png").read_bytes() == b"png"
assert not retired_state.exists()
def test_cleanup_hook_installs_missing_desktop_session_dependencies(monkeypatch):
calls = []
installed_state = {"xpra": False}