fixup(serve): address PR #4255 wenshao round-5 review feedback

Five small adopt items from the round-5 review pass; one stale thread
already addressed in b5b77ee90 (fold-in 5).

#2 — `as const` + derived type for DEVICE_FLOW_SUPPORTED_PROVIDERS so
adding/removing a provider id requires touching exactly ONE site.
Mirrors `SERVE_ERROR_KINDS` / `ServeErrorKind` in `status.ts`.

#3 — Clarify `DEVICE_FLOW_EXPIRY_GRACE_MS` JSDoc to distinguish the
daemon's 30s SWEEP cadence (what the grace tracks) from the 5-min
TERMINAL_GRACE_MS reconnect window (which awaitCompletion does NOT
need to wait through).

#4 — Add `@remarks` block on `DeviceFlowProvider.poll()` warning
future provider authors that thrown `err.message` flows verbatim
into the SSE-broadcast `auth_device_flow_failed` hint, and must be
sanitized. Two equally-correct paths documented (typed `error`
result vs sanitized thrown message).

#5 — Truncate raw IdP detail in `qwenDeviceFlowProvider.ts` stderr
audit lines to 2 KiB. WAFs / reverse proxies can return MB-sized
HTML error pages, and container log aggregators (Loki, Fluent Bit,
Stackdriver) typically truncate or drop lines past 4-32 KiB —
losing the useful prefix downstream. 2 KiB retains structured JSON
envelopes while staying well below every aggregator's per-line cap.

#6 — Track latest `originatorClientId` on per-provider singleton
take-over via new `entry.lastOriginatorClientId` field +
`recordTakeover()` helper. When a second SDK client posts
`POST /workspace/auth/device-flow` for an already-pending provider
(or one being created in `inFlightStarts`) with a different
`initiatorClientId`, an audit breadcrumb records the take-over so
incident response can correlate "client A started, client B took
over at 12:34". Event-routing intentionally still uses the original
`initiatorClientId` (events are workspace-broadcast and changing
the originator field mid-flow would break SDK reducers that key on
it). Two new tests cover the differing-id audit + same-id no-op.

