Replace hardcoded SUPPORTED_BRANCHES with dynamic branch discovery from remote repository

- Add _get_remote_branch_names helper to fetch available branches via git ls-remote with caching
- Add _get_local_origin_branch_names fallback for offline scenarios
- Add get_available_branch_values and get_available_branches to expose filtered branch list
- Add _is_excluded_self_update_branch helper to filter out HEAD, PR branches
- Add _sort_branch_names to deduplicate and sort branches with main first
- Add
This commit is contained in:
frdel 2026-03-26 11:30:17 +01:00
parent c1d709726e
commit ffa6ac5433
5 changed files with 404 additions and 48 deletions

View file

@ -8,7 +8,15 @@ class SelfUpdateTags(ApiHandler):
async def process(self, input: dict, request: Request) -> dict | Response:
branch = str(input.get("branch", "")).strip().lower()
current_branch = self_update.get_repo_version_info().get("branch", "").strip().lower()
default_branch = current_branch if current_branch in self_update.SUPPORTED_BRANCHES else "main"
available_branch_values = self_update.get_available_branch_values()
if current_branch in available_branch_values:
default_branch = current_branch
elif "main" in available_branch_values:
default_branch = "main"
elif available_branch_values:
default_branch = available_branch_values[0]
else:
default_branch = "main"
resolved_branch = branch or default_branch
try:

View file

