mirror of
https://github.com/zed-industries/zed.git
synced 2026-05-23 12:37:09 +00:00
## Summary - add a new crash issue-linking subagent prompt (`.factory/prompts/crash/link-issues.md`) - add a scheduled/manual GitHub workflow for week-one background-agent runs (`.github/workflows/background_agent_mvp.yml`) - add Sentry candidate selection script to rank top crashes by solvability × population (`script/select-sentry-crash-candidates`) - add a local dry-run runner for end-to-end MVP execution without push/PR actions (`script/run-background-agent-mvp-local`) ## Guardrails in this MVP - draft PRs only (no auto-merge) - reviewer routing defaults to: `eholk,morgankrey,osiewicz,bennetbo` - pipeline order is: investigate -> link-issues -> fix ## Validation - `python3 -m py_compile script/select-sentry-crash-candidates script/run-background-agent-mvp-local` - `python3 script/select-sentry-crash-candidates --help` - `python3 script/run-background-agent-mvp-local --help` --------- Co-authored-by: John D. Swanson <swannysec@users.noreply.github.com>
242 lines
7.2 KiB
Python
Executable file
242 lines
7.2 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""Select top Sentry crash candidates ranked by solvability x impact.
|
|
|
|
Usage:
|
|
script/select-sentry-crash-candidates --top 3 --output /tmp/candidates.json
|
|
"""
|
|
|
|
import argparse
|
|
import configparser
|
|
import json
|
|
import math
|
|
import os
|
|
import sys
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
SENTRY_BASE_URL = "https://sentry.io/api/0"
|
|
DEFAULT_SENTRY_ORG = "zed-dev"
|
|
DEFAULT_QUERY = "is:unresolved issue.category:error"
|
|
|
|
|
|
class FetchError(Exception):
|
|
pass
|
|
|
|
|
|
def find_auth_token() -> str | None:
|
|
token = os.environ.get("SENTRY_AUTH_TOKEN")
|
|
if token:
|
|
return token
|
|
|
|
sentryclirc_path = os.path.expanduser("~/.sentryclirc")
|
|
if os.path.isfile(sentryclirc_path):
|
|
config = configparser.ConfigParser()
|
|
try:
|
|
config.read(sentryclirc_path)
|
|
token = config.get("auth", "token", fallback=None)
|
|
if token:
|
|
return token
|
|
except configparser.Error:
|
|
return None
|
|
|
|
return None
|
|
|
|
|
|
def api_get(path: str, token: str):
|
|
url = f"{SENTRY_BASE_URL}{path}"
|
|
request = urllib.request.Request(url)
|
|
request.add_header("Authorization", f"Bearer {token}")
|
|
request.add_header("Accept", "application/json")
|
|
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=30) as response:
|
|
return json.loads(response.read().decode("utf-8"))
|
|
except urllib.error.HTTPError as error:
|
|
body = error.read().decode("utf-8", errors="replace")
|
|
try:
|
|
detail = json.loads(body).get("detail", body)
|
|
except (json.JSONDecodeError, AttributeError):
|
|
detail = body
|
|
raise FetchError(f"Sentry API returned HTTP {error.code} for {path}: {detail}")
|
|
except urllib.error.URLError as error:
|
|
raise FetchError(f"Failed to connect to Sentry API: {error.reason}")
|
|
|
|
|
|
def fetch_issues(token: str, organization: str, limit: int, query: str):
|
|
encoded_query = urllib.parse.quote(query)
|
|
path = (
|
|
f"/organizations/{organization}/issues/"
|
|
f"?limit={limit}&sort=freq&query={encoded_query}"
|
|
)
|
|
return api_get(path, token)
|
|
|
|
|
|
def fetch_latest_event(token: str, issue_id: str):
|
|
return api_get(f"/issues/{issue_id}/events/latest/", token)
|
|
|
|
|
|
def parse_int(value, fallback=0) -> int:
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return fallback
|
|
|
|
|
|
def in_app_frame_count(event) -> int:
|
|
entries = event.get("entries", [])
|
|
count = 0
|
|
for entry in entries:
|
|
if entry.get("type") != "exception":
|
|
continue
|
|
exceptions = entry.get("data", {}).get("values", [])
|
|
for exception in exceptions:
|
|
frames = (exception.get("stacktrace") or {}).get("frames") or []
|
|
count += sum(1 for frame in frames if frame.get("inApp") or frame.get("in_app"))
|
|
return count
|
|
|
|
|
|
def crash_signal_text(issue, event) -> str:
|
|
title = (issue.get("title") or "").lower()
|
|
culprit = (issue.get("culprit") or "").lower()
|
|
message = ""
|
|
|
|
entries = event.get("entries", [])
|
|
for entry in entries:
|
|
if entry.get("type") != "exception":
|
|
continue
|
|
exceptions = entry.get("data", {}).get("values", [])
|
|
for exception in exceptions:
|
|
value = exception.get("value")
|
|
if value:
|
|
message = value.lower()
|
|
break
|
|
if message:
|
|
break
|
|
|
|
return f"{title} {culprit} {message}".strip()
|
|
|
|
|
|
def solvable_factor(issue, event) -> tuple[float, list[str]]:
|
|
factor = 0.6
|
|
reasons: list[str] = []
|
|
|
|
in_app_frames = in_app_frame_count(event)
|
|
if in_app_frames >= 6:
|
|
factor += 0.5
|
|
reasons.append("strong in-app stack coverage")
|
|
elif in_app_frames >= 3:
|
|
factor += 0.3
|
|
reasons.append("moderate in-app stack coverage")
|
|
else:
|
|
factor -= 0.1
|
|
reasons.append("limited in-app stack coverage")
|
|
|
|
signal_text = crash_signal_text(issue, event)
|
|
if "panic" in signal_text or "assert" in signal_text:
|
|
factor += 0.2
|
|
reasons.append("panic/assert style failure")
|
|
|
|
if "out of memory" in signal_text or "oom" in signal_text:
|
|
factor -= 0.35
|
|
reasons.append("likely resource/system failure")
|
|
|
|
if "segmentation fault" in signal_text or "sigsegv" in signal_text:
|
|
factor -= 0.2
|
|
reasons.append("low-level crash signal")
|
|
|
|
level = (issue.get("level") or "").lower()
|
|
if level == "error":
|
|
factor += 0.1
|
|
|
|
return max(0.2, min(1.5, factor)), reasons
|
|
|
|
|
|
def candidate_payload(issue, event):
|
|
issue_id = str(issue.get("id"))
|
|
short_id = issue.get("shortId") or issue_id
|
|
issue_count = parse_int(issue.get("count"), 0)
|
|
user_count = parse_int(issue.get("userCount"), 0)
|
|
population_score = issue_count + (user_count * 10)
|
|
solvability, reasons = solvable_factor(issue, event)
|
|
|
|
score = int(math.floor(population_score * solvability))
|
|
issue_url = f"https://sentry.io/organizations/{DEFAULT_SENTRY_ORG}/issues/{issue_id}/"
|
|
|
|
return {
|
|
"issue_id": issue_id,
|
|
"short_id": short_id,
|
|
"title": issue.get("title") or "Unknown",
|
|
"count": issue_count,
|
|
"user_count": user_count,
|
|
"population_score": population_score,
|
|
"solvability_factor": round(solvability, 2),
|
|
"score": score,
|
|
"sentry_url": issue_url,
|
|
"reasons": reasons,
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(
|
|
description="Select top Sentry crash candidates ranked by solvability x impact."
|
|
)
|
|
parser.add_argument("--org", default=DEFAULT_SENTRY_ORG, help="Sentry organization slug")
|
|
parser.add_argument("--query", default=DEFAULT_QUERY, help="Sentry issue query")
|
|
parser.add_argument("--top", type=int, default=3, help="Number of candidates to select")
|
|
parser.add_argument(
|
|
"--sample-size",
|
|
type=int,
|
|
default=25,
|
|
help="Number of unresolved issues to consider before ranking",
|
|
)
|
|
parser.add_argument("--output", required=True, help="Output JSON file path")
|
|
args = parser.parse_args()
|
|
|
|
token = find_auth_token()
|
|
if not token:
|
|
print(
|
|
"Error: No Sentry auth token found. Set SENTRY_AUTH_TOKEN or run sentry-cli login.",
|
|
file=sys.stderr,
|
|
)
|
|
return 1
|
|
|
|
try:
|
|
issues = fetch_issues(token, args.org, args.sample_size, args.query)
|
|
except FetchError as error:
|
|
print(f"Error fetching issues: {error}", file=sys.stderr)
|
|
return 1
|
|
|
|
candidates = []
|
|
for issue in issues:
|
|
issue_id = issue.get("id")
|
|
if not issue_id:
|
|
continue
|
|
|
|
try:
|
|
event = fetch_latest_event(token, str(issue_id))
|
|
except FetchError:
|
|
continue
|
|
|
|
candidates.append(candidate_payload(issue, event))
|
|
|
|
candidates.sort(key=lambda candidate: candidate["score"], reverse=True)
|
|
selected = candidates[: max(1, args.top)]
|
|
|
|
output = {
|
|
"organization": args.org,
|
|
"query": args.query,
|
|
"sample_size": args.sample_size,
|
|
"top": args.top,
|
|
"selected": selected,
|
|
}
|
|
|
|
with open(args.output, "w", encoding="utf-8") as file:
|
|
json.dump(output, file, indent=2)
|
|
|
|
print(json.dumps(output, indent=2))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|