Debounce Time Travel snapshots with usr watchdog

Register Time Travel on Agent Zero's existing /a0/usr watchdog and coalesce automatic snapshot triggers into a single pending commit window capped at one commit per workspace every 10 seconds.

Exclude top-level /a0/usr plugins and nested Git worktrees from root snapshots, preserve self-root Git workspace tracking, and cover the behavior with Time Travel tests.
This commit is contained in:
Alessandro 2026-04-27 19:06:13 +02:00
parent 45933a47f1
commit 59e2350008
5 changed files with 310 additions and 18 deletions

View file

@ -0,0 +1,9 @@
from __future__ import annotations
from helpers.extension import Extension
from plugins._time_travel.helpers.time_travel import register_watchdogs
class RegisterTimeTravelWatchdog(Extension):
def execute(self, **kwargs):
register_watchdogs()

View file

@ -0,0 +1,9 @@
from __future__ import annotations
from helpers.extension import Extension
from plugins._time_travel.helpers.time_travel import register_watchdogs
class RegisterTimeTravelWatchdog(Extension):
def execute(self, **kwargs):
register_watchdogs()

View file

@ -1,28 +1,16 @@
from __future__ import annotations
import time
from typing import Any
from helpers.extension import Extension
from plugins._time_travel.helpers.time_travel import snapshot_for_agent
DEBOUNCE_SECONDS = 2.0
_LAST_SNAPSHOT_BY_CONTEXT: dict[str, float] = {}
class TimeTravelCodeExecutionSnapshot(Extension):
async def execute(self, tool_name: str = "", response: Any = None, **kwargs: Any):
if tool_name != "code_execution_tool" or not self.agent:
return
context_id = str(getattr(getattr(self.agent, "context", None), "id", "") or "")
now = time.monotonic()
if context_id and now - _LAST_SNAPSHOT_BY_CONTEXT.get(context_id, 0.0) < DEBOUNCE_SECONDS:
return
if context_id:
_LAST_SNAPSHOT_BY_CONTEXT[context_id] = now
tool = getattr(getattr(self.agent, "loop_data", None), "current_tool", None)
args = getattr(tool, "args", {}) if tool else {}
runtime = str(args.get("runtime") or "") if isinstance(args, dict) else ""

View file