🤖 Generated with [Qwen Code](https://github.com/QwenLM/qwen-code)
This commit is contained in:
doudouOUC 2026-05-18 14:03:42 +08:00
parent b5b77ee90f
commit 95a2944cf0
4 changed files with 186 additions and 14 deletions

View file

@ -259,12 +259,14 @@ describe('DeviceFlowRegistry — start / public view', () => {
let provider: FakeProvider;
let registry: DeviceFlowRegistry;
let events: ReturnType<typeof buildRegistry>['events'];
let auditLines: ReturnType<typeof buildRegistry>['auditLines'];
beforeEach(() => {
provider = new FakeProvider();
const built = buildRegistry(provider);
registry = built.registry;
events = built.events;
auditLines = built.auditLines;
});
afterEach(() => {
@ -301,6 +303,46 @@ describe('DeviceFlowRegistry — start / public view', () => {
expect(provider.startCount).toBe(1);
});
it('take-over by a different clientId emits a take-over audit (fold-in 6 #6)', async () => {
await registry.start({
providerId: 'qwen-oauth',
initiatorClientId: 'sdk-client-A',
});
auditLines.length = 0;
await registry.start({
providerId: 'qwen-oauth',
initiatorClientId: 'sdk-client-B',
});
const takeoverAudit = auditLines.find(
(line) =>
line['status'] === 'started' &&
line['clientId'] === 'sdk-client-B' &&
typeof line['hint'] === 'string' &&
(line['hint'] as string).startsWith('take-over'),
);
expect(takeoverAudit).toBeDefined();
expect(takeoverAudit?.['hint']).toContain('sdk-client-A');
});
it('take-over by the SAME clientId does not emit a take-over audit', async () => {
await registry.start({
providerId: 'qwen-oauth',
initiatorClientId: 'sdk-client-A',
});
auditLines.length = 0;
await registry.start({
providerId: 'qwen-oauth',
initiatorClientId: 'sdk-client-A',
});
expect(
auditLines.some(
(line) =>
typeof line['hint'] === 'string' &&
(line['hint'] as string).startsWith('take-over'),
),
).toBe(false);
});
it('concurrent start() for the same providerId coalesces — provider.start fires once', async () => {
// Without the in-flight Promise map, both concurrent callers would
// pass the "no existing pending entry" check, both would call

View file

@ -51,9 +51,15 @@ export const DEVICE_FLOW_PERSIST_TIMEOUT_MS = 30_000;
*/
export const DEVICE_FLOW_START_TIMEOUT_MS = 30_000;
export type DeviceFlowProviderId = 'qwen-oauth';
export const DEVICE_FLOW_SUPPORTED_PROVIDERS: readonly DeviceFlowProviderId[] =
['qwen-oauth'];
// PR #4255 fold-in 6 review thread #2: derive the type from the
// supported-providers tuple so adding/removing a provider id
// requires touching exactly ONE site. The earlier shape (standalone
// union + `readonly DeviceFlowProviderId[]` annotation) let the
// type and the array drift apart silently. Mirrors the codebase's
// `SERVE_ERROR_KINDS` / `ServeErrorKind` pattern in `status.ts`.
export const DEVICE_FLOW_SUPPORTED_PROVIDERS = ['qwen-oauth'] as const;
export type DeviceFlowProviderId =
(typeof DEVICE_FLOW_SUPPORTED_PROVIDERS)[number];
export type DeviceFlowStatus =
| 'pending'
@ -228,6 +234,11 @@ export type DeviceFlowPollResult =
export interface DeviceFlowProvider {
readonly providerId: DeviceFlowProviderId;
/**
* Begin a device-authorization grant against the IdP. Same SSE-leak
* sanitization rule as `poll()` applies to thrown error messages
* see `poll()` `@remarks` below.
*/
start(opts: { signal: AbortSignal }): Promise<DeviceFlowStartResult>;
/**
* Poll the upstream IdP for the user's authorization decision. The
@ -236,6 +247,35 @@ export interface DeviceFlowProvider {
* quota after it's logically given up. Providers that pass `signal`
* to their `fetch` get cleanest tear-down; those that ignore it
* still see the post-`await` guard suppress the resolved frame.
*
* @remarks
* **Provider-author contract sanitize before throwing.** The
* registry's `runPollTick` catch block forwards `err.message`
* verbatim into the `auth_device_flow_failed` event's `hint`
* field, which is workspace-broadcast over SSE to every subscriber
* (and durably stored in the registry's terminal entry). A naive
* provider that re-throws a `fetch` failure or upstream payload
* untouched will leak: (a) full IdP response bodies (HTML error
* pages from a reverse proxy / WAF can run into hundreds of
* kilobytes), (b) infrastructure detail (internal hostnames, proxy
* banners), (c) ANY embedded secret material the upstream
* accidentally echoed.
*
* Two equally-correct paths for new providers:
* 1. **Resolve to a typed `error` result** return
* `{ kind: 'error', errorKind, hint }` with a *bounded
* static-or-pattern hint*. This is the preferred path; it
* keeps full structured-error fidelity and drops nothing.
* 2. **Throw, but only with a sanitized `Error.message`** if
* the implementation finds it more natural to throw,
* construct the thrown `Error` with a *short bounded sentence
* that contains no IdP body / banner / secret*. Send the raw
* detail through `writeStderrLine` for operator audit; the
* thrown `message` is the SSE-visible surface.
*
* `qwenDeviceFlowProvider` is the canonical example see PR #4255
* review S2 + fold-in 3 #9 + fold-in 5 #4 for the historical
* regressions this contract prevents.
*/
poll(
state: {
@ -333,6 +373,20 @@ interface DeviceFlowEntry {
errorKind?: DeviceFlowErrorKind;
hint?: string;
initiatorClientId?: string;
/**
* Most-recent client id observed on a take-over POST (per-provider
* singleton). Initially `undefined`; populated only when a second
* caller's `initiatorClientId` differs from `entry.initiatorClientId`.
* Surfaced through the audit trail so incident response can see
* "client A started this flow, client B took it over at 12:34"
* useful when two SDK processes race on the same Qwen account
* across hosts. Event-routing still uses the original
* `initiatorClientId` (events are workspace-broadcast; the
* originator field is metadata, and changing it mid-flow would
* break SDK reducers that key on it). PR #4255 fold-in 6 review
* thread #6.
*/
lastOriginatorClientId?: string;
lastPolledAt?: number;
createdAt: number;
terminalAt?: number;
@ -556,6 +610,7 @@ export class DeviceFlowRegistry {
// Fast-path: an existing pending entry → idempotent take-over.
const existing = this.byProvider.get(params.providerId);
if (existing && existing.status === 'pending') {
this.recordTakeover(existing, params.initiatorClientId);
return { view: toPublicView(existing), attached: true };
}
// Coalesce concurrent fresh starts for the same providerId.
@ -564,7 +619,13 @@ export class DeviceFlowRegistry {
const result = await inFlight;
// The first start created an entry; this caller is a take-over of
// the just-created flow (NOT a fresh IdP request). Recompute the
// shape so the second caller's `attached: true` is honest.
// shape so the second caller's `attached: true` is honest. PR
// #4255 fold-in 6 #6: also stamp the second caller's id on the
// entry's `lastOriginatorClientId` so audit shows the take-over.
const justCreated = this.byProvider.get(params.providerId);
if (justCreated) {
this.recordTakeover(justCreated, params.initiatorClientId);
}
return { view: result.view, attached: true };
}
if (this.countActive() >= DEVICE_FLOW_MAX_CONCURRENT) {
@ -1005,6 +1066,35 @@ export class DeviceFlowRegistry {
}
}
/**
* Record a take-over: a second SDK client posted
* `POST /workspace/auth/device-flow` for a provider that already has
* a pending entry (or one being created in `inFlightStarts`). When
* the second caller's `initiatorClientId` differs from the entry's,
* stamp it on `entry.lastOriginatorClientId` and emit an audit
* breadcrumb. No event publish the per-provider singleton's
* `started` event was already broadcast workspace-wide, and emitting
* a second `started` would confuse SDK reducers (the `attached:
* true` HTTP response is the second caller's signal). PR #4255
* fold-in 6 review thread #6.
*/
private recordTakeover(
entry: DeviceFlowEntry,
takeoverClientId: string | undefined,
): void {
if (!takeoverClientId) return;
if (takeoverClientId === entry.initiatorClientId) return;
if (takeoverClientId === entry.lastOriginatorClientId) return;
entry.lastOriginatorClientId = takeoverClientId;
this.deps.audit?.record({
deviceFlowId: entry.deviceFlowId,
providerId: entry.providerId,
clientId: takeoverClientId,
status: 'started',
hint: `take-over (per-provider singleton; original initiator=${entry.initiatorClientId ?? '(none)'})`,
});
}
/**
* Move a pending entry to terminal state. Returns **`true` exactly once**
* the call site that successfully drove the transition. Subsequent

View file

@ -31,6 +31,27 @@ import {
const QWEN_OAUTH_SCOPE = 'openid profile email model.completion';
/**
* Maximum length of raw IdP detail written to stderr for operator
* audit. PR #4255 fold-in 6 review thread #5: the raw `err.message`
* from `QwenOAuth2Client` embeds the full upstream response body,
* which on a misbehaving reverse proxy / WAF can be megabytes of
* HTML and container log-aggregation pipelines (Loki, Fluent Bit,
* Stackdriver) typically truncate or reject lines past 432 KiB,
* meaning the *useful* prefix is lost downstream. Truncate here so
* the kept prefix is the part with the actual IdP error code /
* description, with a `[+N more]` tail so the reader knows how much
* was dropped. 2 KiB is comfortably below every aggregator's per-line
* cap and large enough to retain a structured JSON error envelope.
*/
const STDERR_DETAIL_MAX = 2_048;
function truncateForStderr(detail: string): string {
if (detail.length <= STDERR_DETAIL_MAX) return detail;
const dropped = detail.length - STDERR_DETAIL_MAX;
return `${detail.slice(0, STDERR_DETAIL_MAX)}…[+${dropped} bytes truncated]`;
}
/**
* Qwen-OAuth implementation of `DeviceFlowProvider` for `qwen serve`.
*
@ -79,7 +100,9 @@ export class QwenOAuthDeviceFlowProvider implements DeviceFlowProvider {
// standard error path (qwenOAuth2.ts logs via `debugLogger`
// when needed).
const detail = err instanceof Error ? err.message : String(err);
writeStderrLine(`[serve] qwen device-flow start failed (raw): ${detail}`);
writeStderrLine(
`[serve] qwen device-flow start failed (raw): ${truncateForStderr(detail)}`,
);
throw new UpstreamDeviceFlowError(
'Qwen IdP device authorization request failed',
);
@ -94,9 +117,11 @@ export class QwenOAuthDeviceFlowProvider implements DeviceFlowProvider {
// SDK-visible 502 hint. Static message; raw envelope to stderr.
const errorData = auth as { error?: string; error_description?: string };
writeStderrLine(
`[serve] qwen device-flow start error envelope (raw): error=${
errorData?.error ?? 'unknown'
} description=${errorData?.error_description ?? '(none)'}`,
truncateForStderr(
`[serve] qwen device-flow start error envelope (raw): error=${
errorData?.error ?? 'unknown'
} description=${errorData?.error_description ?? '(none)'}`,
),
);
throw new UpstreamDeviceFlowError(
'Qwen IdP rejected the device authorization request',

View file

@ -10,12 +10,27 @@ import type { DaemonAuthProviderId, DaemonDeviceFlowState } from './types.js';
/**
* Grace period added past the daemon-stated `expiresAt` before
* `awaitCompletion` gives up. Covers (a) clock skew between SDK and
* daemon, (b) the daemon's own ~30s sweeper interval (so we don't
* bail one tick before the daemon would surface a synthetic `expired`
* terminal), and (c) per-poll network latency. Matches the registry's
* `DEVICE_FLOW_SWEEP_INTERVAL_MS` so an `awaitCompletion` caller
* observes the daemon's authoritative final state rather than timing
* out client-side ahead of the sweeper.
* daemon, (b) the daemon's own sweep interval (so we don't bail one
* tick before the daemon would surface a synthetic `expired`
* terminal), and (c) per-poll network latency.
*
* **Why 30 s, and which daemon constant it relates to.** The relevant
* daemon-side constant is `DEVICE_FLOW_SWEEP_INTERVAL_MS` (the
* interval at which the registry's sweeper RUNS currently 30 s),
* NOT `DEVICE_FLOW_TERMINAL_GRACE_MS` (the 5-minute window during
* which terminal entries remain GET-able before eviction). One sweep
* cycle past `expiresAt` is enough to flip the entry to a synthetic
* `expired`/`expired_token` terminal state; once that happens the
* SDK's GET poll will return it immediately. Waiting any longer
* client-side just delays the inevitable. PR #4255 fold-in 6 review
* thread #3.
*
* **Not** to be confused with `TERMINAL_GRACE_MS` terminal entries
* remain queryable for 5 minutes after they go terminal, but that's
* a reconnect-affordance for SDK clients that want to *re-read* a
* settled state, not a window `awaitCompletion` needs to wait
* through. Keep this aligned with `SWEEP_INTERVAL_MS`; if the daemon
* ever raises its sweep cadence, raise this in lockstep.
*/
export const DEVICE_FLOW_EXPIRY_GRACE_MS = 30_000;