@ -2,6 +2,7 @@ from __future__ import annotations
import os
import re
import shutil
import subprocess
import tempfile
import time
@ -16,6 +17,7 @@ OFFICIAL_REPO_AUTHOR = "agent0ai"
OFFICIAL_REPO_NAME = "agent-zero"
BRANCH_OPTIONS = [
{"value": "main", "label": "main"},
{"value": "ready", "label": "ready"},
{"value": "testing", "label": "testing"},
{"value": "development", "label": "development"},
]
@ -23,17 +25,20 @@ SUPPORTED_BRANCHES = {option["value"] for option in BRANCH_OPTIONS}
BACKUP_CONFLICT_POLICIES = {"rename", "overwrite", "fail"}
MIN_SELECTOR_VERSION = (1, 0)
REMOTE_BRANCH_TAG_CACHE_TTL_SECONDS = 60.0
REMOTE_BRANCH_LIST_CACHE_TTL_SECONDS = 60.0
UPDATE_FILE_PATH = Path("/exe/a0-self-update.yaml")
STATUS_FILE_PATH = Path("/exe/a0-self-update-status.yaml")
LOG_FILE_PATH = Path("/exe/a0-self-update.log")
DURABLE_EXE_DIR = UPDATE_FILE_PATH.parent
_remote_branch_tag_cache: dict[str, tuple[float, set[str]]] = {}
_remote_branch_head_cache: dict[str, tuple[float, dict[str, str]]] = {}
_remote_branch_list_cache: tuple[float, list[str]] | None = None
class PendingUpdateConfig(TypedDict):
branch: Literal["main", "testing", "development"]
branch: str
tag: str
source_version: str
source_describe: str
@ -84,6 +89,10 @@ def get_log_file_path() -> Path:
return LOG_FILE_PATH
def get_durable_exe_dir() -> Path:
return DURABLE_EXE_DIR
def _load_yaml(path: Path) -> dict[str, Any] | None:
if not path.exists():
return None
@ -123,10 +132,27 @@ def get_repo_dir(repo_dir: str | Path | None = None) -> Path:
return Path(__file__).resolve().parents[1]
def get_self_update_runtime_source_dir(
repo_dir: str | Path | None = None,
) -> Path:
return get_repo_dir(repo_dir) / "docker" / "run" / "fs" / "exe"
def _get_official_remote_url() -> str:
return f"https://github.com/{OFFICIAL_REPO_AUTHOR}/{OFFICIAL_REPO_NAME}.git"
def _run_git_raw(*args: str) -> str:
completed = subprocess.run(
["git", *args],
check=True,
text=True,
capture_output=True,
env={**os.environ, "GIT_TERMINAL_PROMPT": "0"},
)
return completed.stdout.strip()
def _run_git(repo_dir: str | Path, *args: str) -> str:
completed = subprocess.run(
["git", "-C", str(get_repo_dir(repo_dir)), *args],
@ -211,16 +237,139 @@ def _resolve_backup_path(
return path.resolve()
def _is_excluded_self_update_branch(branch: str) -> bool:
normalized = branch.strip().lower()
return (
not normalized
or normalized == "head"
or normalized.startswith("pr/")
or normalized.startswith("pr-")
or normalized.startswith("pull/")
)
def _sort_branch_names(branches: list[str]) -> list[str]:
unique_branches: list[str] = []
seen: set[str] = set()
for branch in branches:
normalized = branch.strip().lower()
if _is_excluded_self_update_branch(normalized) or normalized in seen:
continue
seen.add(normalized)
unique_branches.append(normalized)
return sorted(unique_branches, key=lambda branch: (branch != "main", branch))
def _get_remote_branch_names() -> list[str]:
global _remote_branch_list_cache
now = time.monotonic()
if (
_remote_branch_list_cache
and now - _remote_branch_list_cache[0] <= REMOTE_BRANCH_LIST_CACHE_TTL_SECONDS
):
return list(_remote_branch_list_cache[1])
output = _run_git_raw("ls-remote", "--heads", _get_official_remote_url())
branches: list[str] = []
prefix = "refs/heads/"
for line in output.splitlines():
parts = line.strip().split()
if len(parts) != 2:
continue
ref_name = parts[1]
if not ref_name.startswith(prefix):
continue
branches.append(ref_name[len(prefix):])
sorted_branches = _sort_branch_names(branches)
_remote_branch_list_cache = (now, sorted_branches)
return list(sorted_branches)
def _get_local_origin_branch_names(
repo_dir: str | Path | None = None,
) -> list[str]:
repository = get_repo_dir(repo_dir)
try:
output = _run_git(
repository,
"for-each-ref",
"--format=%(refname:short)",
"refs/remotes/origin",
)
except Exception:
return []
branches: list[str] = []
prefix = "origin/"
for line in output.splitlines():
ref_name = line.strip()
if not ref_name.startswith(prefix):
continue
branches.append(ref_name[len(prefix):])
return _sort_branch_names(branches)
def get_available_branch_values(
repo_dir: str | Path | None = None,
) -> list[str]:
try:
remote_branches = _get_remote_branch_names()
if remote_branches:
return remote_branches
except Exception:
pass
local_origin_branches = _get_local_origin_branch_names(repo_dir=repo_dir)
if local_origin_branches:
return local_origin_branches
return _sort_branch_names([option["value"] for option in BRANCH_OPTIONS])
def get_available_branches(
repo_dir: str | Path | None = None,
) -> list[dict[str, str]]:
return [
{"value": branch, "label": branch}
for branch in get_available_branch_values(repo_dir=repo_dir)
]
def sync_self_update_runtime_files(
repo_dir: str | Path | None = None,
) -> list[str]:
source_dir = get_self_update_runtime_source_dir(repo_dir)
durable_dir = get_durable_exe_dir()
synced_files: list[str] = []
runtime_files = ["self_update_manager.py", "run_A0.sh"]
for filename in runtime_files:
source_path = source_dir / filename
if not source_path.exists():
raise FileNotFoundError(
f"Required self-update runtime file is missing: {source_path}"
)
destination_path = durable_dir / filename
destination_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(source_path, destination_path)
shutil.copymode(source_path, destination_path)
synced_files.append(str(destination_path))
return synced_files
def _get_branch_reference_names(branch: str) -> list[str]:
normalized_branch = branch.strip().lower()
if normalized_branch not in SUPPORTED_BRANCHES:
if _is_excluded_self_update_branch(normalized_branch):
return []
return [f"origin/{normalized_branch}", normalized_branch]
def _get_remote_branch_merged_tags(branch: str) -> set[str]:
normalized_branch = branch.strip().lower()
if normalized_branch not in SUPPORTED_BRANCHES:
if _is_excluded_self_update_branch(normalized_branch):
return set()
cached = _remote_branch_tag_cache.get(normalized_branch)
@ -250,7 +399,7 @@ def _get_remote_branch_merged_tags(branch: str) -> set[str]:
def _get_remote_branch_head_info(branch: str) -> dict[str, str]:
normalized_branch = branch.strip().lower()
if normalized_branch not in SUPPORTED_BRANCHES:
if _is_excluded_self_update_branch(normalized_branch):
return {"describe": "", "short_tag": "", "commit": ""}
cached = _remote_branch_head_cache.get(normalized_branch)
@ -404,7 +553,8 @@ def get_current_branch_latest_info(
) -> dict[str, Any]:
repository = get_repo_dir(repo_dir)
normalized_branch = current_branch.strip().lower()
if normalized_branch not in SUPPORTED_BRANCHES:
available_branches = set(get_available_branch_values(repo_dir=repository))
if normalized_branch not in available_branches:
return {
"branch": current_branch.strip(),
"supported": False,
@ -511,7 +661,16 @@ def get_update_info(repo_dir: str | Path | None = None) -> dict[str, Any]:
version_info = get_repo_version_info(repository)
current_version = version_info["short_tag"]
current_branch = version_info.get("branch", "").strip().lower()
default_branch = current_branch if current_branch in SUPPORTED_BRANCHES else "main"
available_branches = get_available_branches(repo_dir=repository)
available_branch_values = [branch["value"] for branch in available_branches]
if current_branch in available_branch_values:
default_branch = current_branch
elif "main" in available_branch_values:
default_branch = "main"
elif available_branch_values:
default_branch = available_branch_values[0]
else:
default_branch = "main"
tag_options, higher_major_versions, tags_error = get_selector_tag_options(
default_branch,
repo_dir=repository,
@ -526,7 +685,7 @@ def get_update_info(repo_dir: str | Path | None = None) -> dict[str, Any]:
),
"pending": load_pending_update(),
"last_status": load_last_status(),
"branches": BRANCH_OPTIONS,
"branches": available_branches,
"available_tags": [option["value"] for option in tag_options],
"available_tag_options": tag_options,
"available_tags_error": tags_error,
@ -561,8 +720,9 @@ def schedule_update(
version_info = get_repo_version_info(repository)
normalized_branch = branch.strip().lower()
if normalized_branch not in SUPPORTED_BRANCHES:
raise ValueError("Branch must be one of: main, testing, development.")
available_branch_values = set(get_available_branch_values(repo_dir=repository))
if normalized_branch not in available_branch_values:
raise ValueError("Branch must be one of the available remote branches.")
normalized_tag = tag.strip()
if not normalized_tag:
@ -613,5 +773,6 @@ def schedule_update(
"backup_conflict_policy": normalized_policy, # type: ignore[assignment]
}
sync_self_update_runtime_files(repository)
_write_yaml(get_update_file_path(), payload)
return payload

View file

@ -66,6 +66,85 @@ def test_self_update_branch_filter_prefers_remote_branch_tags(monkeypatch):
assert tags == ["v1.1", "v1.0"]
def test_self_update_available_branch_values_filter_prs_and_pin_main_first(monkeypatch):
monkeypatch.setattr(self_update, "_remote_branch_list_cache", None)
monkeypatch.setattr(
self_update,
"_run_git_raw",
lambda *args: "\n".join(
[
"111 refs/heads/testing",
"222 refs/heads/ready",
"333 refs/heads/pr/123",
"444 refs/heads/development",
"555 refs/heads/main",
"666 refs/heads/pr-999",
]
),
)
branches = self_update.get_available_branch_values()
assert branches == ["main", "development", "ready", "testing"]
def test_self_update_available_branch_values_fallback_to_local_origin(monkeypatch):
monkeypatch.setattr(self_update, "_remote_branch_list_cache", None)
monkeypatch.setattr(
self_update,
"_run_git_raw",
lambda *args: (_ for _ in ()).throw(RuntimeError("offline")),
)
monkeypatch.setattr(
self_update,
"_run_git",
lambda repo_dir, *args: "\n".join(
[
"origin/HEAD",
"origin/ready",
"origin/testing",
"origin/pr/999",
"origin/main",
]
),
)
branches = self_update.get_available_branch_values()
assert branches == ["main", "ready", "testing"]
def test_self_update_runtime_files_are_synced_to_durable_exe(monkeypatch, tmp_path):
source_dir = tmp_path / "source-exe"
durable_dir = tmp_path / "durable-exe"
source_dir.mkdir()
durable_dir.mkdir()
manager_source = source_dir / "self_update_manager.py"
launcher_source = source_dir / "run_A0.sh"
manager_source.write_text("# manager\n", encoding="utf-8")
launcher_source.write_text("#!/bin/bash\necho hi\n", encoding="utf-8")
monkeypatch.setattr(
self_update,
"get_self_update_runtime_source_dir",
lambda repo_dir=None: source_dir,
)
monkeypatch.setattr(
self_update,
"get_durable_exe_dir",
lambda: durable_dir,
)
synced_files = self_update.sync_self_update_runtime_files()
assert synced_files == [
str(durable_dir / "self_update_manager.py"),
str(durable_dir / "run_A0.sh"),
]
assert (durable_dir / "self_update_manager.py").read_text(encoding="utf-8") == "# manager\n"
assert (durable_dir / "run_A0.sh").read_text(encoding="utf-8") == "#!/bin/bash\necho hi\n"
def test_self_update_selector_tag_options_filter_to_current_major(monkeypatch):
monkeypatch.setattr(
self_update,
@ -111,6 +190,20 @@ def test_self_update_update_info_uses_current_branch_for_latest_version(monkeypa
"short_commit": "abc1234",
},
)
monkeypatch.setattr(
self_update,
"get_available_branches",
lambda repo_dir=None: [
{"value": "main", "label": "main"},
{"value": "ready", "label": "ready"},
{"value": "testing", "label": "testing"},
],
)
monkeypatch.setattr(
self_update,
"get_available_branch_values",
lambda repo_dir=None: ["main", "ready", "testing"],
)
monkeypatch.setattr(
self_update,
"get_selector_tag_options",
@ -172,14 +265,21 @@ def test_self_update_frontend_uses_preloaded_select():
assert "response.available_higher_major_versions" in content
assert "response.tag_options" in content
assert "response.higher_major_versions" in content
assert "response.pending || {" in content
assert "tag: \"\"," in content
assert "await this.fetchTags();" in content
assert "Release tag must use the format vX.Y." in content
assert "Release tag must be v1.0 or newer." in content
assert "isLatestSelectorTag(value)" in content
assert "this.isSelectableTag(this.form.tag)" in content
assert "getLastStatusBadgeClass(status)" in content
assert "status-pill-error" in content
assert "status-pill-success" in content
assert "this.info?.defaults?.branch ||" in content
assert "Version ${this.trimmedTag} does not exist on branch" in content
assert "this.selectedTagExistsOnBranch" in content
assert "this.form.tag = \"\";" in content
assert "this.availableTags[0]" not in content
assert 'const response = await fetch("/api/health"' in content
assert "if (response.ok && observedBackendUnavailable)" in content
assert "window.location.reload();" in content
@ -210,6 +310,8 @@ def test_self_update_modal_uses_standard_select_and_manual_backup():
assert "current_branch_latest?.display_version" in content
assert "tagOption.label" in content
assert 'data-bs-target="#self-update-last-attempt-collapse"' in content
assert "self-update-header-status" in content
assert "getLastStatusLabel($store.selfUpdateStore.info?.last_status?.status)" in content
assert "Latest version" in content
assert "Docker update guide" in content
assert "https://www.agent-zero.ai/p/docs/get-started/" in content
@ -240,6 +342,16 @@ def test_self_update_schedule_rejects_missing_tag_on_branch(monkeypatch, tmp_pat
"",
),
)
monkeypatch.setattr(
self_update,
"get_available_branch_values",
lambda repo_dir=None: ["development", "ready", "testing", "main"],
)
monkeypatch.setattr(
self_update,
"sync_self_update_runtime_files",
lambda repo_dir=None: [],
)
monkeypatch.setattr(self_update, "_write_yaml", lambda path, payload: None)
with pytest.raises(ValueError, match=r"Version v1\.1 does not exist on branch development\."):
@ -276,6 +388,16 @@ def test_self_update_schedule_accepts_latest_when_selector_exposes_it(monkeypatc
"",
),
)
monkeypatch.setattr(
self_update,
"get_available_branch_values",
lambda repo_dir=None: ["development", "ready", "testing", "main"],
)
monkeypatch.setattr(
self_update,
"sync_self_update_runtime_files",
lambda repo_dir=None: [],
)
monkeypatch.setattr(
self_update,
"_write_yaml",

View file

@ -14,36 +14,6 @@
x-destroy="$store.selfUpdateStore.cleanup()"
class="self-update-modal"
>
<div class="self-update-panel">
<button
type="button"
class="self-update-panel-toggle"
data-bs-toggle="collapse"
data-bs-target="#self-update-howto-collapse"
aria-expanded="false"
aria-controls="self-update-howto-collapse"
>
<span>How it works?</span>
<span class="material-symbols-outlined self-update-panel-toggle-icon">expand_more</span>
</button>
<div class="collapse" id="self-update-howto-collapse">
<div class="self-update-panel-body self-update-copy">
<p>
Agent Zero saves this request into
<code x-text="$store.selfUpdateStore.info?.paths?.update_file || '/exe/a0-self-update.yaml'"></code>,
restarts once, applies the requested branch and version target before the UI
starts again, then reloads this page when <code>/api/health</code> is healthy.
</p>
<p>
If the updated UI does not become healthy within 2 minutes, the bootstrap
manager in <code>/exe</code> restores the previous checkout and starts that
version again, so even an older downgraded <code>/a0</code> can be upgraded back
by creating the YAML file manually.
</p>
</div>
</div>
</div>
<div class="self-update-version-grid">
<div class="self-update-summary-card">
<div class="summary-label">Current version</div>
@ -107,6 +77,36 @@
</div>
</template>
<div class="self-update-panel">
<button
type="button"
class="self-update-panel-toggle"
data-bs-toggle="collapse"
data-bs-target="#self-update-howto-collapse"
aria-expanded="false"
aria-controls="self-update-howto-collapse"
>
<span>How it works?</span>
<span class="material-symbols-outlined self-update-panel-toggle-icon">expand_more</span>
</button>
<div class="collapse" id="self-update-howto-collapse">
<div class="self-update-panel-body self-update-copy">
<p>
Agent Zero saves this request into
<code x-text="$store.selfUpdateStore.info?.paths?.update_file || '/exe/a0-self-update.yaml'"></code>,
restarts once, applies the requested branch and version target before the UI
starts again, then reloads this page when <code>/api/health</code> is healthy.
</p>
<p>
If the updated UI does not become healthy within 2 minutes, the bootstrap
manager in <code>/exe</code> restores the previous checkout and starts that
version again, so even an older downgraded <code>/a0</code> can be upgraded back
by creating the YAML file manually.
</p>
</div>
</div>
</div>
<template x-if="$store.selfUpdateStore.info?.last_status">
<div class="self-update-panel">
<button
@ -118,11 +118,17 @@
aria-controls="self-update-last-attempt-collapse"
>
<span>Last Attempt</span>
<span class="material-symbols-outlined self-update-panel-toggle-icon">expand_more</span>
<span class="self-update-panel-toggle-trailing">
<span
class="status-pill self-update-header-status"
:class="$store.selfUpdateStore.getLastStatusBadgeClass($store.selfUpdateStore.info?.last_status?.status)"
x-text="$store.selfUpdateStore.getLastStatusLabel($store.selfUpdateStore.info?.last_status?.status)"
></span>
<span class="material-symbols-outlined self-update-panel-toggle-icon">expand_more</span>
</span>
</button>
<div class="collapse" id="self-update-last-attempt-collapse">
<div class="self-update-panel-body">
<div class="status-pill" x-text="$store.selfUpdateStore.info?.last_status?.status || 'unknown'"></div>
<div class="status-message" x-text="$store.selfUpdateStore.info?.last_status?.message || ''"></div>
<div class="summary-meta">
Trigger:
@ -399,6 +405,13 @@
transition: transform 0.2s ease;
}
.self-update-panel-toggle-trailing {
display: inline-flex;
align-items: center;
gap: 0.75rem;
flex: 0 0 auto;
}
.self-update-panel-toggle[aria-expanded="true"] .self-update-panel-toggle-icon {
transform: rotate(180deg);
}
@ -537,6 +550,34 @@
letter-spacing: 0.04em;
}
.self-update-header-status {
margin-top: 0;
}
.status-pill-success {
border-color: color-mix(in srgb, var(--color-success, #16a34a) 55%, var(--color-border));
background: color-mix(in srgb, var(--color-success, #16a34a) 18%, transparent);
color: var(--color-success, #16a34a);
}
.status-pill-error {
border-color: color-mix(in srgb, var(--color-error, #dc2626) 55%, var(--color-border));
background: color-mix(in srgb, var(--color-error, #dc2626) 18%, transparent);
color: var(--color-error, #dc2626);
}
.status-pill-warning {
border-color: color-mix(in srgb, var(--color-warning, #d97706) 55%, var(--color-border));
background: color-mix(in srgb, var(--color-warning, #d97706) 18%, transparent);
color: var(--color-warning, #d97706);
}
.status-pill-neutral {
border-color: var(--color-border);
background: transparent;
color: var(--color-text);
}
.status-message {
margin-top: 0.7rem;
line-height: 1.5;

View file

@ -134,6 +134,29 @@ const model = {
return `${branch || "main"} / ${tag || "None"}`;
},
normalizeLastStatus(status) {
return (status || "unknown").trim().toLowerCase();
},
getLastStatusLabel(status) {
const normalizedStatus = this.normalizeLastStatus(status);
return normalizedStatus ? normalizedStatus.replace(/_/g, " ") : "unknown";
},
getLastStatusBadgeClass(status) {
const normalizedStatus = this.normalizeLastStatus(status);
if (normalizedStatus === "success") {
return "status-pill-success";
}
if (normalizedStatus === "failed" || normalizedStatus === "rollback_failed") {
return "status-pill-error";
}
if (normalizedStatus === "rolled_back") {
return "status-pill-warning";
}
return "status-pill-neutral";
},
getProgressOverlay() {
return document.getElementById(SELF_UPDATE_OVERLAY_ID);
},
@ -255,7 +278,12 @@ const model = {
throw new Error(response?.error || "Failed to load self-update info.");
}
this.info = response;
this.applyFormState(response.pending || response.defaults || {});
this.applyFormState(
response.pending || {
...(response.defaults || {}),
tag: "",
},
);
this.applyAvailableTags({
options: response.available_tag_options,
higherMajorVersions: response.available_higher_major_versions,
@ -304,11 +332,7 @@ const model = {
return;
}
const defaultTag = (this.info?.defaults?.tag || "").trim();
this.form.tag =
defaultTag && this.availableTags.includes(defaultTag)
? defaultTag
: this.availableTags[0];
this.form.tag = "";
},
async openModal() {