mirror of
https://github.com/agent0ai/agent-zero.git
synced 2026-05-17 12:31:20 +00:00
Expand the dedicated Editor surface with safe rendered preview mode, ACE-backed source editing, browser-style tabs, toolbar/file actions, preview search, and richer Markdown rendering for code blocks, task lists, images, tables, math, local links, and footnotes. Keep open Markdown files synchronized with the active context and saved tool edits, including live refresh for document_artifact and text_editor results without routing Markdown through Desktop/Office. Add inline preview-page editing, clickable preview task-list checkboxes, source editor rehydration after preview-mode refreshes, and regression coverage for the new editor wiring and sync behavior.
279 lines
11 KiB
Python
279 lines
11 KiB
Python
from __future__ import annotations
|
|
|
|
import time
|
|
import uuid
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from plugins._office.helpers import document_store
|
|
|
|
|
|
@dataclass
|
|
class MarkdownSession:
|
|
session_id: str
|
|
file_id: str
|
|
sid: str
|
|
context_id: str
|
|
extension: str
|
|
path: str
|
|
title: str
|
|
text: str = ""
|
|
dirty: bool = False
|
|
active: bool = False
|
|
opened_at: float = field(default_factory=time.time)
|
|
updated_at: float = field(default_factory=time.time)
|
|
last_active_at: float = field(default_factory=time.time)
|
|
|
|
|
|
class MarkdownSessionManager:
|
|
"""Owns native Editor sessions for Markdown documents."""
|
|
|
|
def __init__(self) -> None:
|
|
self._sessions: dict[str, MarkdownSession] = {}
|
|
self._active_by_context: dict[str, str] = {}
|
|
|
|
def open(self, doc: dict[str, Any], sid: str = "", context_id: str = "", refresh: bool = False) -> dict[str, Any]:
|
|
ext = str(doc["extension"]).lower()
|
|
if ext != "md":
|
|
raise ValueError(f"Editor is only available for Markdown. Open .{ext} files in the Desktop.")
|
|
|
|
normalized_context = str(context_id or "")
|
|
if refresh:
|
|
try:
|
|
doc = document_store.register_document(doc["path"], context_id=normalized_context)
|
|
except Exception:
|
|
pass
|
|
|
|
for session in self._sessions.values():
|
|
if session.file_id != doc["file_id"] or session.context_id != normalized_context:
|
|
continue
|
|
if sid:
|
|
session.sid = sid
|
|
if refresh and not session.dirty:
|
|
session.text = document_store.read_text_for_editor(doc)
|
|
session.dirty = False
|
|
session.path = doc["path"]
|
|
session.title = doc["basename"]
|
|
session.updated_at = time.time()
|
|
self.activate(session.session_id)
|
|
return self._payload(session, doc)
|
|
|
|
session = MarkdownSession(
|
|
session_id=uuid.uuid4().hex,
|
|
file_id=doc["file_id"],
|
|
sid=sid,
|
|
context_id=normalized_context,
|
|
extension=ext,
|
|
path=doc["path"],
|
|
title=doc["basename"],
|
|
text=document_store.read_text_for_editor(doc),
|
|
)
|
|
self._sessions[session.session_id] = session
|
|
self.activate(session.session_id)
|
|
return self._payload(session, doc)
|
|
|
|
def input(self, session_id: str, text: str | None = None, patch: dict[str, Any] | None = None) -> dict[str, Any]:
|
|
session = self._require(session_id)
|
|
if text is not None:
|
|
session.text = str(text)
|
|
elif patch:
|
|
session.text = _apply_text_patch(session.text, patch)
|
|
session.dirty = True
|
|
session.updated_at = time.time()
|
|
self.activate(session.session_id)
|
|
return {"ok": True, "session_id": session.session_id}
|
|
|
|
def save(self, session_id: str, text: str | None = None) -> dict[str, Any]:
|
|
session = self._require(session_id)
|
|
if text is not None:
|
|
session.text = str(text)
|
|
|
|
updated = document_store.write_markdown(session.file_id, session.text)
|
|
session.updated_at = time.time()
|
|
session.dirty = False
|
|
session.path = updated["path"]
|
|
session.title = updated["basename"]
|
|
self._refresh_file_sessions(updated, text=session.text, dirty=False)
|
|
return {
|
|
"ok": True,
|
|
"document": _public_doc(updated),
|
|
"version": document_store.item_version(updated),
|
|
}
|
|
|
|
def activate(self, session_id: str) -> dict[str, Any]:
|
|
session = self._require(session_id)
|
|
now = time.time()
|
|
previous_id = self._active_by_context.get(session.context_id)
|
|
if previous_id and previous_id in self._sessions:
|
|
self._sessions[previous_id].active = False
|
|
session.active = True
|
|
session.last_active_at = now
|
|
session.updated_at = now
|
|
self._active_by_context[session.context_id] = session.session_id
|
|
return {"ok": True, "session_id": session.session_id}
|
|
|
|
def renamed(self, file_id: str, doc: dict[str, Any], text: str | None = None) -> dict[str, Any]:
|
|
updated = self._refresh_file_sessions(doc, text=text, dirty=False)
|
|
return {"ok": True, "updated": updated, "file_id": file_id}
|
|
|
|
def refresh_document(self, file_id: str) -> dict[str, Any]:
|
|
normalized = str(file_id or "").strip()
|
|
if not normalized:
|
|
return {"ok": True, "refreshed": 0, "sessions": []}
|
|
try:
|
|
doc = document_store.get_document(normalized)
|
|
except Exception:
|
|
return {"ok": False, "refreshed": 0, "sessions": []}
|
|
if str(doc.get("extension") or "").lower() != "md":
|
|
return {"ok": True, "refreshed": 0, "sessions": []}
|
|
|
|
refreshed = self._refresh_file_sessions(
|
|
doc,
|
|
text=document_store.read_text_for_editor(doc),
|
|
dirty=False,
|
|
)
|
|
return {"ok": True, "refreshed": len(refreshed), "sessions": refreshed}
|
|
|
|
def list_open(self, context_id: str = "", limit: int = 20) -> list[dict[str, Any]]:
|
|
context_id = str(context_id or "")
|
|
sessions = [session for session in self._sessions.values() if session.context_id == context_id]
|
|
sessions.sort(key=lambda item: (not item.active, -item.last_active_at, -item.updated_at))
|
|
|
|
grouped: dict[str, MarkdownSession] = {}
|
|
counts: dict[str, int] = {}
|
|
for session in sessions:
|
|
key = session.file_id or session.path
|
|
counts[key] = counts.get(key, 0) + 1
|
|
current = grouped.get(key)
|
|
if current is None or session.active or session.last_active_at > current.last_active_at:
|
|
grouped[key] = session
|
|
|
|
result = []
|
|
for session in grouped.values():
|
|
try:
|
|
doc = document_store.get_document(session.file_id)
|
|
version = document_store.item_version(doc)
|
|
size = doc.get("size", 0)
|
|
last_modified = doc.get("last_modified", "")
|
|
path = document_store.display_path(doc.get("path", session.path))
|
|
title = doc.get("basename") or session.title
|
|
except Exception:
|
|
version = ""
|
|
size = 0
|
|
last_modified = ""
|
|
path = document_store.display_path(session.path)
|
|
title = session.title
|
|
result.append({
|
|
"session_id": session.session_id,
|
|
"file_id": session.file_id,
|
|
"title": title,
|
|
"extension": session.extension,
|
|
"path": path,
|
|
"version": version,
|
|
"size": size,
|
|
"last_modified": last_modified,
|
|
"dirty": session.dirty,
|
|
"active": session.active,
|
|
"open_sessions": counts.get(session.file_id or session.path, 1),
|
|
"last_active_at": session.last_active_at,
|
|
})
|
|
|
|
result.sort(key=lambda item: (not item["active"], -float(item["last_active_at"] or 0)))
|
|
safe_limit = max(1, int(limit or 20))
|
|
return result[:safe_limit]
|
|
|
|
def close(self, session_id: str) -> dict[str, Any]:
|
|
session = self._sessions.pop(str(session_id or ""), None)
|
|
if not session:
|
|
return {"ok": True, "closed": 0}
|
|
if self._active_by_context.get(session.context_id) == session.session_id:
|
|
replacement = sorted(
|
|
[item for item in self._sessions.values() if item.context_id == session.context_id],
|
|
key=lambda item: item.last_active_at,
|
|
reverse=True,
|
|
)
|
|
if replacement:
|
|
self.activate(replacement[0].session_id)
|
|
else:
|
|
self._active_by_context.pop(session.context_id, None)
|
|
return {"ok": True, "closed": 1, "session_id": session_id}
|
|
|
|
def close_sid(self, sid: str) -> int:
|
|
doomed = [session_id for session_id, session in self._sessions.items() if session.sid == sid]
|
|
for session_id in doomed:
|
|
self.close(session_id)
|
|
return len(doomed)
|
|
|
|
def _refresh_file_sessions(self, doc: dict[str, Any], text: str | None = None, dirty: bool | None = None) -> list[str]:
|
|
file_id = str(doc.get("file_id") or "").strip()
|
|
refreshed: list[str] = []
|
|
for session in self._sessions.values():
|
|
if session.file_id != file_id:
|
|
continue
|
|
if text is not None:
|
|
session.text = str(text)
|
|
session.path = doc["path"]
|
|
session.title = doc["basename"]
|
|
if dirty is not None:
|
|
session.dirty = dirty
|
|
session.updated_at = time.time()
|
|
refreshed.append(session.session_id)
|
|
return refreshed
|
|
|
|
def _payload(self, session: MarkdownSession, doc: dict[str, Any]) -> dict[str, Any]:
|
|
return {
|
|
"ok": True,
|
|
"session_id": session.session_id,
|
|
"file_id": session.file_id,
|
|
"title": session.title,
|
|
"extension": session.extension,
|
|
"path": session.path,
|
|
"text": session.text,
|
|
"dirty": session.dirty,
|
|
"active": session.active,
|
|
"context_id": session.context_id,
|
|
"document": _public_doc(doc),
|
|
"version": document_store.item_version(doc),
|
|
}
|
|
|
|
def _require(self, session_id: str) -> MarkdownSession:
|
|
normalized = str(session_id or "").strip()
|
|
session = self._sessions.get(normalized)
|
|
if not session:
|
|
raise FileNotFoundError(f"Editor session not found: {normalized}")
|
|
return session
|
|
|
|
|
|
def get_manager() -> MarkdownSessionManager:
|
|
global _manager
|
|
try:
|
|
return _manager
|
|
except NameError:
|
|
_manager = MarkdownSessionManager()
|
|
return _manager
|
|
|
|
|
|
def _public_doc(doc: dict[str, Any]) -> dict[str, Any]:
|
|
return {
|
|
"file_id": doc["file_id"],
|
|
"path": document_store.display_path(doc["path"]),
|
|
"basename": doc["basename"],
|
|
"title": doc["basename"],
|
|
"extension": doc["extension"],
|
|
"size": doc["size"],
|
|
"version": document_store.item_version(doc),
|
|
"last_modified": doc["last_modified"],
|
|
"exists": Path(doc["path"]).exists(),
|
|
}
|
|
|
|
|
|
def _apply_text_patch(text: str, patch: dict[str, Any]) -> str:
|
|
if "content" in patch:
|
|
return str(patch.get("content") or "")
|
|
start = int(patch.get("start") or 0)
|
|
end = int(patch.get("end") if patch.get("end") is not None else start)
|
|
replacement = str(patch.get("text") or "")
|
|
start = max(0, min(len(text), start))
|
|
end = max(start, min(len(text), end))
|
|
return text[:start] + replacement + text[end:]
|