agent-zero/plugins/_editor/helpers/markdown_sessions.py
Alessandro 89901b64f0
Some checks are pending
Build And Publish Docker Images / plan (push) Waiting to run
Build And Publish Docker Images / build (push) Blocked by required conditions
Polish native Markdown editor experience
Expand the dedicated Editor surface with safe rendered preview mode, ACE-backed source editing, browser-style tabs, toolbar/file actions, preview search, and richer Markdown rendering for code blocks, task lists, images, tables, math, local links, and footnotes.

Keep open Markdown files synchronized with the active context and saved tool edits, including live refresh for document_artifact and text_editor results without routing Markdown through Desktop/Office.

Add inline preview-page editing, clickable preview task-list checkboxes, source editor rehydration after preview-mode refreshes, and regression coverage for the new editor wiring and sync behavior.
2026-05-15 04:47:24 +02:00

279 lines
11 KiB
Python

from __future__ import annotations
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from plugins._office.helpers import document_store
@dataclass
class MarkdownSession:
session_id: str
file_id: str
sid: str
context_id: str
extension: str
path: str
title: str
text: str = ""
dirty: bool = False
active: bool = False
opened_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
last_active_at: float = field(default_factory=time.time)
class MarkdownSessionManager:
"""Owns native Editor sessions for Markdown documents."""
def __init__(self) -> None:
self._sessions: dict[str, MarkdownSession] = {}
self._active_by_context: dict[str, str] = {}
def open(self, doc: dict[str, Any], sid: str = "", context_id: str = "", refresh: bool = False) -> dict[str, Any]:
ext = str(doc["extension"]).lower()
if ext != "md":
raise ValueError(f"Editor is only available for Markdown. Open .{ext} files in the Desktop.")
normalized_context = str(context_id or "")
if refresh:
try:
doc = document_store.register_document(doc["path"], context_id=normalized_context)
except Exception:
pass
for session in self._sessions.values():
if session.file_id != doc["file_id"] or session.context_id != normalized_context:
continue
if sid:
session.sid = sid
if refresh and not session.dirty:
session.text = document_store.read_text_for_editor(doc)
session.dirty = False
session.path = doc["path"]
session.title = doc["basename"]
session.updated_at = time.time()
self.activate(session.session_id)
return self._payload(session, doc)
session = MarkdownSession(
session_id=uuid.uuid4().hex,
file_id=doc["file_id"],
sid=sid,
context_id=normalized_context,
extension=ext,
path=doc["path"],
title=doc["basename"],
text=document_store.read_text_for_editor(doc),
)
self._sessions[session.session_id] = session
self.activate(session.session_id)
return self._payload(session, doc)
def input(self, session_id: str, text: str | None = None, patch: dict[str, Any] | None = None) -> dict[str, Any]:
session = self._require(session_id)
if text is not None:
session.text = str(text)
elif patch:
session.text = _apply_text_patch(session.text, patch)
session.dirty = True
session.updated_at = time.time()
self.activate(session.session_id)
return {"ok": True, "session_id": session.session_id}
def save(self, session_id: str, text: str | None = None) -> dict[str, Any]:
session = self._require(session_id)
if text is not None:
session.text = str(text)
updated = document_store.write_markdown(session.file_id, session.text)
session.updated_at = time.time()
session.dirty = False
session.path = updated["path"]
session.title = updated["basename"]
self._refresh_file_sessions(updated, text=session.text, dirty=False)
return {
"ok": True,
"document": _public_doc(updated),
"version": document_store.item_version(updated),
}
def activate(self, session_id: str) -> dict[str, Any]:
session = self._require(session_id)
now = time.time()
previous_id = self._active_by_context.get(session.context_id)
if previous_id and previous_id in self._sessions:
self._sessions[previous_id].active = False
session.active = True
session.last_active_at = now
session.updated_at = now
self._active_by_context[session.context_id] = session.session_id
return {"ok": True, "session_id": session.session_id}
def renamed(self, file_id: str, doc: dict[str, Any], text: str | None = None) -> dict[str, Any]:
updated = self._refresh_file_sessions(doc, text=text, dirty=False)
return {"ok": True, "updated": updated, "file_id": file_id}
def refresh_document(self, file_id: str) -> dict[str, Any]:
normalized = str(file_id or "").strip()
if not normalized:
return {"ok": True, "refreshed": 0, "sessions": []}
try:
doc = document_store.get_document(normalized)
except Exception:
return {"ok": False, "refreshed": 0, "sessions": []}
if str(doc.get("extension") or "").lower() != "md":
return {"ok": True, "refreshed": 0, "sessions": []}
refreshed = self._refresh_file_sessions(
doc,
text=document_store.read_text_for_editor(doc),
dirty=False,
)
return {"ok": True, "refreshed": len(refreshed), "sessions": refreshed}
def list_open(self, context_id: str = "", limit: int = 20) -> list[dict[str, Any]]:
context_id = str(context_id or "")
sessions = [session for session in self._sessions.values() if session.context_id == context_id]
sessions.sort(key=lambda item: (not item.active, -item.last_active_at, -item.updated_at))
grouped: dict[str, MarkdownSession] = {}
counts: dict[str, int] = {}
for session in sessions:
key = session.file_id or session.path
counts[key] = counts.get(key, 0) + 1
current = grouped.get(key)
if current is None or session.active or session.last_active_at > current.last_active_at:
grouped[key] = session
result = []
for session in grouped.values():
try:
doc = document_store.get_document(session.file_id)
version = document_store.item_version(doc)
size = doc.get("size", 0)
last_modified = doc.get("last_modified", "")
path = document_store.display_path(doc.get("path", session.path))
title = doc.get("basename") or session.title
except Exception:
version = ""
size = 0
last_modified = ""
path = document_store.display_path(session.path)
title = session.title
result.append({
"session_id": session.session_id,
"file_id": session.file_id,
"title": title,
"extension": session.extension,
"path": path,
"version": version,
"size": size,
"last_modified": last_modified,
"dirty": session.dirty,
"active": session.active,
"open_sessions": counts.get(session.file_id or session.path, 1),
"last_active_at": session.last_active_at,
})
result.sort(key=lambda item: (not item["active"], -float(item["last_active_at"] or 0)))
safe_limit = max(1, int(limit or 20))
return result[:safe_limit]
def close(self, session_id: str) -> dict[str, Any]:
session = self._sessions.pop(str(session_id or ""), None)
if not session:
return {"ok": True, "closed": 0}
if self._active_by_context.get(session.context_id) == session.session_id:
replacement = sorted(
[item for item in self._sessions.values() if item.context_id == session.context_id],
key=lambda item: item.last_active_at,
reverse=True,
)
if replacement:
self.activate(replacement[0].session_id)
else:
self._active_by_context.pop(session.context_id, None)
return {"ok": True, "closed": 1, "session_id": session_id}
def close_sid(self, sid: str) -> int:
doomed = [session_id for session_id, session in self._sessions.items() if session.sid == sid]
for session_id in doomed:
self.close(session_id)
return len(doomed)
def _refresh_file_sessions(self, doc: dict[str, Any], text: str | None = None, dirty: bool | None = None) -> list[str]:
file_id = str(doc.get("file_id") or "").strip()
refreshed: list[str] = []
for session in self._sessions.values():
if session.file_id != file_id:
continue
if text is not None:
session.text = str(text)
session.path = doc["path"]
session.title = doc["basename"]
if dirty is not None:
session.dirty = dirty
session.updated_at = time.time()
refreshed.append(session.session_id)
return refreshed
def _payload(self, session: MarkdownSession, doc: dict[str, Any]) -> dict[str, Any]:
return {
"ok": True,
"session_id": session.session_id,
"file_id": session.file_id,
"title": session.title,
"extension": session.extension,
"path": session.path,
"text": session.text,
"dirty": session.dirty,
"active": session.active,
"context_id": session.context_id,
"document": _public_doc(doc),
"version": document_store.item_version(doc),
}
def _require(self, session_id: str) -> MarkdownSession:
normalized = str(session_id or "").strip()
session = self._sessions.get(normalized)
if not session:
raise FileNotFoundError(f"Editor session not found: {normalized}")
return session
def get_manager() -> MarkdownSessionManager:
global _manager
try:
return _manager
except NameError:
_manager = MarkdownSessionManager()
return _manager
def _public_doc(doc: dict[str, Any]) -> dict[str, Any]:
return {
"file_id": doc["file_id"],
"path": document_store.display_path(doc["path"]),
"basename": doc["basename"],
"title": doc["basename"],
"extension": doc["extension"],
"size": doc["size"],
"version": document_store.item_version(doc),
"last_modified": doc["last_modified"],
"exists": Path(doc["path"]).exists(),
}
def _apply_text_patch(text: str, patch: dict[str, Any]) -> str:
if "content" in patch:
return str(patch.get("content") or "")
start = int(patch.get("start") or 0)
end = int(patch.get("end") if patch.get("end") is not None else start)
replacement = str(patch.get("text") or "")
start = max(0, min(len(text), start))
end = max(start, min(len(text), end))
return text[:start] + replacement + text[end:]