agent-zero/plugins/_office/helpers/document_store.py
Alessandro 330a0c5790 Split Markdown editor into dedicated surface
Add a builtin _editor plugin that owns Markdown API/WebSocket sessions, canvas and modal UI, live refresh, tabs, prompt Extras for active-context open files, inline close confirmation, and Close All handling.

Route Markdown document artifacts to Editor while keeping Office/Desktop focused on LibreOffice formats, and update Desktop/Office prompts, menus, compatibility shims, and regression coverage.
2026-05-15 02:41:41 +02:00

995 lines
37 KiB
Python

from __future__ import annotations
import csv
import hashlib
import io
import json
import os
import re
import sqlite3
import time
import uuid
import zipfile
from contextlib import contextmanager
from pathlib import Path
from typing import Any
from xml.sax.saxutils import escape
from helpers import files
from plugins._office.helpers import pptx_writer
PLUGIN_NAME = "_office"
OPEN_DOCUMENT_EXTENSIONS = {"odt", "ods", "odp"}
OOXML_EXTENSIONS = {"docx", "xlsx", "pptx"}
SUPPORTED_EXTENSIONS = {"md", *OPEN_DOCUMENT_EXTENSIONS, *OOXML_EXTENSIONS}
DEFAULT_TTL_SECONDS = 8 * 60 * 60
MAX_SAVE_BYTES = 512 * 1024 * 1024
ODF_OFFICE_NS = "urn:oasis:names:tc:opendocument:xmlns:office:1.0"
ODF_TEXT_NS = "urn:oasis:names:tc:opendocument:xmlns:text:1.0"
ODF_TABLE_NS = "urn:oasis:names:tc:opendocument:xmlns:table:1.0"
ODF_DRAW_NS = "urn:oasis:names:tc:opendocument:xmlns:drawing:1.0"
ODF_PRESENTATION_NS = "urn:oasis:names:tc:opendocument:xmlns:presentation:1.0"
ODF_STYLE_NS = "urn:oasis:names:tc:opendocument:xmlns:style:1.0"
ODF_FO_NS = "urn:oasis:names:tc:opendocument:xmlns:xsl-fo-compatible:1.0"
ODF_MANIFEST_NS = "urn:oasis:names:tc:opendocument:xmlns:manifest:1.0"
ODF_VERSION = "1.2"
ODF_MIMETYPES = {
"odt": "application/vnd.oasis.opendocument.text",
"ods": "application/vnd.oasis.opendocument.spreadsheet",
"odp": "application/vnd.oasis.opendocument.presentation",
}
STATE_DIR = Path(files.get_abs_path("usr", "plugins", PLUGIN_NAME, "documents"))
DB_PATH = STATE_DIR / "documents.sqlite3"
BACKUP_DIR = STATE_DIR / "backups"
WORKDIR = Path(files.get_abs_path("usr", "workdir"))
DOCUMENTS_DIR = WORKDIR / "documents"
def now() -> float:
return time.time()
def now_iso() -> str:
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
def ensure_dirs() -> None:
STATE_DIR.mkdir(parents=True, exist_ok=True)
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
def sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def safe_title(title: str, fallback: str = "Document") -> str:
cleaned = "".join(ch if ch.isalnum() or ch in " ._-" else "_" for ch in title).strip(" ._")
return cleaned or fallback
def normalize_extension(value: str) -> str:
ext = value.lower().strip().lstrip(".")
if not ext:
ext = "md"
if ext not in SUPPORTED_EXTENSIONS:
raise ValueError(f"Unsupported document format: {ext}")
return ext
def document_home(context_id: str = "") -> Path:
context_id = str(context_id or "").strip()
if context_id:
try:
from agent import AgentContext
context = AgentContext.get(context_id)
project_helpers = _projects()
project_name = project_helpers.get_context_project_name(context) if context else None
if project_name:
return Path(project_helpers.get_project_folder(project_name)).resolve(strict=False)
except Exception:
pass
configured = str(_settings().get_settings().get("workdir_path") or "").strip()
if configured:
return _path_from_a0(configured).resolve(strict=False)
return WORKDIR.resolve(strict=False)
def document_binary_home(context_id: str = "") -> Path:
if str(context_id or "").strip():
return document_home(context_id) / "documents"
return DOCUMENTS_DIR.resolve(strict=False)
def default_open_path(context_id: str = "") -> str:
return display_path(document_home(context_id))
def display_path(path: str | Path) -> str:
resolved = Path(path).resolve(strict=False)
base = Path(files.get_base_dir()).resolve(strict=False)
if str(base).startswith("/a0"):
return str(resolved)
try:
return "/a0/" + str(resolved.relative_to(base)).lstrip("/")
except ValueError:
return str(path)
def _path_from_a0(path: str | Path) -> Path:
raw = str(path)
if raw.startswith("/a0/") and not files.get_base_dir().startswith("/a0"):
raw = files.get_abs_path(raw.removeprefix("/a0/"))
return Path(raw if os.path.isabs(raw) else files.get_abs_path(raw)).expanduser()
def allowed_roots(context_id: str = "") -> list[Path]:
project_helpers = _projects()
roots = {
WORKDIR.resolve(strict=False),
DOCUMENTS_DIR.resolve(strict=False),
Path(project_helpers.get_projects_parent_folder()).resolve(strict=False),
document_home(context_id).resolve(strict=False),
document_binary_home(context_id).resolve(strict=False),
}
configured = str(_settings().get_settings().get("workdir_path") or "").strip()
if configured:
roots.add(_path_from_a0(configured).resolve(strict=False))
return sorted(roots, key=lambda item: str(item))
def _projects() -> Any:
from helpers import projects
return projects
def _settings() -> Any:
from helpers import settings
return settings
def normalize_path(path: str | Path, context_id: str = "") -> Path:
candidate = _path_from_a0(path)
resolved = candidate.resolve(strict=False)
roots = allowed_roots(context_id)
if not any(_is_relative_to(resolved, root) for root in roots):
raise PermissionError("Document artifacts must stay inside the active project or workdir.")
if candidate.exists():
real = candidate.resolve(strict=True)
if not any(_is_relative_to(real, root) for root in roots):
raise PermissionError("Document artifact symlink escapes the active project or workdir.")
return resolved
def _is_relative_to(path: Path, root: Path) -> bool:
try:
os.path.commonpath([str(path), str(root)])
except ValueError:
return False
return os.path.commonpath([str(path), str(root)]) == str(root)
@contextmanager
def connect() -> Any:
ensure_dirs()
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
init_db(conn)
try:
yield conn
conn.commit()
finally:
conn.close()
def init_db(conn: sqlite3.Connection) -> None:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS documents (
file_id TEXT PRIMARY KEY,
path TEXT NOT NULL UNIQUE,
basename TEXT NOT NULL,
extension TEXT NOT NULL,
owner_id TEXT NOT NULL,
size INTEGER NOT NULL,
version INTEGER NOT NULL,
sha256 TEXT NOT NULL,
last_modified TEXT NOT NULL,
created_at REAL NOT NULL,
updated_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
file_id TEXT NOT NULL,
user_id TEXT NOT NULL,
permission TEXT NOT NULL,
origin TEXT NOT NULL,
created_at REAL NOT NULL,
expires_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS versions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_id TEXT NOT NULL,
version TEXT NOT NULL,
path TEXT NOT NULL,
size INTEGER NOT NULL,
sha256 TEXT NOT NULL,
created_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_id TEXT,
event_type TEXT NOT NULL,
payload TEXT NOT NULL,
created_at REAL NOT NULL
);
"""
)
def register_document(path: str | Path, owner_id: str = "a0", context_id: str = "") -> dict[str, Any]:
resolved = normalize_path(path, context_id=context_id)
if not resolved.exists():
raise FileNotFoundError(str(resolved))
ext = normalize_extension(resolved.suffix.lstrip("."))
data = resolved.read_bytes()
digest = sha256_bytes(data)
stat = resolved.stat()
current_time = now()
with connect() as conn:
row = conn.execute("SELECT * FROM documents WHERE path = ?", (str(resolved),)).fetchone()
if row:
conn.execute(
"""
UPDATE documents
SET basename=?, extension=?, size=?, sha256=?, last_modified=?, updated_at=?
WHERE file_id=?
""",
(resolved.name, ext, stat.st_size, digest, now_iso(), current_time, row["file_id"]),
)
return get_document(row["file_id"], conn=conn)
file_id = uuid.uuid4().hex
conn.execute(
"""
INSERT INTO documents
(file_id, path, basename, extension, owner_id, size, version, sha256, last_modified, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(file_id, str(resolved), resolved.name, ext, owner_id, stat.st_size, 1, digest, now_iso(), current_time, current_time),
)
_record_version(conn, file_id, resolved, "1", data)
return get_document(file_id, conn=conn)
def get_document(file_id: str, conn: sqlite3.Connection | None = None) -> dict[str, Any]:
def _fetch(active: sqlite3.Connection) -> dict[str, Any]:
row = active.execute("SELECT * FROM documents WHERE file_id = ?", (file_id,)).fetchone()
if not row:
raise FileNotFoundError(file_id)
return dict(row)
if conn is not None:
return _fetch(conn)
with connect() as active:
return _fetch(active)
def update_document_path(file_id: str, path: str | Path, context_id: str = "") -> dict[str, Any]:
resolved = normalize_path(path, context_id=context_id)
if not resolved.exists():
raise FileNotFoundError(str(resolved))
ext = normalize_extension(resolved.suffix.lstrip("."))
data = resolved.read_bytes()
digest = sha256_bytes(data)
stat = resolved.stat()
changed_at = now()
with connect() as conn:
doc = get_document(file_id, conn=conn)
row = conn.execute("SELECT file_id FROM documents WHERE path = ?", (str(resolved),)).fetchone()
if row and row["file_id"] != file_id:
raise ValueError(f"Document path is already registered: {display_path(resolved)}")
conn.execute(
"""
UPDATE documents
SET path=?, basename=?, extension=?, size=?, sha256=?, last_modified=?, updated_at=?
WHERE file_id=?
""",
(str(resolved), resolved.name, ext, stat.st_size, digest, now_iso(), changed_at, file_id),
)
conn.execute(
"INSERT INTO events (file_id, event_type, payload, created_at) VALUES (?, ?, ?, ?)",
(
file_id,
"renamed",
json.dumps({"from": display_path(doc["path"]), "to": display_path(resolved)}),
changed_at,
),
)
return get_document(file_id, conn=conn)
def rename_document(
file_id: str,
path: str | Path,
content: str | None = None,
context_id: str = "",
) -> dict[str, Any]:
resolved = normalize_path(path, context_id=context_id)
ext = normalize_extension(resolved.suffix.lstrip("."))
data = None
if content is not None:
if ext != "md":
raise ValueError("Inline content can only be provided for Markdown documents.")
data = str(content or "").encode("utf-8")
if len(data) > MAX_SAVE_BYTES:
raise OverflowError("Document save exceeds maximum size")
changed_at = now()
with connect() as conn:
doc = get_document(file_id, conn=conn)
source = Path(doc["path"])
source_resolved = source.resolve(strict=False)
changed_path = str(source_resolved) != str(resolved)
source_exists = source.exists()
if ext != str(doc["extension"]).lower():
raise ValueError("Document extension cannot change during rename.")
row = conn.execute("SELECT file_id FROM documents WHERE path = ?", (str(resolved),)).fetchone()
if row and row["file_id"] != file_id:
raise ValueError(f"Document path is already registered: {display_path(resolved)}")
if changed_path and resolved.exists():
raise FileExistsError(f"Target already exists: {display_path(resolved)}")
if not source_exists and data is None:
raise FileNotFoundError(str(source_resolved))
previous = source.read_bytes() if source_exists else b""
content_changed = data is not None and data != previous
if changed_path and data is None:
resolved.parent.mkdir(parents=True, exist_ok=True)
source.rename(resolved)
final_data = resolved.read_bytes()
elif data is not None:
if content_changed:
_record_version(conn, file_id, source_resolved, item_version(doc), previous)
_write_atomic(resolved, data)
if changed_path and source_exists:
source.unlink(missing_ok=True)
final_data = data
else:
final_data = previous
stat = resolved.stat()
next_version = int(doc["version"]) + 1 if content_changed else int(doc["version"])
conn.execute(
"""
UPDATE documents
SET path=?, basename=?, extension=?, size=?, version=?, sha256=?, last_modified=?, updated_at=?
WHERE file_id=?
""",
(
str(resolved),
resolved.name,
ext,
stat.st_size,
next_version,
sha256_bytes(final_data),
now_iso(),
changed_at,
file_id,
),
)
conn.execute(
"INSERT INTO events (file_id, event_type, payload, created_at) VALUES (?, ?, ?, ?)",
(
file_id,
"renamed",
json.dumps(
{
"from": display_path(source_resolved),
"to": display_path(resolved),
"saved": content_changed,
"materialized": not source_exists,
}
),
changed_at,
),
)
return get_document(file_id, conn=conn)
def get_open_documents(limit: int = 6) -> list[dict[str, Any]]:
with connect() as conn:
_clear_expired_sessions(conn)
rows = conn.execute(
"""
SELECT
d.*,
COUNT(s.session_id) AS open_sessions,
MAX(s.created_at) AS last_opened_at,
MAX(s.expires_at) AS session_expires_at
FROM documents d
JOIN sessions s ON s.file_id = d.file_id
WHERE s.expires_at > ?
GROUP BY d.file_id
ORDER BY last_opened_at DESC
LIMIT ?
""",
(now(), limit),
).fetchall()
return [dict(row) for row in rows]
def create_session(
file_id: str,
user_id: str = "agent-zero-user",
permission: str = "write",
origin: str = "",
ttl_seconds: int = DEFAULT_TTL_SECONDS,
) -> dict[str, Any]:
permission = "write" if permission == "write" else "read"
created = now()
expires = created + ttl_seconds
session_id = uuid.uuid4().hex
with connect() as conn:
conn.execute(
"INSERT INTO sessions (session_id, file_id, user_id, permission, origin, created_at, expires_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
(session_id, file_id, user_id, permission, origin, created, expires),
)
return {
"session_id": session_id,
"file_id": file_id,
"expires_at": expires,
"permission": permission,
"origin": origin,
}
def close_session(session_id: str = "", file_id: str = "") -> int:
session_id = str(session_id or "").strip()
file_id = str(file_id or "").strip()
if not session_id and not file_id:
return 0
with connect() as conn:
_clear_expired_sessions(conn)
if session_id:
row = conn.execute("SELECT * FROM sessions WHERE session_id = ?", (session_id,)).fetchone()
if not row:
return 0
conn.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,))
conn.execute(
"INSERT INTO events (file_id, event_type, payload, created_at) VALUES (?, ?, ?, ?)",
(row["file_id"], "close_session", json.dumps({"session_id": session_id}), now()),
)
return 1
rows = conn.execute("SELECT session_id FROM sessions WHERE file_id = ?", (file_id,)).fetchall()
conn.execute("DELETE FROM sessions WHERE file_id = ?", (file_id,))
conn.execute(
"INSERT INTO events (file_id, event_type, payload, created_at) VALUES (?, ?, ?, ?)",
(file_id, "close_document_sessions", json.dumps({"closed": len(rows)}), now()),
)
return len(rows)
def read_text_for_editor(doc: dict[str, Any]) -> str:
path = Path(doc["path"])
ext = str(doc["extension"]).lower()
if ext == "md":
return path.read_text(encoding="utf-8", errors="replace")
raise ValueError(f"Text editing is not available for .{ext}.")
def write_markdown(file_id: str, content: str) -> dict[str, Any]:
return replace_document_bytes(file_id, str(content or "").encode("utf-8"), actor="editor:markdown")
def replace_document_bytes(
file_id: str,
data: bytes,
actor: str = "agent",
invalidate_sessions: bool = False,
) -> dict[str, Any]:
if len(data) > MAX_SAVE_BYTES:
raise OverflowError("Document save exceeds maximum size")
with connect() as conn:
doc = get_document(file_id, conn=conn)
path = Path(doc["path"])
previous = path.read_bytes() if path.exists() else b""
if previous == data:
return doc
_record_version(conn, file_id, path, item_version(doc), previous)
_write_atomic(path, data)
digest = sha256_bytes(data)
next_version = int(doc["version"]) + 1
changed_at = now()
conn.execute(
"""
UPDATE documents
SET size=?, version=?, sha256=?, last_modified=?, updated_at=?
WHERE file_id=?
""",
(len(data), next_version, digest, now_iso(), changed_at, file_id),
)
if invalidate_sessions:
conn.execute("DELETE FROM sessions WHERE file_id = ?", (file_id,))
conn.execute(
"INSERT INTO events (file_id, event_type, payload, created_at) VALUES (?, ?, ?, ?)",
(file_id, "saved", json.dumps({"actor": actor, "version": f"{next_version}-{digest[:12]}"}), changed_at),
)
return get_document(file_id, conn=conn)
def item_version(doc: dict[str, Any]) -> str:
return f"{int(doc['version'])}-{str(doc['sha256'])[:12]}"
def _write_atomic(path: Path, data: bytes) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = path.with_name(f".{path.name}.{uuid.uuid4().hex}.tmp")
try:
with tmp_path.open("wb") as handle:
handle.write(data)
handle.flush()
os.fsync(handle.fileno())
os.replace(tmp_path, path)
finally:
if tmp_path.exists():
tmp_path.unlink(missing_ok=True)
def _clear_expired_sessions(conn: sqlite3.Connection) -> None:
conn.execute("DELETE FROM sessions WHERE expires_at < ?", (now(),))
def _record_version(conn: sqlite3.Connection, file_id: str, path: Path, version: str, data: bytes) -> None:
if not data:
return
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
backup_path = BACKUP_DIR / f"{file_id}-{int(time.time() * 1000)}-{version.replace('/', '_')}"
backup_path.write_bytes(data)
conn.execute(
"INSERT INTO versions (file_id, version, path, size, sha256, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(file_id, version, str(backup_path), len(data), sha256_bytes(data), now()),
)
def version_history(file_id: str) -> list[dict[str, Any]]:
with connect() as conn:
rows = conn.execute(
"SELECT id, file_id, version, path, size, sha256, created_at FROM versions WHERE file_id = ? ORDER BY id DESC",
(file_id,),
).fetchall()
return [dict(row) for row in rows]
def restore_version(file_id: str, version_id: int) -> dict[str, Any]:
with connect() as conn:
doc = get_document(file_id, conn=conn)
row = conn.execute("SELECT * FROM versions WHERE id = ? AND file_id = ?", (version_id, file_id)).fetchone()
if not row:
raise FileNotFoundError(f"Version {version_id} not found")
data = Path(row["path"]).read_bytes()
path = Path(doc["path"])
_record_version(conn, file_id, path, item_version(doc), path.read_bytes() if path.exists() else b"")
_write_atomic(path, data)
digest = sha256_bytes(data)
next_version = int(doc["version"]) + 1
conn.execute(
"UPDATE documents SET size=?, version=?, sha256=?, last_modified=?, updated_at=? WHERE file_id=?",
(len(data), next_version, digest, now_iso(), now(), file_id),
)
return get_document(file_id, conn=conn)
def create_document(
kind: str,
title: str,
fmt: str = "md",
content: str = "",
path: str = "",
context_id: str = "",
) -> dict[str, Any]:
ext = normalize_extension(fmt or "md")
target = normalize_path(path, context_id=context_id) if path else _unique_document_path(title, ext, context_id=context_id)
target.parent.mkdir(parents=True, exist_ok=True)
if target.exists():
raise FileExistsError(str(target))
data = template_bytes(kind, ext, title, content)
_write_atomic(target, data)
return register_document(target, context_id=context_id)
def _unique_document_path(title: str, ext: str, context_id: str = "") -> Path:
base = safe_document_stem(title, ext, "Document")
root = document_home(context_id) if ext == "md" else document_binary_home(context_id)
candidate = root / f"{base}.{ext}"
index = 2
while candidate.exists():
candidate = root / f"{base} {index}.{ext}"
index += 1
return candidate.resolve(strict=False)
def safe_document_stem(title: str, ext: str, fallback: str = "Document") -> str:
base = safe_title(title, fallback)
suffix = f".{normalize_extension(ext)}"
if base.casefold().endswith(suffix.casefold()):
base = base[: -len(suffix)].rstrip(" ._") or fallback
return base
def template_bytes(kind: str, ext: str, title: str, content: str) -> bytes:
ext = normalize_extension(ext or "md")
if ext == "md":
return _markdown(title, content).encode("utf-8")
if ext == "odt":
return odt_bytes(title, content)
if ext == "ods":
return ods_bytes(title, content)
if ext == "odp":
return odp_bytes(title, content)
if ext == "docx":
return _docx(title, content)
if ext == "xlsx":
return _xlsx(title, content)
if ext == "pptx":
return _pptx(title, content)
raise ValueError(ext)
def _markdown(title: str, content: str) -> str:
text = str(content or "").strip()
if text:
return text if text.startswith("#") else f"# {title}\n\n{text}\n"
return f"# {title}\n"
def _zip_bytes(files_map: dict[str, str | bytes]) -> bytes:
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, "w", compression=zipfile.ZIP_DEFLATED) as archive:
for name, value in files_map.items():
data = value.encode("utf-8") if isinstance(value, str) else value
archive.writestr(name, data)
return buffer.getvalue()
def odf_zip_bytes(ext: str, files_map: dict[str, str | bytes]) -> bytes:
ext = normalize_extension(ext)
if ext not in ODF_MIMETYPES:
raise ValueError(f"Unsupported ODF format: {ext}")
media_type = ODF_MIMETYPES[ext]
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, "w") as archive:
archive.writestr("mimetype", media_type, compress_type=zipfile.ZIP_STORED)
for name, value in files_map.items():
if name == "mimetype":
continue
data = value.encode("utf-8") if isinstance(value, str) else value
archive.writestr(name, data, compress_type=zipfile.ZIP_DEFLATED)
return buffer.getvalue()
def odt_bytes(title: str, content: str) -> bytes:
return odt_bytes_from_paragraphs(_document_lines(title, content))
def odt_bytes_from_paragraphs(paragraphs: list[str]) -> bytes:
lines = [str(line) for line in paragraphs] or [""]
body = "\n".join(_odt_paragraph(line, index == 0) for index, line in enumerate(lines))
return _odf_package(
"odt",
f"""<?xml version="1.0" encoding="UTF-8"?>
<office:document-content {_odf_content_namespaces()} office:version="{ODF_VERSION}">
<office:body>
<office:text>
{body}
</office:text>
</office:body>
</office:document-content>
""",
)
def ods_bytes(title: str, content: str) -> bytes:
return ods_bytes_from_sheets([{"name": "Sheet1", "rows": _xlsx_rows(title, content)}])
def ods_bytes_from_sheets(sheets: list[dict[str, Any]]) -> bytes:
normalized = []
for index, sheet in enumerate(sheets or []):
name = safe_title(str(sheet.get("name") or f"Sheet{index + 1}"), f"Sheet{index + 1}")[:31] or f"Sheet{index + 1}"
rows = sheet.get("rows") or []
normalized.append({"name": name, "rows": rows})
if not normalized:
normalized = [{"name": "Sheet1", "rows": [["Spreadsheet"]]}]
tables = "\n".join(
f"""<table:table table:name="{escape(sheet['name'])}">
{''.join(_ods_row(row) for row in sheet['rows'])}
</table:table>"""
for sheet in normalized
)
return _odf_package(
"ods",
f"""<?xml version="1.0" encoding="UTF-8"?>
<office:document-content {_odf_content_namespaces()} office:version="{ODF_VERSION}">
<office:body>
<office:spreadsheet>
{tables}
</office:spreadsheet>
</office:body>
</office:document-content>
""",
)
def odp_bytes(title: str, content: str) -> bytes:
return odp_bytes_from_slides(pptx_writer.slides_from_text(title, content))
def odp_bytes_from_slides(slides: list[dict[str, Any]]) -> bytes:
normalized = pptx_writer.normalize_slides(slides)
if not normalized:
normalized = [{"title": "Presentation", "bullets": []}]
pages = "\n".join(_odp_page(slide, index) for index, slide in enumerate(normalized, start=1))
return _odf_package(
"odp",
f"""<?xml version="1.0" encoding="UTF-8"?>
<office:document-content {_odf_content_namespaces()} office:version="{ODF_VERSION}">
<office:body>
<office:presentation>
{pages}
</office:presentation>
</office:body>
</office:document-content>
""",
)
def _document_lines(title: str, content: str) -> list[str]:
lines = [str(title or "Document").strip() or "Document"]
lines.extend(line.rstrip() for line in str(content or "").splitlines() if line.strip())
if len(lines) == 1:
lines.append("")
return lines
def _odf_package(ext: str, content_xml: str) -> bytes:
return odf_zip_bytes(
ext,
{
"content.xml": content_xml,
"styles.xml": _odf_styles_xml(),
"meta.xml": _odf_meta_xml(),
"settings.xml": _odf_settings_xml(),
"META-INF/manifest.xml": _odf_manifest_xml(ODF_MIMETYPES[ext]),
},
)
def _odf_content_namespaces() -> str:
return (
f'xmlns:office="{ODF_OFFICE_NS}" '
f'xmlns:text="{ODF_TEXT_NS}" '
f'xmlns:table="{ODF_TABLE_NS}" '
f'xmlns:draw="{ODF_DRAW_NS}" '
f'xmlns:presentation="{ODF_PRESENTATION_NS}" '
f'xmlns:style="{ODF_STYLE_NS}" '
f'xmlns:fo="{ODF_FO_NS}"'
)
def _odf_styles_xml() -> str:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<office:document-styles {_odf_content_namespaces()} office:version="{ODF_VERSION}">
<office:styles>
<style:style style:name="Standard" style:family="paragraph"/>
<style:style style:name="Heading_20_1" style:display-name="Heading 1" style:family="paragraph">
<style:text-properties fo:font-weight="bold" fo:font-size="18pt"/>
</style:style>
</office:styles>
</office:document-styles>
"""
def _odf_meta_xml() -> str:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<office:document-meta xmlns:office="{ODF_OFFICE_NS}" office:version="{ODF_VERSION}">
<office:meta/>
</office:document-meta>
"""
def _odf_settings_xml() -> str:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<office:document-settings xmlns:office="{ODF_OFFICE_NS}" office:version="{ODF_VERSION}">
<office:settings/>
</office:document-settings>
"""
def _odf_manifest_xml(media_type: str) -> str:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<manifest:manifest xmlns:manifest="{ODF_MANIFEST_NS}" manifest:version="{ODF_VERSION}">
<manifest:file-entry manifest:full-path="/" manifest:media-type="{media_type}"/>
<manifest:file-entry manifest:full-path="content.xml" manifest:media-type="text/xml"/>
<manifest:file-entry manifest:full-path="styles.xml" manifest:media-type="text/xml"/>
<manifest:file-entry manifest:full-path="meta.xml" manifest:media-type="text/xml"/>
<manifest:file-entry manifest:full-path="settings.xml" manifest:media-type="text/xml"/>
</manifest:manifest>
"""
def _odt_paragraph(line: str, heading: bool = False) -> str:
text = escape(str(line))
if heading:
return f'<text:h text:outline-level="1">{text}</text:h>'
return f"<text:p>{text}</text:p>"
def _ods_row(row: list[Any]) -> str:
cells = "".join(_ods_cell(value) for value in row)
return f"<table:table-row>{cells}</table:table-row>"
def _ods_cell(value: Any) -> str:
value = _xlsx_value(value)
if value in (None, ""):
return "<table:table-cell/>"
if isinstance(value, bool):
text = "TRUE" if value else "FALSE"
return (
f'<table:table-cell office:value-type="boolean" office:boolean-value="{str(value).lower()}">'
f"<text:p>{text}</text:p></table:table-cell>"
)
if isinstance(value, (int, float)):
return (
f'<table:table-cell office:value-type="float" office:value="{value}">'
f"<text:p>{value}</text:p></table:table-cell>"
)
text = escape(str(value))
return f'<table:table-cell office:value-type="string"><text:p>{text}</text:p></table:table-cell>'
def _odp_page(slide: dict[str, Any], index: int) -> str:
title = escape(str(slide.get("title") or f"Slide {index}"))
bullets = [escape(str(item)) for item in slide.get("bullets") or []]
bullet_items = "".join(f"<text:list-item><text:p>{bullet}</text:p></text:list-item>" for bullet in bullets)
body = f"<text:list>{bullet_items}</text:list>" if bullet_items else "<text:p/>"
return f"""<draw:page draw:name="Slide {index}" draw:master-page-name="Default">
<draw:frame presentation:class="title" draw:name="Title {index}" svg:width="24cm" svg:height="2cm" svg:x="1.5cm" svg:y="1cm" xmlns:svg="urn:oasis:names:tc:opendocument:xmlns:svg-compatible:1.0">
<draw:text-box><text:p>{title}</text:p></draw:text-box>
</draw:frame>
<draw:frame presentation:class="outline" draw:name="Content {index}" svg:width="24cm" svg:height="12cm" svg:x="1.5cm" svg:y="3.5cm" xmlns:svg="urn:oasis:names:tc:opendocument:xmlns:svg-compatible:1.0">
<draw:text-box>{body}</draw:text-box>
</draw:frame>
</draw:page>"""
def _docx(title: str, content: str) -> bytes:
lines = [title] + [line for line in content.splitlines() if line.strip()]
if len(lines) == 1:
lines.append("")
body = "".join(_docx_paragraph(line) for line in lines)
return _zip_bytes({
"[Content_Types].xml": """<?xml version="1.0" encoding="UTF-8"?><Types xmlns="http://schemas.openxmlformats.org/package/2006/content-types"><Default Extension="rels" ContentType="application/vnd.openxmlformats-package.relationships+xml"/><Default Extension="xml" ContentType="application/xml"/><Override PartName="/word/document.xml" ContentType="application/vnd.openxmlformats-officedocument.wordprocessingml.document.main+xml"/></Types>""",
"_rels/.rels": """<?xml version="1.0" encoding="UTF-8"?><Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships"><Relationship Id="rId1" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/officeDocument" Target="word/document.xml"/></Relationships>""",
"word/document.xml": f"""<?xml version="1.0" encoding="UTF-8"?><w:document xmlns:w="http://schemas.openxmlformats.org/wordprocessingml/2006/main"><w:body>{body}<w:sectPr/></w:body></w:document>""",
})
def _docx_paragraph(line: str) -> str:
if not str(line).strip():
return '<w:p><w:r><w:t xml:space="preserve">&#160;</w:t></w:r></w:p>'
return f"<w:p><w:r><w:t>{escape(line)}</w:t></w:r></w:p>"
def _xlsx(title: str, content: str) -> bytes:
rows = _xlsx_rows(title, content)
sheet_rows = "".join(
f'<row r="{row_idx}">{"".join(_xlsx_cell(row_idx, col_idx, value) for col_idx, value in enumerate(row, start=1))}</row>'
for row_idx, row in enumerate(rows, start=1)
)
return _zip_bytes({
"[Content_Types].xml": """<?xml version="1.0" encoding="UTF-8"?><Types xmlns="http://schemas.openxmlformats.org/package/2006/content-types"><Default Extension="rels" ContentType="application/vnd.openxmlformats-package.relationships+xml"/><Default Extension="xml" ContentType="application/xml"/><Override PartName="/xl/workbook.xml" ContentType="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet.main+xml"/><Override PartName="/xl/worksheets/sheet1.xml" ContentType="application/vnd.openxmlformats-officedocument.spreadsheetml.worksheet+xml"/></Types>""",
"_rels/.rels": """<?xml version="1.0" encoding="UTF-8"?><Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships"><Relationship Id="rId1" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/officeDocument" Target="xl/workbook.xml"/></Relationships>""",
"xl/_rels/workbook.xml.rels": """<?xml version="1.0" encoding="UTF-8"?><Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships"><Relationship Id="rId1" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/worksheet" Target="worksheets/sheet1.xml"/></Relationships>""",
"xl/workbook.xml": """<?xml version="1.0" encoding="UTF-8"?><workbook xmlns="http://schemas.openxmlformats.org/spreadsheetml/2006/main" xmlns:r="http://schemas.openxmlformats.org/officeDocument/2006/relationships"><sheets><sheet name="Sheet1" sheetId="1" r:id="rId1"/></sheets></workbook>""",
"xl/worksheets/sheet1.xml": f"""<?xml version="1.0" encoding="UTF-8"?><worksheet xmlns="http://schemas.openxmlformats.org/spreadsheetml/2006/main"><sheetData>{sheet_rows}</sheetData></worksheet>""",
})
def _xlsx_rows(title: str, content: str) -> list[list[Any]]:
parsed = _tabular_rows(content)
if parsed:
return parsed
lines = [line for line in str(content or "").splitlines() if line.strip()]
if lines:
return [[title], *[[line] for line in lines]]
return [[title]]
def _tabular_rows(content: str) -> list[list[Any]]:
text = str(content or "").strip("\n")
if not text.strip():
return []
lines = [line for line in text.splitlines() if line.strip()]
markdown_rows = _markdown_table_rows(lines)
if markdown_rows:
return markdown_rows
delimiter = "\t" if any("\t" in line for line in lines) else ("," if any("," in line for line in lines) else None)
if not delimiter:
return []
return [[_xlsx_value(cell) for cell in row] for row in csv.reader(io.StringIO("\n".join(lines)), delimiter=delimiter)]
def _markdown_table_rows(lines: list[str]) -> list[list[Any]]:
table_lines = [line.strip() for line in lines if line.strip().startswith("|") and line.strip().endswith("|")]
if len(table_lines) < 2:
return []
rows = []
for line in table_lines:
cells = [cell.strip() for cell in line.strip("|").split("|")]
if all(re.fullmatch(r":?-{3,}:?", cell or "") for cell in cells):
continue
rows.append([_xlsx_value(cell) for cell in cells])
return rows
def _xlsx_cell(row_idx: int, col_idx: int, value: Any) -> str:
ref = f"{_column_name(col_idx)}{row_idx}"
value = _xlsx_value(value)
if value in (None, ""):
return f'<c r="{ref}"/>'
if isinstance(value, bool):
return f'<c r="{ref}" t="b"><v>{1 if value else 0}</v></c>'
if isinstance(value, (int, float)):
return f'<c r="{ref}"><v>{value}</v></c>'
return f'<c r="{ref}" t="inlineStr"><is><t>{escape(str(value))}</t></is></c>'
def _xlsx_value(value: Any) -> Any:
if not isinstance(value, str):
return value
stripped = value.strip()
if not stripped:
return ""
if stripped.lower() in {"true", "false"}:
return stripped.lower() == "true"
if re.fullmatch(r"[+-]?\d+", stripped) and not (len(stripped.lstrip("+-")) > 1 and stripped.lstrip("+-").startswith("0")):
try:
return int(stripped)
except ValueError:
return stripped
if re.fullmatch(r"[+-]?(?:\d+\.\d*|\.\d+)(?:[eE][+-]?\d+)?", stripped) or re.fullmatch(r"[+-]?\d+[eE][+-]?\d+", stripped):
try:
return float(stripped)
except ValueError:
return stripped
return stripped
def _column_name(index: int) -> str:
name = ""
while index:
index, remainder = divmod(index - 1, 26)
name = chr(65 + remainder) + name
return name
def _pptx(title: str, content: str) -> bytes:
return pptx_writer.pptx_from_text(title, content)