Fix Time Travel snapshot resilience

Force-add curated snapshot paths so workspace .gitignore rules cannot break Time Travel snapshots, while preserving Time Travel's own exclusions for secrets and generated files.

Repair invalid shadow Git repositories by restoring HEAD when possible or quarantining and reinitializing unusable repos, and canonicalize workspace paths to avoid duplicate shadow histories for aliases.

Add regression coverage for ignored paths, corrupt shadow HEAD recovery, and canonical workspace identity.
This commit is contained in:
Alessandro 2026-05-02 20:27:28 +02:00
parent d6d97d037c
commit d8c0d6b9fe
2 changed files with 167 additions and 21 deletions

View file

@ -31,6 +31,7 @@ GIT_TIMEOUT_SECONDS = 20
AUTO_SNAPSHOT_DEBOUNCE_SECONDS = 10.0
WATCHDOG_ID = "time_travel_usr"
WATCHDOG_DEBOUNCE_SECONDS = 1.0
SHADOW_REPO_BACKUP_PREFIX = "repo.git.invalid"
_AUTO_SNAPSHOT_LOCK = threading.RLock()
_AUTO_SNAPSHOT_TIMERS: dict[str, threading.Timer] = {}
@ -189,7 +190,7 @@ def is_inside_usr_display(display_path: str) -> bool:
def workspace_id_for(display_path: str) -> str:
normalized = normalize_display_path(display_path).rstrip("/")
normalized = canonical_workspace_display_path(display_path).rstrip("/")
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()[:32]
@ -202,6 +203,13 @@ def real_path_for_display(display_path: str) -> Path:
return Path(normalized).expanduser().resolve(strict=False)
def canonical_workspace_display_path(display_path: str) -> str:
normalized = normalize_display_path(display_path)
real_path = real_path_for_display(normalized)
canonical = normalize_display_path(str(real_path))
return (canonical if canonical.startswith("/a0") else normalized).rstrip("/") or canonical
def resolve_workspace(context_id: str = "", *, context_loader=None) -> WorkspaceInfo:
from helpers import projects, settings
@ -220,7 +228,7 @@ def resolve_workspace(context_id: str = "", *, context_loader=None) -> Workspace
configured = str(settings.get_settings().get("workdir_path") or "")
display_path = configured or files.normalize_a0_path(files.get_abs_path("usr/workdir"))
normalized = normalize_display_path(display_path)
normalized = canonical_workspace_display_path(display_path)
if not is_inside_usr_display(normalized):
raise WorkspaceRejectedError("Time Travel is only available for workspaces inside /a0/usr.")
@ -241,7 +249,7 @@ def resolve_workspace(context_id: str = "", *, context_loader=None) -> Workspace
def resolve_workspace_for_path_hint(path_hint: str) -> WorkspaceInfo | None:
from helpers import settings
normalized = normalize_display_path(path_hint)
normalized = canonical_workspace_display_path(path_hint)
if not is_inside_usr_display(normalized):
return None
@ -251,7 +259,7 @@ def resolve_workspace_for_path_hint(path_hint: str) -> WorkspaceInfo | None:
return _workspace_from_display(project_display, project_name=parts[3])
configured = str(settings.get_settings().get("workdir_path") or "")
workdir_display = normalize_display_path(configured or files.normalize_a0_path(files.get_abs_path("usr/workdir")))
workdir_display = canonical_workspace_display_path(configured or files.normalize_a0_path(files.get_abs_path("usr/workdir")))
if normalized == workdir_display or normalized.startswith(workdir_display.rstrip("/") + "/"):
return _workspace_from_display(workdir_display)
@ -259,7 +267,7 @@ def resolve_workspace_for_path_hint(path_hint: str) -> WorkspaceInfo | None:
def _workspace_from_display(display_path: str, *, project_name: str = "", context_id: str = "") -> WorkspaceInfo:
normalized = normalize_display_path(display_path)
normalized = canonical_workspace_display_path(display_path)
if not is_inside_usr_display(normalized):
raise WorkspaceRejectedError("Time Travel is only available for workspaces inside /a0/usr.")
workspace_id = workspace_id_for(normalized)
@ -553,22 +561,11 @@ class TimeTravelService:
def ensure_repo(self) -> None:
self.workspace.shadow_path.mkdir(parents=True, exist_ok=True)
if not self.workspace.repo_git_path.exists():
completed = subprocess.run(
["git", "init", "--bare", str(self.workspace.repo_git_path)],
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
timeout=GIT_TIMEOUT_SECONDS,
)
if completed.returncode != 0:
raise GitCommandError(
(completed.stderr or completed.stdout or "Could not initialize shadow Git repository.").strip(),
stdout=completed.stdout,
stderr=completed.stderr,
)
self._git("symbolic-ref", "HEAD", CURRENT_REF)
if not self._shadow_repo_valid():
self._repair_shadow_repo_head()
if not self._shadow_repo_valid():
self._initialize_shadow_repo(quarantine_existing=True)
self._ensure_current_head_ref()
self._git("config", "user.name", "Agent Zero Time Travel")
self._git("config", "user.email", "time-travel@agent-zero.local")
@ -854,6 +851,7 @@ class TimeTravelService:
payload = "\0".join(paths).encode("utf-8") + b"\0"
self._git_bytes(
"add",
"-f",
"-A",
"--pathspec-from-file=-",
"--pathspec-file-nul",
@ -1106,6 +1104,102 @@ class TimeTravelService:
env["GIT_OPTIONAL_LOCKS"] = "0"
return env
def _run_git_dir(self, *args: str, check: bool = False) -> subprocess.CompletedProcess[str]:
return subprocess.run(
["git", f"--git-dir={self.workspace.repo_git_path}", *args],
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
env=self._git_env(),
timeout=GIT_TIMEOUT_SECONDS,
check=check,
)
def _shadow_repo_valid(self) -> bool:
if not self.workspace.repo_git_path.is_dir():
return False
completed = self._run_git_dir("rev-parse", "--git-dir")
return completed.returncode == 0
def _repair_shadow_repo_head(self) -> None:
if not self.workspace.repo_git_path.is_dir():
return
if not (self.workspace.repo_git_path / "objects").is_dir() or not (self.workspace.repo_git_path / "refs").is_dir():
return
target_ref = CURRENT_REF if self._loose_ref_exists(CURRENT_REF) else self._first_loose_head_ref()
try:
(self.workspace.repo_git_path / "HEAD").write_text(f"ref: {target_ref}\n", encoding="utf-8")
except OSError:
return
def _initialize_shadow_repo(self, *, quarantine_existing: bool = False) -> None:
if quarantine_existing and self.workspace.repo_git_path.exists():
backup_path = self._next_invalid_repo_backup_path()
shutil.move(str(self.workspace.repo_git_path), str(backup_path))
completed = subprocess.run(
["git", "init", "--bare", str(self.workspace.repo_git_path)],
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
env=self._git_env(),
timeout=GIT_TIMEOUT_SECONDS,
)
if completed.returncode != 0:
raise GitCommandError(
(completed.stderr or completed.stdout or "Could not initialize shadow Git repository.").strip(),
stdout=completed.stdout,
stderr=completed.stderr,
)
updated = self._run_git_dir("symbolic-ref", "HEAD", CURRENT_REF)
if updated.returncode != 0:
raise GitCommandError(
(updated.stderr or updated.stdout or "Could not initialize shadow Git HEAD.").strip(),
stdout=updated.stdout,
stderr=updated.stderr,
)
def _loose_ref_exists(self, ref: str) -> bool:
return self.workspace.repo_git_path.joinpath(*ref.split("/")).is_file()
def _first_loose_head_ref(self) -> str:
heads_dir = self.workspace.repo_git_path / "refs" / "heads"
try:
refs = sorted(path for path in heads_dir.rglob("*") if path.is_file())
except OSError:
refs = []
if not refs:
return CURRENT_REF
return "refs/heads/" + refs[0].relative_to(heads_dir).as_posix()
def _next_invalid_repo_backup_path(self) -> Path:
stamp = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
base_path = self.workspace.shadow_path / f"{SHADOW_REPO_BACKUP_PREFIX}-{stamp}"
backup_path = base_path
counter = 2
while backup_path.exists():
backup_path = self.workspace.shadow_path / f"{base_path.name}-{counter}"
counter += 1
return backup_path
def _ensure_current_head_ref(self) -> None:
current_ref = self._run_git_dir("symbolic-ref", "-q", "HEAD")
if current_ref.returncode == 0 and current_ref.stdout.strip() == CURRENT_REF:
return
current_commit = self._run_git_dir("rev-parse", "--verify", "HEAD^{commit}")
if current_commit.returncode == 0:
self._run_git_dir("update-ref", CURRENT_REF, current_commit.stdout.strip())
updated = self._run_git_dir("symbolic-ref", "HEAD", CURRENT_REF)
if updated.returncode != 0:
raise GitCommandError(
(updated.stderr or updated.stdout or "Could not repair shadow Git HEAD.").strip(),
stdout=updated.stdout,
stderr=updated.stderr,
)
def _git(self, *args: str, input: str | None = None, env: dict[str, str] | None = None, check: bool = True) -> subprocess.CompletedProcess[str]:
self.workspace.shadow_path.mkdir(parents=True, exist_ok=True)
completed = subprocess.run(

View file

@ -135,6 +135,58 @@ def test_kernel_boundary_real_git_repo_and_git_dir_exclusion(workspace):
assert "untracked.txt" in tracked_paths(service, snapshot.hash)
def test_snapshot_force_adds_curated_paths_ignored_by_workspace_gitignore(workspace):
root, service = workspace
(root / ".gitignore").write_text("ignored.txt\nignored-dir/\n.env\n", encoding="utf-8")
(root / "ignored.txt").write_text("still important\n", encoding="utf-8")
(root / "ignored-dir").mkdir()
(root / "ignored-dir" / "note.txt").write_text("nested\n", encoding="utf-8")
(root / ".env").write_text("SECRET=still excluded\n", encoding="utf-8")
snapshot = service.snapshot(trigger="manual")
paths = tracked_paths(service, snapshot.hash)
assert ".gitignore" in paths
assert "ignored.txt" in paths
assert "ignored-dir/note.txt" in paths
assert ".env" not in paths
def test_shadow_repo_empty_head_is_repaired_without_losing_history(workspace):
root, service = workspace
(root / "a.txt").write_text("one\n", encoding="utf-8")
first = service.snapshot(trigger="manual")
(service.workspace.repo_git_path / "HEAD").write_text("", encoding="utf-8")
(root / "a.txt").write_text("one\ntwo\n", encoding="utf-8")
second = service.snapshot(trigger="manual")
assert second.created is True
assert service.current_hash() == second.hash
assert [commit["hash"] for commit in service.history_list(limit=10)["commits"][:2]] == [
second.hash,
first.hash,
]
def test_workspace_identity_canonicalizes_symlink_aliases():
name = f"tt-{uuid.uuid4().hex}"
root = PROJECT_ROOT / "usr" / "time-travel-tests" / name
target = root / "target"
alias = root / "alias"
target.mkdir(parents=True)
os.symlink(target, alias)
target_workspace = _workspace_from_display(f"/a0/usr/time-travel-tests/{name}/target")
alias_workspace = _workspace_from_display(f"/a0/usr/time-travel-tests/{name}/alias")
try:
assert alias_workspace.id == target_workspace.id
assert alias_workspace.display_path == target_workspace.display_path
finally:
shutil.rmtree(root, ignore_errors=True)
shutil.rmtree(target_workspace.shadow_path, ignore_errors=True)
def test_usr_root_snapshot_skips_plugins_and_nested_git_projects(tmp_path: Path):
root = tmp_path / "usr"
root.mkdir()