mirror of
https://github.com/agent0ai/agent-zero.git
synced 2026-05-17 21:19:46 +00:00
Create a generic OAuth Connections plugin with Codex/ChatGPT Account as the first provider, using OpenAI's device-code flow to persist Codex-compatible account tokens. Expose a loopback OpenAI-compatible wrapper for models, responses, and chat completions, and point LiteLLM at the container-local Agent Zero origin. Add a dummy API-key extension and focused tests so the account-backed provider appears configured without requiring a user-entered key. docs: add Codex plan OAuth callout Highlight that Agent Zero can use an existing OpenAI Codex plan through the new OAuth flow. Add the account-backed LLM plans image and surface the section from the README navigation, while pointing toward future Gemini CLI and Claude Code integrations. Handle Codex account SSE chat chunks Teach the Codex/ChatGPT account bridge to extract text from OpenAI-style SSE chat completion deltas and fall back to a normal output_text response when upstream only streams chunks. Strip user-supplied stream kwargs before LiteLLM calls so Agent Zero owns streaming mode and custom parameters cannot pass stream twice. Add targeted tests for streamed delta extraction and reconstructed responses. update README.md with LLM plans mention
112 lines
3.6 KiB
Python
112 lines
3.6 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from helpers import plugins
|
|
|
|
|
|
PLUGIN_NAME = "_oauth"
|
|
|
|
DEFAULT_CODEX_CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
|
|
DEFAULT_CODEX_ISSUER = "https://auth.openai.com"
|
|
DEFAULT_CODEX_TOKEN_URL = "https://auth.openai.com/oauth/token"
|
|
DEFAULT_CODEX_BASE_URL = "https://chatgpt.com/backend-api/codex"
|
|
DEFAULT_CODEX_SCOPES = [
|
|
"openid",
|
|
"profile",
|
|
"email",
|
|
"offline_access",
|
|
"api.connectors.read",
|
|
"api.connectors.invoke",
|
|
]
|
|
|
|
|
|
def oauth_config() -> dict[str, Any]:
|
|
value = plugins.get_plugin_config(PLUGIN_NAME) or {}
|
|
return value if isinstance(value, dict) else {}
|
|
|
|
|
|
def codex_config(config: dict[str, Any] | None = None) -> dict[str, Any]:
|
|
source = config if isinstance(config, dict) else oauth_config()
|
|
raw = source.get("codex", {}) if isinstance(source, dict) else {}
|
|
raw = raw if isinstance(raw, dict) else {}
|
|
|
|
return {
|
|
"enabled": _as_bool(raw.get("enabled"), True),
|
|
"auth_file_path": _as_str(raw.get("auth_file_path")),
|
|
"issuer": _trim_url(raw.get("issuer"), DEFAULT_CODEX_ISSUER),
|
|
"token_url": _as_str(raw.get("token_url")) or DEFAULT_CODEX_TOKEN_URL,
|
|
"client_id": _as_str(raw.get("client_id")) or DEFAULT_CODEX_CLIENT_ID,
|
|
"scopes": _as_str_list(raw.get("scopes")) or DEFAULT_CODEX_SCOPES,
|
|
"open_browser_from_server": _as_bool(raw.get("open_browser_from_server"), False),
|
|
"forced_workspace_id": _as_str(raw.get("forced_workspace_id")),
|
|
"upstream_base_url": _trim_url(raw.get("upstream_base_url"), DEFAULT_CODEX_BASE_URL),
|
|
"codex_version": _as_str(raw.get("codex_version")),
|
|
"models": _as_str_list(raw.get("models")),
|
|
"request_timeout_seconds": _as_int(raw.get("request_timeout_seconds"), 120),
|
|
"proxy_base_path": _normalize_base_path(raw.get("proxy_base_path"), "/oauth/codex"),
|
|
"callback_path": _normalize_base_path(raw.get("callback_path"), "/auth/callback"),
|
|
"require_proxy_token": _as_bool(raw.get("require_proxy_token"), False),
|
|
"proxy_token": _as_str(raw.get("proxy_token")),
|
|
}
|
|
|
|
|
|
def _as_str(value: Any) -> str:
|
|
if value is None:
|
|
return ""
|
|
return str(value).strip()
|
|
|
|
|
|
def _as_int(value: Any, default: int) -> int:
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return default
|
|
|
|
|
|
def _as_bool(value: Any, default: bool) -> bool:
|
|
if value is None or value == "":
|
|
return default
|
|
if isinstance(value, bool):
|
|
return value
|
|
if isinstance(value, (int, float)):
|
|
return bool(value)
|
|
normalized = str(value).strip().lower()
|
|
if normalized in {"1", "true", "yes", "on", "enabled"}:
|
|
return True
|
|
if normalized in {"0", "false", "no", "off", "disabled"}:
|
|
return False
|
|
return default
|
|
|
|
|
|
def _as_str_list(value: Any) -> list[str]:
|
|
if value is None:
|
|
return []
|
|
if isinstance(value, str):
|
|
values = value.replace(",", "\n").splitlines()
|
|
elif isinstance(value, (list, tuple, set)):
|
|
values = list(value)
|
|
else:
|
|
values = [value]
|
|
|
|
result: list[str] = []
|
|
seen: set[str] = set()
|
|
for item in values:
|
|
text = _as_str(item)
|
|
if not text or text in seen:
|
|
continue
|
|
seen.add(text)
|
|
result.append(text)
|
|
return result
|
|
|
|
|
|
def _trim_url(value: Any, default: str) -> str:
|
|
text = _as_str(value) or default
|
|
return text.rstrip("/")
|
|
|
|
|
|
def _normalize_base_path(value: Any, default: str) -> str:
|
|
text = _as_str(value) or default
|
|
if not text.startswith("/"):
|
|
text = "/" + text
|
|
return text.rstrip("/") or default
|