agent-zero/plugins/_oauth/helpers/state.py
Alessandro f67564a8ae Add Codex/ChatGPT account OAuth provider
Create a generic OAuth Connections plugin with Codex/ChatGPT Account as the first provider, using OpenAI's device-code flow to persist Codex-compatible account tokens.

Expose a loopback OpenAI-compatible wrapper for models, responses, and chat completions, and point LiteLLM at the container-local Agent Zero origin.

Add a dummy API-key extension and focused tests so the account-backed provider appears configured without requiring a user-entered key.

docs: add Codex plan OAuth callout

Highlight that Agent Zero can use an existing OpenAI Codex plan through the new OAuth flow.

Add the account-backed LLM plans image and surface the section from the README navigation, while pointing toward future Gemini CLI and Claude Code integrations.

Handle Codex account SSE chat chunks

Teach the Codex/ChatGPT account bridge to extract text from OpenAI-style SSE chat completion deltas and fall back to a normal output_text response when upstream only streams chunks.

Strip user-supplied stream kwargs before LiteLLM calls so Agent Zero owns streaming mode and custom parameters cannot pass stream twice.

Add targeted tests for streamed delta extraction and reconstructed responses.

update README.md with LLM plans mention
2026-04-28 16:14:53 +02:00

118 lines
2.7 KiB
Python

from __future__ import annotations
import threading
import time
from dataclasses import dataclass
LOGIN_TTL_SECONDS = 10 * 60
@dataclass(frozen=True)
class LoginAttempt:
state: str
verifier: str
redirect_uri: str
created_at: float
@property
def expires_at(self) -> float:
return self.created_at + LOGIN_TTL_SECONDS
def expired(self) -> bool:
return time.time() > self.expires_at
@dataclass(frozen=True)
class DeviceAttempt:
attempt_id: str
device_auth_id: str
user_code: str
interval: int
expires_at_value: float
@property
def expires_at(self) -> float:
return self.expires_at_value
def expired(self) -> bool:
return time.time() > self.expires_at
_lock = threading.RLock()
_attempts: dict[str, LoginAttempt] = {}
_device_attempts: dict[str, DeviceAttempt] = {}
def put_attempt(state: str, verifier: str, redirect_uri: str) -> LoginAttempt:
cleanup_expired()
attempt = LoginAttempt(
state=state,
verifier=verifier,
redirect_uri=redirect_uri,
created_at=time.time(),
)
with _lock:
_attempts[state] = attempt
return attempt
def pop_attempt(state: str) -> LoginAttempt | None:
cleanup_expired()
with _lock:
attempt = _attempts.pop(state, None)
if attempt is None or attempt.expired():
return None
return attempt
def put_device_attempt(
attempt_id: str,
device_auth_id: str,
user_code: str,
interval: int,
expires_at: float,
) -> DeviceAttempt:
cleanup_expired()
attempt = DeviceAttempt(
attempt_id=attempt_id,
device_auth_id=device_auth_id,
user_code=user_code,
interval=interval,
expires_at_value=expires_at,
)
with _lock:
_device_attempts[attempt_id] = attempt
return attempt
def get_device_attempt(attempt_id: str) -> DeviceAttempt | None:
cleanup_expired()
with _lock:
attempt = _device_attempts.get(attempt_id)
if attempt is None or attempt.expired():
return None
return attempt
def pop_device_attempt(attempt_id: str) -> DeviceAttempt | None:
cleanup_expired()
with _lock:
return _device_attempts.pop(attempt_id, None)
def cleanup_expired() -> None:
now = time.time()
with _lock:
expired = [
state for state, attempt in _attempts.items() if now > attempt.expires_at
]
for state in expired:
_attempts.pop(state, None)
expired_devices = [
attempt_id
for attempt_id, attempt in _device_attempts.items()
if now > attempt.expires_at
]
for attempt_id in expired_devices:
_device_attempts.pop(attempt_id, None)