agent-zero/plugins/_editor/api/editor_session.py
Alessandro 89901b64f0
Some checks are pending
Build And Publish Docker Images / plan (push) Waiting to run
Build And Publish Docker Images / build (push) Blocked by required conditions
Polish native Markdown editor experience
Expand the dedicated Editor surface with safe rendered preview mode, ACE-backed source editing, browser-style tabs, toolbar/file actions, preview search, and richer Markdown rendering for code blocks, task lists, images, tables, math, local links, and footnotes.

Keep open Markdown files synchronized with the active context and saved tool edits, including live refresh for document_artifact and text_editor results without routing Markdown through Desktop/Office.

Add inline preview-page editing, clickable preview task-list checkboxes, source editor rehydration after preview-mode refreshes, and regression coverage for the new editor wiring and sync behavior.
2026-05-15 04:47:24 +02:00

159 lines
6.4 KiB
Python

from __future__ import annotations
from helpers.api import ApiHandler, Request
from plugins._editor.helpers import markdown_sessions
from plugins._office.helpers import document_store
class EditorSession(ApiHandler):
async def process(self, input: dict, request: Request) -> dict:
action = str(input.get("action") or "open").lower().strip()
context_id = str(input.get("ctxid") or input.get("context_id") or "").strip()
if action == "status":
return {
"ok": True,
"open_files": markdown_sessions.get_manager().list_open(context_id=context_id),
}
if action == "home":
return {"ok": True, "path": document_store.default_open_path(context_id)}
if action == "list":
return {
"ok": True,
"open_files": markdown_sessions.get_manager().list_open(
context_id=context_id,
limit=int(input.get("limit") or 20),
),
}
if action == "activate":
return markdown_sessions.get_manager().activate(str(input.get("session_id") or ""))
if action == "close":
closed = markdown_sessions.get_manager().close(str(input.get("session_id") or ""))
store_session_id = str(input.get("store_session_id") or "").strip()
file_id = str(input.get("file_id") or "").strip()
closed["store_closed"] = document_store.close_session(
session_id=store_session_id,
file_id="" if store_session_id else file_id,
)
return closed
if action == "create":
fmt = str(input.get("format") or "md").lower().lstrip(".")
if fmt != "md":
return {"ok": False, "error": "Editor can only create Markdown documents."}
try:
doc = document_store.create_document(
kind="document",
title=str(input.get("title") or "Untitled"),
fmt="md",
content=str(input.get("content") or ""),
path=str(input.get("path") or ""),
context_id=context_id,
)
except ValueError as exc:
return {"ok": False, "error": str(exc)}
return await self._open_document(doc, input, request, context_id=context_id)
if action == "open":
file_id = str(input.get("file_id") or "").strip()
try:
doc = (
document_store.get_document(file_id)
if file_id
else document_store.register_document(str(input.get("path") or ""), context_id=context_id)
)
except Exception as exc:
return {"ok": False, "error": str(exc)}
return await self._open_document(doc, input, request, context_id=context_id)
if action == "save":
session_id = str(input.get("session_id") or "").strip()
if not session_id:
return {"ok": False, "error": "session_id is required."}
return markdown_sessions.get_manager().save(session_id, text=input.get("text"))
if action == "renamed":
return self._renamed(input, context_id)
if action == "refresh":
return markdown_sessions.get_manager().refresh_document(str(input.get("file_id") or ""))
return {"ok": False, "error": f"Unsupported editor session action: {action}"}
async def _open_document(
self,
doc: dict,
input: dict,
request: Request,
context_id: str = "",
) -> dict:
if str(doc.get("extension") or "").lower() != "md":
return {
"ok": False,
"error": f".{doc.get('extension', '')} documents use the Desktop surface.",
"requires_desktop": True,
"document": _public_doc(doc),
}
mode = "edit" if str(input.get("mode") or "edit").lower() == "edit" else "view"
store_session = document_store.create_session(
doc["file_id"],
user_id=str(input.get("user_id") or "agent-zero-user"),
permission="write" if mode == "edit" else "read",
origin=self._origin(request),
)
try:
editor = markdown_sessions.get_manager().open(
doc,
sid="",
context_id=context_id,
refresh=input.get("refresh") is True,
)
except ValueError as exc:
document_store.close_session(session_id=store_session["session_id"])
return {"ok": False, "error": str(exc)}
return {
**editor,
"store_session_id": store_session["session_id"],
"session_id": editor["session_id"],
"mode": mode,
}
def _renamed(self, input: dict, context_id: str = "") -> dict:
file_id = str(input.get("file_id") or "").strip()
path = str(input.get("path") or "").strip()
if not file_id:
return {"ok": False, "error": "file_id is required."}
if not path:
return {"ok": False, "error": "path is required."}
try:
updated = document_store.rename_document(
file_id,
path,
content=input.get("text") if "text" in input else None,
context_id=context_id,
)
markdown_sessions.get_manager().renamed(
file_id,
updated,
text=input.get("text") if "text" in input else None,
)
except Exception as exc:
return {"ok": False, "error": str(exc)}
return {
"ok": True,
"document": _public_doc(updated),
"version": document_store.item_version(updated),
"refreshFiles": False,
}
def _origin(self, request: Request) -> str:
origin = request.headers.get("Origin") or request.host_url.rstrip("/")
return origin.rstrip("/")
def _public_doc(doc: dict) -> dict:
return {
"file_id": doc["file_id"],
"path": document_store.display_path(doc["path"]),
"basename": doc["basename"],
"title": doc["basename"],
"extension": doc["extension"],
"size": doc["size"],
"version": document_store.item_version(doc),
"last_modified": doc["last_modified"],
}