diff --git a/plugins/_time_travel/extensions/python/_functions/__main__/init_a0/end/_30_register_watchdog.py b/plugins/_time_travel/extensions/python/_functions/__main__/init_a0/end/_30_register_watchdog.py new file mode 100644 index 000000000..50f346ab5 --- /dev/null +++ b/plugins/_time_travel/extensions/python/_functions/__main__/init_a0/end/_30_register_watchdog.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +from helpers.extension import Extension +from plugins._time_travel.helpers.time_travel import register_watchdogs + + +class RegisterTimeTravelWatchdog(Extension): + def execute(self, **kwargs): + register_watchdogs() diff --git a/plugins/_time_travel/extensions/python/_functions/run_ui/init_a0/end/_30_register_watchdog.py b/plugins/_time_travel/extensions/python/_functions/run_ui/init_a0/end/_30_register_watchdog.py new file mode 100644 index 000000000..50f346ab5 --- /dev/null +++ b/plugins/_time_travel/extensions/python/_functions/run_ui/init_a0/end/_30_register_watchdog.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +from helpers.extension import Extension +from plugins._time_travel.helpers.time_travel import register_watchdogs + + +class RegisterTimeTravelWatchdog(Extension): + def execute(self, **kwargs): + register_watchdogs() diff --git a/plugins/_time_travel/extensions/python/tool_execute_after/_50_code_execution_snapshot.py b/plugins/_time_travel/extensions/python/tool_execute_after/_50_code_execution_snapshot.py index c6ffaf81d..8b5645484 100644 --- a/plugins/_time_travel/extensions/python/tool_execute_after/_50_code_execution_snapshot.py +++ b/plugins/_time_travel/extensions/python/tool_execute_after/_50_code_execution_snapshot.py @@ -1,28 +1,16 @@ from __future__ import annotations -import time from typing import Any from helpers.extension import Extension from plugins._time_travel.helpers.time_travel import snapshot_for_agent -DEBOUNCE_SECONDS = 2.0 -_LAST_SNAPSHOT_BY_CONTEXT: dict[str, float] = {} - - class TimeTravelCodeExecutionSnapshot(Extension): async def execute(self, tool_name: str = "", response: Any = None, **kwargs: Any): if tool_name != "code_execution_tool" or not self.agent: return - context_id = str(getattr(getattr(self.agent, "context", None), "id", "") or "") - now = time.monotonic() - if context_id and now - _LAST_SNAPSHOT_BY_CONTEXT.get(context_id, 0.0) < DEBOUNCE_SECONDS: - return - if context_id: - _LAST_SNAPSHOT_BY_CONTEXT[context_id] = now - tool = getattr(getattr(self.agent, "loop_data", None), "current_tool", None) args = getattr(tool, "args", {}) if tool else {} runtime = str(args.get("runtime") or "") if isinstance(args, dict) else "" diff --git a/plugins/_time_travel/helpers/time_travel.py b/plugins/_time_travel/helpers/time_travel.py index 8ffe644fb..389c1b6f4 100644 --- a/plugins/_time_travel/helpers/time_travel.py +++ b/plugins/_time_travel/helpers/time_travel.py @@ -8,6 +8,7 @@ import os import posixpath import shutil import subprocess +import threading import time from dataclasses import dataclass from datetime import datetime, timezone @@ -27,6 +28,13 @@ METADATA_PREFIX = "A0-Time-Travel-Metadata:" EMPTY_TREE = "4b825dc642cb6eb9a060e54bf8d69288fbee4904" MAX_RENDERED_PATCH_BYTES = 1_000_000 GIT_TIMEOUT_SECONDS = 20 +AUTO_SNAPSHOT_DEBOUNCE_SECONDS = 10.0 +WATCHDOG_ID = "time_travel_usr" +WATCHDOG_DEBOUNCE_SECONDS = 1.0 + +_AUTO_SNAPSHOT_LOCK = threading.RLock() +_AUTO_SNAPSHOT_TIMERS: dict[str, threading.Timer] = {} +_AUTO_SNAPSHOT_PAYLOADS: dict[str, dict[str, Any]] = {} STATUS_LABELS = { "A": "added", @@ -78,6 +86,10 @@ EXCLUDED_FILE_PATTERNS = { "*.class", } +USR_ROOT_EXCLUDED_DIR_NAMES = { + "plugins", +} + SAFE_A0PROJ_FILES = { ".a0proj/project.json", ".a0proj/agents.json", @@ -291,14 +303,29 @@ def clean_summary() -> dict[str, Any]: } -def snapshot_for_agent(agent: Any, *, trigger: str, metadata: dict[str, Any] | None = None) -> SnapshotResult | None: +def snapshot_for_agent( + agent: Any, + *, + trigger: str, + metadata: dict[str, Any] | None = None, + debounced: bool = True, +) -> SnapshotResult | None: if not agent: return None context_id = str(getattr(getattr(agent, "context", None), "id", "") or "") try: workspace = resolve_workspace(context_id, context_loader=lambda _ctxid: agent.context) - return TimeTravelService(workspace).snapshot(trigger=trigger, metadata=_agent_metadata(agent, metadata)) + full_metadata = _agent_metadata(agent, metadata) + if debounced: + schedule_debounced_snapshot( + workspace, + trigger=trigger, + metadata=full_metadata, + changed_path_hints=_extract_changed_path_hints(full_metadata), + ) + return None + return TimeTravelService(workspace).snapshot(trigger=trigger, metadata=full_metadata) except WorkspaceRejectedError: return None except Exception as exc: @@ -306,17 +333,167 @@ def snapshot_for_agent(agent: Any, *, trigger: str, metadata: dict[str, Any] | N return None -def snapshot_for_path_hint(path_hint: str, *, trigger: str, metadata: dict[str, Any] | None = None) -> SnapshotResult | None: +def snapshot_for_path_hint( + path_hint: str, + *, + trigger: str, + metadata: dict[str, Any] | None = None, + debounced: bool = True, +) -> SnapshotResult | None: try: workspace = resolve_workspace_for_path_hint(path_hint) if workspace is None: return None - return TimeTravelService(workspace).snapshot(trigger=trigger, metadata=metadata or {}) + full_metadata = metadata or {} + if debounced: + schedule_debounced_snapshot( + workspace, + trigger=trigger, + metadata=full_metadata, + changed_path_hints=_extract_changed_path_hints(full_metadata), + ) + return None + return TimeTravelService(workspace).snapshot(trigger=trigger, metadata=full_metadata) except Exception as exc: PrintStyle.error(f"Time Travel file-browser snapshot failed: {exc}") return None +def register_watchdogs() -> None: + from helpers import watchdog + + root = real_path_for_display(USR_DISPLAY_ROOT) + if not root.exists() or not root.is_dir(): + return + + watchdog.add_watchdog( + id=WATCHDOG_ID, + roots=[str(root)], + patterns=["**/*"], + ignore_patterns=[ + "**/.git", + "**/.git/**", + "**/.time_travel", + "**/.time_travel/**", + "**/__pycache__", + "**/__pycache__/**", + "**/*.pyc", + "**/.pytest_cache/**", + "**/.mypy_cache/**", + "**/.ruff_cache/**", + "**/.cache/**", + "**/node_modules/**", + "**/.venv/**", + "**/venv/**", + "**/dist/**", + "**/build/**", + ], + events=["create", "modify", "delete", "move"], + debounce=WATCHDOG_DEBOUNCE_SECONDS, + handler=_handle_usr_watchdog_events, + ) + + +def schedule_debounced_snapshot( + workspace: WorkspaceInfo, + *, + trigger: str, + metadata: dict[str, Any] | None = None, + changed_path_hints: list[str] | None = None, + delay: float | None = None, +) -> None: + clean_metadata = dict(metadata or {}) + metadata_hints = _extract_changed_path_hints(clean_metadata) + clean_metadata.pop("changed_path_hints", None) + hints = _merge_hints(metadata_hints, changed_path_hints or []) + delay_seconds = AUTO_SNAPSHOT_DEBOUNCE_SECONDS if delay is None else max(0.0, float(delay)) + with _AUTO_SNAPSHOT_LOCK: + payload = _AUTO_SNAPSHOT_PAYLOADS.get(workspace.id) + if payload is None: + payload = { + "workspace": workspace, + "trigger": trigger, + "metadata": clean_metadata, + "changed_path_hints": hints, + } + _AUTO_SNAPSHOT_PAYLOADS[workspace.id] = payload + timer = threading.Timer(delay_seconds, _flush_debounced_snapshot, args=(workspace.id,)) + timer.daemon = True + _AUTO_SNAPSHOT_TIMERS[workspace.id] = timer + timer.start() + return + + payload["trigger"] = trigger + payload["metadata"] = {**payload.get("metadata", {}), **clean_metadata} + payload["changed_path_hints"] = _merge_hints( + payload.get("changed_path_hints", []), + hints, + ) + + +def flush_debounced_snapshots() -> None: + with _AUTO_SNAPSHOT_LOCK: + workspace_ids = list(_AUTO_SNAPSHOT_PAYLOADS) + for workspace_id in workspace_ids: + timer = _AUTO_SNAPSHOT_TIMERS.pop(workspace_id, None) + timer and timer.cancel() + for workspace_id in workspace_ids: + _flush_debounced_snapshot(workspace_id) + + +def clear_debounced_snapshots() -> None: + with _AUTO_SNAPSHOT_LOCK: + timers = list(_AUTO_SNAPSHOT_TIMERS.values()) + _AUTO_SNAPSHOT_TIMERS.clear() + _AUTO_SNAPSHOT_PAYLOADS.clear() + for timer in timers: + timer.cancel() + + +def _flush_debounced_snapshot(workspace_id: str) -> None: + with _AUTO_SNAPSHOT_LOCK: + _AUTO_SNAPSHOT_TIMERS.pop(workspace_id, None) + payload = _AUTO_SNAPSHOT_PAYLOADS.pop(workspace_id, None) + if not payload: + return + + try: + workspace = payload["workspace"] + TimeTravelService(workspace).snapshot( + trigger=str(payload.get("trigger") or "watchdog"), + metadata=payload.get("metadata") or {}, + changed_path_hints=payload.get("changed_path_hints") or None, + ) + except WorkspaceRejectedError: + return + except Exception as exc: + PrintStyle.error(f"Time Travel debounced snapshot failed: {exc}") + + +def _handle_usr_watchdog_events(items: list[Any]) -> None: + by_workspace: dict[str, tuple[WorkspaceInfo, list[str]]] = {} + for path, _event in items: + display_path = normalize_display_path(str(path or "")) + if not _is_watchdog_snapshot_candidate(display_path): + continue + workspace = resolve_workspace_for_path_hint(display_path) + if workspace is None: + continue + hints = by_workspace.setdefault(workspace.id, (workspace, []))[1] + hints.append(display_path) + + for workspace, hints in by_workspace.values(): + schedule_debounced_snapshot( + workspace, + trigger="watchdog", + metadata={ + "source": "watchdog", + "changed_path_hints": _merge_hints(hints), + }, + changed_path_hints=hints, + ) + + def _agent_metadata(agent: Any, metadata: dict[str, Any] | None = None) -> dict[str, Any]: from helpers import projects @@ -340,6 +517,36 @@ def _agent_metadata(agent: Any, metadata: dict[str, Any] | None = None) -> dict[ return {key: value for key, value in result.items() if value not in (None, "")} +def _extract_changed_path_hints(metadata: dict[str, Any]) -> list[str]: + hints = metadata.get("changed_path_hints") + if not isinstance(hints, list): + return [] + return [str(path) for path in hints if path] + + +def _merge_hints(*groups: list[str]) -> list[str]: + merged: list[str] = [] + seen: set[str] = set() + for group in groups: + for path in group: + normalized = normalize_display_path(str(path or "")) + if not normalized or normalized in seen: + continue + merged.append(normalized) + seen.add(normalized) + return merged + + +def _is_watchdog_snapshot_candidate(display_path: str) -> bool: + normalized = normalize_display_path(display_path) + if not is_inside_usr_display(normalized): + return False + if normalized == "/a0/usr/plugins" or normalized.startswith("/a0/usr/plugins/"): + return False + parts = [part for part in normalized.split("/") if part] + return ".git" not in parts and ".time_travel" not in parts + + class TimeTravelService: def __init__(self, workspace: WorkspaceInfo): self.workspace = workspace @@ -642,7 +849,7 @@ class TimeTravelService: def _stage_current_tree(self) -> tuple[str, list[str]]: self.ensure_repo() self._git("read-tree", "--empty") - paths = list(iter_snapshot_paths(self.workspace.real_path)) + paths = list(iter_snapshot_paths(self.workspace.real_path, display_path=self.workspace.display_path)) if paths: payload = "\0".join(paths).encode("utf-8") + b"\0" self._git_bytes( @@ -951,8 +1158,12 @@ class TimeTravelService: return completed -def iter_snapshot_paths(workspace: Path) -> Iterable[str]: +def iter_snapshot_paths(workspace: Path, *, display_path: str = "") -> Iterable[str]: workspace = workspace.resolve(strict=False) + if display_path: + root_is_usr = normalize_display_path(display_path) == USR_DISPLAY_ROOT + else: + root_is_usr = workspace == real_path_for_display(USR_DISPLAY_ROOT) def walk(folder: Path, rel_prefix: str = "") -> Iterable[str]: try: @@ -970,6 +1181,10 @@ def iter_snapshot_paths(workspace: Path) -> Iterable[str]: except OSError: continue if is_dir: + if root_is_usr and not rel_prefix and entry.name in USR_ROOT_EXCLUDED_DIR_NAMES: + continue + if _is_nested_git_worktree_dir(Path(entry.path), workspace): + continue if not is_snapshot_candidate(rel, is_dir=True): continue yield from walk(Path(entry.path), rel) @@ -979,6 +1194,16 @@ def iter_snapshot_paths(workspace: Path) -> Iterable[str]: yield from walk(workspace) +def _is_nested_git_worktree_dir(folder: Path, workspace: Path) -> bool: + try: + if folder.resolve(strict=False) == workspace.resolve(strict=False): + return False + except OSError: + return False + dot_git = folder / ".git" + return dot_git.exists() or dot_git.is_symlink() + + def is_snapshot_candidate(rel_path: str, *, is_dir: bool) -> bool: rel = rel_path.replace("\\", "/").strip("/") if not rel: diff --git a/tests/test_time_travel.py b/tests/test_time_travel.py index 2a46b2d85..0f9de3345 100644 --- a/tests/test_time_travel.py +++ b/tests/test_time_travel.py @@ -131,6 +131,29 @@ def test_kernel_boundary_real_git_repo_and_git_dir_exclusion(workspace): assert run_git(root, "rev-parse", "HEAD") == real_head assert run_git(root, "status", "--short") == real_status_before assert all(not path.startswith(".git/") and path != ".git" for path in tracked_paths(service)) + assert "tracked.txt" in tracked_paths(service, snapshot.hash) + assert "untracked.txt" in tracked_paths(service, snapshot.hash) + + +def test_usr_root_snapshot_skips_plugins_and_nested_git_projects(tmp_path: Path): + root = tmp_path / "usr" + root.mkdir() + (root / "workdir").mkdir() + (root / "workdir" / "note.txt").write_text("note\n", encoding="utf-8") + (root / "plugins" / "demo").mkdir(parents=True) + (root / "plugins" / "demo" / "plugin.yaml").write_text("name: demo\n", encoding="utf-8") + (root / "projects" / "git-project").mkdir(parents=True) + (root / "projects" / "git-project" / ".git").mkdir() + (root / "projects" / "git-project" / "app.py").write_text("print('tracked elsewhere')\n", encoding="utf-8") + (root / "projects" / "plain-project").mkdir(parents=True) + (root / "projects" / "plain-project" / "app.py").write_text("print('plain')\n", encoding="utf-8") + + paths = set(tt.iter_snapshot_paths(root, display_path="/a0/usr")) + + assert "workdir/note.txt" in paths + assert "projects/plain-project/app.py" in paths + assert "plugins/demo/plugin.yaml" not in paths + assert "projects/git-project/app.py" not in paths def test_metadata_policy_tracks_safe_project_files_and_preserves_exclusions(workspace): @@ -238,6 +261,44 @@ def test_pagination_large_diff_and_invalid_inputs(workspace, monkeypatch: pytest service.history_diff(commit_hash=hashes[-1], path="../file.txt", mode="commit") +def test_debounced_snapshots_coalesce_to_one_commit(workspace): + root, service = workspace + tt.clear_debounced_snapshots() + try: + (root / "file.txt").write_text("one\n", encoding="utf-8") + tt.schedule_debounced_snapshot( + service.workspace, + trigger="watchdog", + metadata={"source": "watchdog", "changed_path_hints": ["/a0/usr/file.txt"]}, + delay=60, + ) + assert service.current_hash() == "" + + (root / "file.txt").write_text("two\n", encoding="utf-8") + tt.schedule_debounced_snapshot( + service.workspace, + trigger="text_editor_write", + metadata={"source": "text_editor", "changed_path_hints": ["/a0/usr/other.txt"]}, + delay=60, + ) + tt.flush_debounced_snapshots() + + current = service.current_hash() + assert current + commits = service.history_list(limit=10)["commits"] + assert len(commits) == 1 + assert commits[0]["hash"] == current + assert commits[0]["metadata"]["trigger"] == "text_editor_write" + assert commits[0]["metadata"]["source"] == "text_editor" + assert commits[0]["metadata"]["changed_path_hints"] == [ + "/a0/usr/file.txt", + "/a0/usr/other.txt", + ] + assert "two" in service.history_diff(commit_hash=current, path="file.txt", mode="commit")["patch"] + finally: + tt.clear_debounced_snapshots() + + def test_workspace_resolution_prefers_project_and_rejects_external_paths(monkeypatch: pytest.MonkeyPatch, workspace): root, _service = workspace projects_mod = ModuleType("helpers.projects")