mirror of
https://github.com/agent0ai/agent-zero.git
synced 2026-05-23 12:44:31 +00:00
Add user-configurable timezone and 12/24-hour preferences, then wire them through settings, runtime snapshots, scheduler payloads, wait handling, notifications, backups, memory, plugin metadata, and frontend formatters. Keep UTC as the boundary for absolute instants while serializing user-facing dates in the configured or browser-resolved timezone. Preserve scheduler wall-clock inputs in the selected timezone, propagate TZ into desktop/runtime process environments, and restart active desktop sessions when the runtime timezone changes. Cover the risky paths with timezone regression tests for settings normalization, auto and fixed timezone resolution, scheduler round-trips, memory timestamp conversion, and desktop timezone sync.
360 lines
13 KiB
Python
360 lines
13 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
import json
|
|
import os
|
|
import time
|
|
import urllib.request
|
|
import uuid
|
|
import zipfile
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from helpers import files, print_style, plugins, git
|
|
from helpers.localization import Localization
|
|
from helpers import yaml as yaml_helper
|
|
from helpers.plugins import (
|
|
META_FILE_NAME,
|
|
PluginMetadata,
|
|
get_plugins_list,
|
|
after_plugin_change,
|
|
)
|
|
from werkzeug.datastructures import FileStorage
|
|
from werkzeug.utils import secure_filename
|
|
|
|
|
|
def _get_user_plugins_dir() -> str:
|
|
"""Return absolute path to usr/plugins/."""
|
|
return files.get_abs_path(files.USER_DIR, files.PLUGINS_DIR)
|
|
|
|
|
|
def _get_plugin_name(meta: PluginMetadata) -> str:
|
|
plugin_name = (meta.name or "").strip()
|
|
if not plugin_name:
|
|
raise ValueError(f"{META_FILE_NAME} is missing required field 'name'")
|
|
return plugin_name
|
|
|
|
|
|
def validate_plugin_dir(path: str, plugin_name: str = "") -> PluginMetadata:
|
|
"""Check directory contains plugin.yaml and return parsed metadata.
|
|
Raises ValueError if plugin.yaml is missing or invalid."""
|
|
meta_path = os.path.join(path, META_FILE_NAME)
|
|
if not os.path.isfile(meta_path):
|
|
raise ValueError(f"No {META_FILE_NAME} found in {os.path.basename(path)}")
|
|
with open(meta_path, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
data = yaml_helper.loads(content)
|
|
model = PluginMetadata.model_validate(data)
|
|
if plugin_name and plugin_name != model.name:
|
|
raise ValueError(
|
|
f"Plugin name is incorrect: expected '{plugin_name}', got '{model.name}'. The author needs to correct this in the plugin.yaml file."
|
|
)
|
|
return model
|
|
|
|
|
|
def check_plugin_conflict(name: str) -> None:
|
|
"""Raise ValueError if a plugin with this name already exists in usr/plugins/."""
|
|
dest = os.path.join(_get_user_plugins_dir(), name)
|
|
if os.path.exists(dest):
|
|
raise ValueError(f"Plugin '{name}' is already installed")
|
|
|
|
|
|
def _find_plugin_root(extracted_dir: str) -> str:
|
|
"""Walk extracted directory to find the parent of plugin.yaml.
|
|
Returns absolute path to the plugin root directory."""
|
|
for root, dirs, dir_files in os.walk(extracted_dir):
|
|
if META_FILE_NAME in dir_files:
|
|
return root
|
|
raise ValueError(f"No {META_FILE_NAME} found in the uploaded archive")
|
|
|
|
|
|
def install_uploaded_zip(plugin_file: FileStorage) -> dict:
|
|
"""Persist an uploaded ZIP temporarily and install it."""
|
|
original_filename = Path((plugin_file.filename or "").strip()).name
|
|
if not original_filename:
|
|
raise ValueError("No file selected")
|
|
|
|
tmp_dir = Path(files.get_abs_path("tmp", "plugin_uploads"))
|
|
tmp_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
temp_name = secure_filename(original_filename) or "plugin.zip"
|
|
if not temp_name.lower().endswith(".zip"):
|
|
temp_name = f"{temp_name}.zip"
|
|
|
|
unique = uuid.uuid4().hex[:8]
|
|
stamp = time.strftime("%Y%m%d_%H%M%S")
|
|
tmp_path = str(tmp_dir / f"plugin_{stamp}_{unique}_{temp_name}")
|
|
plugin_file.save(tmp_path)
|
|
|
|
return install_from_zip(tmp_path, original_filename=original_filename)
|
|
|
|
|
|
def install_from_zip(zip_path: str, original_filename: str | None = None) -> dict:
|
|
"""Extract ZIP, find plugin.yaml, move its parent to usr/plugins/.
|
|
Returns dict with plugin name and metadata.
|
|
Cleans up tmp files regardless of outcome."""
|
|
temp_name = f"tmp_plugin_{time.strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}"
|
|
extract_dir = files.get_abs_path(files.TEMP_DIR, "plugin_installs", temp_name)
|
|
extract_dir = files.create_dir_safe(extract_dir)
|
|
dest = ""
|
|
|
|
try:
|
|
try:
|
|
# Extract with path traversal protection
|
|
with zipfile.ZipFile(zip_path, "r") as z:
|
|
for member in z.namelist():
|
|
member_path = os.path.realpath(os.path.join(extract_dir, member))
|
|
if not (files.is_in_dir(member_path, extract_dir)):
|
|
raise ValueError(f"Unsafe path in archive: {member}")
|
|
z.extractall(extract_dir)
|
|
|
|
# Find plugin.yaml
|
|
plugin_root = _find_plugin_root(extract_dir)
|
|
meta = validate_plugin_dir(plugin_root)
|
|
plugin_name = _get_plugin_name(meta)
|
|
|
|
check_plugin_conflict(plugin_name)
|
|
|
|
# Move to usr/plugins/
|
|
dest = os.path.join(_get_user_plugins_dir(), plugin_name)
|
|
files.create_dir(os.path.dirname(dest))
|
|
files.move_dir(plugin_root, dest)
|
|
except Exception as e:
|
|
print_style.PrintStyle.error(f"Failed to validate plugin: {e}")
|
|
files.delete_dir(extract_dir)
|
|
raise
|
|
|
|
# run installation hook
|
|
try:
|
|
run_install_hook(plugin_name)
|
|
except Exception as e:
|
|
print_style.PrintStyle.error(
|
|
f"Failed to run installation hook for {plugin_name}: {e}"
|
|
)
|
|
files.delete_dir(dest)
|
|
raise
|
|
|
|
|
|
# does it have python files?
|
|
python_change = bool(files.find_existing_paths_by_pattern(dest+"/**/*.py"))
|
|
|
|
after_plugin_change([plugin_name], python_change=python_change)
|
|
|
|
return {
|
|
"success": True,
|
|
"plugin_name": plugin_name,
|
|
"title": meta.title or plugin_name,
|
|
"path": files.deabsolute_path(dest),
|
|
}
|
|
finally:
|
|
# Cleanup: extracted files and the archive
|
|
try:
|
|
files.delete_dir(extract_dir)
|
|
files.delete_file(zip_path)
|
|
except Exception as e:
|
|
pass
|
|
|
|
|
|
def _download_thumbnail(thumbnail_url: str, plugin_dir: str) -> None:
|
|
"""Download thumbnail from URL to plugin_dir/webui/thumbnail.<ext>. Non-fatal."""
|
|
try:
|
|
if not thumbnail_url:
|
|
return
|
|
from urllib.parse import urlparse
|
|
parsed = urlparse(thumbnail_url)
|
|
if parsed.scheme not in ("http", "https"):
|
|
return
|
|
_allowed_exts = {"png", "jpg", "jpeg", "gif", "webp"}
|
|
url_path = parsed.path.lower()
|
|
ext = url_path.rsplit(".", 1)[-1] if "." in url_path else ""
|
|
if ext not in _allowed_exts:
|
|
ext = "png"
|
|
webui_dir = Path(plugin_dir) / "webui"
|
|
webui_dir.mkdir(parents=True, exist_ok=True)
|
|
dest = webui_dir / f"thumbnail.{ext}"
|
|
req = urllib.request.Request(thumbnail_url, headers={"User-Agent": "AgentZero"})
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
dest.write_bytes(resp.read())
|
|
except Exception as e:
|
|
print_style.PrintStyle.warning(f"Failed to download plugin thumbnail: {e}")
|
|
|
|
|
|
def install_from_git(url: str, token: str | None = None, plugin_name: str = "", thumbnail_url: str = "") -> dict:
|
|
"""Clone git repo into usr/plugins/, validate plugin.yaml.
|
|
Returns dict with plugin name and metadata."""
|
|
from helpers.git import clone_repo
|
|
|
|
temp_name = f"tmp_plugin_{time.strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}"
|
|
git_dir = files.get_abs_path(files.TEMP_DIR, "plugins_installer", temp_name)
|
|
|
|
try:
|
|
files.create_dir_safe(git_dir)
|
|
clone_repo(url, git_dir, token=token or None)
|
|
meta = validate_plugin_dir(git_dir, plugin_name=plugin_name)
|
|
plugin_name = _get_plugin_name(meta)
|
|
check_plugin_conflict(plugin_name)
|
|
final_dir = os.path.join(_get_user_plugins_dir(), plugin_name)
|
|
files.move_dir(git_dir, final_dir)
|
|
except Exception as e:
|
|
# No plugin.yaml — remove cloned repo
|
|
print_style.PrintStyle.error(f"Failed to validate plugin: {e}")
|
|
files.delete_dir(git_dir)
|
|
raise
|
|
|
|
_download_thumbnail(thumbnail_url, final_dir)
|
|
|
|
# run installation hook
|
|
try:
|
|
run_install_hook(plugin_name)
|
|
except Exception as e:
|
|
print_style.PrintStyle.error(
|
|
f"Failed to run installation hook for {plugin_name}: {e}"
|
|
)
|
|
files.delete_dir(final_dir)
|
|
raise
|
|
|
|
# does it have python files?
|
|
python_change = bool(files.find_existing_paths_by_pattern(final_dir+"/**/*.py"))
|
|
|
|
after_plugin_change([plugin_name],python_change=python_change)
|
|
|
|
return {
|
|
"success": True,
|
|
"plugin_name": plugin_name,
|
|
"title": meta.title or plugin_name,
|
|
"path": files.deabsolute_path(final_dir),
|
|
}
|
|
|
|
|
|
def update_from_git(plugin_name: str) -> dict:
|
|
plugin_name = (plugin_name or "").strip()
|
|
if not plugin_name:
|
|
raise ValueError("Missing plugin_name")
|
|
|
|
plugin_dir = plugins.find_plugin_dir(plugin_name)
|
|
if not plugin_dir:
|
|
raise ValueError("Plugin not found")
|
|
|
|
custom_plugins_dir = files.get_abs_path(files.USER_DIR, files.PLUGINS_DIR)
|
|
if not files.is_in_dir(plugin_dir, custom_plugins_dir):
|
|
raise ValueError("Only custom plugins can be updated")
|
|
|
|
try:
|
|
run_pre_update_hook(plugin_name)
|
|
except Exception as e:
|
|
print_style.PrintStyle.error(
|
|
f"Failed to run pre-update hook for {plugin_name}: {e}"
|
|
)
|
|
raise
|
|
|
|
try:
|
|
repo = git.update_repo(plugin_dir)
|
|
meta = plugins.get_plugin_meta(plugin_name)
|
|
except Exception as e:
|
|
print_style.PrintStyle.error(f"Failed to update plugin: {e}")
|
|
raise
|
|
|
|
try:
|
|
run_install_hook(plugin_name)
|
|
except Exception as e:
|
|
print_style.PrintStyle.error(
|
|
f"Failed to run installation hook for {plugin_name}: {e}"
|
|
)
|
|
raise
|
|
|
|
after_plugin_change([plugin_name])
|
|
head = repo.head.commit
|
|
|
|
return {
|
|
"ok": True,
|
|
"success": True,
|
|
"plugin_name": plugin_name,
|
|
"title": meta.title if meta else plugin_name,
|
|
"path": files.deabsolute_path(plugin_dir),
|
|
"current_commit": head.hexsha,
|
|
"current_commit_timestamp": datetime.fromtimestamp(
|
|
head.committed_date,
|
|
tz=Localization.get().get_tzinfo(),
|
|
).strftime("%Y-%m-%d %H:%M:%S %Z"),
|
|
"version": getattr(meta, "version", "") or "",
|
|
"branch": repo.active_branch.name if not repo.head.is_detached else "",
|
|
"remote_url": git.strip_auth_from_url(repo.remotes.origin.url) if repo.remotes else "",
|
|
"directory_name": Path(plugin_dir).name,
|
|
}
|
|
|
|
|
|
def run_install_hook(plugin_name: str):
|
|
return plugins.call_plugin_hook(plugin_name, "install")
|
|
|
|
def run_pre_update_hook(plugin_name: str):
|
|
return plugins.call_plugin_hook(plugin_name, "pre_update")
|
|
|
|
def get_plugin_hub_index(force: bool = False) -> dict[str, Any]:
|
|
"""Return the plugin index plus installed Plugin Hub keys."""
|
|
index_data = fetch_plugin_index(force=force)
|
|
if not isinstance(index_data, dict):
|
|
raise ValueError("Plugin index response was not a JSON object")
|
|
|
|
plugins = index_data.get("plugins")
|
|
if not isinstance(plugins, dict):
|
|
raise ValueError("Plugin index payload is missing a valid 'plugins' map")
|
|
|
|
from helpers.plugins import find_plugin_dir
|
|
|
|
installed_dirs = set(get_plugins_list())
|
|
installed_keys: list[str] = []
|
|
_thumb_exts = ("png", "jpg", "jpeg", "gif", "webp")
|
|
|
|
for key, plugin_data in plugins.items():
|
|
if not isinstance(plugin_data, dict):
|
|
continue
|
|
if key not in installed_dirs:
|
|
continue
|
|
installed_keys.append(key)
|
|
|
|
# Backfill thumbnail for plugins installed before this feature existed
|
|
plugin_dir = find_plugin_dir(key)
|
|
if not plugin_dir:
|
|
continue
|
|
webui_dir = Path(plugin_dir) / "webui"
|
|
has_thumb = any((webui_dir / f"thumbnail.{ext}").is_file() for ext in _thumb_exts)
|
|
if has_thumb:
|
|
continue
|
|
thumb_url = plugin_data.get("thumbnail") or ""
|
|
if not thumb_url:
|
|
from urllib.parse import urlparse
|
|
raw_base = None
|
|
github = plugin_data.get("github") or ""
|
|
if github:
|
|
try:
|
|
parsed = urlparse(github)
|
|
if parsed.netloc == "github.com":
|
|
parts = parsed.path.strip("/").split("/")
|
|
if len(parts) >= 2:
|
|
raw_base = f"https://raw.githubusercontent.com/{parts[0]}/{parts[1]}"
|
|
except Exception:
|
|
pass
|
|
if raw_base:
|
|
thumb_url = f"{raw_base}/main/thumbnail.png"
|
|
if thumb_url:
|
|
_download_thumbnail(thumb_url, plugin_dir)
|
|
|
|
return {"index": index_data, "installed_plugins": installed_keys}
|
|
|
|
|
|
def fetch_plugin_index(force: bool = False) -> dict:
|
|
"""Download the plugin index from GitHub releases."""
|
|
index_url = "https://github.com/agent0ai/a0-plugins/releases/download/generated-index/index.json"
|
|
if force:
|
|
separator = "&" if "?" in index_url else "?"
|
|
index_url = f"{index_url}{separator}ts={time.time_ns()}"
|
|
|
|
headers = {"User-Agent": "AgentZero"}
|
|
if force:
|
|
headers["Cache-Control"] = "no-cache"
|
|
headers["Pragma"] = "no-cache"
|
|
|
|
req = urllib.request.Request(index_url, headers=headers)
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
data = json.loads(resp.read().decode())
|
|
return data
|