diff --git a/plugins/_time_travel/helpers/time_travel.py b/plugins/_time_travel/helpers/time_travel.py index 389c1b6f4..3942f8317 100644 --- a/plugins/_time_travel/helpers/time_travel.py +++ b/plugins/_time_travel/helpers/time_travel.py @@ -31,6 +31,7 @@ GIT_TIMEOUT_SECONDS = 20 AUTO_SNAPSHOT_DEBOUNCE_SECONDS = 10.0 WATCHDOG_ID = "time_travel_usr" WATCHDOG_DEBOUNCE_SECONDS = 1.0 +SHADOW_REPO_BACKUP_PREFIX = "repo.git.invalid" _AUTO_SNAPSHOT_LOCK = threading.RLock() _AUTO_SNAPSHOT_TIMERS: dict[str, threading.Timer] = {} @@ -189,7 +190,7 @@ def is_inside_usr_display(display_path: str) -> bool: def workspace_id_for(display_path: str) -> str: - normalized = normalize_display_path(display_path).rstrip("/") + normalized = canonical_workspace_display_path(display_path).rstrip("/") return hashlib.sha256(normalized.encode("utf-8")).hexdigest()[:32] @@ -202,6 +203,13 @@ def real_path_for_display(display_path: str) -> Path: return Path(normalized).expanduser().resolve(strict=False) +def canonical_workspace_display_path(display_path: str) -> str: + normalized = normalize_display_path(display_path) + real_path = real_path_for_display(normalized) + canonical = normalize_display_path(str(real_path)) + return (canonical if canonical.startswith("/a0") else normalized).rstrip("/") or canonical + + def resolve_workspace(context_id: str = "", *, context_loader=None) -> WorkspaceInfo: from helpers import projects, settings @@ -220,7 +228,7 @@ def resolve_workspace(context_id: str = "", *, context_loader=None) -> Workspace configured = str(settings.get_settings().get("workdir_path") or "") display_path = configured or files.normalize_a0_path(files.get_abs_path("usr/workdir")) - normalized = normalize_display_path(display_path) + normalized = canonical_workspace_display_path(display_path) if not is_inside_usr_display(normalized): raise WorkspaceRejectedError("Time Travel is only available for workspaces inside /a0/usr.") @@ -241,7 +249,7 @@ def resolve_workspace(context_id: str = "", *, context_loader=None) -> Workspace def resolve_workspace_for_path_hint(path_hint: str) -> WorkspaceInfo | None: from helpers import settings - normalized = normalize_display_path(path_hint) + normalized = canonical_workspace_display_path(path_hint) if not is_inside_usr_display(normalized): return None @@ -251,7 +259,7 @@ def resolve_workspace_for_path_hint(path_hint: str) -> WorkspaceInfo | None: return _workspace_from_display(project_display, project_name=parts[3]) configured = str(settings.get_settings().get("workdir_path") or "") - workdir_display = normalize_display_path(configured or files.normalize_a0_path(files.get_abs_path("usr/workdir"))) + workdir_display = canonical_workspace_display_path(configured or files.normalize_a0_path(files.get_abs_path("usr/workdir"))) if normalized == workdir_display or normalized.startswith(workdir_display.rstrip("/") + "/"): return _workspace_from_display(workdir_display) @@ -259,7 +267,7 @@ def resolve_workspace_for_path_hint(path_hint: str) -> WorkspaceInfo | None: def _workspace_from_display(display_path: str, *, project_name: str = "", context_id: str = "") -> WorkspaceInfo: - normalized = normalize_display_path(display_path) + normalized = canonical_workspace_display_path(display_path) if not is_inside_usr_display(normalized): raise WorkspaceRejectedError("Time Travel is only available for workspaces inside /a0/usr.") workspace_id = workspace_id_for(normalized) @@ -553,22 +561,11 @@ class TimeTravelService: def ensure_repo(self) -> None: self.workspace.shadow_path.mkdir(parents=True, exist_ok=True) - if not self.workspace.repo_git_path.exists(): - completed = subprocess.run( - ["git", "init", "--bare", str(self.workspace.repo_git_path)], - capture_output=True, - text=True, - encoding="utf-8", - errors="replace", - timeout=GIT_TIMEOUT_SECONDS, - ) - if completed.returncode != 0: - raise GitCommandError( - (completed.stderr or completed.stdout or "Could not initialize shadow Git repository.").strip(), - stdout=completed.stdout, - stderr=completed.stderr, - ) - self._git("symbolic-ref", "HEAD", CURRENT_REF) + if not self._shadow_repo_valid(): + self._repair_shadow_repo_head() + if not self._shadow_repo_valid(): + self._initialize_shadow_repo(quarantine_existing=True) + self._ensure_current_head_ref() self._git("config", "user.name", "Agent Zero Time Travel") self._git("config", "user.email", "time-travel@agent-zero.local") @@ -854,6 +851,7 @@ class TimeTravelService: payload = "\0".join(paths).encode("utf-8") + b"\0" self._git_bytes( "add", + "-f", "-A", "--pathspec-from-file=-", "--pathspec-file-nul", @@ -1106,6 +1104,102 @@ class TimeTravelService: env["GIT_OPTIONAL_LOCKS"] = "0" return env + def _run_git_dir(self, *args: str, check: bool = False) -> subprocess.CompletedProcess[str]: + return subprocess.run( + ["git", f"--git-dir={self.workspace.repo_git_path}", *args], + capture_output=True, + text=True, + encoding="utf-8", + errors="replace", + env=self._git_env(), + timeout=GIT_TIMEOUT_SECONDS, + check=check, + ) + + def _shadow_repo_valid(self) -> bool: + if not self.workspace.repo_git_path.is_dir(): + return False + completed = self._run_git_dir("rev-parse", "--git-dir") + return completed.returncode == 0 + + def _repair_shadow_repo_head(self) -> None: + if not self.workspace.repo_git_path.is_dir(): + return + if not (self.workspace.repo_git_path / "objects").is_dir() or not (self.workspace.repo_git_path / "refs").is_dir(): + return + target_ref = CURRENT_REF if self._loose_ref_exists(CURRENT_REF) else self._first_loose_head_ref() + try: + (self.workspace.repo_git_path / "HEAD").write_text(f"ref: {target_ref}\n", encoding="utf-8") + except OSError: + return + + def _initialize_shadow_repo(self, *, quarantine_existing: bool = False) -> None: + if quarantine_existing and self.workspace.repo_git_path.exists(): + backup_path = self._next_invalid_repo_backup_path() + shutil.move(str(self.workspace.repo_git_path), str(backup_path)) + completed = subprocess.run( + ["git", "init", "--bare", str(self.workspace.repo_git_path)], + capture_output=True, + text=True, + encoding="utf-8", + errors="replace", + env=self._git_env(), + timeout=GIT_TIMEOUT_SECONDS, + ) + if completed.returncode != 0: + raise GitCommandError( + (completed.stderr or completed.stdout or "Could not initialize shadow Git repository.").strip(), + stdout=completed.stdout, + stderr=completed.stderr, + ) + updated = self._run_git_dir("symbolic-ref", "HEAD", CURRENT_REF) + if updated.returncode != 0: + raise GitCommandError( + (updated.stderr or updated.stdout or "Could not initialize shadow Git HEAD.").strip(), + stdout=updated.stdout, + stderr=updated.stderr, + ) + + def _loose_ref_exists(self, ref: str) -> bool: + return self.workspace.repo_git_path.joinpath(*ref.split("/")).is_file() + + def _first_loose_head_ref(self) -> str: + heads_dir = self.workspace.repo_git_path / "refs" / "heads" + try: + refs = sorted(path for path in heads_dir.rglob("*") if path.is_file()) + except OSError: + refs = [] + if not refs: + return CURRENT_REF + return "refs/heads/" + refs[0].relative_to(heads_dir).as_posix() + + def _next_invalid_repo_backup_path(self) -> Path: + stamp = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") + base_path = self.workspace.shadow_path / f"{SHADOW_REPO_BACKUP_PREFIX}-{stamp}" + backup_path = base_path + counter = 2 + while backup_path.exists(): + backup_path = self.workspace.shadow_path / f"{base_path.name}-{counter}" + counter += 1 + return backup_path + + def _ensure_current_head_ref(self) -> None: + current_ref = self._run_git_dir("symbolic-ref", "-q", "HEAD") + if current_ref.returncode == 0 and current_ref.stdout.strip() == CURRENT_REF: + return + + current_commit = self._run_git_dir("rev-parse", "--verify", "HEAD^{commit}") + if current_commit.returncode == 0: + self._run_git_dir("update-ref", CURRENT_REF, current_commit.stdout.strip()) + + updated = self._run_git_dir("symbolic-ref", "HEAD", CURRENT_REF) + if updated.returncode != 0: + raise GitCommandError( + (updated.stderr or updated.stdout or "Could not repair shadow Git HEAD.").strip(), + stdout=updated.stdout, + stderr=updated.stderr, + ) + def _git(self, *args: str, input: str | None = None, env: dict[str, str] | None = None, check: bool = True) -> subprocess.CompletedProcess[str]: self.workspace.shadow_path.mkdir(parents=True, exist_ok=True) completed = subprocess.run( diff --git a/tests/test_time_travel.py b/tests/test_time_travel.py index 0f9de3345..00dc42ee5 100644 --- a/tests/test_time_travel.py +++ b/tests/test_time_travel.py @@ -135,6 +135,58 @@ def test_kernel_boundary_real_git_repo_and_git_dir_exclusion(workspace): assert "untracked.txt" in tracked_paths(service, snapshot.hash) +def test_snapshot_force_adds_curated_paths_ignored_by_workspace_gitignore(workspace): + root, service = workspace + (root / ".gitignore").write_text("ignored.txt\nignored-dir/\n.env\n", encoding="utf-8") + (root / "ignored.txt").write_text("still important\n", encoding="utf-8") + (root / "ignored-dir").mkdir() + (root / "ignored-dir" / "note.txt").write_text("nested\n", encoding="utf-8") + (root / ".env").write_text("SECRET=still excluded\n", encoding="utf-8") + + snapshot = service.snapshot(trigger="manual") + paths = tracked_paths(service, snapshot.hash) + + assert ".gitignore" in paths + assert "ignored.txt" in paths + assert "ignored-dir/note.txt" in paths + assert ".env" not in paths + + +def test_shadow_repo_empty_head_is_repaired_without_losing_history(workspace): + root, service = workspace + (root / "a.txt").write_text("one\n", encoding="utf-8") + first = service.snapshot(trigger="manual") + (service.workspace.repo_git_path / "HEAD").write_text("", encoding="utf-8") + + (root / "a.txt").write_text("one\ntwo\n", encoding="utf-8") + second = service.snapshot(trigger="manual") + + assert second.created is True + assert service.current_hash() == second.hash + assert [commit["hash"] for commit in service.history_list(limit=10)["commits"][:2]] == [ + second.hash, + first.hash, + ] + + +def test_workspace_identity_canonicalizes_symlink_aliases(): + name = f"tt-{uuid.uuid4().hex}" + root = PROJECT_ROOT / "usr" / "time-travel-tests" / name + target = root / "target" + alias = root / "alias" + target.mkdir(parents=True) + os.symlink(target, alias) + + target_workspace = _workspace_from_display(f"/a0/usr/time-travel-tests/{name}/target") + alias_workspace = _workspace_from_display(f"/a0/usr/time-travel-tests/{name}/alias") + try: + assert alias_workspace.id == target_workspace.id + assert alias_workspace.display_path == target_workspace.display_path + finally: + shutil.rmtree(root, ignore_errors=True) + shutil.rmtree(target_workspace.shadow_path, ignore_errors=True) + + def test_usr_root_snapshot_skips_plugins_and_nested_git_projects(tmp_path: Path): root = tmp_path / "usr" root.mkdir()