Add Codex/ChatGPT account OAuth provider

Create a generic OAuth Connections plugin with Codex/ChatGPT Account as the first provider, using OpenAI's device-code flow to persist Codex-compatible account tokens.

Expose a loopback OpenAI-compatible wrapper for models, responses, and chat completions, and point LiteLLM at the container-local Agent Zero origin.

Add a dummy API-key extension and focused tests so the account-backed provider appears configured without requiring a user-entered key.

docs: add Codex plan OAuth callout

Highlight that Agent Zero can use an existing OpenAI Codex plan through the new OAuth flow.

Add the account-backed LLM plans image and surface the section from the README navigation, while pointing toward future Gemini CLI and Claude Code integrations.

Handle Codex account SSE chat chunks

Teach the Codex/ChatGPT account bridge to extract text from OpenAI-style SSE chat completion deltas and fall back to a normal output_text response when upstream only streams chunks.

Strip user-supplied stream kwargs before LiteLLM calls so Agent Zero owns streaming mode and custom parameters cannot pass stream twice.

Add targeted tests for streamed delta extraction and reconstructed responses.

update README.md with LLM plans mention
This commit is contained in:
Alessandro 2026-04-28 15:11:37 +02:00
parent ff828e294e
commit f67564a8ae
23 changed files with 2511 additions and 5 deletions

View file

