mirror of
https://github.com/rcourtman/Pulse.git
synced 2026-05-07 00:37:36 +00:00
492 lines
22 KiB
Python
492 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
"""Helpers for resolving the evergreen release control plane and active profile."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
from pathlib import Path
|
|
import re
|
|
import sys
|
|
from typing import Any
|
|
|
|
from repo_file_io import REPO_ROOT, load_repo_json
|
|
|
|
|
|
CONTROL_PLANE_REL = "docs/release-control/control_plane.json"
|
|
AGENT_VALUES_DOC_REL = "docs/release-control/internal/AGENT_VALUES.md"
|
|
CONTROL_PLANE_DOC_REL = "docs/release-control/internal/CONTROL_PLANE.md"
|
|
CONTROL_PLANE_SCHEMA_REL = "docs/release-control/control_plane.schema.json"
|
|
|
|
REQUIRED_CONTROL_PLANE_FIELDS = (
|
|
"version",
|
|
"system",
|
|
"execution_model",
|
|
"agent_values_doc",
|
|
"control_plane_doc",
|
|
"control_plane_schema",
|
|
"active_profile_id",
|
|
"active_target_id",
|
|
"profiles",
|
|
"targets",
|
|
)
|
|
REQUIRED_PROFILE_FIELDS = (
|
|
"id",
|
|
"lifecycle",
|
|
"root",
|
|
"prerelease_branch",
|
|
"stable_branch",
|
|
"source_of_truth",
|
|
"status",
|
|
"status_schema",
|
|
"development_protocol",
|
|
"high_risk_matrix",
|
|
"subsystems_dir",
|
|
"registry",
|
|
"registry_schema",
|
|
"subsystem_contract_template",
|
|
)
|
|
REQUIRED_TARGET_FIELDS = (
|
|
"id",
|
|
"profile_id",
|
|
"kind",
|
|
"status",
|
|
"summary",
|
|
"completion_rule",
|
|
)
|
|
PROFILE_PATH_FIELDS = {
|
|
"root",
|
|
"source_of_truth",
|
|
"status",
|
|
"status_schema",
|
|
"development_protocol",
|
|
"high_risk_matrix",
|
|
"subsystems_dir",
|
|
"registry",
|
|
"registry_schema",
|
|
"subsystem_contract_template",
|
|
}
|
|
PRERELEASE_VERSION_PATTERN = re.compile(r"-(?:rc|alpha|beta)\.[0-9]+$")
|
|
COMPLETION_RULE_BLOCKING_LEVELS = {
|
|
"repo_ready": ("repo-ready",),
|
|
"rc_ready": ("repo-ready", "rc-ready"),
|
|
"release_ready": ("repo-ready", "rc-ready", "release-ready"),
|
|
}
|
|
PROOF_SCOPE_BLOCKING_LEVELS = {
|
|
"none": (),
|
|
"repo_ready": ("repo-ready",),
|
|
"rc_ready": ("repo-ready", "rc-ready"),
|
|
"release_ready": ("repo-ready", "rc-ready", "release-ready"),
|
|
}
|
|
|
|
|
|
def _clean_relative_path(path: str, *, context: str) -> str:
|
|
candidate = Path(path)
|
|
if candidate.is_absolute():
|
|
raise ValueError(f"{context} must be relative, got {path!r}")
|
|
normalized = candidate.as_posix()
|
|
if normalized != path or path.startswith("../") or "/../" in path:
|
|
raise ValueError(f"{context} must be a clean relative path, got {path!r}")
|
|
return path
|
|
|
|
|
|
def _clean_branch_name(branch: str, *, context: str) -> str:
|
|
if branch != branch.strip():
|
|
raise ValueError(f"{context} must not have surrounding whitespace, got {branch!r}")
|
|
if not branch:
|
|
raise ValueError(f"{context} must be non-empty")
|
|
if branch.startswith("refs/") or branch.startswith("/") or branch.endswith("/"):
|
|
raise ValueError(f"{context} must be a clean branch name, got {branch!r}")
|
|
if any(ch.isspace() for ch in branch):
|
|
raise ValueError(f"{context} must not contain whitespace, got {branch!r}")
|
|
return branch
|
|
|
|
|
|
def load_control_plane(*, staged: bool = False) -> dict[str, Any]:
|
|
return load_repo_json(CONTROL_PLANE_REL, staged=staged)
|
|
|
|
|
|
def validate_control_plane_payload(payload: dict[str, Any]) -> dict[str, Any]:
|
|
for field in REQUIRED_CONTROL_PLANE_FIELDS:
|
|
if field not in payload:
|
|
raise ValueError(f"control plane missing required field {field!r}")
|
|
|
|
if payload["version"] != "1":
|
|
raise ValueError(f"control plane version must be '1', got {payload['version']!r}")
|
|
if payload["system"] != "pulse-release-control":
|
|
raise ValueError(
|
|
f"control plane system must be 'pulse-release-control', got {payload['system']!r}"
|
|
)
|
|
if payload["execution_model"] != "direct-repo-sessions":
|
|
raise ValueError(
|
|
"control plane execution_model must be 'direct-repo-sessions', "
|
|
f"got {payload['execution_model']!r}"
|
|
)
|
|
if payload["agent_values_doc"] != AGENT_VALUES_DOC_REL:
|
|
raise ValueError(
|
|
f"control plane agent_values_doc must be {AGENT_VALUES_DOC_REL!r}, "
|
|
f"got {payload['agent_values_doc']!r}"
|
|
)
|
|
if payload["control_plane_doc"] != CONTROL_PLANE_DOC_REL:
|
|
raise ValueError(
|
|
f"control plane control_plane_doc must be {CONTROL_PLANE_DOC_REL!r}, "
|
|
f"got {payload['control_plane_doc']!r}"
|
|
)
|
|
if payload["control_plane_schema"] != CONTROL_PLANE_SCHEMA_REL:
|
|
raise ValueError(
|
|
f"control plane control_plane_schema must be {CONTROL_PLANE_SCHEMA_REL!r}, "
|
|
f"got {payload['control_plane_schema']!r}"
|
|
)
|
|
|
|
active_profile_id = payload.get("active_profile_id")
|
|
if not isinstance(active_profile_id, str) or not active_profile_id.strip():
|
|
raise ValueError("control plane active_profile_id must be a non-empty string")
|
|
active_target_id = payload.get("active_target_id")
|
|
if not isinstance(active_target_id, str) or not active_target_id.strip():
|
|
raise ValueError("control plane active_target_id must be a non-empty string")
|
|
|
|
raw_profiles = payload.get("profiles")
|
|
if not isinstance(raw_profiles, list) or not raw_profiles:
|
|
raise ValueError("control plane profiles must be a non-empty list")
|
|
|
|
profiles_by_id: dict[str, dict[str, str]] = {}
|
|
for index, raw in enumerate(raw_profiles):
|
|
if not isinstance(raw, dict):
|
|
raise ValueError(f"control plane profiles[{index}] must be an object")
|
|
for field in REQUIRED_PROFILE_FIELDS:
|
|
if field not in raw:
|
|
raise ValueError(f"control plane profiles[{index}] missing {field!r}")
|
|
profile = {}
|
|
for field in REQUIRED_PROFILE_FIELDS:
|
|
value = raw[field]
|
|
if not isinstance(value, str) or not value.strip():
|
|
raise ValueError(f"control plane profiles[{index}].{field} must be a non-empty string")
|
|
profile[field] = str(value)
|
|
profile_id = profile["id"]
|
|
if profile_id in profiles_by_id:
|
|
raise ValueError(f"control plane profiles duplicates id {profile_id!r}")
|
|
if profile["lifecycle"] not in {"active", "inactive", "retired"}:
|
|
raise ValueError(
|
|
f"control plane profile {profile_id!r} has invalid lifecycle {profile['lifecycle']!r}"
|
|
)
|
|
for field in REQUIRED_PROFILE_FIELDS:
|
|
if field in {"id", "lifecycle"}:
|
|
continue
|
|
if field in PROFILE_PATH_FIELDS:
|
|
_clean_relative_path(profile[field], context=f"profile {profile_id}.{field}")
|
|
else:
|
|
_clean_branch_name(profile[field], context=f"profile {profile_id}.{field}")
|
|
profiles_by_id[profile_id] = profile
|
|
|
|
active_profile = profiles_by_id.get(active_profile_id)
|
|
if active_profile is None:
|
|
raise ValueError(f"control plane active_profile_id {active_profile_id!r} does not exist in profiles")
|
|
if active_profile["lifecycle"] != "active":
|
|
raise ValueError(
|
|
f"control plane active_profile_id {active_profile_id!r} must point to an active profile"
|
|
)
|
|
|
|
raw_targets = payload.get("targets")
|
|
if not isinstance(raw_targets, list) or not raw_targets:
|
|
raise ValueError("control plane targets must be a non-empty list")
|
|
targets_by_id: dict[str, dict[str, str]] = {}
|
|
for index, raw in enumerate(raw_targets):
|
|
if not isinstance(raw, dict):
|
|
raise ValueError(f"control plane targets[{index}] must be an object")
|
|
for field in REQUIRED_TARGET_FIELDS:
|
|
if field not in raw:
|
|
raise ValueError(f"control plane targets[{index}] missing {field!r}")
|
|
target = {}
|
|
for field in REQUIRED_TARGET_FIELDS:
|
|
value = raw[field]
|
|
if not isinstance(value, str) or not value.strip():
|
|
raise ValueError(f"control plane targets[{index}].{field} must be a non-empty string")
|
|
target[field] = str(value)
|
|
target_id = target["id"]
|
|
if target_id in targets_by_id:
|
|
raise ValueError(f"control plane targets duplicates id {target_id!r}")
|
|
if target["profile_id"] not in profiles_by_id:
|
|
raise ValueError(
|
|
f"control plane target {target_id!r} references unknown profile_id {target['profile_id']!r}"
|
|
)
|
|
if target["kind"] not in {"release", "stabilization", "polish", "feature", "maintenance"}:
|
|
raise ValueError(
|
|
f"control plane target {target_id!r} has invalid kind {target['kind']!r}"
|
|
)
|
|
if target["status"] not in {"active", "planned", "completed", "superseded"}:
|
|
raise ValueError(
|
|
f"control plane target {target_id!r} has invalid status {target['status']!r}"
|
|
)
|
|
if target["completion_rule"] not in {"rc_ready", "release_ready", "repo_ready", "manual"}:
|
|
raise ValueError(
|
|
f"control plane target {target_id!r} has invalid completion_rule {target['completion_rule']!r}"
|
|
)
|
|
proof_scope = raw.get("proof_scope", "derived")
|
|
if not isinstance(proof_scope, str) or not proof_scope.strip():
|
|
raise ValueError(
|
|
f"control plane targets[{index}].proof_scope must be a non-empty string when declared"
|
|
)
|
|
proof_scope = str(proof_scope)
|
|
if proof_scope not in {"derived", "none", "repo_ready", "rc_ready", "release_ready"}:
|
|
raise ValueError(
|
|
f"control plane target {target_id!r} has invalid proof_scope {proof_scope!r}"
|
|
)
|
|
if target["completion_rule"] != "manual" and proof_scope == "none":
|
|
raise ValueError(
|
|
f"control plane target {target_id!r} may only use proof_scope 'none' with completion_rule 'manual'"
|
|
)
|
|
target["proof_scope"] = proof_scope
|
|
targets_by_id[target_id] = target
|
|
|
|
active_target = targets_by_id.get(active_target_id)
|
|
if active_target is None:
|
|
raise ValueError(f"control plane active_target_id {active_target_id!r} does not exist in targets")
|
|
if active_target["status"] != "active":
|
|
raise ValueError(
|
|
f"control plane active_target_id {active_target_id!r} must point to an active target"
|
|
)
|
|
if active_target["profile_id"] != active_profile_id:
|
|
raise ValueError(
|
|
"control plane active_target_id must point to a target owned by the active profile"
|
|
)
|
|
|
|
profiles = [dict(profile) for profile in profiles_by_id.values()]
|
|
targets = [dict(target) for target in targets_by_id.values()]
|
|
|
|
return {
|
|
"control_plane_rel": CONTROL_PLANE_REL,
|
|
"agent_values_doc_rel": AGENT_VALUES_DOC_REL,
|
|
"control_plane_doc_rel": CONTROL_PLANE_DOC_REL,
|
|
"control_plane_schema_rel": CONTROL_PLANE_SCHEMA_REL,
|
|
"agent_values_doc_path": REPO_ROOT / AGENT_VALUES_DOC_REL,
|
|
"control_plane_doc_path": REPO_ROOT / CONTROL_PLANE_DOC_REL,
|
|
"control_plane_schema_path": REPO_ROOT / CONTROL_PLANE_SCHEMA_REL,
|
|
"profiles": profiles,
|
|
"profiles_by_id": profiles_by_id,
|
|
"targets": targets,
|
|
"targets_by_id": targets_by_id,
|
|
"active_profile_id": active_profile_id,
|
|
"active_profile": active_profile,
|
|
"active_target_id": active_target_id,
|
|
"active_target": active_target,
|
|
"profile_root_rel": active_profile["root"],
|
|
"profile_root_path": REPO_ROOT / active_profile["root"],
|
|
"prerelease_branch": active_profile["prerelease_branch"],
|
|
"stable_branch": active_profile["stable_branch"],
|
|
"source_of_truth_rel": active_profile["source_of_truth"],
|
|
"status_rel": active_profile["status"],
|
|
"status_schema_rel": active_profile["status_schema"],
|
|
"development_protocol_rel": active_profile["development_protocol"],
|
|
"high_risk_matrix_rel": active_profile["high_risk_matrix"],
|
|
"subsystems_dir_rel": active_profile["subsystems_dir"],
|
|
"registry_rel": active_profile["registry"],
|
|
"registry_schema_rel": active_profile["registry_schema"],
|
|
"subsystem_contract_template_rel": active_profile["subsystem_contract_template"],
|
|
"source_of_truth_path": REPO_ROOT / active_profile["source_of_truth"],
|
|
"status_path": REPO_ROOT / active_profile["status"],
|
|
"status_schema_path": REPO_ROOT / active_profile["status_schema"],
|
|
"development_protocol_path": REPO_ROOT / active_profile["development_protocol"],
|
|
"high_risk_matrix_path": REPO_ROOT / active_profile["high_risk_matrix"],
|
|
"subsystems_dir_path": REPO_ROOT / active_profile["subsystems_dir"],
|
|
"registry_path": REPO_ROOT / active_profile["registry"],
|
|
"registry_schema_path": REPO_ROOT / active_profile["registry_schema"],
|
|
"subsystem_contract_template_path": REPO_ROOT / active_profile["subsystem_contract_template"],
|
|
}
|
|
|
|
|
|
def active_control_plane(*, staged: bool = False) -> dict[str, Any]:
|
|
return validate_control_plane_payload(load_control_plane(staged=staged))
|
|
|
|
|
|
def agent_entrypoint(*, staged: bool = False) -> dict[str, Any]:
|
|
resolved = active_control_plane(staged=staged)
|
|
active_target_id = str(resolved["active_target_id"])
|
|
startup_files = [
|
|
resolved["agent_values_doc_rel"],
|
|
resolved["control_plane_doc_rel"],
|
|
resolved["control_plane_rel"],
|
|
resolved["source_of_truth_rel"],
|
|
resolved["development_protocol_rel"],
|
|
]
|
|
escalation_files = [
|
|
resolved["status_rel"],
|
|
resolved["status_schema_rel"],
|
|
resolved["registry_rel"],
|
|
resolved["registry_schema_rel"],
|
|
]
|
|
startup_commands = [
|
|
"python3 scripts/release_control/agent_preflight.py --pretty",
|
|
"python3 scripts/release_control/status_audit.py --pretty",
|
|
]
|
|
targeted_lookup_commands = []
|
|
if active_target_id == "v6-product-lane-expansion":
|
|
targeted_lookup_commands.extend(
|
|
[
|
|
"python3 scripts/release_control/status_lookup.py --candidate-lane <CANDIDATE_LANE_ID> --pretty",
|
|
"python3 scripts/release_control/status_lookup.py --coverage-gap <COVERAGE_GAP_ID> --pretty",
|
|
]
|
|
)
|
|
targeted_lookup_commands.extend(
|
|
[
|
|
"python3 scripts/release_control/status_lookup.py --lane <LANE_ID> --pretty",
|
|
"python3 scripts/release_control/status_lookup.py --assertion <ASSERTION_ID> --pretty",
|
|
"python3 scripts/release_control/status_lookup.py --release-gate <GATE_ID> --pretty",
|
|
"python3 scripts/release_control/status_lookup.py --followup <FOLLOWUP_ID> --pretty",
|
|
"python3 scripts/release_control/status_lookup.py --work-claim <CLAIM_ID> --pretty",
|
|
"python3 scripts/release_control/work_claim.py --kind <KIND> --id <ID> --summary <SUMMARY> --agent-id <AGENT_ID> --pretty",
|
|
"python3 scripts/release_control/subsystem_lookup.py <path> [<path> ...] --pretty",
|
|
"python3 scripts/release_control/worktree_base.py --base-branch <BASE_BRANCH> --pretty",
|
|
"python3 scripts/release_control/worktree_claim.py --kind <KIND> --id <ID> --summary <SUMMARY> --agent-id <AGENT_ID> --pretty",
|
|
"python3 scripts/release_control/worktree_finish.py --base-branch <BASE_BRANCH> --pretty",
|
|
]
|
|
)
|
|
if active_target_id == "v6-product-lane-expansion":
|
|
default_pick_surface = "available_candidate_lane_queue"
|
|
selection_rule = (
|
|
"Pick from available candidate lanes and linked coverage gaps before local prerelease cleanup unless the user overrides the priority or a release-blocking surface needs immediate containment."
|
|
)
|
|
else:
|
|
default_pick_surface = "lanes"
|
|
selection_rule = (
|
|
"Pick from the current lane map first unless the user overrides the priority or a narrower governed surface is already the explicit task."
|
|
)
|
|
claim_rule = (
|
|
"Before mutating a governed slice, reserve exactly one lane, candidate lane, coverage gap, or narrower governed item with work_claim.py. "
|
|
"Support-only slices should claim the owning governed surface and explain the plumbing rationale in the claim summary. "
|
|
"Before replacing or releasing the claim, record any remaining same-lane residual in status.json rather than dropping the slice silently."
|
|
)
|
|
return {
|
|
"active_profile_id": resolved["active_profile_id"],
|
|
"active_target_id": resolved["active_target_id"],
|
|
"prerelease_branch": resolved["prerelease_branch"],
|
|
"stable_branch": resolved["stable_branch"],
|
|
"agent_values_doc": resolved["agent_values_doc_rel"],
|
|
"startup_files": startup_files,
|
|
"escalation_files": escalation_files,
|
|
"ordered_files": startup_files + escalation_files,
|
|
"startup_commands": startup_commands,
|
|
"targeted_lookup_commands": targeted_lookup_commands,
|
|
"default_pick_surface": default_pick_surface,
|
|
"selection_rule": selection_rule,
|
|
"claim_rule": claim_rule,
|
|
"subsystems_dir": resolved["subsystems_dir_rel"],
|
|
"subsystem_lookup": "scripts/release_control/subsystem_lookup.py",
|
|
"status_lookup": "scripts/release_control/status_lookup.py",
|
|
"work_claim": "scripts/release_control/work_claim.py",
|
|
"worktree_base": "scripts/release_control/worktree_base.py",
|
|
"worktree_claim": "scripts/release_control/worktree_claim.py",
|
|
"worktree_finish": "scripts/release_control/worktree_finish.py",
|
|
}
|
|
|
|
|
|
def is_prerelease_version(version: str) -> bool:
|
|
return bool(PRERELEASE_VERSION_PATTERN.search(version))
|
|
|
|
|
|
def release_branch_for_version(
|
|
version: str,
|
|
*,
|
|
control_plane: dict[str, Any] | None = None,
|
|
staged: bool = False,
|
|
) -> str:
|
|
resolved = control_plane
|
|
if resolved is None:
|
|
resolved = active_control_plane(staged=staged)
|
|
elif "profiles_by_id" not in resolved:
|
|
resolved = validate_control_plane_payload(control_plane)
|
|
return resolved["prerelease_branch"] if is_prerelease_version(version) else resolved["stable_branch"]
|
|
|
|
|
|
def blocking_levels_for_completion_rule(completion_rule: str) -> tuple[str, ...]:
|
|
if completion_rule == "manual":
|
|
raise ValueError("manual completion_rule does not map to derived readiness blocking levels")
|
|
try:
|
|
return COMPLETION_RULE_BLOCKING_LEVELS[completion_rule]
|
|
except KeyError as exc:
|
|
raise ValueError(f"unsupported completion_rule {completion_rule!r}") from exc
|
|
|
|
|
|
def blocking_levels_for_proof_scope(proof_scope: str) -> tuple[str, ...]:
|
|
if proof_scope == "derived":
|
|
raise ValueError("derived proof_scope requires completion_rule resolution")
|
|
try:
|
|
return PROOF_SCOPE_BLOCKING_LEVELS[proof_scope]
|
|
except KeyError as exc:
|
|
raise ValueError(f"unsupported proof_scope {proof_scope!r}") from exc
|
|
|
|
|
|
def parse_args(argv: list[str]) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Resolve Pulse control-plane values.")
|
|
parser.add_argument(
|
|
"--branch-for-version",
|
|
help="Print the governed release branch for the supplied version.",
|
|
)
|
|
parser.add_argument(
|
|
"--agent-entrypoint",
|
|
action="store_true",
|
|
help="Print the canonical ordered guidance bundle for agents.",
|
|
)
|
|
parser.add_argument(
|
|
"--pretty",
|
|
action="store_true",
|
|
help="Render agent entrypoint output as a concise human summary.",
|
|
)
|
|
parser.add_argument(
|
|
"--staged",
|
|
action="store_true",
|
|
help="Read control-plane data from the git index when available.",
|
|
)
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
args = parse_args(list(argv or []))
|
|
if args.agent_entrypoint:
|
|
payload = agent_entrypoint(staged=args.staged)
|
|
if args.pretty:
|
|
print(
|
|
"\n".join(
|
|
[
|
|
f"agent_entrypoint: profile={payload['active_profile_id']} target={payload['active_target_id']}",
|
|
f"agent_values={payload['agent_values_doc']}",
|
|
"startup_files:",
|
|
*[f" - {rel}" for rel in payload["startup_files"]],
|
|
"startup_commands:",
|
|
*[f" - {command}" for command in payload["startup_commands"]],
|
|
f"default_pick_surface={payload['default_pick_surface']}",
|
|
f"selection_rule={payload['selection_rule']}",
|
|
f"claim_rule={payload['claim_rule']}",
|
|
"targeted_lookup_commands:",
|
|
*[f" - {command}" for command in payload["targeted_lookup_commands"]],
|
|
"escalation_files:",
|
|
*[f" - {rel}" for rel in payload["escalation_files"]],
|
|
f"subsystems_dir={payload['subsystems_dir']}",
|
|
f"subsystem_lookup={payload['subsystem_lookup']}",
|
|
f"status_lookup={payload['status_lookup']}",
|
|
f"work_claim={payload['work_claim']}",
|
|
]
|
|
)
|
|
)
|
|
else:
|
|
print(json.dumps(payload, indent=2, sort_keys=True))
|
|
return 0
|
|
if args.branch_for_version:
|
|
print(release_branch_for_version(args.branch_for_version, staged=args.staged))
|
|
return 0
|
|
raise SystemExit("no action requested")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main(sys.argv[1:]))
|
|
|
|
|
|
def active_target_blocking_levels(*, staged: bool = False) -> tuple[str, ...]:
|
|
control_plane = active_control_plane(staged=staged)
|
|
active_target = control_plane["active_target"]
|
|
proof_scope = str(active_target.get("proof_scope", "derived"))
|
|
if proof_scope != "derived":
|
|
return blocking_levels_for_proof_scope(proof_scope)
|
|
return blocking_levels_for_completion_rule(str(active_target["completion_rule"]))
|
|
|
|
|
|
DEFAULT_CONTROL_PLANE = active_control_plane()
|