diff --git a/docs/developers/qwen-serve-protocol.md b/docs/developers/qwen-serve-protocol.md index efacad522..d35a72adb 100644 --- a/docs/developers/qwen-serve-protocol.md +++ b/docs/developers/qwen-serve-protocol.md @@ -1093,7 +1093,7 @@ Five typed events (workspace-scoped, fanned out to every active session bus): - `auth_device_flow_started` `{deviceFlowId, providerId, expiresAt}` — POST succeeded; SDK should subscribe (no userCode here, fetch via GET if needed) - `auth_device_flow_throttled` `{deviceFlowId, intervalMs}` — daemon honored upstream `slow_down`; clients polling GET should bump their interval to match - `auth_device_flow_authorized` `{deviceFlowId, providerId, expiresAt?, accountAlias?}` — credentials persisted; `accountAlias` is a non-PII label (never email/phone) -- `auth_device_flow_failed` `{deviceFlowId, errorKind, hint?}` — terminal; `errorKind` is one of `expired_token | access_denied | invalid_grant | upstream_error` +- `auth_device_flow_failed` `{deviceFlowId, errorKind, hint?}` — terminal; `errorKind` is one of `expired_token | access_denied | invalid_grant | upstream_error | persist_failed`. `persist_failed` is daemon-internal: the IdP exchange succeeded but the daemon couldn't durably store credentials (EACCES / EROFS / ENOSPC). The user should retry once the underlying disk condition is fixed. - `auth_device_flow_cancelled` `{deviceFlowId}` — DELETE succeeded against a pending entry > **Not MCP-compatible.** The MCP authorization spec (2025-06-18) mandates OAuth 2.1 + PKCE auth-code with a redirect callback, which doesn't work for headless-pod daemons. Mode B's device-flow surface is daemon-private — clients targeting MCP-compliant servers should use a different auth path. diff --git a/packages/cli/src/serve/auth/deviceFlow.test.ts b/packages/cli/src/serve/auth/deviceFlow.test.ts index acbd5f0fe..cefe46db4 100644 --- a/packages/cli/src/serve/auth/deviceFlow.test.ts +++ b/packages/cli/src/serve/auth/deviceFlow.test.ts @@ -107,6 +107,11 @@ class FakeProvider implements DeviceFlowProvider { startError: Error | undefined; expiresIn = 600; // 10 minutes interval: number | undefined = undefined; + /** Most recent `opts.signal` observed by `poll`. Test hook for the + * abort-mid-poll assertion: after `registry.cancel(...)`, this + * signal MUST report `.aborted === true` so the upstream HTTP + * socket can be torn down. */ + lastPollSignal: AbortSignal | undefined; async start(): Promise<{ deviceCode: ReturnType; @@ -135,6 +140,7 @@ class FakeProvider implements DeviceFlowProvider { opts: { signal: AbortSignal }, ): Promise { this.pollCount += 1; + this.lastPollSignal = opts.signal; if (opts.signal.aborted) return { kind: 'pending' }; if (this.pollScript.length === 0) { return { kind: 'pending' }; @@ -518,6 +524,51 @@ describe('DeviceFlowRegistry — polling state machine', () => { }); }); +describe('DeviceFlowRegistry — abort propagation to provider.poll', () => { + it('cancel() aborts the signal observed by the in-flight provider.poll', async () => { + const provider = new FakeProvider(); + const built = buildRegistry(provider); + const { registry, env } = built; + try { + const { view: started } = await registry.start({ + providerId: 'qwen-oauth', + }); + // Drive one polling tick so the provider records its signal. + env.clock.tick(DEVICE_FLOW_DEFAULT_INTERVAL_MS + 1); + env.scheduler.flushDue(env.clock.now); + // Two microtask flushes so the poll handler resolves and + // `lastPollSignal` is populated. + await Promise.resolve(); + await Promise.resolve(); + expect(provider.lastPollSignal).toBeDefined(); + expect(provider.lastPollSignal!.aborted).toBe(false); + + // Cancel the flow — registry should abort the entry's + // cancelController, which is the SAME signal the provider's + // `poll` saw. A real Qwen provider passes this to `fetch`, so + // an in-flight HTTP socket gets torn down immediately. + registry.cancel(started.deviceFlowId); + expect(provider.lastPollSignal!.aborted).toBe(true); + } finally { + registry.dispose(); + } + }); + + it('dispose() also aborts the signal observed by every active flow', async () => { + const provider = new FakeProvider(); + const built = buildRegistry(provider); + const { registry, env } = built; + await registry.start({ providerId: 'qwen-oauth' }); + env.clock.tick(DEVICE_FLOW_DEFAULT_INTERVAL_MS + 1); + env.scheduler.flushDue(env.clock.now); + await Promise.resolve(); + await Promise.resolve(); + expect(provider.lastPollSignal!.aborted).toBe(false); + registry.dispose(); + expect(provider.lastPollSignal!.aborted).toBe(true); + }); +}); + describe('DeviceFlowRegistry — cancel', () => { it('cancels a pending flow, emits cancelled, idempotent on terminal', async () => { const provider = new FakeProvider(); diff --git a/packages/cli/src/serve/auth/deviceFlow.ts b/packages/cli/src/serve/auth/deviceFlow.ts index ad343d98c..c1cf45a93 100644 --- a/packages/cli/src/serve/auth/deviceFlow.ts +++ b/packages/cli/src/serve/auth/deviceFlow.ts @@ -42,16 +42,42 @@ export type DeviceFlowStatus = | 'error' | 'cancelled'; +/** + * Terminal error classifications surfaced on `auth_device_flow_failed`. + * + * RFC 8628 §3.5 defines the upstream error codes for the polling + * endpoint; the daemon adds one daemon-internal kind (`persist_failed`) + * for the disk-write phase. Keep these mutually exclusive — a + * mis-classification (e.g. routing a network error into + * `invalid_grant`) drives operators toward the wrong remediation. + */ export type DeviceFlowErrorKind = + /** RFC 8628: device_code has aged out (`expires_in` elapsed + * upstream) before user authorization. Recovery: re-issue + * `client.auth.start`; daemon also surfaces this kind on its own + * time-based sweep when the entry's `expiresAt` passes. */ | 'expired_token' + /** RFC 8628: user explicitly rejected the authorization at the + * IdP page. Recovery: re-issue with consent, or surface the + * refusal back to the human. */ | 'access_denied' + /** RFC 8628: protocol-level violation — `device_code` / + * `client_id` / PKCE verifier didn't validate. Treat as a + * programmer error in the daemon's flow construction (the user + * can't fix this themselves). */ | 'invalid_grant' + /** Catch-all for IdP-side failures that don't map to an RFC 8628 + * code: network errors, malformed JSON, 5xx responses, unknown + * error codes. Distinguished from `persist_failed` by the LOCATION + * of the failure (upstream HTTP vs daemon-local disk). */ | 'upstream_error' - // Disk-write / `provider.persist()` failure path. Distinguished from - // `upstream_error` because it's NOT an IdP fault — the token exchange - // succeeded; the daemon couldn't durably store the credentials - // (EACCES, EROFS, ENOSPC, etc.). Recovery: fix permissions and retry - // `client.auth.start`. + /** Daemon-local: the IdP exchange succeeded, but the daemon could + * not durably store the credentials (EACCES, EROFS, ENOSPC, etc.). + * Distinct from `upstream_error` so operators can route remediation + * to disk / permissions rather than chasing an IdP outage. The + * `device_code` was consumed upstream, so the user must + * `client.auth.start` again after fixing the underlying disk + * condition. */ | 'persist_failed'; /** @@ -119,8 +145,15 @@ export function brandSecret(value: T): BrandedSecret { export function revealSecret(secret: BrandedSecret): T { const value = SECRETS.get(secret); if (value === undefined) { + // The earlier message claimed "secret has been GC-evicted", but a + // `WeakMap` only evicts entries when the KEY object becomes + // unreachable — and if that happened, the caller couldn't hold a + // reference to pass in here. So the only path to `undefined` is + // an argument that was never registered (e.g. forged structural + // shape, mistakenly serialized + reparsed object that retained + // the public surface but lost the WeakMap binding). throw new Error( - 'revealSecret: argument is not a BrandedSecret (or secret has been GC-evicted)', + 'revealSecret: argument is not a BrandedSecret (was never registered, or its WeakMap binding was lost via serialization)', ); } return value as T; @@ -737,9 +770,15 @@ export class DeviceFlowRegistry { * * On a successful transition: * 1. clears any pending poll timer - * 2. wipes the secret material so a stale timer firing late cannot - * leak the device_code (defense in depth — the timer is also - * cleared, but registry state may outlive the timer reference) + * 2. wipes the secret material from `entry.deviceCode` / + * `entry.pkceVerifier`. The PRIMARY guard against secret leaks + * is the `entry.status !== 'pending'` check at the top of + * `runPollTick` — a stale timer that managed to fire post-clear + * bails out before touching the entry. Secret-clearing here is + * DEFENSE IN DEPTH: even if a future refactor weakens the + * status guard, the registry's in-memory state can no longer + * hand out the upstream `device_code` to a late-arriving + * logger / serializer. * 3. records `terminalAt` for the sweeper to evict after grace * 4. removes the per-provider singleton index so a new POST creates * a fresh flow instead of taking over the terminal one diff --git a/packages/cli/src/serve/auth/qwenDeviceFlowProvider.ts b/packages/cli/src/serve/auth/qwenDeviceFlowProvider.ts index 3f5887439..621c6a8d1 100644 --- a/packages/cli/src/serve/auth/qwenDeviceFlowProvider.ts +++ b/packages/cli/src/serve/auth/qwenDeviceFlowProvider.ts @@ -111,16 +111,19 @@ export class QwenOAuthDeviceFlowProvider implements DeviceFlowProvider { } let response: Awaited>; try { - // The class's `pollDeviceToken` doesn't accept a signal yet — see - // `qwenOAuth2.ts:333`. We honor the signal at the boundary - // (abort check before the call, abort check after) so that - // dispose / cancel during a slow IdP request still results in - // the registry suppressing the resolved frame. Threading signal - // INTO `pollDeviceToken`'s `fetch` is a Wave 5 follow-up. - response = await this.client.pollDeviceToken({ - device_code: revealSecret(state.deviceCode), - code_verifier: revealSecret(state.pkceVerifier), - }); + // Pass `signal` through to the IdP fetch so cancel / dispose + // during a slow upstream response aborts the in-flight socket + // immediately instead of waiting for the IdP's own timeout. + // The post-await abort check is still useful: an early cancel + // can land before fetch even starts, in which case the abort + // throws synchronously into our catch block below. + response = await this.client.pollDeviceToken( + { + device_code: revealSecret(state.deviceCode), + code_verifier: revealSecret(state.pkceVerifier), + }, + { signal: opts.signal }, + ); } catch (err: unknown) { // The class throws on non-OAuth error responses (network, malformed // upstream payloads) and on RFC 8628 terminal errors that aren't diff --git a/packages/core/src/qwen/qwenOAuth2.ts b/packages/core/src/qwen/qwenOAuth2.ts index bf8fe4bcd..c7589b083 100644 --- a/packages/core/src/qwen/qwenOAuth2.ts +++ b/packages/core/src/qwen/qwenOAuth2.ts @@ -242,10 +242,13 @@ export interface IQwenOAuth2Client { code_challenge: string; code_challenge_method: string; }): Promise; - pollDeviceToken(options: { - device_code: string; - code_verifier: string; - }): Promise; + pollDeviceToken( + options: { + device_code: string; + code_verifier: string; + }, + fetchOpts?: { signal?: AbortSignal }, + ): Promise; refreshAccessToken(): Promise; } @@ -330,10 +333,13 @@ export class QwenOAuth2Client implements IQwenOAuth2Client { return result; } - async pollDeviceToken(options: { - device_code: string; - code_verifier: string; - }): Promise { + async pollDeviceToken( + options: { + device_code: string; + code_verifier: string; + }, + fetchOpts?: { signal?: AbortSignal }, + ): Promise { const bodyData = { grant_type: QWEN_OAUTH_GRANT_TYPE, client_id: QWEN_OAUTH_CLIENT_ID, @@ -348,6 +354,11 @@ export class QwenOAuth2Client implements IQwenOAuth2Client { Accept: 'application/json', }, body: objectToUrlEncoded(bodyData), + // PR #4255 — daemon device-flow registry passes its per-entry + // `cancelController.signal` so cancel() / dispose() during a + // slow IdP response actually aborts the in-flight socket + // instead of waiting for the upstream timeout. + ...(fetchOpts?.signal ? { signal: fetchOpts.signal } : {}), }); if (!response.ok) { @@ -989,18 +1000,50 @@ export async function cacheQwenCredentials(credentials: QwenCredentials) { await fs.writeFile(filePath, credString, { mode: QWEN_CREDENTIAL_FILE_MODE, }); + // `fs.writeFile`'s `mode` only applies when the file is CREATED; an + // existing `oauth_creds.json` written under a broader umask (or by + // earlier code that didn't pass `mode`) keeps its old permissions. + // Explicit `chmod` after every write tightens it on existing files + // so upgrades actually fix the insecure-permissions case. + try { + await fs.chmod(filePath, QWEN_CREDENTIAL_FILE_MODE); + } catch (chmodErr) { + // chmod is a no-op on some Windows file systems and `EPERM`s + // otherwise. Surface as a warning rather than silently dropping + // the invariant — operators on hardened/exotic FSes need a + // breadcrumb if `oauth_creds.json` retains a wider mode. + debugLogger.warn( + `cacheQwenCredentials: chmod 0o${QWEN_CREDENTIAL_FILE_MODE.toString( + 8, + )} on ${filePath} failed: ${ + chmodErr instanceof Error ? chmodErr.message : String(chmodErr) + }`, + ); + } // SharedTokenManager throttles file checks and serves an in-memory cache; // without an explicit invalidation a follow-up `getValidCredentials` in // the same process can stay on the previous (often empty) cache and // re-trigger device auth despite the just-written file. The original - // device-flow site at L820+L829 paired write+clear; folding the clear - // here keeps every caller (PR 21 daemon device-flow registry included) + // device-flow site (L820+L829) paired write+clear; folding the clear + // here keeps every caller (#4255 daemon device-flow registry included) // correct without re-pairing the call. try { SharedTokenManager.getInstance().clearCache(); - } catch { - // Best-effort: unit tests sometimes stub SharedTokenManager with a - // minimal shape; cache invalidation is non-critical to the write. + } catch (clearErr) { + // In production, a failed cache clear means subsequent + // `getValidCredentials` reads in the same process may serve + // stale (pre-write) credentials until the SharedTokenManager + // mtime watcher catches up. That's a recoverable degradation + // (worst case: device auth re-prompts), but the silent swallow + // it used to be made the symptom invisible. Warn so logs show + // it. Unit tests stubbing `SharedTokenManager.getInstance()` + // with a minimal shape will also flow through here — acceptable + // noise for the production-visibility win. + debugLogger.warn( + `cacheQwenCredentials: SharedTokenManager.clearCache failed; in-process callers may serve stale credentials until the next mtime poll: ${ + clearErr instanceof Error ? clearErr.message : String(clearErr) + }`, + ); } } catch (error: unknown) { // Handle file system errors (e.g., EACCES permission denied) diff --git a/packages/sdk-typescript/src/daemon/DaemonAuthFlow.ts b/packages/sdk-typescript/src/daemon/DaemonAuthFlow.ts index b843e6c6c..5801bd225 100644 --- a/packages/sdk-typescript/src/daemon/DaemonAuthFlow.ts +++ b/packages/sdk-typescript/src/daemon/DaemonAuthFlow.ts @@ -7,6 +7,18 @@ import { DaemonHttpError, type DaemonClient } from './DaemonClient.js'; import type { DaemonAuthProviderId, DaemonDeviceFlowState } from './types.js'; +/** + * Grace period added past the daemon-stated `expiresAt` before + * `awaitCompletion` gives up. Covers (a) clock skew between SDK and + * daemon, (b) the daemon's own ~30s sweeper interval (so we don't + * bail one tick before the daemon would surface a synthetic `expired` + * terminal), and (c) per-poll network latency. Matches the registry's + * `DEVICE_FLOW_SWEEP_INTERVAL_MS` so an `awaitCompletion` caller + * observes the daemon's authoritative final state rather than timing + * out client-side ahead of the sweeper. + */ +export const DEVICE_FLOW_EXPIRY_GRACE_MS = 30_000; + /** * High-level convenience wrapper around the four `client.*DeviceFlow*` HTTP * helpers. SDK users should normally write: @@ -55,8 +67,13 @@ export interface AwaitCompletionOptions { * daemon-supplied `intervalMs` from `start(...)` and respects bumps * from `slow_down`. */ pollOverrideMs?: number; - /** Hard ceiling on `awaitCompletion`'s wall-clock duration. Defaults to - * the daemon's `expiresAt - Date.now()`. */ + /** Hard ceiling on `awaitCompletion`'s wall-clock duration, in ms. + * When omitted, `awaitCompletion` runs until the daemon-stated + * `expiresAt` plus `DEVICE_FLOW_EXPIRY_GRACE_MS` (default 30s), + * which lets the daemon's own sweeper surface the authoritative + * terminal state instead of timing out client-side. Set explicitly + * to clamp the wait shorter; values past `expiresAt` will still see + * the daemon return `expired` once its sweeper fires. */ timeoutMs?: number; } @@ -119,12 +136,13 @@ async function awaitCompletion( clientId: string | undefined, opts: AwaitCompletionOptions, ): Promise { - // The SSE stream is workspace-scoped today and only flows through - // session subscriptions; without a session id we have no stream to - // attach to. PR 21 §3 ships device-flow events on the session bus - // fan-out for now (PR 16's `bridge.publishWorkspaceEvent` lands the - // shared workspace topic). Until then, GET polling is the universal - // path. + // Workspace-scoped events fan out through whatever session buses + // happen to be live, but `awaitCompletion` is workspace-level (no + // session id) — so attaching to a single SSE stream isn't a stable + // contract here. GET polling against the daemon's authoritative + // device-flow state is the universal path; `auth_device_flow_*` + // events remain a real-time hint for clients that ARE already + // subscribed to a session stream. return await pollUntilTerminal(client, start, clientId, opts); } @@ -141,7 +159,7 @@ async function pollUntilTerminal( const signal = opts.signal; const ceiling = opts.timeoutMs ? Date.now() + opts.timeoutMs - : start.expiresAt + 30_000; + : start.expiresAt + DEVICE_FLOW_EXPIRY_GRACE_MS; let interval = Math.max( 1_000, opts.pollOverrideMs ?? start.intervalMs ?? 5_000, diff --git a/packages/sdk-typescript/src/daemon/events.ts b/packages/sdk-typescript/src/daemon/events.ts index 18961cbdc..abeb674eb 100644 --- a/packages/sdk-typescript/src/daemon/events.ts +++ b/packages/sdk-typescript/src/daemon/events.ts @@ -617,9 +617,11 @@ export interface DaemonDeviceFlowReducerState { hint?: string; /** Most recent `intervalMs` reported by `auth_device_flow_throttled`. */ intervalMs?: number; - /** Daemon-clock epoch ms, set on `started` and refreshed when the same - * flow's later events arrive. */ - lastSeenAt: number; + /** Most recent SSE event id observed for this flow (NOT a wall-clock + * timestamp). Used as a monotonic counter so out-of-order delivery + * doesn't let a stale frame overwrite a newer one. `0` if the + * underlying envelope omitted `id` (synthetic / terminal frames). */ + lastSeenEventId: number; /** Set on `authorized` to the credential's expiry, when known. */ authorizedExpiresAt?: number; /** Best-effort non-PII account label echoed from `authorized`. */ @@ -668,7 +670,8 @@ export function reduceDaemonAuthEvent( [providerId]: { deviceFlowId: event.data.deviceFlowId, status: 'pending', - lastSeenAt: rawEvent.id ?? state.flows[providerId]?.lastSeenAt ?? 0, + lastSeenEventId: + rawEvent.id ?? state.flows[providerId]?.lastSeenEventId ?? 0, }, }, }; @@ -680,7 +683,7 @@ export function reduceDaemonAuthEvent( (flow) => ({ ...flow, intervalMs: event.data.intervalMs, - lastSeenAt: rawEvent.id ?? flow.lastSeenAt, + lastSeenEventId: rawEvent.id ?? flow.lastSeenEventId, }), ); return updated ?? state; @@ -697,7 +700,7 @@ export function reduceDaemonAuthEvent( authorizedExpiresAt: event.data.expiresAt, accountAlias: event.data.accountAlias, errorKind: undefined, - lastSeenAt: rawEvent.id ?? existing.lastSeenAt, + lastSeenEventId: rawEvent.id ?? existing.lastSeenEventId, }; return { flows: { ...state.flows, [providerId]: next } }; } @@ -718,7 +721,7 @@ export function reduceDaemonAuthEvent( status: 'error', errorKind: event.data.errorKind, hint: event.data.hint, - lastSeenAt: rawEvent.id ?? flow.lastSeenAt, + lastSeenEventId: rawEvent.id ?? flow.lastSeenEventId, }), ); return updated ?? state; @@ -730,7 +733,7 @@ export function reduceDaemonAuthEvent( (flow) => ({ ...flow, status: 'cancelled', - lastSeenAt: rawEvent.id ?? flow.lastSeenAt, + lastSeenEventId: rawEvent.id ?? flow.lastSeenEventId, }), ); return updated ?? state;