feat(dx): Setup Wizard + doctor command — closes #2030 (#2034)

This commit is contained in:
DanielWalnut 2026-04-10 17:43:39 +08:00 committed by GitHub
parent b107444878
commit eef0a6e2da
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 2809 additions and 68 deletions

View file

@ -6,7 +6,6 @@ from __future__ import annotations
import shutil
import subprocess
import sys
from typing import Optional
def configure_stdio() -> None:
@ -20,7 +19,7 @@ def configure_stdio() -> None:
continue
def run_command(command: list[str]) -> Optional[str]:
def run_command(command: list[str]) -> str | None:
"""Run a command and return trimmed stdout, or None on failure."""
try:
result = subprocess.run(command, capture_output=True, text=True, check=True, shell=False)
@ -29,7 +28,7 @@ def run_command(command: list[str]) -> Optional[str]:
return result.stdout.strip() or result.stderr.strip()
def find_pnpm_command() -> Optional[list[str]]:
def find_pnpm_command() -> list[str] | None:
"""Return a pnpm-compatible command that exists on this machine."""
candidates = [["pnpm"], ["pnpm.cmd"]]
if shutil.which("corepack"):
@ -41,7 +40,7 @@ def find_pnpm_command() -> Optional[list[str]]:
return None
def parse_node_major(version_text: str) -> Optional[int]:
def parse_node_major(version_text: str) -> int | None:
version = version_text.strip()
if version.startswith("v"):
version = version[1:]
@ -145,7 +144,9 @@ def main() -> int:
print()
print("You can now run:")
print(" make install - Install project dependencies")
print(" make config - Generate local config files")
print(" make setup - Create a minimal working config (recommended)")
print(" make config - Copy the full config template (manual setup)")
print(" make doctor - Verify config and dependency health")
print(" make dev - Start development server")
print(" make start - Start production server")
return 0

View file

@ -70,7 +70,9 @@ if [ "$FAILED" -eq 0 ]; then
echo ""
echo "You can now run:"
echo " make install - Install project dependencies"
echo " make config - Generate local config files"
echo " make setup - Create a minimal working config (recommended)"
echo " make config - Copy the full config template (manual setup)"
echo " make doctor - Verify config and dependency health"
echo " make dev - Start development server"
echo " make start - Start production server"
else

View file

@ -91,11 +91,11 @@ if [ ! -f "$DEER_FLOW_CONFIG_PATH" ]; then
cp "$REPO_ROOT/config.example.yaml" "$DEER_FLOW_CONFIG_PATH"
echo -e "${GREEN}✓ Seeded config.example.yaml → $DEER_FLOW_CONFIG_PATH${NC}"
echo -e "${YELLOW}⚠ config.yaml was seeded from the example template.${NC}"
echo " Edit $DEER_FLOW_CONFIG_PATH and set your model API keys before use."
echo " Run 'make setup' to generate a minimal config, or edit $DEER_FLOW_CONFIG_PATH manually before use."
else
echo -e "${RED}✗ No config.yaml found.${NC}"
echo " Run 'make config' from the repo root to generate one,"
echo " then set the required model API keys."
echo " Run 'make setup' from the repo root (recommended),"
echo " or 'make config' for the full template, then set the required model API keys."
exit 1
fi
else

View file