@ -14,7 +14,8 @@ Agent Zero is a dynamic, organic agentic framework for running AI agents that ca
[Introduction](#what-agent-zero-is) |
[Space Agent](#agent-zero-and-space-agent) |
[Quick Start](#quick-start) |
[Quick Start](#how-to-install) |
[LLM Plans](#use-your-openai-codex-plan) |
[CLI Connector](#a0-cli-connector-use-agent-zero-on-your-host-machine) |
[Features](#what-makes-agent-zero-different) |
[Examples](#try-these-first) |
@ -98,6 +99,16 @@ For web and mobile development, Annotate mode lets you click page elements or re
The Browser also supports Chrome extensions installed from the Chrome Web Store directly inside the Agent Zero browser environment, so workflows can use the same kind of browser capabilities real users depend on.
## Use Your OpenAI Codex Plan
Agent Zero can now connect to your OpenAI Codex plan through the new OAuth flow. Sign in with your account, pick the Codex-backed provider, and let Agent Zero use the plan you already have.
<img width="1184" height="604" alt="OAuth LLM plans in Agent Zero" src="docs/res/oauth_llm_plans-ok.png" />
<br>
Click "Connect", enter the device code in the OpenAI page. Choose your model after checking the list, and you're all set.
This is the first step toward account-backed LLM plans in Agent Zero. More integrations are coming, including Gemini CLI, Claude Code based on extra-usage, and more.
# A0 CLI Connector: Use Agent Zero on Your Host Machine

Binary file not shown.

After

Width:  |  Height:  |  Size: 53 KiB

View file

@ -396,8 +396,9 @@ class LiteLLMChatWrapper(SimpleChatModel):
apply_rate_limiter_sync(self.a0_model_conf, str(msgs))
# Call the model
call_kwargs = _without_stream_kwarg({**self.kwargs, **kwargs})
resp = completion(
model=self.model_name, messages=msgs, stop=stop, **{**self.kwargs, **kwargs}
model=self.model_name, messages=msgs, stop=stop, **call_kwargs
)
# Parse output
@ -420,13 +421,14 @@ class LiteLLMChatWrapper(SimpleChatModel):
apply_rate_limiter_sync(self.a0_model_conf, str(msgs))
result = ChatGenerationResult()
call_kwargs = _without_stream_kwarg({**self.kwargs, **kwargs})
for chunk in completion(
model=self.model_name,
messages=msgs,
stream=True,
stop=stop,
**{**self.kwargs, **kwargs},
**call_kwargs,
):
# parse chunk
parsed = _parse_chunk(chunk) # chunk parsing
@ -451,13 +453,14 @@ class LiteLLMChatWrapper(SimpleChatModel):
await apply_rate_limiter(self.a0_model_conf, str(msgs))
result = ChatGenerationResult()
call_kwargs = _without_stream_kwarg({**self.kwargs, **kwargs})
response = await acompletion(
model=self.model_name,
messages=msgs,
stream=True,
stop=stop,
**{**self.kwargs, **kwargs},
**call_kwargs,
)
async for chunk in response: # type: ignore
# parse chunk
@ -504,7 +507,7 @@ class LiteLLMChatWrapper(SimpleChatModel):
)
# Prepare call kwargs and retry config (strip A0-only params before calling LiteLLM)
call_kwargs: dict[str, Any] = {**self.kwargs, **kwargs}
call_kwargs: dict[str, Any] = _without_stream_kwarg({**self.kwargs, **kwargs})
max_retries: int = int(call_kwargs.pop("a0_retry_attempts", 2))
retry_delay_s: float = float(call_kwargs.pop("a0_retry_delay_seconds", 1.5))
stream = reasoning_callback is not None or response_callback is not None or tokens_callback is not None
@ -760,6 +763,11 @@ def _parse_chunk(chunk: Any) -> ChatChunk:
return ChatChunk(reasoning_delta=reasoning_delta, response_delta=response_delta)
def _without_stream_kwarg(kwargs: dict[str, Any]) -> dict[str, Any]:
kwargs.pop("stream", None)
return kwargs
def _adjust_call_args(provider_name: str, model_name: str, kwargs: dict):

12
plugins/_oauth/README.md Normal file
View file

@ -0,0 +1,12 @@
# OAuth Connections
Generic local OAuth bridge for Agent Zero.
The first provider is `Codex/ChatGPT Account`:
- signs in with OpenAI's Codex device-code flow
- writes Codex-compatible `auth.json` credentials
- refreshes local tokens when needed
- exposes a loopback OpenAI-compatible wrapper at `/oauth/codex/v1`
Tokens in `auth.json` are password-equivalent credentials. Keep this plugin on trusted local machines only.

View file

@ -0,0 +1,13 @@
from __future__ import annotations
from helpers.api import ApiHandler, Request
from plugins._oauth.helpers import codex
class Models(ApiHandler):
async def process(self, input: dict, request: Request) -> dict:
try:
models = codex.fetch_models()
return {"ok": True, "models": models}
except Exception as exc:
return {"ok": False, "error": str(exc), "models": []}

View file

@ -0,0 +1,35 @@
from __future__ import annotations
from helpers.api import ApiHandler, Request
from plugins._oauth.helpers import codex
from plugins._oauth.helpers.state import get_device_attempt, pop_device_attempt
class PollDeviceLogin(ApiHandler):
async def process(self, input: dict, request: Request) -> dict:
attempt_id = str(input.get("attempt_id") or "").strip()
if not attempt_id:
return {"ok": False, "error": "Missing device authorization attempt."}
attempt = get_device_attempt(attempt_id)
if attempt is None:
return {"ok": False, "expired": True, "error": "Device authorization expired."}
try:
result = codex.poll_device_authorization(
attempt.device_auth_id,
attempt.user_code,
)
except Exception as exc:
return {"ok": False, "error": str(exc)}
if result.get("completed"):
pop_device_attempt(attempt_id)
return {"ok": True, "completed": True, "account_id": result.get("account_id", "")}
return {
"ok": True,
"completed": False,
"interval": attempt.interval,
"expires_at": attempt.expires_at,
}

View file

@ -0,0 +1,36 @@
from __future__ import annotations
import secrets
from helpers.api import ApiHandler, Request
from plugins._oauth.helpers import codex
from plugins._oauth.helpers.config import codex_config
from plugins._oauth.helpers.state import put_device_attempt
class StartDeviceLogin(ApiHandler):
async def process(self, input: dict, request: Request) -> dict:
cfg = codex_config()
if not cfg["enabled"]:
return {"ok": False, "error": "Codex/ChatGPT account connection is disabled."}
try:
device = codex.request_device_code()
attempt_id = secrets.token_urlsafe(24)
attempt = put_device_attempt(
attempt_id,
device["device_auth_id"],
device["user_code"],
device["interval"],
device["expires_at"],
)
return {
"ok": True,
"attempt_id": attempt.attempt_id,
"verification_url": device["verification_url"],
"user_code": attempt.user_code,
"interval": attempt.interval,
"expires_at": attempt.expires_at,
}
except Exception as exc:
return {"ok": False, "error": str(exc)}

View file

@ -0,0 +1,54 @@
from __future__ import annotations
import webbrowser
from helpers.api import ApiHandler, Request
from plugins._oauth.helpers import codex
from plugins._oauth.helpers.config import codex_config
from plugins._oauth.helpers.state import put_attempt
class StartLogin(ApiHandler):
async def process(self, input: dict, request: Request) -> dict:
cfg = codex_config()
if not cfg["enabled"]:
return {"ok": False, "error": "Codex/ChatGPT account connection is disabled."}
redirect_uri = _redirect_uri(request, cfg["callback_path"])
pkce = codex.generate_pkce()
state = codex.generate_state()
attempt = put_attempt(state, pkce.verifier, redirect_uri)
auth_url = codex.build_authorize_url(redirect_uri, state, pkce)
if cfg["open_browser_from_server"]:
try:
webbrowser.open(auth_url)
except Exception:
pass
return {
"ok": True,
"auth_url": auth_url,
"redirect_uri": redirect_uri,
"expires_at": attempt.expires_at,
}
def _redirect_uri(request: Request, callback_path: str) -> str:
origin = (request.headers.get("Origin") or "").rstrip("/")
if not _is_local_origin(origin):
origin = request.url_root.rstrip("/")
return f"{origin}{callback_path}"
def _is_local_origin(origin: str) -> bool:
if not origin:
return False
return (
origin.startswith("http://localhost:")
or origin == "http://localhost"
or origin.startswith("http://127.0.0.1:")
or origin == "http://127.0.0.1"
or origin.startswith("http://[::1]:")
or origin == "http://[::1]"
)

View file

@ -0,0 +1,22 @@
from __future__ import annotations
from helpers.api import ApiHandler, Request
from plugins._oauth.helpers import codex
from plugins._oauth.helpers.config import codex_config
from plugins._oauth.helpers.route_bootstrap import is_installed
class Status(ApiHandler):
async def process(self, input: dict, request: Request) -> dict:
cfg = codex_config()
return {
"ok": True,
"routes_installed": is_installed(),
"codex": {
**codex.status(),
"enabled": cfg["enabled"],
"proxy_base_path": cfg["proxy_base_path"],
"callback_path": cfg["callback_path"],
"v1_base_path": f'{cfg["proxy_base_path"]}/v1',
},
}

View file

@ -0,0 +1,9 @@
chat:
codex_oauth:
name: Codex/ChatGPT Account
litellm_provider: openai
models_list:
endpoint_url: "/models"
kwargs:
api_base: "http://127.0.0.1/oauth/codex/v1"
api_key: "oauth"

View file

@ -0,0 +1,33 @@
# Generic OAuth connection settings. Only Codex is implemented for now.
codex:
enabled: true
# Empty means auto-discover CODEX_HOME/auth.json, ~/.codex/auth.json,
# CHATGPT_LOCAL_HOME/auth.json, then ~/.chatgpt-local/auth.json.
auth_file_path: ""
issuer: "https://auth.openai.com"
token_url: "https://auth.openai.com/oauth/token"
client_id: "app_EMoamEEZ73f0CkXaXp7hrann"
scopes:
- openid
- profile
- email
- offline_access
- api.connectors.read
- api.connectors.invoke
open_browser_from_server: false
forced_workspace_id: ""
upstream_base_url: "https://chatgpt.com/backend-api/codex"
codex_version: ""
models: []
request_timeout_seconds: 120
# The OpenAI-compatible wrapper is mounted at /oauth/codex/v1.
# It is loopback-only by default and does not emit CORS headers.
proxy_base_path: "/oauth/codex"
callback_path: "/auth/callback"
require_proxy_token: false
proxy_token: ""

View file

@ -0,0 +1,28 @@
from __future__ import annotations
from helpers.extension import Extension
DUMMY_API_KEY = "oauth"
PROVIDERS = {"codex_oauth"}
class CodexAccountDummyKey(Extension):
def execute(self, data: dict | None = None, **kwargs):
if not isinstance(data, dict):
return
args = data.get("args")
call_kwargs = data.get("kwargs")
service = ""
if isinstance(args, tuple) and args:
service = str(args[0] or "")
elif isinstance(call_kwargs, dict):
service = str(call_kwargs.get("service") or "")
if service.lower() not in PROVIDERS:
return
result = str(data.get("result") or "").strip()
if not result or result == "None":
data["result"] = DUMMY_API_KEY

View file

@ -0,0 +1,9 @@
from __future__ import annotations
from helpers.extension import Extension
from plugins._oauth.helpers.route_bootstrap import install_route_hooks
class OAuthRoutesStartup(Extension):
def execute(self, **kwargs):
install_route_hooks()

View file

@ -0,0 +1 @@
"""OAuth Connections plugin helpers."""

View file

@ -0,0 +1,859 @@
from __future__ import annotations
import base64
import hashlib
import json
import os
import secrets
import subprocess
import time
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Iterable
from urllib.parse import parse_qs, urlencode, urljoin, urlparse
import requests
from helpers import files
from plugins._oauth.helpers.config import codex_config
AUTH_FILENAME = "auth.json"
ACCESS_EXPIRY_MARGIN = timedelta(minutes=5)
REFRESH_INTERVAL = timedelta(minutes=55)
FALLBACK_CODEX_VERSION = "0.124.0"
OAUTH_ERROR_KEYS = {"error", "error_description"}
DEVICE_CODE_TIMEOUT_SECONDS = 15 * 60
@dataclass(frozen=True)
class PkcePair:
verifier: str
challenge: str
@dataclass(frozen=True)
class EffectiveAuth:
access_token: str
account_id: str
id_token: str = ""
refresh_token: str = ""
source_path: str = ""
last_refresh: str = ""
def generate_pkce() -> PkcePair:
verifier = _base64url(secrets.token_bytes(64))
challenge = _base64url(hashlib.sha256(verifier.encode("utf-8")).digest())
return PkcePair(verifier=verifier, challenge=challenge)
def generate_state() -> str:
return _base64url(secrets.token_bytes(32))
def build_authorize_url(redirect_uri: str, state: str, pkce: PkcePair) -> str:
cfg = codex_config()
query = {
"response_type": "code",
"client_id": cfg["client_id"],
"redirect_uri": redirect_uri,
"scope": " ".join(cfg["scopes"]),
"code_challenge": pkce.challenge,
"code_challenge_method": "S256",
"id_token_add_organizations": "true",
"codex_cli_simplified_flow": "true",
"state": state,
"originator": "codex_cli_rs",
}
if cfg["forced_workspace_id"]:
query["allowed_workspace_id"] = cfg["forced_workspace_id"]
return f'{cfg["issuer"]}/oauth/authorize?{urlencode(query)}'
def exchange_code_for_tokens(
code: str,
redirect_uri: str,
verifier: str,
) -> dict[str, str]:
cfg = codex_config()
response = requests.post(
cfg["token_url"],
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"client_id": cfg["client_id"],
"code_verifier": verifier,
},
timeout=30,
)
if not response.ok:
raise RuntimeError(_token_error_message(response))
payload = response.json()
if not isinstance(payload, dict):
raise RuntimeError("OAuth token endpoint returned a malformed response.")
tokens = {
"id_token": str(payload.get("id_token") or ""),
"access_token": str(payload.get("access_token") or ""),
"refresh_token": str(payload.get("refresh_token") or ""),
}
missing = [key for key, value in tokens.items() if not value]
if missing:
raise RuntimeError(f"OAuth token response is missing: {', '.join(missing)}")
return tokens
def obtain_api_key(id_token: str) -> str:
cfg = codex_config()
response = requests.post(
cfg["token_url"],
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"client_id": cfg["client_id"],
"requested_token": "openai-api-key",
"subject_token": id_token,
"subject_token_type": "urn:ietf:params:oauth:token-type:id_token",
},
timeout=30,
)
if not response.ok:
raise RuntimeError(f"API-key token exchange failed with status {response.status_code}.")
payload = response.json()
if not isinstance(payload, dict) or not payload.get("access_token"):
raise RuntimeError("API-key token exchange returned a malformed response.")
return str(payload["access_token"])
def complete_login(code: str, redirect_uri: str, verifier: str) -> EffectiveAuth:
tokens = exchange_code_for_tokens(code, redirect_uri, verifier)
return persist_exchanged_tokens(tokens)
def persist_exchanged_tokens(tokens: dict[str, str]) -> EffectiveAuth:
id_token = tokens["id_token"]
account_id = derive_account_id(id_token)
if not account_id:
raise RuntimeError("OAuth ID token did not include a ChatGPT account id.")
cfg = codex_config()
if cfg["forced_workspace_id"] and account_id != cfg["forced_workspace_id"]:
raise RuntimeError(
f'Login is restricted to workspace id {cfg["forced_workspace_id"]}.'
)
try:
api_key = obtain_api_key(id_token)
except Exception:
api_key = ""
auth_data = {
"auth_mode": "chatgpt",
"OPENAI_API_KEY": api_key or None,
"tokens": {
"id_token": id_token,
"access_token": tokens["access_token"],
"refresh_token": tokens["refresh_token"],
"account_id": account_id,
},
"last_refresh": utc_now_iso(),
}
path = resolve_auth_write_path()
write_auth_file(path, auth_data)
return load_auth(ensure_fresh=False)
def request_device_code() -> dict[str, Any]:
cfg = codex_config()
base_url = cfg["issuer"].rstrip("/")
response = requests.post(
f"{base_url}/api/accounts/deviceauth/usercode",
headers={"Content-Type": "application/json"},
json={"client_id": cfg["client_id"]},
timeout=30,
)
if not response.ok:
raise RuntimeError(_token_error_message(response))
payload = response.json()
if not isinstance(payload, dict):
raise RuntimeError("Device authorization returned a malformed response.")
device_auth_id = _string(payload.get("device_auth_id"))
user_code = _string(payload.get("user_code") or payload.get("usercode"))
if not device_auth_id or not user_code:
raise RuntimeError("Device authorization response did not include a code.")
interval = _safe_int(payload.get("interval"), 5)
expires_at = _device_expires_at(payload.get("expires_at"))
return {
"device_auth_id": device_auth_id,
"user_code": user_code,
"interval": interval,
"expires_at": expires_at,
"verification_url": f"{base_url}/codex/device",
}
def poll_device_authorization(device_auth_id: str, user_code: str) -> dict[str, Any]:
cfg = codex_config()
base_url = cfg["issuer"].rstrip("/")
response = requests.post(
f"{base_url}/api/accounts/deviceauth/token",
headers={"Content-Type": "application/json"},
json={"device_auth_id": device_auth_id, "user_code": user_code},
timeout=30,
)
if response.status_code in {403, 404}:
return {"completed": False}
if not response.ok:
raise RuntimeError(_token_error_message(response))
payload = response.json()
if not isinstance(payload, dict):
raise RuntimeError("Device authorization token response was malformed.")
authorization_code = _string(payload.get("authorization_code"))
verifier = _string(payload.get("code_verifier"))
if not authorization_code or not verifier:
raise RuntimeError("Device authorization response was missing token exchange data.")
tokens = exchange_code_for_tokens(
authorization_code,
f"{base_url}/deviceauth/callback",
verifier,
)
auth = persist_exchanged_tokens(tokens)
return {"completed": True, "account_id": auth.account_id}
def load_auth(*, ensure_fresh: bool = True) -> EffectiveAuth:
path, data = read_auth_file()
tokens = data.get("tokens") if isinstance(data, dict) else {}
tokens = tokens if isinstance(tokens, dict) else {}
access_token = _string(tokens.get("access_token"))
id_token = _string(tokens.get("id_token"))
refresh_token = _string(tokens.get("refresh_token"))
account_id = _string(tokens.get("account_id")) or derive_account_id(id_token)
last_refresh = _string(data.get("last_refresh")) if isinstance(data, dict) else ""
if ensure_fresh and refresh_token and should_refresh(access_token, last_refresh):
refreshed = refresh_tokens(refresh_token)
access_token = refreshed.get("access_token") or access_token
id_token = refreshed.get("id_token") or id_token
refresh_token = refreshed.get("refresh_token") or refresh_token
account_id = derive_account_id(id_token) or account_id
last_refresh = utc_now_iso()
data["tokens"] = {
"id_token": id_token,
"access_token": access_token,
"refresh_token": refresh_token,
"account_id": account_id,
}
data["last_refresh"] = last_refresh
write_auth_file(path, data)
if not access_token:
raise RuntimeError("Codex/ChatGPT account access token not found. Connect the account first.")
if not account_id:
raise RuntimeError("Codex/ChatGPT account id not found. Connect the account again.")
return EffectiveAuth(
access_token=access_token,
account_id=account_id,
id_token=id_token,
refresh_token=refresh_token,
source_path=str(path),
last_refresh=last_refresh,
)
def status() -> dict[str, Any]:
candidates = resolve_auth_file_candidates()
existing = [str(path) for path in candidates if path.is_file()]
result: dict[str, Any] = {
"connected": False,
"auth_file_path": str(resolve_auth_write_path()),
"discovered_auth_files": existing,
}
try:
auth = load_auth(ensure_fresh=False)
except Exception as exc:
result["message"] = str(exc)
return result
id_claims = parse_jwt_claims(auth.id_token)
access_claims = parse_jwt_claims(auth.access_token)
auth_claims = _auth_claims(id_claims)
result.update(
{
"connected": True,
"auth_file_path": auth.source_path,
"account_id": auth.account_id,
"email": id_claims.get("email")
or _record(id_claims.get("https://api.openai.com/profile")).get("email"),
"plan_type": auth_claims.get("chatgpt_plan_type"),
"user_id": auth_claims.get("chatgpt_user_id") or auth_claims.get("user_id"),
"access_expires_at": _jwt_expiration_iso(access_claims),
"last_refresh": auth.last_refresh,
}
)
return result
def refresh_tokens(refresh_token: str) -> dict[str, str]:
cfg = codex_config()
response = requests.post(
cfg["token_url"],
headers={"Content-Type": "application/json"},
json={
"client_id": cfg["client_id"],
"grant_type": "refresh_token",
"refresh_token": refresh_token,
},
timeout=30,
)
if not response.ok:
raise RuntimeError(_token_error_message(response))
payload = response.json()
if not isinstance(payload, dict):
raise RuntimeError("OAuth refresh endpoint returned a malformed response.")
return {
"id_token": _string(payload.get("id_token")),
"access_token": _string(payload.get("access_token")),
"refresh_token": _string(payload.get("refresh_token")) or refresh_token,
}
def should_refresh(access_token: str, last_refresh: str) -> bool:
if not access_token:
return True
claims = parse_jwt_claims(access_token)
exp = claims.get("exp")
if isinstance(exp, (int, float)):
expires_at = datetime.fromtimestamp(float(exp), tz=timezone.utc)
if expires_at <= datetime.now(timezone.utc) + ACCESS_EXPIRY_MARGIN:
return True
refreshed_at = parse_iso(last_refresh)
if refreshed_at is not None:
return refreshed_at <= datetime.now(timezone.utc) - REFRESH_INTERVAL
return False
def request_codex(
path: str,
*,
method: str = "GET",
headers: dict[str, str] | None = None,
body: bytes | str | None = None,
stream: bool = False,
params: dict[str, str] | None = None,
) -> requests.Response:
cfg = codex_config()
auth = load_auth()
target = build_upstream_url(path, cfg["upstream_base_url"])
request_headers = sanitize_forward_headers(headers or {})
request_headers.update(
{
"Authorization": f"Bearer {auth.access_token}",
"chatgpt-account-id": auth.account_id,
"OpenAI-Beta": "responses=experimental",
}
)
return requests.request(
method,
target,
headers=request_headers,
data=body,
params=params,
timeout=max(5, cfg["request_timeout_seconds"]),
stream=stream,
)
def fetch_models() -> list[str]:
cfg = codex_config()
configured = cfg["models"]
if configured:
return configured
response = request_codex(
"/models",
params={"client_version": resolve_codex_version()},
)
if not response.ok:
raise RuntimeError(upstream_error_message(response, "Failed to load Codex models."))
payload = response.json()
raw_models = payload.get("models") if isinstance(payload, dict) else None
if not isinstance(raw_models, list):
raise RuntimeError("Codex returned a malformed models response.")
models: list[str] = []
seen: set[str] = set()
for item in raw_models:
slug = item.get("slug") if isinstance(item, dict) else None
if isinstance(slug, str) and slug and slug not in seen:
seen.add(slug)
models.append(slug)
if not models:
raise RuntimeError("Codex returned an empty models list.")
return models
def prepare_responses_body(body: dict[str, Any], *, force_stream: bool) -> dict[str, Any]:
normalized = dict(body)
normalized.setdefault("instructions", "")
normalized.setdefault("store", False)
if force_stream:
normalized["stream"] = True
normalized.pop("max_output_tokens", None)
return normalized
def collect_completed_response(response: requests.Response) -> dict[str, Any]:
latest_response: dict[str, Any] | None = None
latest_error: Any = None
text_pieces: list[str] = []
latest_usage: dict[str, Any] | None = None
for event in iter_sse_events(response):
data = event.get("data")
if not data:
continue
try:
parsed = json.loads(data)
except json.JSONDecodeError:
continue
if not isinstance(parsed, dict):
continue
if event.get("event") == "error":
latest_error = parsed
continue
text_pieces.extend(extract_sse_text_deltas(parsed, event.get("event", "")))
usage = parsed.get("usage")
if isinstance(usage, dict):
latest_usage = usage
candidate = parsed.get("response")
if isinstance(candidate, dict):
latest_response = candidate
if latest_response is not None:
return latest_response
if text_pieces:
result: dict[str, Any] = {"output_text": "".join(text_pieces)}
if latest_usage:
result["usage"] = latest_usage
return result
suffix = f" Last error: {json.dumps(latest_error)}" if latest_error else ""
raise RuntimeError(f"No completed response found in Codex SSE stream.{suffix}")
def iter_sse_events(response: requests.Response) -> Iterable[dict[str, str]]:
buffer = ""
for chunk in response.iter_content(chunk_size=8192, decode_unicode=True):
if not chunk:
continue
buffer += chunk
while "\n\n" in buffer or "\r\n\r\n" in buffer:
sep = "\r\n\r\n" if "\r\n\r\n" in buffer else "\n\n"
block, buffer = buffer.split(sep, 1)
event = parse_sse_block(block)
if event:
yield event
event = parse_sse_block(buffer)
if event:
yield event
def parse_sse_block(block: str) -> dict[str, str]:
event: dict[str, str] = {}
data_lines: list[str] = []
for line in block.splitlines():
if line.startswith("event:"):
event["event"] = line[6:].strip()
elif line.startswith("data:"):
data_lines.append(line[5:].lstrip())
if data_lines:
event["data"] = "\n".join(data_lines)
return event
def extract_sse_text_deltas(payload: dict[str, Any], event_type: str = "") -> list[str]:
pieces: list[str] = []
choices = payload.get("choices")
if isinstance(choices, list):
for choice in choices:
if not isinstance(choice, dict):
continue
delta = choice.get("delta")
if isinstance(delta, dict):
_append_text_value(pieces, delta.get("content"))
elif isinstance(delta, str):
pieces.append(delta)
message = choice.get("message")
if isinstance(message, dict):
_append_text_value(pieces, message.get("content"))
delta = payload.get("delta")
if isinstance(delta, str):
pieces.append(delta)
elif isinstance(delta, dict):
_append_text_value(pieces, delta.get("content"))
_append_text_value(pieces, delta.get("text"))
if (payload.get("type") or event_type) in {
"response.output_text.delta",
"response.text.delta",
}:
_append_text_value(pieces, payload.get("text"))
return [piece for piece in pieces if piece]
def _append_text_value(pieces: list[str], value: Any) -> None:
if isinstance(value, str):
pieces.append(value)
return
if isinstance(value, list):
for item in value:
if isinstance(item, str):
pieces.append(item)
elif isinstance(item, dict):
_append_text_value(pieces, item.get("text"))
_append_text_value(pieces, item.get("content"))
def chat_messages_to_response_body(body: dict[str, Any]) -> dict[str, Any]:
messages = body.get("messages")
if not isinstance(messages, list):
raise RuntimeError("`messages` must be an array.")
if body.get("tools"):
raise RuntimeError("Codex/ChatGPT account wrapper does not yet support tool calls.")
instructions: list[str] = []
response_input: list[dict[str, Any]] = []
for message in messages:
if not isinstance(message, dict):
continue
role = str(message.get("role") or "user")
content = message.get("content", "")
text = normalize_message_content(content)
if role in {"system", "developer"}:
if text:
instructions.append(text)
continue
response_input.append({"role": role, "content": text})
response_body: dict[str, Any] = {
"model": body.get("model") or "gpt-5.2",
"input": response_input,
"instructions": "\n\n".join(instructions),
"store": False,
}
if body.get("temperature") is not None:
response_body["temperature"] = body["temperature"]
if body.get("top_p") is not None:
response_body["top_p"] = body["top_p"]
if body.get("reasoning_effort") is not None:
response_body["reasoning"] = {"effort": body["reasoning_effort"]}
return response_body
def normalize_message_content(content: Any) -> str:
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, dict):
text = item.get("text")
if isinstance(text, str):
parts.append(text)
elif isinstance(item, str):
parts.append(item)
return "\n".join(parts)
if content is None:
return ""
return str(content)
def response_text(response: dict[str, Any]) -> str:
value = response.get("output_text")
if isinstance(value, str):
return value
pieces: list[str] = []
output = response.get("output")
if isinstance(output, list):
for item in output:
if not isinstance(item, dict):
continue
content = item.get("content")
if isinstance(content, list):
for block in content:
if isinstance(block, dict):
text = block.get("text")
if isinstance(text, str):
pieces.append(text)
return "".join(pieces)
def build_upstream_url(path: str, base_url: str) -> str:
if path.startswith("http://") or path.startswith("https://"):
parsed = urlparse(path)
path = parsed.path
if parsed.query:
path = f"{path}?{parsed.query}"
if path == "/v1":
path = "/"
elif path.startswith("/v1/"):
path = path[3:]
return urljoin(base_url.rstrip("/") + "/", path.lstrip("/"))
def sanitize_forward_headers(headers: dict[str, str]) -> dict[str, str]:
blocked = {
"authorization",
"chatgpt-account-id",
"host",
"openai-beta",
"content-length",
"connection",
}
return {
key: value
for key, value in headers.items()
if key.lower() not in blocked and value is not None
}
def response_headers(response: requests.Response) -> dict[str, str]:
blocked = {
"connection",
"content-encoding",
"content-length",
"transfer-encoding",
}
return {
key: value
for key, value in response.headers.items()
if key.lower() not in blocked
}
def upstream_error_message(response: requests.Response, fallback: str) -> str:
text = response.text
if not text:
return fallback
try:
payload = json.loads(text)
except json.JSONDecodeError:
return text
if isinstance(payload, dict):
detail = payload.get("detail")
if isinstance(detail, str):
return detail
error = payload.get("error")
if isinstance(error, dict) and isinstance(error.get("message"), str):
return error["message"]
if isinstance(error, str):
return error
return text
def resolve_codex_version() -> str:
configured = codex_config()["codex_version"]
if configured:
return configured
try:
result = subprocess.run(
["codex", "--version"],
check=False,
capture_output=True,
text=True,
timeout=2,
)
version = _extract_semver(result.stdout) or _extract_semver(result.stderr)
if version:
return version
except Exception:
pass
return FALLBACK_CODEX_VERSION
def resolve_auth_file_candidates() -> list[Path]:
cfg = codex_config()
explicit = cfg["auth_file_path"]
if explicit:
return [Path(explicit).expanduser()]
candidates: list[Path] = []
for env_name in ("CHATGPT_LOCAL_HOME", "CODEX_HOME"):
env_home = os.getenv(env_name)
if env_home:
candidates.append(Path(env_home).expanduser() / AUTH_FILENAME)
home = Path.home()
candidates.extend(
[
home / ".codex" / AUTH_FILENAME,
home / ".chatgpt-local" / AUTH_FILENAME,
Path(files.get_abs_path("usr", "plugins", "_oauth", "codex", AUTH_FILENAME)),
]
)
return _unique_paths(candidates)
def resolve_auth_write_path() -> Path:
for candidate in resolve_auth_file_candidates():
if candidate.is_file():
return candidate
return resolve_auth_file_candidates()[-1]
def read_auth_file() -> tuple[Path, dict[str, Any]]:
candidates = resolve_auth_file_candidates()
for candidate in candidates:
try:
with candidate.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
if isinstance(payload, dict):
return candidate, payload
except FileNotFoundError:
continue
except Exception:
continue
return resolve_auth_write_path(), {}
def write_auth_file(path: Path, data: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8")
try:
path.chmod(0o600)
except OSError:
pass
def parse_jwt_claims(token: str) -> dict[str, Any]:
if not token or token.count(".") != 2:
return {}
try:
payload = token.split(".")[1]
padding = "=" * ((4 - len(payload) % 4) % 4)
decoded = base64.urlsafe_b64decode((payload + padding).encode("ascii"))
value = json.loads(decoded)
return value if isinstance(value, dict) else {}
except Exception:
return {}
def derive_account_id(id_token: str) -> str:
return _string(_auth_claims(parse_jwt_claims(id_token)).get("chatgpt_account_id"))
def parse_iso(value: str) -> datetime | None:
if not value:
return None
normalized = value.replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def _auth_claims(claims: dict[str, Any]) -> dict[str, Any]:
return _record(claims.get("https://api.openai.com/auth"))
def _record(value: Any) -> dict[str, Any]:
return value if isinstance(value, dict) else {}
def _string(value: Any) -> str:
return value if isinstance(value, str) else ""
def _jwt_expiration_iso(claims: dict[str, Any]) -> str:
exp = claims.get("exp")
if not isinstance(exp, (int, float)):
return ""
return datetime.fromtimestamp(float(exp), tz=timezone.utc).isoformat()
def _base64url(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=")
def _token_error_message(response: requests.Response) -> str:
try:
payload = response.json()
except Exception:
payload = None
if isinstance(payload, dict):
for key in OAUTH_ERROR_KEYS:
value = payload.get(key)
if isinstance(value, str) and value:
return value
error = payload.get("error")
if isinstance(error, dict) and isinstance(error.get("message"), str):
return error["message"]
if isinstance(error, str):
return error
return f"OAuth token endpoint returned status {response.status_code}: {response.text}"
def _extract_semver(value: str) -> str:
import re
match = re.search(r"\b\d+\.\d+\.\d+\b", value or "")
return match.group(0) if match else ""
def _safe_int(value: Any, default: int) -> int:
try:
return int(value)
except (TypeError, ValueError):
return default
def _device_expires_at(value: Any) -> float:
if isinstance(value, str):
parsed = parse_iso(value)
if parsed is not None:
return parsed.timestamp()
return time.time() + DEVICE_CODE_TIMEOUT_SECONDS
def _unique_paths(paths: list[Path]) -> list[Path]:
result: list[Path] = []
seen: set[str] = set()
for path in paths:
key = str(path)
if key in seen:
continue
seen.add(key)
result.append(path)
return result

View file

@ -0,0 +1,112 @@
from __future__ import annotations
from typing import Any
from helpers import plugins
PLUGIN_NAME = "_oauth"
DEFAULT_CODEX_CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
DEFAULT_CODEX_ISSUER = "https://auth.openai.com"
DEFAULT_CODEX_TOKEN_URL = "https://auth.openai.com/oauth/token"
DEFAULT_CODEX_BASE_URL = "https://chatgpt.com/backend-api/codex"
DEFAULT_CODEX_SCOPES = [
"openid",
"profile",
"email",
"offline_access",
"api.connectors.read",
"api.connectors.invoke",
]
def oauth_config() -> dict[str, Any]:
value = plugins.get_plugin_config(PLUGIN_NAME) or {}
return value if isinstance(value, dict) else {}
def codex_config(config: dict[str, Any] | None = None) -> dict[str, Any]:
source = config if isinstance(config, dict) else oauth_config()
raw = source.get("codex", {}) if isinstance(source, dict) else {}
raw = raw if isinstance(raw, dict) else {}
return {
"enabled": _as_bool(raw.get("enabled"), True),
"auth_file_path": _as_str(raw.get("auth_file_path")),
"issuer": _trim_url(raw.get("issuer"), DEFAULT_CODEX_ISSUER),
"token_url": _as_str(raw.get("token_url")) or DEFAULT_CODEX_TOKEN_URL,
"client_id": _as_str(raw.get("client_id")) or DEFAULT_CODEX_CLIENT_ID,
"scopes": _as_str_list(raw.get("scopes")) or DEFAULT_CODEX_SCOPES,
"open_browser_from_server": _as_bool(raw.get("open_browser_from_server"), False),
"forced_workspace_id": _as_str(raw.get("forced_workspace_id")),
"upstream_base_url": _trim_url(raw.get("upstream_base_url"), DEFAULT_CODEX_BASE_URL),
"codex_version": _as_str(raw.get("codex_version")),
"models": _as_str_list(raw.get("models")),
"request_timeout_seconds": _as_int(raw.get("request_timeout_seconds"), 120),
"proxy_base_path": _normalize_base_path(raw.get("proxy_base_path"), "/oauth/codex"),
"callback_path": _normalize_base_path(raw.get("callback_path"), "/auth/callback"),
"require_proxy_token": _as_bool(raw.get("require_proxy_token"), False),
"proxy_token": _as_str(raw.get("proxy_token")),
}
def _as_str(value: Any) -> str:
if value is None:
return ""
return str(value).strip()
def _as_int(value: Any, default: int) -> int:
try:
return int(value)
except (TypeError, ValueError):
return default
def _as_bool(value: Any, default: bool) -> bool:
if value is None or value == "":
return default
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
normalized = str(value).strip().lower()
if normalized in {"1", "true", "yes", "on", "enabled"}:
return True
if normalized in {"0", "false", "no", "off", "disabled"}:
return False
return default
def _as_str_list(value: Any) -> list[str]:
if value is None:
return []
if isinstance(value, str):
values = value.replace(",", "\n").splitlines()
elif isinstance(value, (list, tuple, set)):
values = list(value)
else:
values = [value]
result: list[str] = []
seen: set[str] = set()
for item in values:
text = _as_str(item)
if not text or text in seen:
continue
seen.add(text)
result.append(text)
return result
def _trim_url(value: Any, default: str) -> str:
text = _as_str(value) or default
return text.rstrip("/")
def _normalize_base_path(value: Any, default: str) -> str:
text = _as_str(value) or default
if not text.startswith("/"):
text = "/" + text
return text.rstrip("/") or default

View file

@ -0,0 +1,26 @@
from __future__ import annotations
def install_route_hooks() -> None:
from helpers.ui_server import UiServerRuntime
if getattr(UiServerRuntime, "_a0_oauth_route_hooks_installed", False):
return
original_register_http_routes = UiServerRuntime.register_http_routes
def register_http_routes(self):
result = original_register_http_routes(self)
from plugins._oauth.helpers.routes import register_oauth_routes
register_oauth_routes(self.webapp)
return result
UiServerRuntime.register_http_routes = register_http_routes
UiServerRuntime._a0_oauth_route_hooks_installed = True
def is_installed() -> bool:
from helpers.ui_server import UiServerRuntime
return bool(getattr(UiServerRuntime, "_a0_oauth_route_hooks_installed", False))

View file

@ -0,0 +1,372 @@
from __future__ import annotations
import ipaddress
import json
import time
from typing import Any
from flask import Response, jsonify, request, stream_with_context
from plugins._oauth.helpers import codex
from plugins._oauth.helpers.config import codex_config
from plugins._oauth.helpers.state import pop_attempt
def register_oauth_routes(app) -> None:
cfg = codex_config()
base = cfg["proxy_base_path"]
routes = [
(f"{base}/health", "oauth_codex_health", codex_health, ["GET"]),
(f"{base}/callback", "oauth_codex_callback", codex_callback, ["GET"]),
(cfg["callback_path"], "oauth_codex_compat_callback", codex_callback, ["GET"]),
(f"{base}/v1/models", "oauth_codex_models", codex_models, ["GET", "OPTIONS"]),
(
f"{base}/v1/responses",
"oauth_codex_responses",
codex_responses,
["POST", "OPTIONS"],
),
(
f"{base}/v1/chat/completions",
"oauth_codex_chat_completions",
codex_chat_completions,
["POST", "OPTIONS"],
),
]
for rule, endpoint, view_func, methods in routes:
if endpoint in app.view_functions:
continue
app.add_url_rule(rule, endpoint, view_func, methods=methods)
def codex_health():
return jsonify({"ok": True, "provider": "codex", "base_path": codex_config()["proxy_base_path"]})
def codex_callback():
error = request.args.get("error")
if error:
description = request.args.get("error_description") or error
return _html_page("Codex Sign-In Failed", description), 400
state = request.args.get("state", "")
code = request.args.get("code", "")
attempt = pop_attempt(state)
if not attempt:
return _html_page("Codex Sign-In Expired", "Return to Agent Zero and start a new Codex connection."), 400
if not code:
return _html_page("Codex Sign-In Failed", "The OAuth callback did not include an authorization code."), 400
try:
auth = codex.complete_login(code, attempt.redirect_uri, attempt.verifier)
info = codex.status()
except Exception as exc:
return _html_page("Codex Sign-In Failed", str(exc)), 500
email = info.get("email") or "Connected"
detail = f"{email}\n{auth.account_id}"
return _html_page("Codex Connected", detail)
def codex_models():
if request.method == "OPTIONS":
return _options_response()
denied = _proxy_denied_response()
if denied:
return denied
try:
models = codex.fetch_models()
return jsonify(
{
"object": "list",
"data": [
{
"id": model,
"object": "model",
"created": 0,
"owned_by": "codex-oauth",
}
for model in models
],
}
)
except Exception as exc:
return _json_error(str(exc), status=502, code="upstream_error")
def codex_responses():
if request.method == "OPTIONS":
return _options_response()
denied = _proxy_denied_response()
if denied:
return denied
body = request.get_json(silent=True)
if not isinstance(body, dict):
return _json_error("Request body must be a JSON object.")
wants_stream = body.get("stream") is True
upstream_body = codex.prepare_responses_body(body, force_stream=True)
try:
upstream = codex.request_codex(
"/responses",
method="POST",
headers={"Content-Type": "application/json"},
body=json.dumps(upstream_body),
stream=True,
)
except Exception as exc:
return _json_error(str(exc), status=502, code="upstream_error")
if not upstream.ok:
return _copy_upstream_response(upstream)
if wants_stream:
return _stream_upstream_sse(upstream)
try:
completed = codex.collect_completed_response(upstream)
except Exception as exc:
return _json_error(str(exc), status=502, code="upstream_error")
return jsonify(completed)
def codex_chat_completions():
if request.method == "OPTIONS":
return _options_response()
denied = _proxy_denied_response()
if denied:
return denied
body = request.get_json(silent=True)
if not isinstance(body, dict):
return _json_error("Request body must be a JSON object.")
try:
response_body = codex.chat_messages_to_response_body(body)
except Exception as exc:
return _json_error(str(exc))
wants_stream = body.get("stream") is True
response_body["stream"] = True
try:
upstream = codex.request_codex(
"/responses",
method="POST",
headers={"Content-Type": "application/json"},
body=json.dumps(codex.prepare_responses_body(response_body, force_stream=True)),
stream=True,
)
except Exception as exc:
return _json_error(str(exc), status=502, code="upstream_error")
if not upstream.ok:
return _copy_upstream_response(upstream)
if wants_stream:
return _stream_chat_completion(upstream, str(body.get("model") or response_body["model"]))
try:
completed = codex.collect_completed_response(upstream)
except Exception as exc:
return _json_error(str(exc), status=502, code="upstream_error")
text = codex.response_text(completed)
return jsonify(
{
"id": f"chatcmpl_{int(time.time() * 1000)}",
"object": "chat.completion",
"created": int(time.time()),
"model": body.get("model") or response_body["model"],
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": text},
"finish_reason": "stop",
}
],
"usage": completed.get("usage") or {},
}
)
def _stream_upstream_sse(upstream):
headers = codex.response_headers(upstream)
headers.setdefault("Content-Type", "text/event-stream")
headers.setdefault("Cache-Control", "no-cache")
return Response(
stream_with_context(upstream.iter_content(chunk_size=8192)),
status=upstream.status_code,
headers=headers,
)
def _stream_chat_completion(upstream, model: str):
created = int(time.time())
chunk_id = f"chatcmpl_{int(time.time() * 1000)}"
def generate():
yield _sse_data(
{
"id": chunk_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}],
}
)
for event in codex.iter_sse_events(upstream):
data = event.get("data")
if not data:
continue
try:
parsed = json.loads(data)
except json.JSONDecodeError:
continue
if not isinstance(parsed, dict):
continue
for delta in codex.extract_sse_text_deltas(parsed, event.get("event", "")):
yield _sse_data(
{
"id": chunk_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [
{
"index": 0,
"delta": {"content": delta},
"finish_reason": None,
}
],
}
)
yield _sse_data(
{
"id": chunk_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
}
)
yield "data: [DONE]\n\n"
return Response(
stream_with_context(generate()),
headers={"Content-Type": "text/event-stream", "Cache-Control": "no-cache"},
)
def _copy_upstream_response(upstream):
return Response(
upstream.content,
status=upstream.status_code,
headers=codex.response_headers(upstream),
)
def _proxy_denied_response() -> Response | None:
if _proxy_authorized():
return None
return _json_error("Codex/ChatGPT account proxy access denied.", status=403, code="access_denied")
def _proxy_authorized() -> bool:
cfg = codex_config()
token = cfg["proxy_token"]
supplied = _supplied_proxy_token()
if token and supplied == token:
return True
if cfg["require_proxy_token"]:
return False
return _host_is_local(request.host) or _remote_is_loopback(request.remote_addr)
def _supplied_proxy_token() -> str:
auth = request.headers.get("Authorization", "")
if auth.lower().startswith("bearer "):
return auth[7:].strip()
return (
request.headers.get("X-API-Key")
or request.args.get("api_key")
or request.args.get("key")
or ""
).strip()
def _host_is_local(host: str) -> bool:
hostname = (host or "").split(":", 1)[0].strip("[]").lower()
if hostname in {"localhost", "127.0.0.1", "::1"}:
return True
try:
return ipaddress.ip_address(hostname).is_loopback
except ValueError:
return False
def _remote_is_loopback(addr: str | None) -> bool:
try:
return ipaddress.ip_address(addr or "").is_loopback
except ValueError:
return False
def _json_error(message: str, *, status: int = 400, code: str = "invalid_request") -> Response:
return jsonify({"error": {"message": message, "type": code, "code": code}}), status
def _options_response() -> Response:
return Response(status=204)
def _sse_data(payload: dict[str, Any]) -> str:
return f"data: {json.dumps(payload, separators=(',', ':'))}\n\n"
def _html_page(title: str, body: str) -> str:
return f"""<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width,initial-scale=1">
<title>{_escape_html(title)}</title>
<style>
body {{
margin: 0;
min-height: 100vh;
display: grid;
place-items: center;
background: #101214;
color: #f2f5f7;
font-family: Inter, ui-sans-serif, system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif;
}}
main {{
width: min(560px, calc(100vw - 32px));
border: 1px solid rgba(255,255,255,.14);
border-radius: 8px;
padding: 24px;
background: #171a1d;
box-shadow: 0 18px 70px rgba(0,0,0,.28);
}}
h1 {{ margin: 0 0 10px; font-size: 24px; }}
p {{ margin: 0; color: #b9c1c9; line-height: 1.5; white-space: pre-line; }}
span {{ color: #7f8b96; font-size: 13px; }}
</style>
</head>
<body>
<main>
<h1>{_escape_html(title)}</h1>
<p>{_escape_html(body)}</p>
</main>
</body>
</html>"""
def _escape_html(value: str) -> str:
return (
value.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;")
.replace("'", "&#39;")
)

View file

@ -0,0 +1,118 @@
from __future__ import annotations
import threading
import time
from dataclasses import dataclass
LOGIN_TTL_SECONDS = 10 * 60
@dataclass(frozen=True)
class LoginAttempt:
state: str
verifier: str
redirect_uri: str
created_at: float
@property
def expires_at(self) -> float:
return self.created_at + LOGIN_TTL_SECONDS
def expired(self) -> bool:
return time.time() > self.expires_at
@dataclass(frozen=True)
class DeviceAttempt:
attempt_id: str
device_auth_id: str
user_code: str
interval: int
expires_at_value: float
@property
def expires_at(self) -> float:
return self.expires_at_value
def expired(self) -> bool:
return time.time() > self.expires_at
_lock = threading.RLock()
_attempts: dict[str, LoginAttempt] = {}
_device_attempts: dict[str, DeviceAttempt] = {}
def put_attempt(state: str, verifier: str, redirect_uri: str) -> LoginAttempt:
cleanup_expired()
attempt = LoginAttempt(
state=state,
verifier=verifier,
redirect_uri=redirect_uri,
created_at=time.time(),
)
with _lock:
_attempts[state] = attempt
return attempt
def pop_attempt(state: str) -> LoginAttempt | None:
cleanup_expired()
with _lock:
attempt = _attempts.pop(state, None)
if attempt is None or attempt.expired():
return None
return attempt
def put_device_attempt(
attempt_id: str,
device_auth_id: str,
user_code: str,
interval: int,
expires_at: float,
) -> DeviceAttempt:
cleanup_expired()
attempt = DeviceAttempt(
attempt_id=attempt_id,
device_auth_id=device_auth_id,
user_code=user_code,
interval=interval,
expires_at_value=expires_at,
)
with _lock:
_device_attempts[attempt_id] = attempt
return attempt
def get_device_attempt(attempt_id: str) -> DeviceAttempt | None:
cleanup_expired()
with _lock:
attempt = _device_attempts.get(attempt_id)
if attempt is None or attempt.expired():
return None
return attempt
def pop_device_attempt(attempt_id: str) -> DeviceAttempt | None:
cleanup_expired()
with _lock:
return _device_attempts.pop(attempt_id, None)
def cleanup_expired() -> None:
now = time.time()
with _lock:
expired = [
state for state, attempt in _attempts.items() if now > attempt.expires_at
]
for state in expired:
_attempts.pop(state, None)
expired_devices = [
attempt_id
for attempt_id, attempt in _device_attempts.items()
if now > attempt.expires_at
]
for attempt_id in expired_devices:
_device_attempts.pop(attempt_id, None)

View file

@ -0,0 +1,9 @@
name: _oauth
title: OAuth Connections
description: Generic local OAuth bridge for account-backed providers. Includes Codex/ChatGPT Account as the first provider.
version: 0.1.0
always_enabled: false
settings_sections:
- external
per_project_config: false
per_agent_config: false

View file

@ -0,0 +1,391 @@
<html>
<head>
<title>OAuth Connections</title>
<script type="module">
import { store } from "/plugins/_oauth/webui/oauth-config-store.js";
</script>
</head>
<body>
<div x-data>
<template x-if="$store.oauthConfig && config">
<div
class="oauth"
x-init="$store.oauthConfig.init(config)"
x-effect="$store.oauthConfig.bindConfig(config)"
x-destroy="$store.oauthConfig.cleanup()"
>
<section class="oauth-hero" :class="$store.oauthConfig.connected() ? 'is-connected' : ''">
<div class="oauth-mark">
<span class="material-symbols-outlined" x-text="$store.oauthConfig.connected() ? 'check' : 'key'"></span>
</div>
<div class="oauth-copy">
<h2>Codex/ChatGPT</h2>
<p x-show="!$store.oauthConfig.connected() && !$store.oauthConfig.connecting">
Connect your account to unlock Codex models locally.
</p>
<p x-show="$store.oauthConfig.connected()">
Connected and ready.
</p>
<p x-show="$store.oauthConfig.connecting">
Finish sign-in in the browser tab.
</p>
</div>
<div class="oauth-primary">
<button
class="oauth-connect"
type="button"
@click="$store.oauthConfig.connectCodex()"
:disabled="$store.oauthConfig.connecting"
x-show="!$store.oauthConfig.connected()"
>
<span class="material-symbols-outlined" x-text="$store.oauthConfig.connecting ? 'progress_activity' : 'login'"></span>
<span x-text="$store.oauthConfig.connecting ? 'Waiting' : 'Connect'"></span>
</button>
<button
class="oauth-connect secondary"
type="button"
@click="$store.oauthConfig.loadModels()"
:disabled="$store.oauthConfig.loadingModels"
x-show="$store.oauthConfig.connected()"
>
<span class="material-symbols-outlined" x-text="$store.oauthConfig.loadingModels ? 'progress_activity' : 'view_list'"></span>
<span>Check Models</span>
</button>
</div>
</section>
<section class="oauth-device" x-show="$store.oauthConfig.device">
<span>Enter this code</span>
<strong x-text="$store.oauthConfig.device?.user_code"></strong>
<button class="text-button" type="button" @click="$store.oauthConfig.cancelConnect()">
<span class="material-symbols-outlined">close</span>
<span>Cancel</span>
</button>
</section>
<section class="oauth-status-row">
<div>
<span>Status</span>
<strong x-text="$store.oauthConfig.statusLabel()"></strong>
</div>
<div x-show="$store.oauthConfig.status?.codex?.email">
<span>Account</span>
<strong x-text="$store.oauthConfig.status?.codex?.email"></strong>
</div>
<button class="oauth-icon-button" type="button" @click="$store.oauthConfig.loadStatus()" title="Refresh status" aria-label="Refresh status">
<span class="material-symbols-outlined">refresh</span>
</button>
</section>
<details class="oauth-advanced">
<summary>Advanced</summary>
<div class="oauth-details">
<div>
<span>Endpoint</span>
<code x-text="$store.oauthConfig.endpointUrl()"></code>
</div>
<div>
<span>Auth file</span>
<code x-text="$store.oauthConfig.status?.codex?.auth_file_path || 'Auto-discover'"></code>
</div>
</div>
<div class="oauth-grid">
<label>
<span>Auth file path</span>
<input type="text" x-model="$store.oauthConfig.codex().auth_file_path" placeholder="Auto-discover" />
</label>
<label>
<span>Issuer</span>
<input type="text" x-model="$store.oauthConfig.codex().issuer" />
</label>
<label>
<span>Token URL</span>
<input type="text" x-model="$store.oauthConfig.codex().token_url" />
</label>
<label>
<span>Base path</span>
<input type="text" x-model="$store.oauthConfig.codex().proxy_base_path" />
</label>
<label>
<span>Upstream URL</span>
<input type="text" x-model="$store.oauthConfig.codex().upstream_base_url" />
</label>
<label>
<span>Codex version</span>
<input type="text" x-model="$store.oauthConfig.codex().codex_version" placeholder="Auto" />
</label>
<label class="oauth-switch">
<span>Require proxy token</span>
<input type="checkbox" x-model="$store.oauthConfig.codex().require_proxy_token" />
</label>
<label>
<span>Proxy token</span>
<input type="password" x-model="$store.oauthConfig.codex().proxy_token" autocomplete="off" />
</label>
</div>
</details>
<div class="oauth-models" x-show="$store.oauthConfig.models.length">
<template x-for="model in $store.oauthConfig.models" :key="model">
<span x-text="model"></span>
</template>
</div>
</div>
</template>
</div>
<style>
.oauth {
display: flex;
flex-direction: column;
gap: 14px;
color: var(--color-text);
}
.oauth-hero {
display: grid;
grid-template-columns: 52px minmax(0, 1fr) auto;
align-items: center;
gap: 16px;
min-height: 118px;
padding: 18px;
border: 1px solid var(--color-border);
border-radius: 8px;
background: color-mix(in srgb, var(--color-panel) 86%, transparent);
}
.oauth-mark {
display: grid;
width: 52px;
height: 52px;
place-items: center;
border-radius: 50%;
background: color-mix(in srgb, var(--color-border) 52%, transparent);
}
.oauth-mark .material-symbols-outlined {
font-size: 28px;
}
.oauth-hero.is-connected .oauth-mark {
color: #08120c;
background: #35d07f;
}
.oauth-copy h2 {
margin: 0 0 4px;
font-size: 1.35rem;
letter-spacing: 0;
}
.oauth-copy p {
margin: 0;
color: var(--color-text-secondary);
font-size: 0.88rem;
line-height: 1.4;
}
.oauth-connect {
display: inline-flex;
align-items: center;
justify-content: center;
gap: 8px;
min-width: 124px;
min-height: 40px;
padding: 0 16px;
border: 0;
border-radius: 8px;
background: #f5f7fa;
color: #111418;
font: inherit;
font-size: 0.88rem;
font-weight: 750;
cursor: pointer;
}
.oauth-connect.secondary {
background: color-mix(in srgb, var(--color-border) 60%, transparent);
color: var(--color-text);
}
.oauth-connect:disabled {
cursor: default;
opacity: .65;
}
.oauth-device {
display: grid;
grid-template-columns: minmax(0, 1fr) auto auto;
align-items: center;
gap: 14px;
padding: 14px 16px;
border: 1px solid color-mix(in srgb, #f5f7fa 18%, var(--color-border));
border-radius: 8px;
background: color-mix(in srgb, #f5f7fa 7%, var(--color-panel));
}
.oauth-device span {
color: var(--color-text-secondary);
font-size: 0.84rem;
font-weight: 700;
}
.oauth-device strong {
font-size: 1.45rem;
letter-spacing: 0;
}
.oauth-status-row {
display: grid;
grid-template-columns: repeat(2, minmax(0, 1fr)) auto;
align-items: center;
gap: 10px;
padding: 12px 14px;
border: 1px solid var(--color-border);
border-radius: 8px;
}
.oauth-status-row div {
display: flex;
min-width: 0;
flex-direction: column;
gap: 3px;
}
.oauth-status-row span,
.oauth-details span,
.oauth-grid label span {
color: var(--color-text-secondary);
font-size: 0.76rem;
font-weight: 700;
}
.oauth-status-row strong {
overflow: hidden;
font-size: 0.88rem;
text-overflow: ellipsis;
white-space: nowrap;
}
.oauth-icon-button {
display: grid;
width: 34px;
height: 34px;
place-items: center;
border: 1px solid var(--color-border);
border-radius: 8px;
background: transparent;
color: var(--color-text);
cursor: pointer;
}
.oauth-advanced {
border: 1px solid var(--color-border);
border-radius: 8px;
padding: 0;
}
.oauth-advanced summary {
padding: 12px 14px;
color: var(--color-text-secondary);
font-size: 0.84rem;
font-weight: 750;
cursor: pointer;
}
.oauth-details {
display: grid;
gap: 8px;
padding: 0 14px 14px;
}
.oauth-details div {
display: grid;
grid-template-columns: 84px minmax(0, 1fr);
align-items: center;
gap: 10px;
}
.oauth-details code {
overflow: hidden;
font-size: 0.76rem;
text-overflow: ellipsis;
white-space: nowrap;
}
.oauth-grid {
display: grid;
grid-template-columns: repeat(2, minmax(0, 1fr));
gap: 12px;
padding: 0 14px 14px;
}
.oauth-grid label {
display: flex;
min-width: 0;
flex-direction: column;
gap: 6px;
}
.oauth-grid input[type="text"],
.oauth-grid input[type="password"] {
width: 100%;
min-height: 36px;
padding: 7px 10px;
border: 1px solid color-mix(in srgb, var(--color-border) 74%, transparent);
border-radius: 8px;
background: var(--color-input);
color: var(--color-text);
font: inherit;
font-size: 0.82rem;
}
.oauth-switch {
justify-content: center;
}
.oauth-switch input {
width: 18px;
height: 18px;
accent-color: #f5f7fa;
}
.oauth-models {
display: flex;
flex-wrap: wrap;
gap: 8px;
}
.oauth-models span {
padding: 5px 8px;
border: 1px solid color-mix(in srgb, var(--color-border) 70%, transparent);
border-radius: 999px;
color: var(--color-text-secondary);
font-size: 0.76rem;
}
@media (max-width: 720px) {
.oauth-hero,
.oauth-device,
.oauth-status-row,
.oauth-grid,
.oauth-details div {
grid-template-columns: 1fr;
}
.oauth-primary {
width: 100%;
}
.oauth-connect {
width: 100%;
}
}
</style>
</body>
</html>

View file

@ -0,0 +1,192 @@
import { createStore } from "/js/AlpineStore.js";
import { callJsonApi } from "/js/api.js";
import {
toastFrontendError,
toastFrontendInfo,
toastFrontendSuccess,
} from "/components/notifications/notification-store.js";
const STATUS_API = "/plugins/_oauth/status";
const START_DEVICE_LOGIN_API = "/plugins/_oauth/start_device_login";
const POLL_DEVICE_LOGIN_API = "/plugins/_oauth/poll_device_login";
const MODELS_API = "/plugins/_oauth/models";
const MAX_POLL_MS = 120000;
function ensureConfig(config) {
if (!config || typeof config !== "object") return null;
config.codex = config.codex && typeof config.codex === "object" ? config.codex : {};
const codex = config.codex;
codex.enabled = codex.enabled !== false;
codex.auth_file_path = String(codex.auth_file_path || "");
codex.issuer = String(codex.issuer || "https://auth.openai.com");
codex.token_url = String(codex.token_url || "https://auth.openai.com/oauth/token");
codex.client_id = String(codex.client_id || "app_EMoamEEZ73f0CkXaXp7hrann");
codex.upstream_base_url = String(codex.upstream_base_url || "https://chatgpt.com/backend-api/codex");
codex.proxy_base_path = String(codex.proxy_base_path || "/oauth/codex");
codex.callback_path = String(codex.callback_path || "/auth/callback");
codex.require_proxy_token = Boolean(codex.require_proxy_token);
codex.proxy_token = String(codex.proxy_token || "");
codex.codex_version = String(codex.codex_version || "");
codex.models = Array.isArray(codex.models) ? codex.models : [];
return config;
}
function messageOf(error) {
return error instanceof Error ? error.message : String(error);
}
export const store = createStore("oauthConfig", {
config: null,
status: null,
loadingStatus: false,
connecting: false,
loadingModels: false,
models: [],
device: null,
pollTimer: null,
pollStartedAt: 0,
async init(config) {
this.bindConfig(config);
await this.loadStatus();
},
cleanup() {
this.stopPolling();
this.config = null;
this.status = null;
this.models = [];
this.device = null;
},
bindConfig(config) {
const safeConfig = ensureConfig(config);
if (!safeConfig) return;
if (this.config === safeConfig) return;
this.config = safeConfig;
},
codex() {
return this.config?.codex || {};
},
connected() {
return Boolean(this.status?.codex?.connected);
},
statusLabel() {
if (this.loadingStatus) return "Checking";
return this.connected() ? "Connected" : "Not connected";
},
endpointUrl() {
const base = this.codex().proxy_base_path || "/oauth/codex";
return `${window.location.origin}${base}/v1`;
},
callbackUrl() {
const path = this.codex().callback_path || "/auth/callback";
return `${window.location.origin}${path}`;
},
async loadStatus() {
if (this.loadingStatus) return;
this.loadingStatus = true;
try {
const response = await callJsonApi(STATUS_API, {});
this.status = response;
} catch (error) {
void toastFrontendError(messageOf(error), "OAuth Connections");
} finally {
this.loadingStatus = false;
}
},
async connectCodex() {
if (this.connecting) return;
this.connecting = true;
try {
const response = await callJsonApi(START_DEVICE_LOGIN_API, {});
if (!response?.ok || !response.verification_url || !response.attempt_id) {
throw new Error(response?.error || "Could not start Codex sign-in.");
}
this.device = response;
window.open(response.verification_url, "_blank", "noopener,noreferrer");
void toastFrontendInfo("Enter the code shown here in the opened browser tab.", "OAuth Connections");
this.startPolling();
} catch (error) {
this.connecting = false;
void toastFrontendError(messageOf(error), "OAuth Connections");
}
},
startPolling() {
this.stopPolling();
this.pollStartedAt = Date.now();
const tick = async () => {
if (!this.device?.attempt_id) return;
try {
const response = await callJsonApi(POLL_DEVICE_LOGIN_API, {
attempt_id: this.device.attempt_id,
});
if (!response?.ok) {
if (response?.expired) {
this.connecting = false;
this.device = null;
this.stopPolling();
}
throw new Error(response?.error || "Could not finish Codex sign-in.");
}
if (response.completed) {
await this.loadStatus();
this.device = null;
this.connecting = false;
this.stopPolling();
void toastFrontendSuccess("Codex account connected.", "OAuth Connections");
return;
}
} catch (error) {
this.connecting = false;
this.stopPolling();
void toastFrontendError(messageOf(error), "OAuth Connections");
return;
}
if (Date.now() - this.pollStartedAt > MAX_POLL_MS) {
this.connecting = false;
this.device = null;
this.stopPolling();
return;
}
};
void tick();
const delay = Math.max(1500, Number(this.device.interval || 5) * 1000);
this.pollTimer = window.setInterval(tick, delay);
},
stopPolling() {
if (this.pollTimer) window.clearInterval(this.pollTimer);
this.pollTimer = null;
},
async loadModels() {
if (this.loadingModels) return;
this.loadingModels = true;
try {
const response = await callJsonApi(MODELS_API, {});
if (!response?.ok) throw new Error(response?.error || "Could not load Codex models.");
this.models = Array.isArray(response.models) ? response.models : [];
void toastFrontendSuccess("Codex models loaded.", "OAuth Connections");
} catch (error) {
this.models = [];
void toastFrontendError(messageOf(error), "OAuth Connections");
} finally {
this.loadingModels = false;
}
},
cancelConnect() {
this.connecting = false;
this.device = null;
this.stopPolling();
},
});

156
tests/test_oauth_codex.py Normal file
View file

@ -0,0 +1,156 @@
from __future__ import annotations
import json
import sys
from pathlib import Path
import yaml
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from plugins._oauth.helpers import codex
from plugins._oauth.extensions.python._functions.models.get_api_key.end._20_codex_account_dummy_key import (
CodexAccountDummyKey,
)
def test_generate_pkce_produces_urlsafe_verifier_and_challenge():
pair = codex.generate_pkce()
assert 43 <= len(pair.verifier) <= 128
assert pair.verifier
assert pair.challenge
assert "=" not in pair.verifier
assert "=" not in pair.challenge
def test_build_authorize_url_uses_existing_a0_origin_callback(monkeypatch):
monkeypatch.setattr(
codex,
"codex_config",
lambda: {
"issuer": "https://auth.openai.com",
"client_id": "app_EMoamEEZ73f0CkXaXp7hrann",
"scopes": [
"openid",
"profile",
"email",
"offline_access",
"api.connectors.read",
"api.connectors.invoke",
],
"forced_workspace_id": "",
},
)
pair = codex.PkcePair(verifier="verifier", challenge="challenge")
auth_url = codex.build_authorize_url(
"http://localhost:50001/auth/callback",
"state",
pair,
)
assert auth_url.startswith("https://auth.openai.com/oauth/authorize?")
assert "redirect_uri=http%3A%2F%2Flocalhost%3A50001%2Fauth%2Fcallback" in auth_url
assert "code_challenge=challenge" in auth_url
assert "originator=codex_cli_rs" in auth_url
def test_chat_messages_to_response_body_extracts_instructions():
body = codex.chat_messages_to_response_body(
{
"model": "gpt-5.2",
"messages": [
{"role": "system", "content": "Be precise."},
{"role": "user", "content": "Hello"},
],
"temperature": 0.2,
"reasoning_effort": "high",
}
)
assert body["model"] == "gpt-5.2"
assert body["instructions"] == "Be precise."
assert body["input"] == [{"role": "user", "content": "Hello"}]
assert body["temperature"] == 0.2
assert body["reasoning"] == {"effort": "high"}
def test_response_text_reads_output_text_or_output_blocks():
assert codex.response_text({"output_text": "direct"}) == "direct"
assert (
codex.response_text(
{
"output": [
{
"content": [
{"type": "output_text", "text": "a"},
{"type": "output_text", "text": "b"},
]
}
]
}
)
== "ab"
)
def test_parse_sse_block_joins_data_lines():
event = codex.parse_sse_block(
'event: response.completed\ndata: {"response":\ndata: {"id":"r"}}\n'
)
assert event["event"] == "response.completed"
assert json.loads(event["data"]) == {"response": {"id": "r"}}
def test_extract_sse_text_deltas_reads_chat_completion_chunks():
deltas = codex.extract_sse_text_deltas(
{
"id": "chatcmpl_test",
"choices": [
{"delta": {"role": "assistant"}},
{"delta": {"content": "Hel"}},
{"delta": {"content": "lo"}},
],
}
)
assert deltas == ["Hel", "lo"]
def test_collect_completed_response_falls_back_to_text_deltas():
class FakeResponse:
def iter_content(self, chunk_size=8192, decode_unicode=True):
del chunk_size, decode_unicode
yield 'data: {"choices":[{"delta":{"content":"Hel"}}]}\n\n'
yield 'data: {"choices":[{"delta":{"content":"lo"}}]}\n\n'
yield "data: [DONE]\n\n"
assert codex.collect_completed_response(FakeResponse()) == {"output_text": "Hello"}
def test_provider_config_uses_container_local_agent_zero_origin():
provider_path = Path(__file__).resolve().parents[1] / "plugins/_oauth/conf/model_providers.yaml"
provider_config = yaml.safe_load(provider_path.read_text(encoding="utf-8"))
codex_provider = provider_config["chat"]["codex_oauth"]
assert codex_provider["name"] == "Codex/ChatGPT Account"
assert codex_provider["models_list"]["endpoint_url"] == "/models"
assert codex_provider["kwargs"]["api_base"] == "http://127.0.0.1/oauth/codex/v1"
assert "50001" not in json.dumps(codex_provider)
def test_codex_provider_reports_dummy_api_key_when_missing():
data = {"args": ("codex_oauth",), "kwargs": {}, "result": "None"}
CodexAccountDummyKey(agent=None).execute(data=data)
assert data["result"] == "oauth"
def test_codex_provider_preserves_configured_api_key():
data = {"args": ("codex_oauth",), "kwargs": {}, "result": "configured"}
CodexAccountDummyKey(agent=None).execute(data=data)
assert data["result"] == "configured"