@ -8,6 +8,7 @@ import os
import posixpath
import shutil
import subprocess
import threading
import time
from dataclasses import dataclass
from datetime import datetime, timezone
@ -27,6 +28,13 @@ METADATA_PREFIX = "A0-Time-Travel-Metadata:"
EMPTY_TREE = "4b825dc642cb6eb9a060e54bf8d69288fbee4904"
MAX_RENDERED_PATCH_BYTES = 1_000_000
GIT_TIMEOUT_SECONDS = 20
AUTO_SNAPSHOT_DEBOUNCE_SECONDS = 10.0
WATCHDOG_ID = "time_travel_usr"
WATCHDOG_DEBOUNCE_SECONDS = 1.0
_AUTO_SNAPSHOT_LOCK = threading.RLock()
_AUTO_SNAPSHOT_TIMERS: dict[str, threading.Timer] = {}
_AUTO_SNAPSHOT_PAYLOADS: dict[str, dict[str, Any]] = {}
STATUS_LABELS = {
"A": "added",
@ -78,6 +86,10 @@ EXCLUDED_FILE_PATTERNS = {
"*.class",
}
USR_ROOT_EXCLUDED_DIR_NAMES = {
"plugins",
}
SAFE_A0PROJ_FILES = {
".a0proj/project.json",
".a0proj/agents.json",
@ -291,14 +303,29 @@ def clean_summary() -> dict[str, Any]:
}
def snapshot_for_agent(agent: Any, *, trigger: str, metadata: dict[str, Any] | None = None) -> SnapshotResult | None:
def snapshot_for_agent(
agent: Any,
*,
trigger: str,
metadata: dict[str, Any] | None = None,
debounced: bool = True,
) -> SnapshotResult | None:
if not agent:
return None
context_id = str(getattr(getattr(agent, "context", None), "id", "") or "")
try:
workspace = resolve_workspace(context_id, context_loader=lambda _ctxid: agent.context)
return TimeTravelService(workspace).snapshot(trigger=trigger, metadata=_agent_metadata(agent, metadata))
full_metadata = _agent_metadata(agent, metadata)
if debounced:
schedule_debounced_snapshot(
workspace,
trigger=trigger,
metadata=full_metadata,
changed_path_hints=_extract_changed_path_hints(full_metadata),
)
return None
return TimeTravelService(workspace).snapshot(trigger=trigger, metadata=full_metadata)
except WorkspaceRejectedError:
return None
except Exception as exc:
@ -306,17 +333,167 @@ def snapshot_for_agent(agent: Any, *, trigger: str, metadata: dict[str, Any] | N
return None
def snapshot_for_path_hint(path_hint: str, *, trigger: str, metadata: dict[str, Any] | None = None) -> SnapshotResult | None:
def snapshot_for_path_hint(
path_hint: str,
*,
trigger: str,
metadata: dict[str, Any] | None = None,
debounced: bool = True,
) -> SnapshotResult | None:
try:
workspace = resolve_workspace_for_path_hint(path_hint)
if workspace is None:
return None
return TimeTravelService(workspace).snapshot(trigger=trigger, metadata=metadata or {})
full_metadata = metadata or {}
if debounced:
schedule_debounced_snapshot(
workspace,
trigger=trigger,
metadata=full_metadata,
changed_path_hints=_extract_changed_path_hints(full_metadata),
)
return None
return TimeTravelService(workspace).snapshot(trigger=trigger, metadata=full_metadata)
except Exception as exc:
PrintStyle.error(f"Time Travel file-browser snapshot failed: {exc}")
return None
def register_watchdogs() -> None:
from helpers import watchdog
root = real_path_for_display(USR_DISPLAY_ROOT)
if not root.exists() or not root.is_dir():
return
watchdog.add_watchdog(
id=WATCHDOG_ID,
roots=[str(root)],
patterns=["**/*"],
ignore_patterns=[
"**/.git",
"**/.git/**",
"**/.time_travel",
"**/.time_travel/**",
"**/__pycache__",
"**/__pycache__/**",
"**/*.pyc",
"**/.pytest_cache/**",
"**/.mypy_cache/**",
"**/.ruff_cache/**",
"**/.cache/**",
"**/node_modules/**",
"**/.venv/**",
"**/venv/**",
"**/dist/**",
"**/build/**",
],
events=["create", "modify", "delete", "move"],
debounce=WATCHDOG_DEBOUNCE_SECONDS,
handler=_handle_usr_watchdog_events,
)
def schedule_debounced_snapshot(
workspace: WorkspaceInfo,
*,
trigger: str,
metadata: dict[str, Any] | None = None,
changed_path_hints: list[str] | None = None,
delay: float | None = None,
) -> None:
clean_metadata = dict(metadata or {})
metadata_hints = _extract_changed_path_hints(clean_metadata)
clean_metadata.pop("changed_path_hints", None)
hints = _merge_hints(metadata_hints, changed_path_hints or [])
delay_seconds = AUTO_SNAPSHOT_DEBOUNCE_SECONDS if delay is None else max(0.0, float(delay))
with _AUTO_SNAPSHOT_LOCK:
payload = _AUTO_SNAPSHOT_PAYLOADS.get(workspace.id)
if payload is None:
payload = {
"workspace": workspace,
"trigger": trigger,
"metadata": clean_metadata,
"changed_path_hints": hints,
}
_AUTO_SNAPSHOT_PAYLOADS[workspace.id] = payload
timer = threading.Timer(delay_seconds, _flush_debounced_snapshot, args=(workspace.id,))
timer.daemon = True
_AUTO_SNAPSHOT_TIMERS[workspace.id] = timer
timer.start()
return
payload["trigger"] = trigger
payload["metadata"] = {**payload.get("metadata", {}), **clean_metadata}
payload["changed_path_hints"] = _merge_hints(
payload.get("changed_path_hints", []),
hints,
)
def flush_debounced_snapshots() -> None:
with _AUTO_SNAPSHOT_LOCK:
workspace_ids = list(_AUTO_SNAPSHOT_PAYLOADS)
for workspace_id in workspace_ids:
timer = _AUTO_SNAPSHOT_TIMERS.pop(workspace_id, None)
timer and timer.cancel()
for workspace_id in workspace_ids:
_flush_debounced_snapshot(workspace_id)
def clear_debounced_snapshots() -> None:
with _AUTO_SNAPSHOT_LOCK:
timers = list(_AUTO_SNAPSHOT_TIMERS.values())
_AUTO_SNAPSHOT_TIMERS.clear()
_AUTO_SNAPSHOT_PAYLOADS.clear()
for timer in timers:
timer.cancel()
def _flush_debounced_snapshot(workspace_id: str) -> None:
with _AUTO_SNAPSHOT_LOCK:
_AUTO_SNAPSHOT_TIMERS.pop(workspace_id, None)
payload = _AUTO_SNAPSHOT_PAYLOADS.pop(workspace_id, None)
if not payload:
return
try:
workspace = payload["workspace"]
TimeTravelService(workspace).snapshot(
trigger=str(payload.get("trigger") or "watchdog"),
metadata=payload.get("metadata") or {},
changed_path_hints=payload.get("changed_path_hints") or None,
)
except WorkspaceRejectedError:
return
except Exception as exc:
PrintStyle.error(f"Time Travel debounced snapshot failed: {exc}")
def _handle_usr_watchdog_events(items: list[Any]) -> None:
by_workspace: dict[str, tuple[WorkspaceInfo, list[str]]] = {}
for path, _event in items:
display_path = normalize_display_path(str(path or ""))
if not _is_watchdog_snapshot_candidate(display_path):
continue
workspace = resolve_workspace_for_path_hint(display_path)
if workspace is None:
continue
hints = by_workspace.setdefault(workspace.id, (workspace, []))[1]
hints.append(display_path)
for workspace, hints in by_workspace.values():
schedule_debounced_snapshot(
workspace,
trigger="watchdog",
metadata={
"source": "watchdog",
"changed_path_hints": _merge_hints(hints),
},
changed_path_hints=hints,
)
def _agent_metadata(agent: Any, metadata: dict[str, Any] | None = None) -> dict[str, Any]:
from helpers import projects
@ -340,6 +517,36 @@ def _agent_metadata(agent: Any, metadata: dict[str, Any] | None = None) -> dict[
return {key: value for key, value in result.items() if value not in (None, "")}
def _extract_changed_path_hints(metadata: dict[str, Any]) -> list[str]:
hints = metadata.get("changed_path_hints")
if not isinstance(hints, list):
return []
return [str(path) for path in hints if path]
def _merge_hints(*groups: list[str]) -> list[str]:
merged: list[str] = []
seen: set[str] = set()
for group in groups:
for path in group:
normalized = normalize_display_path(str(path or ""))
if not normalized or normalized in seen:
continue
merged.append(normalized)
seen.add(normalized)
return merged
def _is_watchdog_snapshot_candidate(display_path: str) -> bool:
normalized = normalize_display_path(display_path)
if not is_inside_usr_display(normalized):
return False
if normalized == "/a0/usr/plugins" or normalized.startswith("/a0/usr/plugins/"):
return False
parts = [part for part in normalized.split("/") if part]
return ".git" not in parts and ".time_travel" not in parts
class TimeTravelService:
def __init__(self, workspace: WorkspaceInfo):
self.workspace = workspace
@ -642,7 +849,7 @@ class TimeTravelService:
def _stage_current_tree(self) -> tuple[str, list[str]]:
self.ensure_repo()
self._git("read-tree", "--empty")
paths = list(iter_snapshot_paths(self.workspace.real_path))
paths = list(iter_snapshot_paths(self.workspace.real_path, display_path=self.workspace.display_path))
if paths:
payload = "\0".join(paths).encode("utf-8") + b"\0"
self._git_bytes(
@ -951,8 +1158,12 @@ class TimeTravelService:
return completed
def iter_snapshot_paths(workspace: Path) -> Iterable[str]:
def iter_snapshot_paths(workspace: Path, *, display_path: str = "") -> Iterable[str]:
workspace = workspace.resolve(strict=False)
if display_path:
root_is_usr = normalize_display_path(display_path) == USR_DISPLAY_ROOT
else:
root_is_usr = workspace == real_path_for_display(USR_DISPLAY_ROOT)
def walk(folder: Path, rel_prefix: str = "") -> Iterable[str]:
try:
@ -970,6 +1181,10 @@ def iter_snapshot_paths(workspace: Path) -> Iterable[str]:
except OSError:
continue
if is_dir:
if root_is_usr and not rel_prefix and entry.name in USR_ROOT_EXCLUDED_DIR_NAMES:
continue
if _is_nested_git_worktree_dir(Path(entry.path), workspace):
continue
if not is_snapshot_candidate(rel, is_dir=True):
continue
yield from walk(Path(entry.path), rel)
@ -979,6 +1194,16 @@ def iter_snapshot_paths(workspace: Path) -> Iterable[str]:
yield from walk(workspace)
def _is_nested_git_worktree_dir(folder: Path, workspace: Path) -> bool:
try:
if folder.resolve(strict=False) == workspace.resolve(strict=False):
return False
except OSError:
return False
dot_git = folder / ".git"
return dot_git.exists() or dot_git.is_symlink()
def is_snapshot_candidate(rel_path: str, *, is_dir: bool) -> bool:
rel = rel_path.replace("\\", "/").strip("/")
if not rel:

View file

@ -131,6 +131,29 @@ def test_kernel_boundary_real_git_repo_and_git_dir_exclusion(workspace):
assert run_git(root, "rev-parse", "HEAD") == real_head
assert run_git(root, "status", "--short") == real_status_before
assert all(not path.startswith(".git/") and path != ".git" for path in tracked_paths(service))
assert "tracked.txt" in tracked_paths(service, snapshot.hash)
assert "untracked.txt" in tracked_paths(service, snapshot.hash)
def test_usr_root_snapshot_skips_plugins_and_nested_git_projects(tmp_path: Path):
root = tmp_path / "usr"
root.mkdir()
(root / "workdir").mkdir()
(root / "workdir" / "note.txt").write_text("note\n", encoding="utf-8")
(root / "plugins" / "demo").mkdir(parents=True)
(root / "plugins" / "demo" / "plugin.yaml").write_text("name: demo\n", encoding="utf-8")
(root / "projects" / "git-project").mkdir(parents=True)
(root / "projects" / "git-project" / ".git").mkdir()
(root / "projects" / "git-project" / "app.py").write_text("print('tracked elsewhere')\n", encoding="utf-8")
(root / "projects" / "plain-project").mkdir(parents=True)
(root / "projects" / "plain-project" / "app.py").write_text("print('plain')\n", encoding="utf-8")
paths = set(tt.iter_snapshot_paths(root, display_path="/a0/usr"))
assert "workdir/note.txt" in paths
assert "projects/plain-project/app.py" in paths
assert "plugins/demo/plugin.yaml" not in paths
assert "projects/git-project/app.py" not in paths
def test_metadata_policy_tracks_safe_project_files_and_preserves_exclusions(workspace):
@ -238,6 +261,44 @@ def test_pagination_large_diff_and_invalid_inputs(workspace, monkeypatch: pytest
service.history_diff(commit_hash=hashes[-1], path="../file.txt", mode="commit")
def test_debounced_snapshots_coalesce_to_one_commit(workspace):
root, service = workspace
tt.clear_debounced_snapshots()
try:
(root / "file.txt").write_text("one\n", encoding="utf-8")
tt.schedule_debounced_snapshot(
service.workspace,
trigger="watchdog",
metadata={"source": "watchdog", "changed_path_hints": ["/a0/usr/file.txt"]},
delay=60,
)
assert service.current_hash() == ""
(root / "file.txt").write_text("two\n", encoding="utf-8")
tt.schedule_debounced_snapshot(
service.workspace,
trigger="text_editor_write",
metadata={"source": "text_editor", "changed_path_hints": ["/a0/usr/other.txt"]},
delay=60,
)
tt.flush_debounced_snapshots()
current = service.current_hash()
assert current
commits = service.history_list(limit=10)["commits"]
assert len(commits) == 1
assert commits[0]["hash"] == current
assert commits[0]["metadata"]["trigger"] == "text_editor_write"
assert commits[0]["metadata"]["source"] == "text_editor"
assert commits[0]["metadata"]["changed_path_hints"] == [
"/a0/usr/file.txt",
"/a0/usr/other.txt",
]
assert "two" in service.history_diff(commit_hash=current, path="file.txt", mode="commit")["patch"]
finally:
tt.clear_debounced_snapshots()
def test_workspace_resolution_prefers_project_and_rejects_external_paths(monkeypatch: pytest.MonkeyPatch, workspace):
root, _service = workspace
projects_mod = ModuleType("helpers.projects")