@ -209,6 +209,7 @@ start() {
echo -e "${YELLOW} configuration before starting DeerFlow. ${NC}"
echo -e "${YELLOW}============================================================${NC}"
echo ""
echo -e "${YELLOW} Recommended: run 'make setup' before starting Docker. ${NC}"
echo -e "${YELLOW} Edit the file: $PROJECT_ROOT/config.yaml${NC}"
echo -e "${YELLOW} Then run: make docker-start${NC}"
echo ""

721
scripts/doctor.py Normal file
View file

@ -0,0 +1,721 @@
#!/usr/bin/env python3
"""DeerFlow Health Check (make doctor).
Checks system requirements, configuration, LLM provider, and optional
components, then prints an actionable report.
Exit codes:
0 all required checks passed (warnings allowed)
1 one or more required checks failed
"""
from __future__ import annotations
import os
import shutil
import subprocess
import sys
from importlib import import_module
from pathlib import Path
from typing import Literal
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
Status = Literal["ok", "warn", "fail", "skip"]
def _supports_color() -> bool:
return hasattr(sys.stdout, "isatty") and sys.stdout.isatty()
def _c(text: str, code: str) -> str:
if _supports_color():
return f"\033[{code}m{text}\033[0m"
return text
def green(t: str) -> str:
return _c(t, "32")
def red(t: str) -> str:
return _c(t, "31")
def yellow(t: str) -> str:
return _c(t, "33")
def cyan(t: str) -> str:
return _c(t, "36")
def bold(t: str) -> str:
return _c(t, "1")
def _icon(status: Status) -> str:
icons = {"ok": green(""), "warn": yellow("!"), "fail": red(""), "skip": ""}
return icons[status]
def _run(cmd: list[str]) -> str | None:
try:
r = subprocess.run(cmd, capture_output=True, text=True, check=True)
return (r.stdout or r.stderr).strip()
except Exception:
return None
def _parse_major(version_text: str) -> int | None:
v = version_text.lstrip("v").split(".", 1)[0]
return int(v) if v.isdigit() else None
def _load_yaml_file(path: Path) -> dict:
import yaml
with open(path, encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
if not isinstance(data, dict):
raise ValueError("top-level config must be a YAML mapping")
return data
def _load_app_config(config_path: Path) -> object:
from deerflow.config.app_config import AppConfig
return AppConfig.from_file(str(config_path))
def _split_use_path(use: str) -> tuple[str, str] | None:
if ":" not in use:
return None
module_name, attr_name = use.split(":", 1)
if not module_name or not attr_name:
return None
return module_name, attr_name
# ---------------------------------------------------------------------------
# Check result container
# ---------------------------------------------------------------------------
class CheckResult:
def __init__(
self,
label: str,
status: Status,
detail: str = "",
fix: str | None = None,
) -> None:
self.label = label
self.status = status
self.detail = detail
self.fix = fix
def print(self) -> None:
icon = _icon(self.status)
detail_str = f" ({self.detail})" if self.detail else ""
print(f" {icon} {self.label}{detail_str}")
if self.fix:
for line in self.fix.splitlines():
print(f" {cyan('')} {line}")
# ---------------------------------------------------------------------------
# Individual checks
# ---------------------------------------------------------------------------
def check_python() -> CheckResult:
v = sys.version_info
version_str = f"{v.major}.{v.minor}.{v.micro}"
if v >= (3, 12):
return CheckResult("Python", "ok", version_str)
return CheckResult(
"Python",
"fail",
version_str,
fix="Python 3.12+ required. Install from https://www.python.org/",
)
def check_node() -> CheckResult:
node = shutil.which("node")
if not node:
return CheckResult(
"Node.js",
"fail",
fix="Install Node.js 22+: https://nodejs.org/",
)
out = _run(["node", "-v"]) or ""
major = _parse_major(out)
if major is None or major < 22:
return CheckResult(
"Node.js",
"fail",
out or "unknown version",
fix="Node.js 22+ required. Install from https://nodejs.org/",
)
return CheckResult("Node.js", "ok", out.lstrip("v"))
def check_pnpm() -> CheckResult:
candidates = [["pnpm"], ["pnpm.cmd"]]
if shutil.which("corepack"):
candidates.append(["corepack", "pnpm"])
for cmd in candidates:
if shutil.which(cmd[0]):
out = _run([*cmd, "-v"]) or ""
return CheckResult("pnpm", "ok", out)
return CheckResult(
"pnpm",
"fail",
fix="npm install -g pnpm (or: corepack enable)",
)
def check_uv() -> CheckResult:
if not shutil.which("uv"):
return CheckResult(
"uv",
"fail",
fix="curl -LsSf https://astral.sh/uv/install.sh | sh",
)
out = _run(["uv", "--version"]) or ""
parts = out.split()
version = parts[1] if len(parts) > 1 else out
return CheckResult("uv", "ok", version)
def check_nginx() -> CheckResult:
if shutil.which("nginx"):
out = _run(["nginx", "-v"]) or ""
version = out.split("/", 1)[-1] if "/" in out else out
return CheckResult("nginx", "ok", version)
return CheckResult(
"nginx",
"fail",
fix=(
"macOS: brew install nginx\n"
"Ubuntu: sudo apt install nginx\n"
"Windows: use WSL or Docker mode"
),
)
def check_config_exists(config_path: Path) -> CheckResult:
if config_path.exists():
return CheckResult("config.yaml found", "ok")
return CheckResult(
"config.yaml found",
"fail",
fix="Run 'make setup' to create it",
)
def check_config_version(config_path: Path, project_root: Path) -> CheckResult:
if not config_path.exists():
return CheckResult("config.yaml version", "skip")
try:
import yaml
with open(config_path, encoding="utf-8") as f:
user_data = yaml.safe_load(f) or {}
user_ver = int(user_data.get("config_version", 0))
except Exception as exc:
return CheckResult("config.yaml version", "fail", str(exc))
example_path = project_root / "config.example.yaml"
if not example_path.exists():
return CheckResult("config.yaml version", "skip", "config.example.yaml not found")
try:
import yaml
with open(example_path, encoding="utf-8") as f:
example_data = yaml.safe_load(f) or {}
example_ver = int(example_data.get("config_version", 0))
except Exception:
return CheckResult("config.yaml version", "skip")
if user_ver < example_ver:
return CheckResult(
"config.yaml version",
"warn",
f"v{user_ver} < v{example_ver} (latest)",
fix="make config-upgrade",
)
return CheckResult("config.yaml version", "ok", f"v{user_ver}")
def check_models_configured(config_path: Path) -> CheckResult:
if not config_path.exists():
return CheckResult("models configured", "skip")
try:
data = _load_yaml_file(config_path)
models = data.get("models", [])
if models:
return CheckResult("models configured", "ok", f"{len(models)} model(s)")
return CheckResult(
"models configured",
"fail",
"no models found",
fix="Run 'make setup' to configure an LLM provider",
)
except Exception as exc:
return CheckResult("models configured", "fail", str(exc))
def check_config_loadable(config_path: Path) -> CheckResult:
if not config_path.exists():
return CheckResult("config.yaml loadable", "skip")
try:
_load_app_config(config_path)
return CheckResult("config.yaml loadable", "ok")
except Exception as exc:
return CheckResult(
"config.yaml loadable",
"fail",
str(exc),
fix="Run 'make setup' again, or compare with config.example.yaml",
)
def check_llm_api_key(config_path: Path) -> list[CheckResult]:
"""Check that each model's env var is set in the environment."""
if not config_path.exists():
return []
results: list[CheckResult] = []
try:
import yaml
from dotenv import load_dotenv
env_path = config_path.parent / ".env"
if env_path.exists():
load_dotenv(env_path, override=False)
with open(config_path, encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
for model in data.get("models", []):
# Collect all values that look like $ENV_VAR references
def _collect_env_refs(obj: object) -> list[str]:
refs: list[str] = []
if isinstance(obj, str) and obj.startswith("$"):
refs.append(obj[1:])
elif isinstance(obj, dict):
for v in obj.values():
refs.extend(_collect_env_refs(v))
elif isinstance(obj, list):
for item in obj:
refs.extend(_collect_env_refs(item))
return refs
env_refs = _collect_env_refs(model)
model_name = model.get("name", "default")
for var in env_refs:
label = f"{var} set (model: {model_name})"
if os.environ.get(var):
results.append(CheckResult(label, "ok"))
else:
results.append(
CheckResult(
label,
"fail",
fix=f"Add {var}=<your-key> to your .env file",
)
)
except Exception as exc:
results.append(CheckResult("LLM API key check", "fail", str(exc)))
return results
def check_llm_package(config_path: Path) -> list[CheckResult]:
"""Check that the LangChain provider package is installed."""
if not config_path.exists():
return []
results: list[CheckResult] = []
try:
import yaml
with open(config_path, encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
seen_packages: set[str] = set()
for model in data.get("models", []):
use = model.get("use", "")
if ":" in use:
package_path = use.split(":")[0]
# e.g. langchain_openai → langchain-openai
top_level = package_path.split(".")[0]
pip_name = top_level.replace("_", "-")
if pip_name in seen_packages:
continue
seen_packages.add(pip_name)
label = f"{pip_name} installed"
try:
__import__(top_level)
results.append(CheckResult(label, "ok"))
except ImportError:
results.append(
CheckResult(
label,
"fail",
fix=f"cd backend && uv add {pip_name}",
)
)
except Exception as exc:
results.append(CheckResult("LLM package check", "fail", str(exc)))
return results
def check_llm_auth(config_path: Path) -> list[CheckResult]:
if not config_path.exists():
return []
results: list[CheckResult] = []
try:
data = _load_yaml_file(config_path)
for model in data.get("models", []):
use = model.get("use", "")
model_name = model.get("name", "default")
if use == "deerflow.models.openai_codex_provider:CodexChatModel":
auth_path = Path(os.environ.get("CODEX_AUTH_PATH", "~/.codex/auth.json")).expanduser()
if auth_path.exists():
results.append(CheckResult(f"Codex CLI auth available (model: {model_name})", "ok", str(auth_path)))
else:
results.append(
CheckResult(
f"Codex CLI auth available (model: {model_name})",
"fail",
str(auth_path),
fix="Run `codex login`, or set CODEX_AUTH_PATH to a valid auth.json",
)
)
if use == "deerflow.models.claude_provider:ClaudeChatModel":
credential_paths = [
Path(os.environ["CLAUDE_CODE_CREDENTIALS_PATH"]).expanduser()
for env_name in ("CLAUDE_CODE_CREDENTIALS_PATH",)
if os.environ.get(env_name)
]
credential_paths.append(Path("~/.claude/.credentials.json").expanduser())
has_oauth_env = any(
os.environ.get(name)
for name in (
"ANTHROPIC_API_KEY",
"CLAUDE_CODE_OAUTH_TOKEN",
"ANTHROPIC_AUTH_TOKEN",
"CLAUDE_CODE_OAUTH_TOKEN_FILE_DESCRIPTOR",
)
)
existing_path = next((path for path in credential_paths if path.exists()), None)
if has_oauth_env or existing_path is not None:
detail = "env var set" if has_oauth_env else str(existing_path)
results.append(CheckResult(f"Claude auth available (model: {model_name})", "ok", detail))
else:
results.append(
CheckResult(
f"Claude auth available (model: {model_name})",
"fail",
fix=(
"Set ANTHROPIC_API_KEY / CLAUDE_CODE_OAUTH_TOKEN, "
"or place credentials at ~/.claude/.credentials.json"
),
)
)
except Exception as exc:
results.append(CheckResult("LLM auth check", "fail", str(exc)))
return results
def check_web_search(config_path: Path) -> CheckResult:
return check_web_tool(config_path, tool_name="web_search", label="web search configured")
def check_web_tool(config_path: Path, *, tool_name: str, label: str) -> CheckResult:
"""Warn (not fail) if a web capability is not configured."""
if not config_path.exists():
return CheckResult(label, "skip")
try:
from dotenv import load_dotenv
env_path = config_path.parent / ".env"
if env_path.exists():
load_dotenv(env_path, override=False)
data = _load_yaml_file(config_path)
tool_uses = [t.get("use", "") for t in data.get("tools", []) if t.get("name") == tool_name]
if not tool_uses:
return CheckResult(
label,
"warn",
f"no {tool_name} tool in config",
fix=f"Run 'make setup' to configure {tool_name}",
)
free_providers = {
"web_search": {"ddg_search": "DuckDuckGo (no key needed)"},
"web_fetch": {"jina_ai": "Jina AI Reader (no key needed)"},
}
key_providers = {
"web_search": {
"tavily": "TAVILY_API_KEY",
"infoquest": "INFOQUEST_API_KEY",
"exa": "EXA_API_KEY",
"firecrawl": "FIRECRAWL_API_KEY",
},
"web_fetch": {
"infoquest": "INFOQUEST_API_KEY",
"exa": "EXA_API_KEY",
"firecrawl": "FIRECRAWL_API_KEY",
},
}
for use in tool_uses:
for provider, detail in free_providers.get(tool_name, {}).items():
if provider in use:
return CheckResult(label, "ok", detail)
for use in tool_uses:
for provider, var in key_providers.get(tool_name, {}).items():
if provider in use:
val = os.environ.get(var)
if val:
return CheckResult(label, "ok", f"{provider} ({var} set)")
return CheckResult(
label,
"warn",
f"{provider} configured but {var} not set",
fix=f"Add {var}=<your-key> to .env, or run 'make setup'",
)
for use in tool_uses:
split = _split_use_path(use)
if split is None:
return CheckResult(
label,
"fail",
f"invalid use path: {use}",
fix="Use a valid module:path provider from config.example.yaml",
)
module_name, attr_name = split
try:
module = import_module(module_name)
getattr(module, attr_name)
except Exception as exc:
return CheckResult(
label,
"fail",
f"provider import failed: {use} ({exc})",
fix="Install the provider dependency or pick a valid provider in `make setup`",
)
return CheckResult(label, "ok")
except Exception as exc:
return CheckResult(label, "warn", str(exc))
def check_web_fetch(config_path: Path) -> CheckResult:
return check_web_tool(config_path, tool_name="web_fetch", label="web fetch configured")
def check_frontend_env(project_root: Path) -> CheckResult:
env_path = project_root / "frontend" / ".env"
if env_path.exists():
return CheckResult("frontend/.env found", "ok")
return CheckResult(
"frontend/.env found",
"warn",
fix="Run 'make setup' or copy frontend/.env.example to frontend/.env",
)
def check_sandbox(config_path: Path) -> list[CheckResult]:
if not config_path.exists():
return [CheckResult("sandbox configured", "skip")]
try:
data = _load_yaml_file(config_path)
sandbox = data.get("sandbox")
if not isinstance(sandbox, dict):
return [
CheckResult(
"sandbox configured",
"fail",
"missing sandbox section",
fix="Run 'make setup' to choose an execution mode",
)
]
sandbox_use = sandbox.get("use", "")
tools = data.get("tools", [])
tool_names = {tool.get("name") for tool in tools if isinstance(tool, dict)}
results: list[CheckResult] = []
if "LocalSandboxProvider" in sandbox_use:
results.append(CheckResult("sandbox configured", "ok", "Local sandbox"))
has_bash_tool = "bash" in tool_names
allow_host_bash = bool(sandbox.get("allow_host_bash", False))
if has_bash_tool and not allow_host_bash:
results.append(
CheckResult(
"bash compatibility",
"warn",
"bash tool configured but host bash is disabled",
fix="Enable host bash only in a fully trusted environment, or switch to container sandbox",
)
)
elif allow_host_bash:
results.append(
CheckResult(
"bash compatibility",
"warn",
"host bash enabled on LocalSandboxProvider",
fix="Use container sandbox for stronger isolation when bash is required",
)
)
elif "AioSandboxProvider" in sandbox_use:
results.append(CheckResult("sandbox configured", "ok", "Container sandbox"))
if not sandbox.get("provisioner_url") and not (shutil.which("docker") or shutil.which("container")):
results.append(
CheckResult(
"container runtime available",
"warn",
"no Docker/Apple Container runtime detected",
fix="Install Docker Desktop / Apple Container, or switch to local sandbox",
)
)
elif sandbox_use:
results.append(CheckResult("sandbox configured", "ok", sandbox_use))
else:
results.append(
CheckResult(
"sandbox configured",
"fail",
"sandbox.use is empty",
fix="Run 'make setup' to choose an execution mode",
)
)
return results
except Exception as exc:
return [CheckResult("sandbox configured", "fail", str(exc))]
def check_env_file(project_root: Path) -> CheckResult:
env_path = project_root / ".env"
if env_path.exists():
return CheckResult(".env found", "ok")
return CheckResult(
".env found",
"warn",
fix="Run 'make setup' or copy .env.example to .env",
)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> int:
project_root = Path(__file__).resolve().parents[1]
config_path = project_root / "config.yaml"
# Load .env early so key checks work
try:
from dotenv import load_dotenv
env_path = project_root / ".env"
if env_path.exists():
load_dotenv(env_path, override=False)
except ImportError:
pass
print()
print(bold("DeerFlow Health Check"))
print("" * 40)
sections: list[tuple[str, list[CheckResult]]] = []
# ── System Requirements ────────────────────────────────────────────────────
sys_checks = [
check_python(),
check_node(),
check_pnpm(),
check_uv(),
check_nginx(),
]
sections.append(("System Requirements", sys_checks))
# ── Configuration ─────────────────────────────────────────────────────────
cfg_checks: list[CheckResult] = [
check_env_file(project_root),
check_frontend_env(project_root),
check_config_exists(config_path),
check_config_version(config_path, project_root),
check_config_loadable(config_path),
check_models_configured(config_path),
]
sections.append(("Configuration", cfg_checks))
# ── LLM Provider ──────────────────────────────────────────────────────────
llm_checks: list[CheckResult] = [
*check_llm_api_key(config_path),
*check_llm_auth(config_path),
*check_llm_package(config_path),
]
sections.append(("LLM Provider", llm_checks))
# ── Web Capabilities ─────────────────────────────────────────────────────
search_checks = [check_web_search(config_path), check_web_fetch(config_path)]
sections.append(("Web Capabilities", search_checks))
# ── Sandbox ──────────────────────────────────────────────────────────────
sandbox_checks = check_sandbox(config_path)
sections.append(("Sandbox", sandbox_checks))
# ── Render ────────────────────────────────────────────────────────────────
total_fails = 0
total_warns = 0
for section_title, checks in sections:
print()
print(bold(section_title))
for cr in checks:
cr.print()
if cr.status == "fail":
total_fails += 1
elif cr.status == "warn":
total_warns += 1
# ── Summary ───────────────────────────────────────────────────────────────
print()
print("" * 40)
if total_fails == 0 and total_warns == 0:
print(f"Status: {green('Ready')}")
print(f"Run {cyan('make dev')} to start DeerFlow")
elif total_fails == 0:
print(f"Status: {yellow(f'Ready ({total_warns} warning(s))')}")
print(f"Run {cyan('make dev')} to start DeerFlow")
else:
print(f"Status: {red(f'{total_fails} error(s), {total_warns} warning(s)')}")
print("Fix the errors above, then run 'make doctor' again.")
print()
return 0 if total_fails == 0 else 1
if __name__ == "__main__":
sys.exit(main())

View file

@ -68,6 +68,15 @@ done
# ── Stop helper ──────────────────────────────────────────────────────────────
_kill_port() {
local port=$1
local pid
pid=$(lsof -ti :"$port" 2>/dev/null) || true
if [ -n "$pid" ]; then
kill -9 $pid 2>/dev/null || true
fi
}
stop_all() {
echo "Stopping all services..."
pkill -f "langgraph dev" 2>/dev/null || true
@ -78,6 +87,10 @@ stop_all() {
nginx -c "$REPO_ROOT/docker/nginx/nginx.local.conf" -p "$REPO_ROOT" -s quit 2>/dev/null || true
sleep 1
pkill -9 nginx 2>/dev/null || true
# Force-kill any survivors still holding the service ports
_kill_port 2024
_kill_port 8001
_kill_port 3000
./scripts/cleanup-containers.sh deer-flow-sandbox 2>/dev/null || true
echo "✓ All services stopped"
}
@ -155,7 +168,7 @@ if ! { \
[ -f config.yaml ]; \
}; then
echo "✗ No DeerFlow config file found."
echo " Run 'make config' to generate config.yaml."
echo " Run 'make setup' (recommended) or 'make config' to generate config.yaml."
exit 1
fi

165
scripts/setup_wizard.py Normal file
View file

@ -0,0 +1,165 @@
#!/usr/bin/env python3
"""DeerFlow Interactive Setup Wizard.
Usage:
uv run python scripts/setup_wizard.py
"""
from __future__ import annotations
import sys
from pathlib import Path
# Make the scripts/ directory importable so wizard.* works
sys.path.insert(0, str(Path(__file__).resolve().parent))
def _is_interactive() -> bool:
return sys.stdin.isatty() and sys.stdout.isatty()
def main() -> int:
try:
if not _is_interactive():
print(
"Non-interactive environment detected.\n"
"Please edit config.yaml and .env directly, or run 'make setup' in a terminal."
)
return 1
from wizard.ui import (
ask_yes_no,
bold,
cyan,
green,
print_header,
print_info,
print_success,
yellow,
)
from wizard.writer import write_config_yaml, write_env_file
project_root = Path(__file__).resolve().parents[1]
config_path = project_root / "config.yaml"
env_path = project_root / ".env"
print()
print(bold("Welcome to DeerFlow Setup!"))
print("This wizard will help you configure DeerFlow in a few minutes.")
print()
if config_path.exists():
print(yellow("Existing configuration detected."))
print()
should_reconfigure = ask_yes_no("Do you want to reconfigure?", default=False)
if not should_reconfigure:
print()
print_info("Keeping existing config. Run 'make doctor' to verify your setup.")
return 0
print()
total_steps = 4
from wizard.steps.llm import run_llm_step
llm = run_llm_step(f"Step 1/{total_steps}")
from wizard.steps.search import run_search_step
search = run_search_step(f"Step 2/{total_steps}")
search_provider = search.search_provider
search_api_key = search.search_api_key
fetch_provider = search.fetch_provider
fetch_api_key = search.fetch_api_key
from wizard.steps.execution import run_execution_step
execution = run_execution_step(f"Step 3/{total_steps}")
print_header(f"Step {total_steps}/{total_steps} · Writing configuration")
write_config_yaml(
config_path,
provider_use=llm.provider.use,
model_name=llm.model_name,
display_name=f"{llm.provider.display_name} / {llm.model_name}",
api_key_field=llm.provider.api_key_field,
env_var=llm.provider.env_var,
extra_model_config=llm.provider.extra_config or None,
base_url=llm.base_url,
search_use=search_provider.use if search_provider else None,
search_tool_name=search_provider.tool_name if search_provider else "web_search",
search_extra_config=search_provider.extra_config if search_provider else None,
web_fetch_use=fetch_provider.use if fetch_provider else None,
web_fetch_tool_name=fetch_provider.tool_name if fetch_provider else "web_fetch",
web_fetch_extra_config=fetch_provider.extra_config if fetch_provider else None,
sandbox_use=execution.sandbox_use,
allow_host_bash=execution.allow_host_bash,
include_bash_tool=execution.include_bash_tool,
include_write_tools=execution.include_write_tools,
)
print_success(f"Config written to: {config_path.relative_to(project_root)}")
if not env_path.exists():
env_example = project_root / ".env.example"
if env_example.exists():
import shutil
shutil.copyfile(env_example, env_path)
env_pairs: dict[str, str] = {}
if llm.api_key:
env_pairs[llm.provider.env_var] = llm.api_key
if search_api_key and search_provider and search_provider.env_var:
env_pairs[search_provider.env_var] = search_api_key
if fetch_api_key and fetch_provider and fetch_provider.env_var:
env_pairs[fetch_provider.env_var] = fetch_api_key
if env_pairs:
write_env_file(env_path, env_pairs)
print_success(f"API keys written to: {env_path.relative_to(project_root)}")
frontend_env = project_root / "frontend" / ".env"
frontend_env_example = project_root / "frontend" / ".env.example"
if not frontend_env.exists() and frontend_env_example.exists():
import shutil
shutil.copyfile(frontend_env_example, frontend_env)
print_success("frontend/.env created from example")
print_header("Setup complete!")
print(f" {green('')} LLM: {llm.provider.display_name} / {llm.model_name}")
if search_provider:
print(f" {green('')} Web search: {search_provider.display_name}")
else:
print(f" {'':>3} Web search: not configured")
if fetch_provider:
print(f" {green('')} Web fetch: {fetch_provider.display_name}")
else:
print(f" {'':>3} Web fetch: not configured")
sandbox_label = "Local sandbox" if execution.sandbox_use.endswith("LocalSandboxProvider") else "Container sandbox"
print(f" {green('')} Execution: {sandbox_label}")
if execution.include_bash_tool:
bash_label = "enabled"
if execution.allow_host_bash:
bash_label += " (host bash)"
print(f" {green('')} Bash: {bash_label}")
else:
print(f" {'':>3} Bash: disabled")
if execution.include_write_tools:
print(f" {green('')} File write: enabled")
else:
print(f" {'':>3} File write: disabled")
print()
print("Next steps:")
print(f" {cyan('make install')} # Install dependencies (first time only)")
print(f" {cyan('make dev')} # Start DeerFlow")
print()
print(f"Run {cyan('make doctor')} to verify your setup at any time.")
print()
return 0
except KeyboardInterrupt:
print("\n\nSetup cancelled.")
return 130
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1 @@
# DeerFlow Setup Wizard package

251
scripts/wizard/providers.py Normal file
View file

@ -0,0 +1,251 @@
"""LLM and search provider definitions for the Setup Wizard."""
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass
class LLMProvider:
name: str
display_name: str
description: str
use: str
models: list[str]
default_model: str
env_var: str | None
package: str | None
# Optional: some providers use a different field name for the API key in YAML
api_key_field: str = "api_key"
# Extra config fields beyond the common ones (merged into YAML)
extra_config: dict = field(default_factory=dict)
auth_hint: str | None = None
@dataclass
class WebProvider:
name: str
display_name: str
description: str
use: str
env_var: str | None # None = no API key required
tool_name: str
extra_config: dict = field(default_factory=dict)
@dataclass
class SearchProvider:
name: str
display_name: str
description: str
use: str
env_var: str | None # None = no API key required
tool_name: str = "web_search"
extra_config: dict = field(default_factory=dict)
LLM_PROVIDERS: list[LLMProvider] = [
LLMProvider(
name="openai",
display_name="OpenAI",
description="GPT-4o, GPT-4.1, o3",
use="langchain_openai:ChatOpenAI",
models=["gpt-4o", "gpt-4.1", "o3"],
default_model="gpt-4o",
env_var="OPENAI_API_KEY",
package="langchain-openai",
),
LLMProvider(
name="anthropic",
display_name="Anthropic",
description="Claude Opus 4, Sonnet 4",
use="langchain_anthropic:ChatAnthropic",
models=["claude-opus-4-5", "claude-sonnet-4-5"],
default_model="claude-sonnet-4-5",
env_var="ANTHROPIC_API_KEY",
package="langchain-anthropic",
extra_config={"max_tokens": 8192},
),
LLMProvider(
name="deepseek",
display_name="DeepSeek",
description="V3, R1",
use="langchain_deepseek:ChatDeepSeek",
models=["deepseek-chat", "deepseek-reasoner"],
default_model="deepseek-chat",
env_var="DEEPSEEK_API_KEY",
package="langchain-deepseek",
),
LLMProvider(
name="google",
display_name="Google Gemini",
description="2.0 Flash, 2.5 Pro",
use="langchain_google_genai:ChatGoogleGenerativeAI",
models=["gemini-2.0-flash", "gemini-2.5-pro"],
default_model="gemini-2.0-flash",
env_var="GEMINI_API_KEY",
package="langchain-google-genai",
api_key_field="gemini_api_key",
),
LLMProvider(
name="openrouter",
display_name="OpenRouter",
description="OpenAI-compatible gateway with broad model catalog",
use="langchain_openai:ChatOpenAI",
models=["google/gemini-2.5-flash-preview", "openai/gpt-5-mini", "anthropic/claude-sonnet-4"],
default_model="google/gemini-2.5-flash-preview",
env_var="OPENROUTER_API_KEY",
package="langchain-openai",
extra_config={
"base_url": "https://openrouter.ai/api/v1",
"request_timeout": 600.0,
"max_retries": 2,
"max_tokens": 8192,
"temperature": 0.7,
},
),
LLMProvider(
name="vllm",
display_name="vLLM",
description="Self-hosted OpenAI-compatible serving",
use="deerflow.models.vllm_provider:VllmChatModel",
models=["Qwen/Qwen3-32B", "Qwen/Qwen2.5-Coder-32B-Instruct"],
default_model="Qwen/Qwen3-32B",
env_var="VLLM_API_KEY",
package=None,
extra_config={
"base_url": "http://localhost:8000/v1",
"request_timeout": 600.0,
"max_retries": 2,
"max_tokens": 8192,
"supports_thinking": True,
"supports_vision": False,
"when_thinking_enabled": {
"extra_body": {
"chat_template_kwargs": {
"enable_thinking": True,
}
}
},
},
),
LLMProvider(
name="codex",
display_name="Codex CLI",
description="Uses Codex CLI local auth (~/.codex/auth.json)",
use="deerflow.models.openai_codex_provider:CodexChatModel",
models=["gpt-5.4", "gpt-5-mini"],
default_model="gpt-5.4",
env_var=None,
package=None,
api_key_field="api_key",
extra_config={"supports_thinking": True, "supports_reasoning_effort": True},
auth_hint="Uses existing Codex CLI auth from ~/.codex/auth.json",
),
LLMProvider(
name="claude_code",
display_name="Claude Code OAuth",
description="Uses Claude Code local OAuth credentials",
use="deerflow.models.claude_provider:ClaudeChatModel",
models=["claude-sonnet-4-6", "claude-opus-4-1"],
default_model="claude-sonnet-4-6",
env_var=None,
package=None,
extra_config={"max_tokens": 4096, "supports_thinking": True},
auth_hint="Uses Claude Code OAuth credentials from your local machine",
),
LLMProvider(
name="other",
display_name="Other OpenAI-compatible",
description="Custom gateway with base_url and model name",
use="langchain_openai:ChatOpenAI",
models=["gpt-4o"],
default_model="gpt-4o",
env_var="OPENAI_API_KEY",
package="langchain-openai",
),
]
SEARCH_PROVIDERS: list[SearchProvider] = [
SearchProvider(
name="ddg",
display_name="DuckDuckGo (free, no key needed)",
description="No API key required",
use="deerflow.community.ddg_search.tools:web_search_tool",
env_var=None,
extra_config={"max_results": 5},
),
SearchProvider(
name="tavily",
display_name="Tavily",
description="Recommended, free tier available",
use="deerflow.community.tavily.tools:web_search_tool",
env_var="TAVILY_API_KEY",
extra_config={"max_results": 5},
),
SearchProvider(
name="infoquest",
display_name="InfoQuest",
description="Higher quality vertical search, API key required",
use="deerflow.community.infoquest.tools:web_search_tool",
env_var="INFOQUEST_API_KEY",
extra_config={"search_time_range": 10},
),
SearchProvider(
name="exa",
display_name="Exa",
description="Neural + keyword web search, API key required",
use="deerflow.community.exa.tools:web_search_tool",
env_var="EXA_API_KEY",
extra_config={
"max_results": 5,
"search_type": "auto",
"contents_max_characters": 1000,
},
),
SearchProvider(
name="firecrawl",
display_name="Firecrawl",
description="Search + crawl via Firecrawl API",
use="deerflow.community.firecrawl.tools:web_search_tool",
env_var="FIRECRAWL_API_KEY",
extra_config={"max_results": 5},
),
]
WEB_FETCH_PROVIDERS: list[WebProvider] = [
WebProvider(
name="jina_ai",
display_name="Jina AI Reader",
description="Good default reader, no API key required",
use="deerflow.community.jina_ai.tools:web_fetch_tool",
env_var=None,
tool_name="web_fetch",
extra_config={"timeout": 10},
),
WebProvider(
name="exa",
display_name="Exa",
description="API key required",
use="deerflow.community.exa.tools:web_fetch_tool",
env_var="EXA_API_KEY",
tool_name="web_fetch",
),
WebProvider(
name="infoquest",
display_name="InfoQuest",
description="API key required",
use="deerflow.community.infoquest.tools:web_fetch_tool",
env_var="INFOQUEST_API_KEY",
tool_name="web_fetch",
extra_config={"timeout": 10, "fetch_time": 10, "navigation_timeout": 30},
),
WebProvider(
name="firecrawl",
display_name="Firecrawl",
description="Search-grade crawl with markdown output, API key required",
use="deerflow.community.firecrawl.tools:web_fetch_tool",
env_var="FIRECRAWL_API_KEY",
tool_name="web_fetch",
),
]

View file

@ -0,0 +1 @@
# Setup Wizard steps

View file

@ -0,0 +1,51 @@
"""Step: execution mode and safety-related capabilities."""
from __future__ import annotations
from dataclasses import dataclass
from wizard.ui import ask_choice, ask_yes_no, print_header, print_info, print_warning
LOCAL_SANDBOX = "deerflow.sandbox.local:LocalSandboxProvider"
CONTAINER_SANDBOX = "deerflow.community.aio_sandbox:AioSandboxProvider"
@dataclass
class ExecutionStepResult:
sandbox_use: str
allow_host_bash: bool
include_bash_tool: bool
include_write_tools: bool
def run_execution_step(step_label: str = "Step 3/4") -> ExecutionStepResult:
print_header(f"{step_label} · Execution & Safety")
print_info("Choose how much execution power DeerFlow should have in this workspace.")
options = [
"Local sandbox — fastest, uses host filesystem paths",
"Container sandbox — more isolated, requires Docker or Apple Container",
]
sandbox_idx = ask_choice("Execution mode", options, default=0)
sandbox_use = LOCAL_SANDBOX if sandbox_idx == 0 else CONTAINER_SANDBOX
print()
if sandbox_use == LOCAL_SANDBOX:
print_warning(
"Local sandbox is convenient but not a secure shell isolation boundary."
)
print_info("Keep host bash disabled unless this is a fully trusted local workflow.")
else:
print_info("Container sandbox isolates shell execution better than host-local mode.")
include_bash_tool = ask_yes_no("Enable bash command execution?", default=False)
include_write_tools = ask_yes_no(
"Enable file write tools (write_file, str_replace)?", default=True
)
return ExecutionStepResult(
sandbox_use=sandbox_use,
allow_host_bash=sandbox_use == LOCAL_SANDBOX and include_bash_tool,
include_bash_tool=include_bash_tool,
include_write_tools=include_write_tools,
)

View file

@ -0,0 +1,76 @@
"""Step 1: LLM provider selection."""
from __future__ import annotations
from dataclasses import dataclass
from wizard.providers import LLM_PROVIDERS, LLMProvider
from wizard.ui import (
ask_choice,
ask_secret,
ask_text,
print_header,
print_info,
print_success,
)
@dataclass
class LLMStepResult:
provider: LLMProvider
model_name: str
api_key: str | None
base_url: str | None = None
def run_llm_step(step_label: str = "Step 1/3") -> LLMStepResult:
print_header(f"{step_label} · Choose your LLM provider")
options = [f"{p.display_name} ({p.description})" for p in LLM_PROVIDERS]
idx = ask_choice("Enter choice", options)
provider = LLM_PROVIDERS[idx]
print()
# Model selection (show list, default to first)
if len(provider.models) > 1:
print_info(f"Available models for {provider.display_name}:")
model_idx = ask_choice("Select model", provider.models, default=0)
model_name = provider.models[model_idx]
else:
model_name = provider.models[0]
print()
base_url: str | None = None
if provider.name in {"openrouter", "vllm"}:
base_url = provider.extra_config.get("base_url")
if provider.name == "other":
print_header(f"{step_label} · Connection details")
base_url = ask_text("Base URL (e.g. https://api.openai.com/v1)", required=True)
model_name = ask_text("Model name", default=provider.default_model)
elif provider.auth_hint:
print_header(f"{step_label} · Authentication")
print_info(provider.auth_hint)
api_key = None
return LLMStepResult(
provider=provider,
model_name=model_name,
api_key=api_key,
base_url=base_url,
)
print_header(f"{step_label} · Enter your API Key")
if provider.env_var:
api_key = ask_secret(f"{provider.env_var}")
else:
api_key = None
if api_key:
print_success(f"Key will be saved to .env as {provider.env_var}")
return LLMStepResult(
provider=provider,
model_name=model_name,
api_key=api_key,
base_url=base_url,
)

View file

@ -0,0 +1,66 @@
"""Step: Web search configuration."""
from __future__ import annotations
from dataclasses import dataclass
from wizard.providers import SEARCH_PROVIDERS, WEB_FETCH_PROVIDERS, SearchProvider, WebProvider
from wizard.ui import ask_choice, ask_secret, print_header, print_info, print_success
@dataclass
class SearchStepResult:
search_provider: SearchProvider | None # None = skip
search_api_key: str | None
fetch_provider: WebProvider | None # None = skip
fetch_api_key: str | None
def run_search_step(step_label: str = "Step 3/3") -> SearchStepResult:
print_header(f"{step_label} · Web Search & Fetch (optional)")
provided_keys: dict[str, str] = {}
search_options = [f"{p.display_name}{p.description}" for p in SEARCH_PROVIDERS]
search_options.append("Skip for now (agent still works without web search)")
idx = ask_choice("Choose a web search provider", search_options, default=0)
search_provider: SearchProvider | None = None
search_api_key: str | None = None
if idx >= len(SEARCH_PROVIDERS):
search_provider = None
else:
search_provider = SEARCH_PROVIDERS[idx]
if search_provider.env_var:
print()
search_api_key = ask_secret(f"{search_provider.env_var}")
provided_keys[search_provider.env_var] = search_api_key
print_success(f"Key will be saved to .env as {search_provider.env_var}")
print()
fetch_options = [f"{p.display_name}{p.description}" for p in WEB_FETCH_PROVIDERS]
fetch_options.append("Skip for now (agent can still answer without web fetch)")
idx = ask_choice("Choose a web fetch provider", fetch_options, default=0)
fetch_provider: WebProvider | None = None
fetch_api_key: str | None = None
if idx < len(WEB_FETCH_PROVIDERS):
fetch_provider = WEB_FETCH_PROVIDERS[idx]
if fetch_provider.env_var:
if fetch_provider.env_var in provided_keys:
fetch_api_key = provided_keys[fetch_provider.env_var]
print()
print_info(f"Reusing {fetch_provider.env_var} from web search provider")
else:
print()
fetch_api_key = ask_secret(f"{fetch_provider.env_var}")
provided_keys[fetch_provider.env_var] = fetch_api_key
print_success(f"Key will be saved to .env as {fetch_provider.env_var}")
return SearchStepResult(
search_provider=search_provider,
search_api_key=search_api_key,
fetch_provider=fetch_provider,
fetch_api_key=fetch_api_key,
)

261
scripts/wizard/ui.py Normal file
View file

@ -0,0 +1,261 @@
"""Terminal UI helpers for the Setup Wizard."""
from __future__ import annotations
import getpass
import shutil
import sys
try:
import termios
import tty
except ImportError: # pragma: no cover - non-Unix fallback
termios = None
tty = None
# ── ANSI colours ──────────────────────────────────────────────────────────────
def _supports_color() -> bool:
return hasattr(sys.stdout, "isatty") and sys.stdout.isatty()
def _c(text: str, code: str) -> str:
if _supports_color():
return f"\033[{code}m{text}\033[0m"
return text
def green(text: str) -> str:
return _c(text, "32")
def red(text: str) -> str:
return _c(text, "31")
def yellow(text: str) -> str:
return _c(text, "33")
def cyan(text: str) -> str:
return _c(text, "36")
def bold(text: str) -> str:
return _c(text, "1")
def inverse(text: str) -> str:
return _c(text, "7")
# ── UI primitives ─────────────────────────────────────────────────────────────
def print_header(title: str) -> None:
width = max(len(title) + 4, 44)
bar = "" * width
print()
print(f"{bar}")
print(f"{title.ljust(width - 2)}")
print(f"{bar}")
print()
def print_section(title: str) -> None:
print()
print(bold(f"── {title} ──"))
print()
def print_success(message: str) -> None:
print(f" {green('')} {message}")
def print_warning(message: str) -> None:
print(f" {yellow('!')} {message}")
def print_error(message: str) -> None:
print(f" {red('')} {message}")
def print_info(message: str) -> None:
print(f" {cyan('')} {message}")
def _ask_choice_with_numbers(prompt: str, options: list[str], default: int | None = None) -> int:
for i, opt in enumerate(options, 1):
marker = f" {green('*')}" if default is not None and i - 1 == default else " "
print(f"{marker} {i}. {opt}")
print()
while True:
suffix = f" [{default + 1}]" if default is not None else ""
raw = input(f"{prompt}{suffix}: ").strip()
if raw == "" and default is not None:
return default
if raw.isdigit():
idx = int(raw) - 1
if 0 <= idx < len(options):
return idx
print(f" Please enter a number between 1 and {len(options)}.")
def _supports_arrow_menu() -> bool:
return (
termios is not None
and tty is not None
and hasattr(sys.stdin, "isatty")
and hasattr(sys.stdout, "isatty")
and sys.stdin.isatty()
and sys.stdout.isatty()
and sys.stderr.isatty()
)
def _clear_rendered_lines(count: int) -> None:
if count <= 0:
return
sys.stdout.write("\x1b[2K\r")
for _ in range(count):
sys.stdout.write("\x1b[1A\x1b[2K\r")
def _read_key(fd: int) -> str:
first = sys.stdin.read(1)
if first != "\x1b":
return first
second = sys.stdin.read(1)
if second != "[":
return first
third = sys.stdin.read(1)
return f"\x1b[{third}"
def _terminal_width() -> int:
return max(shutil.get_terminal_size(fallback=(80, 24)).columns, 40)
def _truncate_line(text: str, max_width: int) -> str:
if len(text) <= max_width:
return text
if max_width <= 1:
return text[:max_width]
return f"{text[: max_width - 1]}"
def _render_choice_menu(options: list[str], selected: int) -> int:
number_width = len(str(len(options)))
menu_width = _terminal_width()
content_width = max(menu_width - 3, 20)
for i, opt in enumerate(options, 1):
line = _truncate_line(f"{i:>{number_width}}. {opt}", content_width)
if i - 1 == selected:
print(f"{green('')} {inverse(bold(line))}")
else:
print(f" {line}")
sys.stdout.flush()
return len(options)
def _ask_choice_with_arrows(prompt: str, options: list[str], default: int | None = None) -> int:
selected = default if default is not None else 0
typed = ""
fd = sys.stdin.fileno()
original_settings = termios.tcgetattr(fd)
rendered_lines = 0
try:
sys.stdout.write("\x1b[?25l")
sys.stdout.flush()
tty.setcbreak(fd)
prompt_help = f"{prompt} (↑/↓ move, Enter confirm, number quick-select)"
print(cyan(_truncate_line(prompt_help, max(_terminal_width() - 2, 20))))
while True:
if rendered_lines:
_clear_rendered_lines(rendered_lines)
rendered_lines = _render_choice_menu(options, selected)
key = _read_key(fd)
if key == "\x03":
raise KeyboardInterrupt
if key in ("\r", "\n"):
if typed:
idx = int(typed) - 1
if 0 <= idx < len(options):
selected = idx
typed = ""
break
if key == "\x1b[A":
selected = (selected - 1) % len(options)
typed = ""
continue
if key == "\x1b[B":
selected = (selected + 1) % len(options)
typed = ""
continue
if key in ("\x7f", "\b"):
typed = typed[:-1]
continue
if key.isdigit():
typed += key
continue
if rendered_lines:
_clear_rendered_lines(rendered_lines)
print(f"{prompt}: {options[selected]}")
return selected
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, original_settings)
sys.stdout.write("\x1b[?25h")
sys.stdout.flush()
def ask_choice(prompt: str, options: list[str], default: int | None = None) -> int:
"""Present a menu and return the 0-based index of the selected option."""
if _supports_arrow_menu():
return _ask_choice_with_arrows(prompt, options, default=default)
return _ask_choice_with_numbers(prompt, options, default=default)
def ask_text(prompt: str, default: str = "", required: bool = False) -> str:
"""Ask for a text value, returning default if the user presses Enter."""
suffix = f" [{default}]" if default else ""
while True:
value = input(f"{prompt}{suffix}: ").strip()
if value:
return value
if default:
return default
if not required:
return ""
print(" This field is required.")
def ask_secret(prompt: str) -> str:
"""Ask for a secret value (hidden input)."""
while True:
value = getpass.getpass(f"{prompt}: ").strip()
if value:
return value
print(" API key cannot be empty.")
def ask_yes_no(prompt: str, default: bool = True) -> bool:
"""Ask a yes/no question."""
suffix = "[Y/N]"
while True:
raw = input(f"{prompt} {suffix}: ").strip().lower()
if raw == "":
return default
if raw in ("y", "yes"):
return True
if raw in ("n", "no"):
return False
print(" Please enter y or n.")

290
scripts/wizard/writer.py Normal file
View file

@ -0,0 +1,290 @@
"""Config file writer for the Setup Wizard.
Writes config.yaml as a minimal working configuration and updates .env
without wiping existing user customisations where possible.
"""
from __future__ import annotations
from copy import deepcopy
from pathlib import Path
from typing import Any
import yaml
def _project_root() -> Path:
return Path(__file__).resolve().parents[2]
# ── .env helpers ──────────────────────────────────────────────────────────────
def read_env_file(env_path: Path) -> dict[str, str]:
"""Parse a .env file into a dict (ignores comments and blank lines)."""
result: dict[str, str] = {}
if not env_path.exists():
return result
for line in env_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, _, value = line.partition("=")
result[key.strip()] = value.strip()
return result
def write_env_file(env_path: Path, pairs: dict[str, str]) -> None:
"""Merge *pairs* into an existing (or new) .env file.
Existing keys are updated in place; new keys are appended.
Lines with comments and other formatting are preserved.
"""
lines: list[str] = []
if env_path.exists():
lines = env_path.read_text(encoding="utf-8").splitlines()
updated: set[str] = set()
new_lines: list[str] = []
for line in lines:
stripped = line.strip()
if stripped and not stripped.startswith("#") and "=" in stripped:
key = stripped.split("=", 1)[0].strip()
if key in pairs:
new_lines.append(f"{key}={pairs[key]}")
updated.add(key)
continue
new_lines.append(line)
for key, value in pairs.items():
if key not in updated:
new_lines.append(f"{key}={value}")
env_path.write_text("\n".join(new_lines) + "\n", encoding="utf-8")
# ── config.yaml helpers ───────────────────────────────────────────────────────
def _yaml_dump(data: Any) -> str:
return yaml.safe_dump(data, default_flow_style=False, allow_unicode=True, sort_keys=False)
def _default_tools() -> list[dict[str, Any]]:
return [
{"name": "image_search", "use": "deerflow.community.image_search.tools:image_search_tool", "group": "web", "max_results": 5},
{"name": "ls", "use": "deerflow.sandbox.tools:ls_tool", "group": "file:read"},
{"name": "read_file", "use": "deerflow.sandbox.tools:read_file_tool", "group": "file:read"},
{"name": "glob", "use": "deerflow.sandbox.tools:glob_tool", "group": "file:read"},
{"name": "grep", "use": "deerflow.sandbox.tools:grep_tool", "group": "file:read"},
{"name": "write_file", "use": "deerflow.sandbox.tools:write_file_tool", "group": "file:write"},
{"name": "str_replace", "use": "deerflow.sandbox.tools:str_replace_tool", "group": "file:write"},
{"name": "bash", "use": "deerflow.sandbox.tools:bash_tool", "group": "bash"},
]
def _build_tools(
*,
base_tools: list[dict[str, Any]] | None,
search_use: str | None,
search_tool_name: str,
search_extra_config: dict | None,
web_fetch_use: str | None,
web_fetch_tool_name: str,
web_fetch_extra_config: dict | None,
include_bash_tool: bool,
include_write_tools: bool,
) -> list[dict[str, Any]]:
tools = deepcopy(base_tools if base_tools is not None else _default_tools())
tools = [
tool
for tool in tools
if tool.get("name") not in {search_tool_name, web_fetch_tool_name, "write_file", "str_replace", "bash"}
]
web_group = "web"
if search_use:
search_tool: dict[str, Any] = {
"name": search_tool_name,
"use": search_use,
"group": web_group,
}
if search_extra_config:
search_tool.update(search_extra_config)
tools.insert(0, search_tool)
if web_fetch_use:
fetch_tool: dict[str, Any] = {
"name": web_fetch_tool_name,
"use": web_fetch_use,
"group": web_group,
}
if web_fetch_extra_config:
fetch_tool.update(web_fetch_extra_config)
insert_idx = 1 if search_use else 0
tools.insert(insert_idx, fetch_tool)
if include_write_tools:
tools.extend(
[
{"name": "write_file", "use": "deerflow.sandbox.tools:write_file_tool", "group": "file:write"},
{"name": "str_replace", "use": "deerflow.sandbox.tools:str_replace_tool", "group": "file:write"},
]
)
if include_bash_tool:
tools.append({"name": "bash", "use": "deerflow.sandbox.tools:bash_tool", "group": "bash"})
return tools
def _make_model_config_name(model_name: str) -> str:
"""Derive a meaningful config model name from the provider model identifier.
Replaces path separators and dots with hyphens so the result is a clean
YAML-friendly identifier (e.g. "google/gemini-2.5-pro" "gemini-2-5-pro",
"gpt-5.4" "gpt-5-4", "deepseek-chat" "deepseek-chat").
"""
# Take only the last path component for namespaced models (e.g. "org/model-name")
base = model_name.split("/")[-1]
# Replace dots with hyphens so "gpt-5.4" → "gpt-5-4"
return base.replace(".", "-")
def build_minimal_config(
*,
provider_use: str,
model_name: str,
display_name: str,
api_key_field: str,
env_var: str | None,
extra_model_config: dict | None = None,
base_url: str | None = None,
search_use: str | None = None,
search_tool_name: str = "web_search",
search_extra_config: dict | None = None,
web_fetch_use: str | None = None,
web_fetch_tool_name: str = "web_fetch",
web_fetch_extra_config: dict | None = None,
sandbox_use: str = "deerflow.sandbox.local:LocalSandboxProvider",
allow_host_bash: bool = False,
include_bash_tool: bool = False,
include_write_tools: bool = True,
config_version: int = 5,
base_config: dict[str, Any] | None = None,
) -> str:
"""Build the content of a minimal config.yaml."""
from datetime import date
today = date.today().isoformat()
model_entry: dict[str, Any] = {
"name": _make_model_config_name(model_name),
"display_name": display_name,
"use": provider_use,
"model": model_name,
}
if env_var:
model_entry[api_key_field] = f"${env_var}"
extra_model_fields = dict(extra_model_config or {})
if "base_url" in extra_model_fields and not base_url:
base_url = extra_model_fields.pop("base_url")
if base_url:
model_entry["base_url"] = base_url
if extra_model_fields:
model_entry.update(extra_model_fields)
data: dict[str, Any] = deepcopy(base_config or {})
data["config_version"] = config_version
data["models"] = [model_entry]
base_tools = data.get("tools")
if not isinstance(base_tools, list):
base_tools = None
tools = _build_tools(
base_tools=base_tools,
search_use=search_use,
search_tool_name=search_tool_name,
search_extra_config=search_extra_config,
web_fetch_use=web_fetch_use,
web_fetch_tool_name=web_fetch_tool_name,
web_fetch_extra_config=web_fetch_extra_config,
include_bash_tool=include_bash_tool,
include_write_tools=include_write_tools,
)
data["tools"] = tools
sandbox_config = deepcopy(data.get("sandbox") if isinstance(data.get("sandbox"), dict) else {})
sandbox_config["use"] = sandbox_use
if sandbox_use == "deerflow.sandbox.local:LocalSandboxProvider":
sandbox_config["allow_host_bash"] = allow_host_bash
else:
sandbox_config.pop("allow_host_bash", None)
data["sandbox"] = sandbox_config
header = (
f"# DeerFlow Configuration\n"
f"# Generated by 'make setup' on {today}\n"
f"# Run 'make setup' to reconfigure, or edit this file for advanced options.\n"
f"# Full reference: config.example.yaml\n\n"
)
return header + _yaml_dump(data)
def write_config_yaml(
config_path: Path,
*,
provider_use: str,
model_name: str,
display_name: str,
api_key_field: str,
env_var: str | None,
extra_model_config: dict | None = None,
base_url: str | None = None,
search_use: str | None = None,
search_tool_name: str = "web_search",
search_extra_config: dict | None = None,
web_fetch_use: str | None = None,
web_fetch_tool_name: str = "web_fetch",
web_fetch_extra_config: dict | None = None,
sandbox_use: str = "deerflow.sandbox.local:LocalSandboxProvider",
allow_host_bash: bool = False,
include_bash_tool: bool = False,
include_write_tools: bool = True,
) -> None:
"""Write (or overwrite) config.yaml with a minimal working configuration."""
# Read config_version from config.example.yaml if present
config_version = 5
example_path = config_path.parent / "config.example.yaml"
if example_path.exists():
try:
import yaml as _yaml
raw = _yaml.safe_load(example_path.read_text(encoding="utf-8")) or {}
config_version = int(raw.get("config_version", 5))
example_defaults = raw
except Exception:
example_defaults = None
else:
example_defaults = None
content = build_minimal_config(
provider_use=provider_use,
model_name=model_name,
display_name=display_name,
api_key_field=api_key_field,
env_var=env_var,
extra_model_config=extra_model_config,
base_url=base_url,
search_use=search_use,
search_tool_name=search_tool_name,
search_extra_config=search_extra_config,
web_fetch_use=web_fetch_use,
web_fetch_tool_name=web_fetch_tool_name,
web_fetch_extra_config=web_fetch_extra_config,
sandbox_use=sandbox_use,
allow_host_bash=allow_host_bash,
include_bash_tool=include_bash_tool,
include_write_tools=include_write_tools,
config_version=config_version,
base_config=example_defaults,
)
config_path.write_text(content, encoding="utf-8")