mirror of
https://github.com/agent0ai/agent-zero.git
synced 2026-05-17 04:01:13 +00:00
Rename high-impact skills to task-oriented names and move plugin-owned skills into their owning plugin folders.\n\nAlign renamed skill frontmatter with the official SKILL.md standard by keeping trigger language in name/description metadata, replacing the old create-skill wizard with build-skill, and updating browser, A0 connector, computer-use, CLI setup, and scheduler skill references.\n\nTighten the recurring cross-provider guidance gaps surfaced by the evidence sweeps: memory requests now avoid promptinclude-file routing, scheduler prompts distinguish cron schedules from planned ISO dates, document questions prefer document_query, skills_tool search/read_file usage is clearer, normal notifications set info/priority 10, and local/host text editors preserve patch intent.\n\nUpdate regression tests for the renamed skills, plugin ownership, prompt budget reality, and standard frontmatter shape.
535 lines
18 KiB
Python
535 lines
18 KiB
Python
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
import urllib.error
|
|
import urllib.request
|
|
import zipfile
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from helpers import files, plugins
|
|
from plugins._browser.helpers.config import PLUGIN_NAME, get_browser_config
|
|
|
|
|
|
EXTENSIONS_ROOT_DIR = ("usr", "_browser", "extensions")
|
|
EXTENSION_ID_RE = re.compile(r"^[a-p]{32}$")
|
|
WEB_STORE_ID_RE = re.compile(r"(?<![a-p])([a-p]{32})(?![a-p])")
|
|
CHROME_VERSION_RE = re.compile(r"(\d+(?:\.\d+){0,3})")
|
|
CHROME_I18N_MESSAGE_RE = re.compile(r"__MSG_([A-Za-z0-9_@.-]+)__")
|
|
DEFAULT_CHROME_PRODVERSION = "140.0.0.0"
|
|
EXTENSION_ID_ALPHABET = "abcdefghijklmnop"
|
|
CHROME_VERSION_COMMANDS = (
|
|
("google-chrome", "--version"),
|
|
("chromium", "--version"),
|
|
("chromium-browser", "--version"),
|
|
)
|
|
WEB_STORE_DOWNLOAD_URL = (
|
|
"https://clients2.google.com/service/update2/crx"
|
|
"?response=redirect"
|
|
"&prod=chromecrx"
|
|
"&prodversion={prodversion}"
|
|
"&acceptformat=crx2,crx3"
|
|
"&x=id%3D{extension_id}%26installsource%3Dondemand%26uc"
|
|
)
|
|
|
|
|
|
def get_extensions_root() -> Path:
|
|
return Path(files.get_abs_path(*EXTENSIONS_ROOT_DIR))
|
|
|
|
|
|
def parse_chrome_web_store_extension_id(value: str) -> str:
|
|
source = str(value or "").strip()
|
|
if EXTENSION_ID_RE.fullmatch(source):
|
|
return source
|
|
|
|
match = WEB_STORE_ID_RE.search(source)
|
|
if match:
|
|
return match.group(1)
|
|
|
|
raise ValueError("Enter a Chrome Web Store URL or a 32-character extension id.")
|
|
|
|
|
|
def list_browser_extensions() -> list[dict[str, Any]]:
|
|
config = get_browser_config()
|
|
enabled_paths = {str(Path(path).expanduser()) for path in config["extension_paths"]}
|
|
entries: list[dict[str, Any]] = []
|
|
seen: set[str] = set()
|
|
|
|
root = get_extensions_root()
|
|
if root.exists():
|
|
for manifest_path in sorted(root.glob("**/manifest.json")):
|
|
entry = _extension_entry(manifest_path.parent, enabled_paths)
|
|
seen.add(entry["path"])
|
|
entries.append(entry)
|
|
|
|
for configured_path in config["extension_paths"]:
|
|
extension_dir = Path(configured_path).expanduser()
|
|
extension_path = str(extension_dir)
|
|
if extension_path in seen or not (extension_dir / "manifest.json").is_file():
|
|
continue
|
|
entries.append(_extension_entry(extension_dir, enabled_paths))
|
|
seen.add(extension_path)
|
|
|
|
return entries
|
|
|
|
|
|
def install_chrome_web_store_extension(source: str) -> dict[str, Any]:
|
|
extension_id = parse_chrome_web_store_extension_id(source)
|
|
target = get_extensions_root() / "chrome-web-store" / extension_id
|
|
|
|
with tempfile.TemporaryDirectory(prefix="browser-extension-control-") as tmp:
|
|
archive_path = Path(tmp) / f"{extension_id}.crx"
|
|
_download_crx(extension_id, archive_path)
|
|
payload_path = Path(tmp) / f"{extension_id}.zip"
|
|
payload_path.write_bytes(_crx_zip_payload(archive_path.read_bytes()))
|
|
extracted_path = Path(tmp) / "extracted"
|
|
_safe_extract_zip(payload_path, extracted_path)
|
|
|
|
if not (extracted_path / "manifest.json").is_file():
|
|
raise ValueError("Downloaded extension did not contain a manifest.json file.")
|
|
|
|
if target.exists():
|
|
shutil.rmtree(target)
|
|
target.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.copytree(extracted_path, target)
|
|
|
|
config = _enable_extension_path(target)
|
|
manifest = _read_manifest(target)
|
|
return {
|
|
"ok": True,
|
|
"id": extension_id,
|
|
"name": _manifest_label(target, manifest, "name") or extension_id,
|
|
"version": manifest.get("version") or "",
|
|
"path": str(target),
|
|
"extension_paths": config["extension_paths"],
|
|
}
|
|
|
|
|
|
def set_browser_extension_enabled(extension_path: str, enabled: bool) -> dict[str, Any]:
|
|
raw_path = str(extension_path or "").strip()
|
|
if not raw_path:
|
|
raise ValueError("Choose an extension first.")
|
|
|
|
path = str(Path(raw_path).expanduser())
|
|
directory = Path(path)
|
|
if enabled and not (directory / "manifest.json").is_file():
|
|
raise ValueError("Extension folder must contain a manifest.json file.")
|
|
|
|
config = get_browser_config()
|
|
paths = list(config["extension_paths"])
|
|
if enabled:
|
|
if path not in paths:
|
|
paths.append(path)
|
|
else:
|
|
paths = [item for item in paths if str(Path(item).expanduser()) != path]
|
|
|
|
config["extension_paths"] = paths
|
|
plugins.save_plugin_config(PLUGIN_NAME, "", "", config)
|
|
return config
|
|
|
|
|
|
def uninstall_browser_extension(extension_path: str) -> dict[str, Any]:
|
|
raw_path = str(extension_path or "").strip()
|
|
if not raw_path:
|
|
raise ValueError("Choose an extension first.")
|
|
|
|
root = get_extensions_root().resolve()
|
|
extension_dir = Path(raw_path).expanduser().resolve()
|
|
if extension_dir == root or not extension_dir.is_relative_to(root):
|
|
raise ValueError("Only Browser-managed extension folders can be deleted.")
|
|
if not extension_dir.is_dir():
|
|
raise ValueError("Extension folder was not found.")
|
|
|
|
manifest = _read_manifest(extension_dir)
|
|
name = (
|
|
_manifest_label(extension_dir, manifest, "name")
|
|
or _manifest_label(extension_dir, manifest, "short_name")
|
|
or extension_dir.name
|
|
)
|
|
config = get_browser_config()
|
|
config["extension_paths"] = [
|
|
path
|
|
for path in config["extension_paths"]
|
|
if Path(path).expanduser().resolve() != extension_dir
|
|
]
|
|
|
|
try:
|
|
shutil.rmtree(extension_dir)
|
|
except OSError as exc:
|
|
raise ValueError(f"Could not delete extension folder: {exc}") from exc
|
|
|
|
plugins.save_plugin_config(PLUGIN_NAME, "", "", config)
|
|
return {
|
|
"ok": True,
|
|
"name": name,
|
|
"path": str(extension_dir),
|
|
"extension_paths": config["extension_paths"],
|
|
}
|
|
|
|
|
|
def _download_crx(extension_id: str, archive_path: Path) -> None:
|
|
prodversion = _detect_chrome_prodversion()
|
|
url = _build_web_store_download_url(extension_id, prodversion=prodversion)
|
|
request = urllib.request.Request(
|
|
url,
|
|
headers={
|
|
"User-Agent": (
|
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
|
f"(KHTML, like Gecko) Chrome/{prodversion} Safari/537.36"
|
|
)
|
|
},
|
|
)
|
|
try:
|
|
response = urllib.request.urlopen(request, timeout=120)
|
|
except urllib.error.HTTPError as exc:
|
|
raise ValueError(
|
|
f"Chrome Web Store download failed with HTTP {exc.code} for Chrome {prodversion}."
|
|
) from exc
|
|
except urllib.error.URLError as exc:
|
|
reason = getattr(exc, "reason", exc)
|
|
raise ValueError(f"Chrome Web Store download failed: {reason}.") from exc
|
|
|
|
with response:
|
|
status = response.getcode()
|
|
data = response.read()
|
|
if not data:
|
|
raise ValueError(
|
|
"Chrome Web Store did not return an extension package "
|
|
f"(HTTP {status}, Chrome {prodversion})."
|
|
)
|
|
archive_path.write_bytes(data)
|
|
|
|
|
|
def _build_web_store_download_url(extension_id: str, *, prodversion: str | None = None) -> str:
|
|
return WEB_STORE_DOWNLOAD_URL.format(
|
|
extension_id=extension_id,
|
|
prodversion=_normalize_chrome_prodversion(prodversion or "") or DEFAULT_CHROME_PRODVERSION,
|
|
)
|
|
|
|
|
|
def _detect_chrome_prodversion() -> str:
|
|
env_version = _normalize_chrome_prodversion(os.environ.get("A0_BROWSER_EXTENSION_PRODVERSION", ""))
|
|
if env_version:
|
|
return env_version
|
|
|
|
for command in CHROME_VERSION_COMMANDS:
|
|
try:
|
|
completed = subprocess.run(
|
|
command,
|
|
check=False,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5,
|
|
)
|
|
except (OSError, subprocess.TimeoutExpired):
|
|
continue
|
|
|
|
version = _normalize_chrome_prodversion(
|
|
" ".join(part for part in (completed.stdout, completed.stderr) if part)
|
|
)
|
|
if version:
|
|
return version
|
|
|
|
return DEFAULT_CHROME_PRODVERSION
|
|
|
|
|
|
def _normalize_chrome_prodversion(value: str) -> str:
|
|
match = CHROME_VERSION_RE.search(str(value or ""))
|
|
if not match:
|
|
return ""
|
|
parts = match.group(1).split(".")
|
|
return ".".join((parts + ["0", "0", "0", "0"])[:4])
|
|
|
|
|
|
def _crx_zip_payload(data: bytes) -> bytes:
|
|
if data.startswith(b"PK"):
|
|
return data
|
|
if data[:4] != b"Cr24":
|
|
raise ValueError("Downloaded package is not a CRX or ZIP archive.")
|
|
|
|
version = int.from_bytes(data[4:8], "little")
|
|
if version == 2:
|
|
public_key_len = int.from_bytes(data[8:12], "little")
|
|
signature_len = int.from_bytes(data[12:16], "little")
|
|
offset = 16 + public_key_len + signature_len
|
|
elif version == 3:
|
|
header_len = int.from_bytes(data[8:12], "little")
|
|
offset = 12 + header_len
|
|
else:
|
|
raise ValueError(f"Unsupported CRX version: {version}.")
|
|
|
|
payload = data[offset:]
|
|
if not payload.startswith(b"PK"):
|
|
raise ValueError("CRX payload did not contain a ZIP archive.")
|
|
return payload
|
|
|
|
|
|
def _safe_extract_zip(archive_path: Path, target_dir: Path) -> None:
|
|
target_dir.mkdir(parents=True, exist_ok=True)
|
|
root = target_dir.resolve()
|
|
with zipfile.ZipFile(archive_path) as archive:
|
|
for member in archive.infolist():
|
|
destination = (target_dir / member.filename).resolve()
|
|
if not destination.is_relative_to(root):
|
|
raise ValueError("Extension archive contains an unsafe path.")
|
|
if member.is_dir():
|
|
destination.mkdir(parents=True, exist_ok=True)
|
|
continue
|
|
destination.parent.mkdir(parents=True, exist_ok=True)
|
|
with archive.open(member) as source, destination.open("wb") as output:
|
|
shutil.copyfileobj(source, output)
|
|
|
|
|
|
def _enable_extension_path(extension_path: Path) -> dict[str, Any]:
|
|
config = get_browser_config()
|
|
path = str(extension_path)
|
|
paths = list(config["extension_paths"])
|
|
if path not in paths:
|
|
paths.append(path)
|
|
config["extension_paths"] = paths
|
|
plugins.save_plugin_config(PLUGIN_NAME, "", "", config)
|
|
return config
|
|
|
|
|
|
def _extension_entry(extension_dir: Path, enabled_paths: set[str]) -> dict[str, Any]:
|
|
manifest = _read_manifest(extension_dir)
|
|
extension_path = str(extension_dir)
|
|
can_delete = _is_managed_extension_dir(extension_dir)
|
|
extension_id = _extension_runtime_id(extension_dir, manifest)
|
|
source_id = _extension_source_id(extension_dir)
|
|
ui = _extension_ui(extension_id, manifest)
|
|
name = (
|
|
_manifest_label(extension_dir, manifest, "name")
|
|
or _manifest_label(extension_dir, manifest, "short_name")
|
|
or extension_dir.name
|
|
)
|
|
return {
|
|
"id": extension_id,
|
|
"source_id": source_id,
|
|
"name": name,
|
|
"raw_name": manifest.get("name") or "",
|
|
"description": _manifest_label(extension_dir, manifest, "description"),
|
|
"version": manifest.get("version") or "",
|
|
"path": extension_path,
|
|
"enabled": extension_path in enabled_paths,
|
|
"managed": can_delete,
|
|
"can_delete": can_delete,
|
|
"has_ui": bool(ui["open_url"]),
|
|
"open_url": ui["open_url"],
|
|
"open_label": ui["open_label"],
|
|
"ui": ui,
|
|
}
|
|
|
|
|
|
def _is_managed_extension_dir(extension_dir: Path) -> bool:
|
|
try:
|
|
root = get_extensions_root().resolve()
|
|
path = extension_dir.expanduser().resolve()
|
|
except OSError:
|
|
return False
|
|
return path != root and path.is_relative_to(root)
|
|
|
|
|
|
def _read_manifest(extension_path: Path) -> dict[str, Any]:
|
|
manifest_path = extension_path / "manifest.json"
|
|
try:
|
|
return json.loads(manifest_path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def _extension_runtime_id(extension_dir: Path, manifest: dict[str, Any]) -> str:
|
|
key_id = _extension_id_from_manifest_key(str(manifest.get("key") or ""))
|
|
if key_id:
|
|
return key_id
|
|
return _extension_id_from_path(extension_dir)
|
|
|
|
|
|
def _extension_source_id(extension_dir: Path) -> str:
|
|
name = extension_dir.name
|
|
if not EXTENSION_ID_RE.fullmatch(name):
|
|
return ""
|
|
try:
|
|
if extension_dir.parent.name == "chrome-web-store":
|
|
return name
|
|
except OSError:
|
|
return ""
|
|
return ""
|
|
|
|
|
|
def _extension_id_from_manifest_key(key: str) -> str:
|
|
raw_key = str(key or "").strip()
|
|
if not raw_key:
|
|
return ""
|
|
try:
|
|
padding = "=" * (-len(raw_key) % 4)
|
|
public_key = base64.b64decode(raw_key + padding, validate=True)
|
|
except Exception:
|
|
return ""
|
|
return _extension_id_from_hash_input(public_key)
|
|
|
|
|
|
def _extension_id_from_path(extension_dir: Path) -> str:
|
|
return _extension_id_from_hash_input(str(extension_dir).encode("utf-8"))
|
|
|
|
|
|
def _extension_id_from_hash_input(value: bytes) -> str:
|
|
digest = hashlib.sha256(value).hexdigest()[:32]
|
|
return "".join(EXTENSION_ID_ALPHABET[int(char, 16)] for char in digest)
|
|
|
|
|
|
def _extension_ui(extension_id: str, manifest: dict[str, Any]) -> dict[str, Any]:
|
|
targets = _extension_ui_targets(extension_id, manifest)
|
|
primary = targets[0] if targets else {}
|
|
return {
|
|
"open_url": primary.get("url", ""),
|
|
"open_label": primary.get("label", ""),
|
|
"targets": targets,
|
|
}
|
|
|
|
|
|
def _extension_ui_targets(extension_id: str, manifest: dict[str, Any]) -> list[dict[str, str]]:
|
|
candidates: list[tuple[str, str, Any]] = [
|
|
("options", "Options", _manifest_nested_value(manifest, "options_ui", "page")),
|
|
("options", "Options", manifest.get("options_page")),
|
|
("popup", "Popup", _manifest_nested_value(manifest, "action", "default_popup")),
|
|
("popup", "Popup", _manifest_nested_value(manifest, "browser_action", "default_popup")),
|
|
("popup", "Popup", _manifest_nested_value(manifest, "page_action", "default_popup")),
|
|
("side_panel", "Side panel", _manifest_nested_value(manifest, "side_panel", "default_path")),
|
|
("devtools", "DevTools", manifest.get("devtools_page")),
|
|
]
|
|
|
|
chrome_url_overrides = manifest.get("chrome_url_overrides")
|
|
if isinstance(chrome_url_overrides, dict):
|
|
candidates.extend(
|
|
(
|
|
(f"chrome_url_override_{name}", _extension_override_label(name), page)
|
|
for name, page in chrome_url_overrides.items()
|
|
)
|
|
)
|
|
|
|
targets: list[dict[str, str]] = []
|
|
seen_urls: set[str] = set()
|
|
for kind, label, page in candidates:
|
|
url = _extension_page_url(extension_id, page)
|
|
if not url or url in seen_urls:
|
|
continue
|
|
seen_urls.add(url)
|
|
targets.append(
|
|
{
|
|
"kind": kind,
|
|
"label": label,
|
|
"page": str(page or "").strip(),
|
|
"url": url,
|
|
}
|
|
)
|
|
return targets
|
|
|
|
|
|
def _manifest_nested_value(manifest: dict[str, Any], key: str, nested_key: str) -> Any:
|
|
value = manifest.get(key)
|
|
if not isinstance(value, dict):
|
|
return ""
|
|
return value.get(nested_key)
|
|
|
|
|
|
def _extension_override_label(name: str) -> str:
|
|
normalized = str(name or "").replace("_", " ").strip()
|
|
return normalized[:1].upper() + normalized[1:] if normalized else "Extension page"
|
|
|
|
|
|
def _extension_page_url(extension_id: str, page: Any) -> str:
|
|
page_path = str(page or "").strip()
|
|
if not extension_id or not page_path:
|
|
return ""
|
|
if re.match(r"^[a-z][a-z0-9+.-]*:", page_path, flags=re.IGNORECASE):
|
|
return ""
|
|
page_path = page_path.lstrip("/")
|
|
if not page_path:
|
|
return ""
|
|
return f"chrome-extension://{extension_id}/{page_path}"
|
|
|
|
|
|
def _manifest_label(extension_dir: Path, manifest: dict[str, Any], key: str) -> str:
|
|
value = str(manifest.get(key) or "").strip()
|
|
if not value:
|
|
return ""
|
|
|
|
messages = _load_locale_messages(extension_dir, str(manifest.get("default_locale") or ""))
|
|
if not messages:
|
|
return "" if CHROME_I18N_MESSAGE_RE.fullmatch(value) else value
|
|
|
|
def replace_message(match: re.Match[str]) -> str:
|
|
message_key = match.group(1)
|
|
message = _resolve_locale_message(messages, message_key)
|
|
return message if message is not None else match.group(0)
|
|
|
|
resolved = CHROME_I18N_MESSAGE_RE.sub(replace_message, value).strip()
|
|
if CHROME_I18N_MESSAGE_RE.fullmatch(resolved):
|
|
return ""
|
|
return resolved
|
|
|
|
|
|
def _load_locale_messages(extension_dir: Path, default_locale: str) -> dict[str, Any]:
|
|
locale_root = extension_dir / "_locales"
|
|
if not locale_root.is_dir():
|
|
return {}
|
|
|
|
preferred_locales = [
|
|
default_locale,
|
|
default_locale.split("_", 1)[0] if default_locale else "",
|
|
"en_US",
|
|
"en",
|
|
]
|
|
for locale in [item for item in preferred_locales if item]:
|
|
messages = _read_locale_file(locale_root / locale / "messages.json")
|
|
if messages:
|
|
return messages
|
|
|
|
for messages_path in sorted(locale_root.glob("*/messages.json")):
|
|
messages = _read_locale_file(messages_path)
|
|
if messages:
|
|
return messages
|
|
return {}
|
|
|
|
|
|
def _read_locale_file(messages_path: Path) -> dict[str, Any]:
|
|
if not messages_path.is_file():
|
|
return {}
|
|
try:
|
|
data = json.loads(messages_path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return {}
|
|
return data if isinstance(data, dict) else {}
|
|
|
|
|
|
def _resolve_locale_message(messages: dict[str, Any], key: str) -> str | None:
|
|
entry = messages.get(key)
|
|
if not isinstance(entry, dict):
|
|
return None
|
|
message = str(entry.get("message") or "")
|
|
if not message:
|
|
return None
|
|
|
|
placeholders = entry.get("placeholders")
|
|
if isinstance(placeholders, dict):
|
|
for name, placeholder in placeholders.items():
|
|
if not isinstance(placeholder, dict):
|
|
continue
|
|
content = str(placeholder.get("content") or "")
|
|
if not content:
|
|
continue
|
|
message = re.sub(
|
|
rf"\${re.escape(str(name))}\$",
|
|
content,
|
|
message,
|
|
flags=re.IGNORECASE,
|
|
)
|
|
return message
|