Add OAuth disconnect and remaining quota visibility

Allow users to disconnect their OpenAI account by clearing stored ChatGPT OAuth tokens while preserving unrelated auth data.

Fetch and normalize Codex usage windows, then show remaining percentage and reset timing in the OAuth settings UI.

Add focused tests for usage parsing and disconnect cleanup.
This commit is contained in:
Alessandro 2026-05-02 20:14:04 +02:00
parent e63173d812
commit 0da8f3dc2b
5 changed files with 666 additions and 1 deletions

View file

@ -0,0 +1,17 @@
from __future__ import annotations
from helpers.api import ApiHandler, Request
from plugins._oauth.helpers import codex
class Disconnect(ApiHandler):
async def process(self, input: dict, request: Request) -> dict:
try:
result = codex.disconnect_auth()
return {
"ok": True,
**result,
"codex": codex.status(),
}
except Exception as exc:
return {"ok": False, "error": str(exc)}

View file

@ -10,7 +10,7 @@ import time
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Iterable
from typing import Any, Iterable, Mapping
from urllib.parse import parse_qs, urlencode, urljoin, urlparse
import requests
@ -25,6 +25,11 @@ REFRESH_INTERVAL = timedelta(minutes=55)
FALLBACK_CODEX_VERSION = "0.124.0"
OAUTH_ERROR_KEYS = {"error", "error_description"}
DEVICE_CODE_TIMEOUT_SECONDS = 15 * 60
USAGE_ENDPOINT_PATHS = (
"/backend-api/codex/usage",
"/backend-api/wham/usage",
"/api/codex/usage",
)
@dataclass(frozen=True)
@ -306,9 +311,160 @@ def status() -> dict[str, Any]:
"last_refresh": auth.last_refresh,
}
)
try:
result["usage"] = fetch_usage()
except Exception as exc:
result["usage"] = {"available": False, "error": str(exc)}
return result
def disconnect_auth() -> dict[str, Any]:
cleared_paths: list[str] = []
removed_paths: list[str] = []
preserved_paths: list[str] = []
for path in resolve_auth_file_candidates():
if not path.is_file():
continue
try:
with path.open("r", encoding="utf-8") as handle:
data = json.load(handle)
except Exception:
continue
if not isinstance(data, dict) or not _contains_chatgpt_auth(data):
continue
cleaned = dict(data)
cleaned.pop("tokens", None)
cleaned.pop("last_refresh", None)
if _string(cleaned.get("auth_mode")).lower() == "chatgpt":
cleaned.pop("auth_mode", None)
cleared_paths.append(str(path))
if _has_meaningful_auth_data(cleaned):
write_auth_file(path, cleaned)
preserved_paths.append(str(path))
continue
path.unlink(missing_ok=True)
removed_paths.append(str(path))
return {
"disconnected": bool(cleared_paths),
"cleared_auth_files": cleared_paths,
"removed_auth_files": removed_paths,
"preserved_auth_files": preserved_paths,
}
def fetch_usage() -> dict[str, Any]:
cfg = codex_config()
auth = load_auth()
errors: list[str] = []
headers = {
"Authorization": f"Bearer {auth.access_token}",
"ChatGPT-Account-Id": auth.account_id,
"Accept": "application/json",
"User-Agent": "codex-cli",
}
for url in usage_endpoint_candidates(cfg["upstream_base_url"]):
try:
response = requests.get(
url,
headers=headers,
timeout=max(5, min(cfg["request_timeout_seconds"], 30)),
)
except Exception as exc:
errors.append(str(exc))
continue
if not response.ok:
errors.append(upstream_error_message(response, "Failed to load Codex usage."))
continue
try:
payload = response.json()
except Exception:
payload = {}
usage = normalize_usage_payload(payload, response.headers)
if usage["available"]:
usage["endpoint_path"] = urlparse(url).path
return usage
errors.append("Usage endpoint returned no rate-limit data.")
suffix = f" {' '.join(errors[-2:])}" if errors else ""
raise RuntimeError(f"Failed to load Codex usage.{suffix}")
def usage_endpoint_candidates(upstream_base_url: str) -> list[str]:
parsed = urlparse(upstream_base_url)
if not parsed.scheme or not parsed.netloc:
return []
root = f"{parsed.scheme}://{parsed.netloc}"
paths = list(USAGE_ENDPOINT_PATHS)
upstream_path = parsed.path.rstrip("/")
if upstream_path and upstream_path.endswith("/codex"):
paths.insert(0, f"{upstream_path}/usage")
result: list[str] = []
seen: set[str] = set()
for path in paths:
url = urljoin(root.rstrip("/") + "/", path.lstrip("/"))
if url in seen:
continue
seen.add(url)
result.append(url)
return result
def normalize_usage_payload(
payload: Mapping[str, Any] | None,
headers: Mapping[str, Any] | None = None,
) -> dict[str, Any]:
body = payload if isinstance(payload, Mapping) else {}
rate_limit = _record(body.get("rate_limit")) or _record(body.get("rateLimits"))
header_usage = _normalize_usage_headers(headers or {})
primary = (
_normalize_usage_window(rate_limit.get("primary_window"))
or _normalize_usage_window(rate_limit.get("primary"))
or _normalize_usage_window(body.get("primary_window"))
or header_usage.get("primary")
)
secondary = (
_normalize_usage_window(rate_limit.get("secondary_window"))
or _normalize_usage_window(rate_limit.get("secondary"))
or _normalize_usage_window(body.get("secondary_window"))
or header_usage.get("secondary")
)
code_review = _normalize_code_review_usage(body.get("code_review_rate_limit"))
additional = _normalize_additional_rate_limits(rate_limit.get("additional_rate_limits"))
credits = _normalize_credits(body.get("credits"))
plan_type = (
_string(body.get("plan_type"))
or _string(body.get("planType"))
or _string(header_usage.get("plan_type"))
)
return {
"available": bool(primary or secondary or code_review or additional),
"plan_type": plan_type,
"primary": primary,
"secondary": secondary,
"code_review": code_review,
"additional": additional,
"credits": credits,
"rate_limit_reached_type": _string(
rate_limit.get("rate_limit_reached_type")
or rate_limit.get("rateLimitReachedType")
or body.get("rate_limit_reached_type")
or body.get("rateLimitReachedType")
),
}
def refresh_tokens(refresh_token: str) -> dict[str, str]:
cfg = codex_config()
response = requests.post(
@ -867,3 +1023,227 @@ def _unique_paths(paths: list[Path]) -> list[Path]:
seen.add(key)
result.append(path)
return result
def _contains_chatgpt_auth(data: dict[str, Any]) -> bool:
tokens = _record(data.get("tokens"))
if _string(data.get("auth_mode")).lower() == "chatgpt":
return True
return any(
_string(tokens.get(key))
for key in ("access_token", "refresh_token", "id_token", "account_id")
)
def _has_meaningful_auth_data(data: dict[str, Any]) -> bool:
for value in data.values():
if value is None:
continue
if isinstance(value, str) and not value.strip():
continue
if isinstance(value, (dict, list, tuple, set)) and not value:
continue
return True
return False
def _normalize_usage_headers(headers: Mapping[str, Any]) -> dict[str, Any]:
lowered = {str(key).lower(): value for key, value in headers.items()}
primary = _normalize_usage_window(
{
"used_percent": lowered.get("x-codex-primary-used-percent"),
"window_minutes": lowered.get("x-codex-primary-window-minutes"),
"reset_at": lowered.get("x-codex-primary-resets-at")
or lowered.get("x-codex-primary-reset-at"),
}
)
secondary = _normalize_usage_window(
{
"used_percent": lowered.get("x-codex-secondary-used-percent"),
"window_minutes": lowered.get("x-codex-secondary-window-minutes"),
"reset_at": lowered.get("x-codex-secondary-resets-at")
or lowered.get("x-codex-secondary-reset-at"),
}
)
return {
"primary": primary,
"secondary": secondary,
"plan_type": _string(lowered.get("x-codex-plan-type")),
}
def _normalize_code_review_usage(value: Any) -> dict[str, Any] | None:
data = _record(value)
if not data:
return None
window = (
_normalize_usage_window(data.get("primary_window"))
or _normalize_usage_window(data.get("primary"))
or _normalize_usage_window(data)
)
if window:
window["name"] = _string(data.get("name")) or "Code review"
return window
def _normalize_additional_rate_limits(value: Any) -> list[dict[str, Any]]:
if not isinstance(value, list):
return []
result: list[dict[str, Any]] = []
for item in value:
data = _record(item)
if not data:
continue
window = (
_normalize_usage_window(data.get("primary_window"))
or _normalize_usage_window(data.get("primary"))
or _normalize_usage_window(data)
)
if not window:
continue
name = (
_string(data.get("name"))
or _string(data.get("model"))
or _string(data.get("limit_name"))
or _string(data.get("limitName"))
or _string(data.get("id"))
)
if name:
window["name"] = name
result.append(window)
return result
def _normalize_usage_window(value: Any) -> dict[str, Any] | None:
data = _record(value)
used_percent = _number(
_first_present(
data.get("used_percent"),
data.get("usedPercent"),
data.get("utilization"),
data.get("usage_percent"),
)
)
if used_percent is None:
return None
window_seconds = _number(
_first_present(
data.get("limit_window_seconds"),
data.get("window_seconds"),
data.get("windowSeconds"),
data.get("windowDurationSeconds"),
)
)
window_minutes = _number(
_first_present(
data.get("window_minutes"),
data.get("windowMinutes"),
data.get("windowDurationMins"),
)
)
if window_seconds is None and window_minutes is not None:
window_seconds = window_minutes * 60
if window_minutes is None and window_seconds is not None:
window_minutes = window_seconds / 60
reset_at = _epoch_seconds(
_first_present(
data.get("reset_at"),
data.get("resets_at"),
data.get("resetsAt"),
data.get("resetAt"),
)
)
used = max(0.0, min(100.0, used_percent))
return {
"used_percent": _clean_number(used),
"remaining_percent": _clean_number(max(0.0, 100.0 - used)),
"reset_at": reset_at,
"resets_at_iso": _epoch_iso(reset_at),
"window_seconds": _clean_number(window_seconds) if window_seconds is not None else None,
"window_minutes": _clean_number(window_minutes) if window_minutes is not None else None,
"label": _usage_window_label(window_seconds, window_minutes),
}
def _normalize_credits(value: Any) -> dict[str, Any] | None:
data = _record(value)
if not data:
return None
balance = _number(data.get("balance"))
return {
"has_credits": bool(data.get("has_credits") or data.get("hasCredits")),
"unlimited": bool(data.get("unlimited")),
"balance": _clean_number(balance) if balance is not None else None,
}
def _usage_window_label(seconds: float | None, minutes: float | None) -> str:
if seconds is None and minutes is not None:
seconds = minutes * 60
if seconds is None:
return ""
if 17_940 <= seconds <= 18_060:
return "5h"
if 604_000 <= seconds <= 605_000:
return "7d"
if seconds >= 86_400 and seconds % 86_400 == 0:
return f"{int(seconds // 86_400)}d"
if seconds >= 3_600 and seconds % 3_600 == 0:
return f"{int(seconds // 3_600)}h"
if seconds >= 60 and seconds % 60 == 0:
return f"{int(seconds // 60)}m"
return ""
def _number(value: Any) -> float | None:
if isinstance(value, bool) or value is None:
return None
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
text = value.strip().rstrip("%")
if not text:
return None
try:
return float(text)
except ValueError:
return None
return None
def _first_present(*values: Any) -> Any:
for value in values:
if value is None:
continue
if isinstance(value, str) and value == "":
continue
return value
return None
def _epoch_seconds(value: Any) -> float | None:
number = _number(value)
if number is None or number <= 0:
return None
if number > 1_000_000_000_000:
number = number / 1000
return number
def _epoch_iso(value: float | None) -> str:
if value is None:
return ""
try:
return datetime.fromtimestamp(value, tz=timezone.utc).isoformat()
except (OSError, ValueError):
return ""
def _clean_number(value: float | None) -> int | float | None:
if value is None:
return None
if float(value).is_integer():
return int(value)
return round(float(value), 2)

View file

@ -54,6 +54,16 @@
<span class="material-symbols-outlined" x-text="$store.oauthConfig.loadingModels ? 'progress_activity' : 'view_list'"></span>
<span>Check Models</span>
</button>
<button
class="oauth-connect danger"
type="button"
@click="$store.oauthConfig.disconnectCodex()"
:disabled="$store.oauthConfig.disconnecting"
x-show="$store.oauthConfig.connected()"
>
<span class="material-symbols-outlined" x-text="$store.oauthConfig.disconnecting ? 'progress_activity' : 'link_off'"></span>
<span x-text="$store.oauthConfig.disconnecting ? 'Disconnecting' : 'Disconnect'"></span>
</button>
</div>
</section>
@ -66,6 +76,24 @@
</button>
</section>
<section class="oauth-usage" x-show="$store.oauthConfig.connected() && $store.oauthConfig.usageWindows().length">
<template x-for="window in $store.oauthConfig.usageWindows()" :key="window.key">
<div class="oauth-usage-window">
<div class="oauth-usage-head">
<span>
<span x-text="window.title"></span>
<small x-show="$store.oauthConfig.formatWindowLabel(window)" x-text="$store.oauthConfig.formatWindowLabel(window)"></small>
</span>
<strong x-text="$store.oauthConfig.formatRemainingPercent(window)"></strong>
</div>
<div class="oauth-usage-bar" aria-hidden="true">
<i :style="{ width: $store.oauthConfig.usageWidth(window) }"></i>
</div>
<p x-show="$store.oauthConfig.formatReset(window)" x-text="`Resets in ${$store.oauthConfig.formatReset(window)}`"></p>
</div>
</template>
</section>
<section class="oauth-status-row">
<div>
<span>Status</span>
@ -190,6 +218,13 @@
line-height: 1.4;
}
.oauth-primary {
display: flex;
flex-wrap: wrap;
justify-content: flex-end;
gap: 8px;
}
.oauth-connect {
display: inline-flex;
align-items: center;
@ -213,6 +248,12 @@
color: var(--color-text);
}
.oauth-connect.danger {
border: 1px solid color-mix(in srgb, #f06464 40%, var(--color-border));
background: color-mix(in srgb, #f06464 16%, var(--color-panel));
color: var(--color-text);
}
.oauth-connect:disabled {
cursor: default;
opacity: .65;
@ -240,6 +281,74 @@
letter-spacing: 0;
}
.oauth-usage {
display: grid;
grid-template-columns: repeat(2, minmax(0, 1fr));
gap: 10px;
padding: 12px 14px;
border: 1px solid var(--color-border);
border-radius: 8px;
}
.oauth-usage-window {
display: grid;
min-width: 0;
gap: 8px;
}
.oauth-usage-head {
display: flex;
align-items: center;
justify-content: space-between;
gap: 12px;
}
.oauth-usage-head span {
display: inline-flex;
min-width: 0;
align-items: center;
gap: 6px;
color: var(--color-text-secondary);
font-size: 0.78rem;
font-weight: 750;
}
.oauth-usage-head small {
padding: 2px 6px;
border-radius: 999px;
background: color-mix(in srgb, var(--color-border) 55%, transparent);
color: var(--color-text-secondary);
font-size: 0.7rem;
line-height: 1;
}
.oauth-usage-head strong {
font-size: 0.92rem;
white-space: nowrap;
}
.oauth-usage-bar {
overflow: hidden;
height: 8px;
border-radius: 999px;
background: color-mix(in srgb, var(--color-border) 54%, transparent);
}
.oauth-usage-bar i {
display: block;
width: 0;
height: 100%;
border-radius: inherit;
background: #35d07f;
transition: width .22s ease;
}
.oauth-usage-window p {
margin: 0;
color: var(--color-text-secondary);
font-size: 0.74rem;
}
.oauth-status-row {
display: grid;
grid-template-columns: repeat(2, minmax(0, 1fr)) auto;
@ -372,6 +481,7 @@
@media (max-width: 720px) {
.oauth-hero,
.oauth-device,
.oauth-usage,
.oauth-status-row,
.oauth-grid,
.oauth-details div {

View file

@ -10,6 +10,7 @@ const STATUS_API = "/plugins/_oauth/status";
const START_DEVICE_LOGIN_API = "/plugins/_oauth/start_device_login";
const POLL_DEVICE_LOGIN_API = "/plugins/_oauth/poll_device_login";
const MODELS_API = "/plugins/_oauth/models";
const DISCONNECT_API = "/plugins/_oauth/disconnect";
const MAX_POLL_MS = 120000;
function ensureConfig(config) {
@ -40,6 +41,7 @@ export const store = createStore("oauthConfig", {
status: null,
loadingStatus: false,
connecting: false,
disconnecting: false,
loadingModels: false,
models: [],
device: null,
@ -79,6 +81,53 @@ export const store = createStore("oauthConfig", {
return this.connected() ? "Connected" : "Not connected";
},
usage() {
return this.status?.codex?.usage || null;
},
usageWindows() {
const usage = this.usage();
if (!usage?.available) return [];
return [
{ key: "primary", title: "Session", ...(usage.primary || {}) },
{ key: "secondary", title: "Week", ...(usage.secondary || {}) },
].filter((window) => Number.isFinite(this.remainingPercent(window)));
},
usageWidth(window) {
const value = Math.max(0, Math.min(100, this.remainingPercent(window)));
return `${value}%`;
},
remainingPercent(window) {
const remaining = Number(window?.remaining_percent);
if (Number.isFinite(remaining)) return remaining;
const used = Number(window?.used_percent);
if (Number.isFinite(used)) return 100 - used;
return Number.NaN;
},
formatRemainingPercent(window) {
const number = this.remainingPercent(window);
if (!Number.isFinite(number)) return "0%";
return `${Math.round(number * 10) / 10}% left`;
},
formatWindowLabel(window) {
return window?.label || "";
},
formatReset(window) {
const seconds = Number(window?.reset_at || 0);
if (!Number.isFinite(seconds) || seconds <= 0) return "";
const remainingMs = Math.max(0, seconds * 1000 - Date.now());
const minutes = Math.round(remainingMs / 60000);
if (minutes < 60) return `${minutes}m`;
const hours = Math.round(minutes / 60);
if (hours < 48) return `${hours}h`;
return `${Math.round(hours / 24)}d`;
},
endpointUrl() {
const base = this.codex().proxy_base_path || "/oauth/codex";
return `${window.location.origin}${base}/v1`;
@ -184,6 +233,29 @@ export const store = createStore("oauthConfig", {
}
},
async disconnectCodex() {
if (this.disconnecting || !this.connected()) return;
const confirmed = window.confirm("Disconnect this OpenAI account and remove stored OAuth tokens?");
if (!confirmed) return;
this.disconnecting = true;
try {
const response = await callJsonApi(DISCONNECT_API, {});
if (!response?.ok) throw new Error(response?.error || "Could not disconnect the account.");
this.status = response.codex ? { ok: true, codex: response.codex } : this.status;
this.models = [];
this.device = null;
this.connecting = false;
this.stopPolling();
void toastFrontendSuccess("OpenAI account disconnected.", "OAuth Connections");
await this.loadStatus();
} catch (error) {
void toastFrontendError(messageOf(error), "OAuth Connections");
} finally {
this.disconnecting = false;
}
},
cancelConnect() {
this.connecting = false;
this.device = null;

View file

@ -132,6 +132,92 @@ def test_collect_completed_response_falls_back_to_text_deltas():
assert codex.collect_completed_response(FakeResponse()) == {"output": [], "output_text": "Hello"}
def test_normalize_usage_payload_reads_codex_windows():
usage = codex.normalize_usage_payload(
{
"plan_type": "plus",
"rate_limit": {
"primary_window": {
"used_percent": 39,
"reset_at": 1_738_300_000,
"limit_window_seconds": 18_000,
},
"secondary_window": {
"used_percent": 15,
"reset_at": 1_738_900_000,
"limit_window_seconds": 604_800,
},
},
"credits": {"has_credits": True, "unlimited": False, "balance": 5.39},
}
)
assert usage["available"] is True
assert usage["plan_type"] == "plus"
assert usage["primary"]["used_percent"] == 39
assert usage["primary"]["remaining_percent"] == 61
assert usage["primary"]["label"] == "5h"
assert usage["secondary"]["used_percent"] == 15
assert usage["secondary"]["label"] == "7d"
assert usage["credits"]["balance"] == 5.39
def test_normalize_usage_payload_accepts_zero_percent_headers():
usage = codex.normalize_usage_payload(
{},
{
"x-codex-primary-used-percent": "0",
"x-codex-primary-window-minutes": "300",
},
)
assert usage["available"] is True
assert usage["primary"]["used_percent"] == 0
assert usage["primary"]["remaining_percent"] == 100
assert usage["primary"]["label"] == "5h"
def test_disconnect_auth_clears_chatgpt_tokens_and_preserves_api_key(tmp_path, monkeypatch):
private_auth = tmp_path / "private-auth.json"
shared_auth = tmp_path / "shared-auth.json"
private_auth.write_text(
json.dumps(
{
"auth_mode": "chatgpt",
"OPENAI_API_KEY": None,
"tokens": {
"access_token": "access",
"refresh_token": "refresh",
"id_token": "id",
"account_id": "account",
},
"last_refresh": "2026-01-01T00:00:00Z",
}
),
encoding="utf-8",
)
shared_auth.write_text(
json.dumps(
{
"auth_mode": "chatgpt",
"OPENAI_API_KEY": "sk-keep",
"tokens": {"access_token": "access", "account_id": "account"},
"last_refresh": "2026-01-01T00:00:00Z",
}
),
encoding="utf-8",
)
monkeypatch.setattr(codex, "resolve_auth_file_candidates", lambda: [private_auth, shared_auth])
result = codex.disconnect_auth()
assert result["disconnected"] is True
assert str(private_auth) in result["removed_auth_files"]
assert not private_auth.exists()
preserved = json.loads(shared_auth.read_text(encoding="utf-8"))
assert preserved == {"OPENAI_API_KEY": "sk-keep"}
def test_provider_config_uses_container_local_agent_zero_origin():
provider_path = Path(__file__).resolve().parents[1] / "plugins/_oauth/conf/model_providers.yaml"
provider_config = yaml.safe_load(provider_path.read_text(encoding="utf-8"))