mirror of
https://github.com/agent0ai/agent-zero.git
synced 2026-05-22 19:47:15 +00:00
Add user-configurable timezone and 12/24-hour preferences, then wire them through settings, runtime snapshots, scheduler payloads, wait handling, notifications, backups, memory, plugin metadata, and frontend formatters. Keep UTC as the boundary for absolute instants while serializing user-facing dates in the configured or browser-resolved timezone. Preserve scheduler wall-clock inputs in the selected timezone, propagate TZ into desktop/runtime process environments, and restart active desktop sessions when the runtime timezone changes. Cover the risky paths with timezone regression tests for settings normalization, auto and fixed timezone resolution, scheduler round-trips, memory timestamp conversion, and desktop timezone sync.
744 lines
26 KiB
Python
744 lines
26 KiB
Python
import base64
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
from typing import Any, Literal, TypedDict, cast, TypeVar
|
|
|
|
import models
|
|
import pytz # type: ignore
|
|
from helpers import runtime, defer, git, subagents
|
|
from . import files, dotenv
|
|
from helpers.print_style import PrintStyle
|
|
from helpers.providers import get_providers, FieldOption as ProvidersFO
|
|
from helpers.secrets import get_default_secrets_manager
|
|
from helpers import dirty_json
|
|
from helpers.notification import NotificationManager, NotificationType, NotificationPriority
|
|
|
|
|
|
T = TypeVar('T')
|
|
|
|
def get_default_value(name: str, value: T) -> T:
|
|
"""
|
|
Load setting value from .env with A0_SET_ prefix, falling back to default.
|
|
|
|
Args:
|
|
name: Setting name (will be prefixed with A0_SET_)
|
|
value: Default value to use if env var not set
|
|
|
|
Returns:
|
|
Environment variable value (type-normalized) or default value
|
|
"""
|
|
env_value = dotenv.get_dotenv_value(f"A0_SET_{name}", dotenv.get_dotenv_value(f"A0_SET_{name.upper()}", None))
|
|
|
|
if env_value is None:
|
|
return value
|
|
|
|
# Normalize type to match value param type
|
|
try:
|
|
if isinstance(value, bool):
|
|
return env_value.strip().lower() in ('true', '1', 'yes', 'on') # type: ignore
|
|
elif isinstance(value, dict):
|
|
return json.loads(env_value.strip()) # type: ignore
|
|
elif isinstance(value, str):
|
|
return str(env_value).strip() # type: ignore
|
|
else:
|
|
return type(value)(env_value.strip()) # type: ignore
|
|
except (ValueError, TypeError, json.JSONDecodeError) as e:
|
|
PrintStyle(background_color="yellow", font_color="black").print(
|
|
f"Warning: Invalid value for A0_SET_{name}='{env_value}': {e}. Using default: {value}"
|
|
)
|
|
return value
|
|
|
|
class Settings(TypedDict):
|
|
version: str
|
|
|
|
agent_profile: str
|
|
agent_knowledge_subdir: str
|
|
timezone: str
|
|
time_format: str
|
|
|
|
workdir_path: str
|
|
workdir_show: bool
|
|
workdir_max_depth: int
|
|
workdir_max_files: int
|
|
workdir_max_folders: int
|
|
workdir_max_lines: int
|
|
workdir_gitignore: str
|
|
|
|
api_keys: dict[str, str]
|
|
|
|
auth_login: str
|
|
auth_password: str
|
|
root_password: str
|
|
|
|
rfc_auto_docker: bool
|
|
rfc_url: str
|
|
rfc_password: str
|
|
rfc_port_http: int
|
|
|
|
websocket_server_restart_enabled: bool
|
|
uvicorn_access_logs_enabled: bool
|
|
|
|
mcp_servers: str
|
|
mcp_client_init_timeout: int
|
|
mcp_client_tool_timeout: int
|
|
mcp_server_enabled: bool
|
|
mcp_server_token: str
|
|
|
|
a2a_server_enabled: bool
|
|
|
|
variables: str
|
|
secrets: str
|
|
|
|
# LiteLLM global kwargs applied to all model calls
|
|
litellm_global_kwargs: dict[str, Any]
|
|
|
|
update_check_enabled: bool
|
|
chat_inherit_project: bool
|
|
|
|
|
|
class PartialSettings(Settings, total=False):
|
|
pass
|
|
|
|
|
|
class FieldOption(TypedDict):
|
|
value: str
|
|
label: str
|
|
|
|
class SettingsField(TypedDict, total=False):
|
|
id: str
|
|
title: str
|
|
description: str
|
|
type: Literal[
|
|
"text",
|
|
"number",
|
|
"select",
|
|
"range",
|
|
"textarea",
|
|
"password",
|
|
"switch",
|
|
"button",
|
|
"html",
|
|
]
|
|
value: Any
|
|
min: float
|
|
max: float
|
|
step: float
|
|
hidden: bool
|
|
options: list[FieldOption]
|
|
style: str
|
|
|
|
|
|
class SettingsSection(TypedDict, total=False):
|
|
id: str
|
|
title: str
|
|
description: str
|
|
fields: list[SettingsField]
|
|
tab: str # Indicates which tab this section belongs to
|
|
|
|
class ModelProvider(ProvidersFO):
|
|
pass
|
|
|
|
class SettingsOutputAdditional(TypedDict):
|
|
chat_providers: list[ModelProvider]
|
|
embedding_providers: list[ModelProvider]
|
|
agent_subdirs: list[FieldOption]
|
|
knowledge_subdirs: list[FieldOption]
|
|
timezones: list[FieldOption]
|
|
resolved_timezone: str
|
|
is_dockerized: bool
|
|
runtime_settings: dict[str, Any]
|
|
|
|
|
|
class SettingsOutput(TypedDict):
|
|
settings: Settings
|
|
additional: SettingsOutputAdditional
|
|
|
|
|
|
PASSWORD_PLACEHOLDER = "****PSWD****"
|
|
API_KEY_PLACEHOLDER = "************"
|
|
TIMEZONE_AUTO = "auto"
|
|
TIME_FORMAT_12H = "12h"
|
|
TIME_FORMAT_24H = "24h"
|
|
|
|
SETTINGS_FILE = files.get_abs_path("usr/settings.json")
|
|
_settings: Settings | None = None
|
|
_runtime_settings_snapshot: Settings | None = None
|
|
|
|
OptionT = TypeVar("OptionT", bound=FieldOption)
|
|
|
|
def _ensure_option_present(options: list[OptionT] | None, current_value: str | None) -> list[OptionT]:
|
|
"""
|
|
Ensure the currently selected value exists in a dropdown options list.
|
|
If missing, inserts it at the front as {value: current_value, label: current_value}.
|
|
"""
|
|
opts = list(options or [])
|
|
if not current_value:
|
|
return opts
|
|
for o in opts:
|
|
if o.get("value") == current_value:
|
|
return opts
|
|
opts.insert(0, cast(OptionT, {"value": current_value, "label": current_value}))
|
|
return opts
|
|
|
|
|
|
def _is_valid_timezone(value: str) -> bool:
|
|
try:
|
|
pytz.timezone(value)
|
|
return True
|
|
except pytz.exceptions.UnknownTimeZoneError:
|
|
return False
|
|
|
|
|
|
def _normalize_timezone_setting(value: Any, default: str = TIMEZONE_AUTO) -> str:
|
|
timezone = str(value or "").strip()
|
|
if timezone.lower() == TIMEZONE_AUTO:
|
|
return TIMEZONE_AUTO
|
|
if _is_valid_timezone(timezone):
|
|
return timezone
|
|
return default if default == TIMEZONE_AUTO or _is_valid_timezone(default) else TIMEZONE_AUTO
|
|
|
|
|
|
def _normalize_time_format(value: Any, default: str = TIME_FORMAT_12H) -> str:
|
|
time_format = str(value or "").strip().lower()
|
|
if time_format in {TIME_FORMAT_12H, TIME_FORMAT_24H}:
|
|
return time_format
|
|
return default if default in {TIME_FORMAT_12H, TIME_FORMAT_24H} else TIME_FORMAT_12H
|
|
|
|
|
|
def _resolve_runtime_timezone(setting_value: str, browser_timezone: str | None = None) -> str:
|
|
if setting_value == TIMEZONE_AUTO:
|
|
candidate = str(browser_timezone or "").strip()
|
|
if _is_valid_timezone(candidate):
|
|
return candidate
|
|
try:
|
|
from helpers.localization import Localization
|
|
|
|
return Localization.get().get_timezone()
|
|
except Exception:
|
|
return "UTC"
|
|
return _normalize_timezone_setting(setting_value, default="UTC")
|
|
|
|
|
|
def _timezone_options() -> list[FieldOption]:
|
|
return [{"value": timezone, "label": timezone} for timezone in pytz.common_timezones]
|
|
|
|
|
|
def convert_out(settings: Settings) -> SettingsOutput:
|
|
out = SettingsOutput(
|
|
settings = settings.copy(),
|
|
additional = SettingsOutputAdditional(
|
|
chat_providers=get_providers("chat"),
|
|
embedding_providers=get_providers("embedding"),
|
|
is_dockerized=runtime.is_dockerized(),
|
|
agent_subdirs=[{"value": item["key"], "label": item["label"]}
|
|
for item in subagents.get_all_agents_list()
|
|
if item["key"] != "_example"],
|
|
knowledge_subdirs=[{"value": subdir, "label": subdir}
|
|
for subdir in files.get_subdirectories("knowledge", exclude="default")],
|
|
timezones=_timezone_options(),
|
|
resolved_timezone="UTC",
|
|
runtime_settings={},
|
|
),
|
|
)
|
|
|
|
# ensure dropdown options include currently selected values
|
|
additional = out["additional"]
|
|
current = out["settings"]
|
|
|
|
default_settings = get_default_settings()
|
|
runtime_settings = _runtime_settings_snapshot or settings
|
|
additional["runtime_settings"] = {
|
|
"uvicorn_access_logs_enabled": bool(
|
|
runtime_settings.get(
|
|
"uvicorn_access_logs_enabled",
|
|
default_settings["uvicorn_access_logs_enabled"],
|
|
)
|
|
),
|
|
}
|
|
|
|
additional["agent_subdirs"] = _ensure_option_present(additional.get("agent_subdirs"), current.get("agent_profile"))
|
|
additional["knowledge_subdirs"] = _ensure_option_present(additional.get("knowledge_subdirs"), current.get("agent_knowledge_subdir"))
|
|
if current.get("timezone") != TIMEZONE_AUTO:
|
|
additional["timezones"] = _ensure_option_present(additional.get("timezones"), current.get("timezone"))
|
|
additional["resolved_timezone"] = _resolve_runtime_timezone(current.get("timezone", TIMEZONE_AUTO))
|
|
|
|
# masked api keys
|
|
providers = get_providers("chat") + get_providers("embedding")
|
|
for provider in providers:
|
|
provider_name = provider["value"]
|
|
api_key = settings["api_keys"].get(provider_name, models.get_api_key(provider_name))
|
|
settings["api_keys"][provider_name] = API_KEY_PLACEHOLDER if api_key and api_key != "None" else ""
|
|
|
|
# load auth from dotenv
|
|
out["settings"]["auth_login"] = dotenv.get_dotenv_value(dotenv.KEY_AUTH_LOGIN) or ""
|
|
out["settings"]["auth_password"] = (
|
|
PASSWORD_PLACEHOLDER if dotenv.get_dotenv_value(dotenv.KEY_AUTH_PASSWORD) else ""
|
|
)
|
|
out["settings"]["rfc_password"] = (
|
|
PASSWORD_PLACEHOLDER if dotenv.get_dotenv_value(dotenv.KEY_RFC_PASSWORD) else ""
|
|
)
|
|
out["settings"]["root_password"] = (
|
|
PASSWORD_PLACEHOLDER if dotenv.get_dotenv_value(dotenv.KEY_ROOT_PASSWORD) else ""
|
|
)
|
|
|
|
#secrets
|
|
secrets_manager = get_default_secrets_manager()
|
|
try:
|
|
out["settings"]["secrets"] = secrets_manager.get_masked_secrets()
|
|
except Exception:
|
|
out["settings"]["secrets"] = ""
|
|
|
|
# mask API keys before sending to frontend
|
|
if isinstance(out["settings"].get("api_keys"), dict):
|
|
for provider, value in list(out["settings"]["api_keys"].items()):
|
|
if value:
|
|
out["settings"]["api_keys"][provider] = API_KEY_PLACEHOLDER
|
|
|
|
# normalize certain fields
|
|
for key, value in list(out["settings"].items()):
|
|
# convert kwargs dicts to .env format
|
|
if (key.endswith("_kwargs")) and isinstance(value, dict):
|
|
out["settings"][key] = _dict_to_env(value)
|
|
return out
|
|
|
|
def _get_api_key_field(settings: Settings, provider: str, title: str) -> SettingsField:
|
|
key = settings["api_keys"].get(provider, models.get_api_key(provider))
|
|
# For API keys, use simple asterisk placeholder for existing keys
|
|
return {
|
|
"id": f"api_key_{provider}",
|
|
"title": title,
|
|
"type": "text",
|
|
"value": (API_KEY_PLACEHOLDER if key and key != "None" else ""),
|
|
}
|
|
|
|
|
|
def convert_in(settings: Settings) -> Settings:
|
|
current = get_settings()
|
|
|
|
for key, value in settings.items():
|
|
# Special handling for *_kwargs (stored as .env text)
|
|
if (key.endswith("_kwargs")) and isinstance(value, str):
|
|
current[key] = _env_to_dict(value)
|
|
continue
|
|
|
|
current[key] = value
|
|
return current
|
|
|
|
|
|
def get_settings() -> Settings:
|
|
global _settings
|
|
if not _settings:
|
|
_settings = _read_settings_file()
|
|
if not _settings:
|
|
_settings = get_default_settings()
|
|
norm = normalize_settings(_settings)
|
|
_load_sensitive_settings(norm)
|
|
return norm
|
|
|
|
|
|
def reload_settings() -> Settings:
|
|
global _settings
|
|
_settings = None
|
|
return get_settings()
|
|
|
|
|
|
def set_runtime_settings_snapshot(settings: Settings) -> None:
|
|
global _runtime_settings_snapshot
|
|
_runtime_settings_snapshot = settings.copy()
|
|
|
|
|
|
def set_settings(settings: Settings, apply: bool = True, browser_timezone: str | None = None):
|
|
global _settings
|
|
previous = _settings
|
|
_settings = normalize_settings(settings)
|
|
_write_settings_file(_settings)
|
|
if apply:
|
|
_apply_settings(previous, browser_timezone)
|
|
return reload_settings()
|
|
|
|
|
|
def set_settings_delta(delta: dict, apply: bool = True):
|
|
current = get_settings()
|
|
new = {**current, **delta}
|
|
return set_settings(new, apply) # type: ignore
|
|
|
|
|
|
def merge_settings(original: Settings, delta: dict) -> Settings:
|
|
merged = original.copy()
|
|
merged.update(delta)
|
|
return merged
|
|
|
|
|
|
def normalize_settings(settings: Settings) -> Settings:
|
|
copy = settings.copy()
|
|
default = get_default_settings()
|
|
|
|
# adjust settings values to match current version if needed
|
|
if "version" not in copy or copy["version"] != default["version"]:
|
|
_adjust_to_version(copy, default)
|
|
copy["version"] = default["version"] # sync version
|
|
|
|
# remove keys that are not in default
|
|
keys_to_remove = [key for key in copy if key not in default]
|
|
for key in keys_to_remove:
|
|
del copy[key]
|
|
|
|
# add missing keys and normalize types
|
|
for key, value in default.items():
|
|
if key not in copy:
|
|
copy[key] = value
|
|
else:
|
|
try:
|
|
copy[key] = type(value)(copy[key]) # type: ignore
|
|
if isinstance(copy[key], str):
|
|
copy[key] = copy[key].strip() # strip strings
|
|
except (ValueError, TypeError):
|
|
copy[key] = value # make default instead
|
|
|
|
# mcp server token is set automatically
|
|
copy["mcp_server_token"] = create_auth_token()
|
|
copy["timezone"] = _normalize_timezone_setting(copy.get("timezone"), default["timezone"])
|
|
copy["time_format"] = _normalize_time_format(copy.get("time_format"), default["time_format"])
|
|
|
|
return copy
|
|
|
|
|
|
def _adjust_to_version(settings: Settings, default: Settings):
|
|
# starting with 0.9, the default prompt subfolder for agent no. 0 is agent0
|
|
# switch to agent0 if the old default is used from v0.8
|
|
if "version" not in settings or settings["version"].startswith("v0.8"):
|
|
if "agent_profile" not in settings or settings["agent_profile"] == "default":
|
|
settings["agent_profile"] = "agent0"
|
|
|
|
|
|
|
|
def _load_sensitive_settings(settings: Settings):
|
|
# load api keys from .env
|
|
providers = get_providers("chat") + get_providers("embedding")
|
|
for provider in providers:
|
|
provider_name = provider["value"]
|
|
api_key = settings["api_keys"].get(provider_name) or models.get_api_key(provider_name)
|
|
if api_key and api_key != "None":
|
|
settings["api_keys"][provider_name] = api_key
|
|
|
|
# load auth fields from .env
|
|
settings["auth_login"] = dotenv.get_dotenv_value(dotenv.KEY_AUTH_LOGIN) or ""
|
|
settings["auth_password"] = dotenv.get_dotenv_value(dotenv.KEY_AUTH_PASSWORD) or ""
|
|
settings["rfc_password"] = dotenv.get_dotenv_value(dotenv.KEY_RFC_PASSWORD) or ""
|
|
settings["root_password"] = dotenv.get_dotenv_value(dotenv.KEY_ROOT_PASSWORD) or ""
|
|
|
|
# load secrets raw content
|
|
secrets_manager = get_default_secrets_manager()
|
|
try:
|
|
settings["secrets"] = secrets_manager.read_secrets_raw()
|
|
except Exception:
|
|
settings["secrets"] = ""
|
|
|
|
|
|
def _read_settings_file() -> Settings | None:
|
|
if os.path.exists(SETTINGS_FILE):
|
|
content = files.read_file(SETTINGS_FILE)
|
|
parsed = json.loads(content)
|
|
return normalize_settings(parsed)
|
|
|
|
|
|
def _write_settings_file(settings: Settings):
|
|
settings = settings.copy()
|
|
_write_sensitive_settings(settings)
|
|
_remove_sensitive_settings(settings)
|
|
|
|
# write settings
|
|
content = json.dumps(settings, indent=4)
|
|
files.write_file(SETTINGS_FILE, content)
|
|
|
|
|
|
def _remove_sensitive_settings(settings: Settings):
|
|
settings["api_keys"] = {}
|
|
settings["auth_login"] = ""
|
|
settings["auth_password"] = ""
|
|
settings["rfc_password"] = ""
|
|
settings["root_password"] = ""
|
|
settings["mcp_server_token"] = ""
|
|
settings["secrets"] = ""
|
|
|
|
|
|
def _write_sensitive_settings(settings: Settings):
|
|
for key, val in settings["api_keys"].items():
|
|
if val != API_KEY_PLACEHOLDER:
|
|
dotenv.save_dotenv_value(f"API_KEY_{key.upper()}", val)
|
|
|
|
dotenv.save_dotenv_value(dotenv.KEY_AUTH_LOGIN, settings["auth_login"])
|
|
if settings["auth_password"] != PASSWORD_PLACEHOLDER:
|
|
dotenv.save_dotenv_value(dotenv.KEY_AUTH_PASSWORD, settings["auth_password"])
|
|
if settings["rfc_password"] != PASSWORD_PLACEHOLDER:
|
|
dotenv.save_dotenv_value(dotenv.KEY_RFC_PASSWORD, settings["rfc_password"])
|
|
if settings["root_password"] != PASSWORD_PLACEHOLDER:
|
|
if runtime.is_dockerized():
|
|
dotenv.save_dotenv_value(dotenv.KEY_ROOT_PASSWORD, settings["root_password"])
|
|
set_root_password(settings["root_password"])
|
|
|
|
# Handle secrets separately - merge with existing preserving comments/order and support deletions
|
|
secrets_manager = get_default_secrets_manager()
|
|
submitted_content = settings["secrets"]
|
|
secrets_manager.save_secrets_with_merge(submitted_content)
|
|
|
|
|
|
|
|
def get_default_settings() -> Settings:
|
|
gitignore = files.read_file(files.get_abs_path("conf/workdir.gitignore"))
|
|
return Settings(
|
|
version=_get_version(),
|
|
api_keys={},
|
|
auth_login="",
|
|
auth_password="",
|
|
root_password="",
|
|
agent_profile=get_default_value("agent_profile", "agent0"),
|
|
agent_knowledge_subdir=get_default_value("agent_knowledge_subdir", "custom"),
|
|
timezone=_normalize_timezone_setting(get_default_value("timezone", TIMEZONE_AUTO)),
|
|
time_format=_normalize_time_format(get_default_value("time_format", TIME_FORMAT_12H)),
|
|
workdir_path=get_default_value("workdir_path", files.get_abs_path_dockerized("usr/workdir")),
|
|
workdir_show=get_default_value("workdir_show", True),
|
|
workdir_max_depth=get_default_value("workdir_max_depth", 5),
|
|
workdir_max_files=get_default_value("workdir_max_files", 20),
|
|
workdir_max_folders=get_default_value("workdir_max_folders", 20),
|
|
workdir_max_lines=get_default_value("workdir_max_lines", 250),
|
|
workdir_gitignore=get_default_value("workdir_gitignore", gitignore),
|
|
rfc_auto_docker=get_default_value("rfc_auto_docker", True),
|
|
rfc_url=get_default_value("rfc_url", "localhost"),
|
|
rfc_password="",
|
|
rfc_port_http=get_default_value("rfc_port_http", 55080),
|
|
websocket_server_restart_enabled=get_default_value("websocket_server_restart_enabled", True),
|
|
uvicorn_access_logs_enabled=get_default_value("uvicorn_access_logs_enabled", False),
|
|
mcp_servers=get_default_value("mcp_servers", '{\n "mcpServers": {}\n}'),
|
|
mcp_client_init_timeout=get_default_value("mcp_client_init_timeout", 10),
|
|
mcp_client_tool_timeout=get_default_value("mcp_client_tool_timeout", 120),
|
|
mcp_server_enabled=get_default_value("mcp_server_enabled", False),
|
|
mcp_server_token=create_auth_token(),
|
|
a2a_server_enabled=get_default_value("a2a_server_enabled", False),
|
|
variables="",
|
|
secrets="",
|
|
litellm_global_kwargs=get_default_value("litellm_global_kwargs", {}),
|
|
update_check_enabled=get_default_value("update_check_enabled", True),
|
|
chat_inherit_project=get_default_value("chat_inherit_project", True),
|
|
)
|
|
|
|
|
|
def _apply_timezone_setting(previous: Settings | None, browser_timezone: str | None = None) -> None:
|
|
if not _settings:
|
|
return
|
|
|
|
from helpers.localization import Localization
|
|
|
|
localization = Localization.get()
|
|
previous_timezone = localization.get_timezone()
|
|
target_timezone = _resolve_runtime_timezone(_settings["timezone"], browser_timezone)
|
|
if (
|
|
previous
|
|
and _settings["timezone"] == previous.get("timezone")
|
|
and _settings["timezone"] != TIMEZONE_AUTO
|
|
and previous_timezone == target_timezone
|
|
):
|
|
return
|
|
|
|
localization.set_timezone(target_timezone)
|
|
current_timezone = localization.get_timezone()
|
|
if current_timezone == previous_timezone:
|
|
return
|
|
|
|
try:
|
|
from helpers import plugins
|
|
|
|
plugins.call_plugin_hook(
|
|
"_office",
|
|
"timezone_changed",
|
|
None,
|
|
previous_timezone=previous_timezone,
|
|
timezone=current_timezone,
|
|
)
|
|
except Exception:
|
|
return
|
|
|
|
|
|
def _apply_settings(previous: Settings | None, browser_timezone: str | None = None):
|
|
global _settings
|
|
if _settings:
|
|
_apply_timezone_setting(previous, browser_timezone)
|
|
|
|
from agent import AgentContext
|
|
from initialize import initialize_agent
|
|
|
|
for ctx in AgentContext.all():
|
|
profile = str(getattr(ctx.config, "profile", "") or _settings["agent_profile"])
|
|
config = initialize_agent(override_settings={"agent_profile": profile})
|
|
ctx.config = config # reinitialize context config with new settings
|
|
# apply config to agents
|
|
agent = ctx.agent0
|
|
while agent:
|
|
agent.config = ctx.config
|
|
agent = agent.get_data(agent.DATA_NAME_SUBORDINATE)
|
|
|
|
# update mcp settings if necessary
|
|
if not previous or _settings["mcp_servers"] != previous["mcp_servers"]:
|
|
from helpers.mcp_handler import MCPConfig
|
|
|
|
async def update_mcp_settings(mcp_servers: str):
|
|
PrintStyle(
|
|
background_color="black", font_color="white", padding=True
|
|
).print("Updating MCP config...")
|
|
NotificationManager.send_notification(
|
|
type=NotificationType.INFO,
|
|
priority=NotificationPriority.NORMAL,
|
|
message="Updating MCP settings...",
|
|
display_time=999,
|
|
group="settings-mcp"
|
|
)
|
|
|
|
mcp_config = MCPConfig.get_instance()
|
|
try:
|
|
MCPConfig.update(mcp_servers)
|
|
except Exception as e:
|
|
|
|
NotificationManager.send_notification(
|
|
type=NotificationType.ERROR,
|
|
priority=NotificationPriority.HIGH,
|
|
message="Failed to update MCP settings",
|
|
detail=str(e),
|
|
)
|
|
(
|
|
PrintStyle(
|
|
background_color="red", font_color="black", padding=True
|
|
).print("Failed to update MCP settings")
|
|
)
|
|
(
|
|
PrintStyle(
|
|
background_color="black", font_color="red", padding=True
|
|
).print(f"{e}")
|
|
)
|
|
|
|
PrintStyle(
|
|
background_color="#6734C3", font_color="white", padding=True
|
|
).print("Parsed MCP config:")
|
|
(
|
|
PrintStyle(
|
|
background_color="#334455", font_color="white", padding=False
|
|
).print(mcp_config.model_dump_json())
|
|
)
|
|
NotificationManager.send_notification(
|
|
type=NotificationType.INFO,
|
|
priority=NotificationPriority.NORMAL,
|
|
message="Finished updating MCP settings.",
|
|
group="settings-mcp"
|
|
)
|
|
|
|
task2 = defer.DeferredTask().start_task(
|
|
update_mcp_settings, config.mcp_servers
|
|
) # TODO overkill, replace with background task
|
|
|
|
# update token in mcp server
|
|
current_token = (
|
|
create_auth_token()
|
|
) # TODO - ugly, token in settings is generated from dotenv and does not always correspond
|
|
if not previous or current_token != previous["mcp_server_token"]:
|
|
|
|
async def update_mcp_token(token: str):
|
|
from helpers.mcp_server import DynamicMcpProxy
|
|
|
|
DynamicMcpProxy.get_instance().reconfigure(token=token)
|
|
|
|
task3 = defer.DeferredTask().start_task(
|
|
update_mcp_token, current_token
|
|
) # TODO overkill, replace with background task
|
|
|
|
# update token in a2a server
|
|
if not previous or current_token != previous["mcp_server_token"]:
|
|
|
|
async def update_a2a_token(token: str):
|
|
from helpers.fasta2a_server import DynamicA2AProxy
|
|
|
|
DynamicA2AProxy.get_instance().reconfigure(token=token)
|
|
|
|
task4 = defer.DeferredTask().start_task(
|
|
update_a2a_token, current_token
|
|
) # TODO overkill, replace with background task
|
|
|
|
|
|
def _env_to_dict(data: str):
|
|
result = {}
|
|
for line in data.splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith('#'):
|
|
continue
|
|
|
|
if '=' not in line:
|
|
continue
|
|
|
|
key, value = line.split('=', 1)
|
|
key = key.strip()
|
|
value = value.strip()
|
|
|
|
# If quoted, treat as string
|
|
if value.startswith('"') and value.endswith('"'):
|
|
result[key] = value[1:-1].replace('\\"', '"') # Unescape quotes
|
|
elif value.startswith("'") and value.endswith("'"):
|
|
result[key] = value[1:-1].replace("\\'", "'") # Unescape quotes
|
|
else:
|
|
# Not quoted, try JSON parse
|
|
try:
|
|
result[key] = json.loads(value)
|
|
except (json.JSONDecodeError, ValueError):
|
|
result[key] = value
|
|
|
|
return result
|
|
|
|
|
|
def _dict_to_env(data_dict):
|
|
lines = []
|
|
for key, value in data_dict.items():
|
|
if isinstance(value, str):
|
|
# Quote strings and escape internal quotes
|
|
escaped_value = value.replace('"', '\\"')
|
|
lines.append(f'{key}="{escaped_value}"')
|
|
elif isinstance(value, (dict, list, bool)) or value is None:
|
|
# Serialize as unquoted JSON
|
|
lines.append(f'{key}={json.dumps(value, separators=(",", ":"))}')
|
|
else:
|
|
# Numbers and other types as unquoted strings
|
|
lines.append(f'{key}={value}')
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def set_root_password(password: str):
|
|
if not runtime.is_dockerized():
|
|
raise Exception("root password can only be set in dockerized environments")
|
|
_result = subprocess.run(
|
|
["chpasswd"],
|
|
input=f"root:{password}".encode(),
|
|
capture_output=True,
|
|
check=True,
|
|
)
|
|
dotenv.save_dotenv_value(dotenv.KEY_ROOT_PASSWORD, password)
|
|
|
|
|
|
def get_runtime_config(set: Settings):
|
|
# SSH config is now managed by the code_execution plugin.
|
|
# This function is kept for backward compatibility but returns an empty dict.
|
|
return {}
|
|
|
|
|
|
def create_auth_token() -> str:
|
|
runtime_id = runtime.get_persistent_id()
|
|
username = dotenv.get_dotenv_value(dotenv.KEY_AUTH_LOGIN) or ""
|
|
password = dotenv.get_dotenv_value(dotenv.KEY_AUTH_PASSWORD) or ""
|
|
# use base64 encoding for a more compact token with alphanumeric chars
|
|
hash_bytes = hashlib.sha256(f"{runtime_id}:{username}:{password}".encode()).digest()
|
|
# encode as base64 and remove any non-alphanumeric chars (like +, /, =)
|
|
b64_token = base64.urlsafe_b64encode(hash_bytes).decode().replace("=", "")
|
|
return b64_token[:16]
|
|
|
|
|
|
def _get_version():
|
|
return git.get_version()
|