fixup(serve): address PR 14 review round 8 (#4247 wenshao seventh pass)

Addresses @wenshao's seventh review pass on PR #4247 (gpt-5.5 + DeepSeek/deepseek-v4-pro via Qwen Code /review). One critical transport leak + three soundness/consistency fixes; one optional clarity refactor explicitly deferred.

Critical:
- R8 #1 line 532 (4 duplicate threads): bulk-path transport leak. Mirrors the R7 #3 fix but in `discoverAllMcpTools` instead of the per-server path. Pre-fix: when `connect()` succeeded (transport established) and `discover()` later threw, the bulk catch deleted the client reference without calling `client.disconnect()`, leaking the stdio child / WebSocket / HTTP socket for the rest of the daemon's lifetime (`stop()` can't see what we just removed from `this.clients`). Best-effort `await client.disconnect()` added before `clients.delete` + `reservedSlots.delete`. Updated the doc comment that misleadingly claimed `stop()` was the lifecycle owner — true only for slot bookkeeping, not transports.

Soundness:
- R8 #2 line 431: tighten `readBudgetFromEnv` mode-without-budget downgrade. Originally only `enforce` got downgraded to `off` when no budget was set; `warn` mode without a budget threshold reached `emitBudgetTelemetry` with `clientBudget: undefined`, contradicting the JSDoc invariant `mode !== 'off' ⇒ clientBudget defined`. Now both `enforce` AND `warn` downgrade to `off` when no budget is configured. The invariant comment was also weakened to match the actual `?? 0` defense-in-depth (the new R8 #5 constructor downgrade closes the remaining edge case).

- R8 #5 line 302: constructor mirrors the `readBudgetFromEnv` downgrade for the direct `budgetConfig` parameter. All production callers (CLI, `runQwenServe`, env-var fallback) validate upfront, but a future code path that injects `budgetConfig` directly without re-validating would re-introduce the silent fail-open. Defense in depth.

- R8 #4 line 1221: distinguish fresh vs `'already_held'` reservations in `runWithDiscoveryTimeout`'s timeout handler. New private `freshReservations: Set<string>` field marked when `weReservedSlot === true` inside `discoverMcpToolsForServerInternal` and cleared in finally / catch / success. Timeout handler now releases the slot ONLY when `freshReservations.has(serverName)` — meaning the slot was freshly reserved by THIS in-flight call. `'already_held'` reconnect timeouts (a previously-healthy server's transient hiccup) keep the slot so health-monitor retry doesn't have to compete for capacity with new servers admitted during the timeout window. Aligns the timeout handler with the connect-failure catch's `weReservedSlot` semantics — closes the asymmetry wenshao R8 #4 caught.

Deferred:
- R8 #3 line 332 (`tryReserveSlot` `'observed'` return value clarity): optional, non-blocking style improvement that ripples through 3 call sites + many tests for zero behavior change. Worth doing in a focused refactor PR; flagged as deferred polish, not in this fixup.

+ 3 new core regression tests (bulk discover-throw disconnects, warn-no-budget downgrade, constructor enforce downgrade). 679/679 focused tests pass; typecheck + lint clean.
This commit is contained in:
doudouOUC 2026-05-18 10:34:20 +08:00
parent f80629ec88
commit 7228813c58
2 changed files with 196 additions and 37 deletions

View file

@ -1609,6 +1609,18 @@ describe('McpClientManager — PR 14 guardrails', () => {
).toBe(false);
});
// R8 #4 (line 1221): the `freshReservations` Set distinguishes
// fresh-reservation timeouts (release) from `'already_held'`
// reconnect timeouts (keep slot). Verified by code inspection +
// the R5 release-on-fresh test below; a dedicated already_held
// timeout test requires either driving the health-monitor flow
// end-to-end (which needs autoReconnect timer interleaving with
// fake timers — interferes with the sibling R5 test in the same
// file) or piercing the private `runWithDiscoveryTimeout`
// helper. The invariant is small enough that the fresh-release
// test below is sufficient regression coverage; an integration
// test in a separate file can add the already_held variant
// without the timer interleave problem.
it('runWithDiscoveryTimeout timeout handler releases the budget slot (wenshao R5 line 956)', async () => {
vi.useFakeTimers();
// McpClient.connect never resolves → timeout fires.
@ -1849,6 +1861,70 @@ describe('McpClientManager — PR 14 guardrails', () => {
);
});
it('discoverAllMcpTools disconnects on discover() failure (wenshao R8 #1 line 532)', async () => {
// Bulk-path mirror of R7 #3 (per-server path). Pre-fix:
// connect() success + discover() throw → catch deleted client
// without disconnect() → stdio child / WebSocket / HTTP socket
// leaked for the rest of the daemon's lifetime (stop() can't
// see the entry it just removed from this.clients).
let disconnectCalls = 0;
vi.mocked(McpClient).mockImplementation(
() =>
({
connect: vi.fn().mockResolvedValue(undefined),
discover: vi.fn().mockRejectedValue(new Error('discover failed')),
disconnect: vi.fn().mockImplementation(() => {
disconnectCalls += 1;
return Promise.resolve();
}),
getStatus: vi.fn(),
}) as unknown as McpClient,
);
const config = configWithServers({ a: { command: 'node' } });
const manager = new McpClientManager(
config,
{} as ToolRegistry,
undefined,
undefined,
undefined,
{ clientBudget: 1, budgetMode: 'enforce' },
);
await manager.discoverAllMcpTools(config);
// Transport closed before client reference dropped + slot released.
expect(disconnectCalls).toBeGreaterThanOrEqual(1);
expect(manager.getMcpClientAccounting().reservedSlots).toEqual([]);
});
it('readBudgetFromEnv downgrades warn-without-budget to off (wenshao R8 #2)', async () => {
process.env['QWEN_SERVE_MCP_BUDGET_MODE'] = 'warn';
// No budget — pre-fix this passed through with mode='warn',
// reaching emitBudgetTelemetry with clientBudget=undefined.
const config = configWithServers({});
const manager = new McpClientManager(config, {} as ToolRegistry);
expect(manager.getMcpClientBudget()).toBeUndefined();
expect(manager.getMcpBudgetMode()).toBe('off');
});
it('constructor downgrades enforce-without-budget when budgetConfig passed directly (wenshao R8 #5)', async () => {
// Direct-budgetConfig path is test-/embedded-only — production
// callers (CLI, runQwenServe, env-var fallback) all validate
// upfront. Defense-in-depth: constructor mirrors the env-var
// path's downgrade so a future caller that bypasses validation
// can't silently fail-open.
const config = configWithServers({});
const manager = new McpClientManager(
config,
{} as ToolRegistry,
undefined,
undefined,
undefined,
// Invalid combination: enforce mode without a budget.
{ budgetMode: 'enforce' },
);
// Downgraded to off so tryReserveSlot doesn't masquerade as enforce.
expect(manager.getMcpBudgetMode()).toBe('off');
});
it('discoverMcpToolsForServer reconnect-attempt connect-failure KEEPS slot (wenshao R4 C2 already_held)', async () => {
// Distinguish from the previous test: same call signature, but
// here the slot is already-held (from a prior successful connect

View file

@ -229,15 +229,19 @@ function readBudgetFromEnv(): McpBudgetConfig {
}
budgetMode = clientBudget === undefined ? 'off' : 'warn';
}
// PR 14 fix (review #4247 wenshao S4): enforce-without-budget would
// silently fail-open — `tryReserveSlot` returns 'reserved' when
// `clientBudget === undefined`, so a daemon spawned with the env-var
// pair `MODE=enforce` + `BUDGET=<unset>` would advertise enforcement
// but allow unlimited servers. The CLI handler + `runQwenServe`
// both reject this combination; the env-var fallback path
// (e.g. child manually launched with stale env) now mirrors that
// invariant: downgrade to `off` rather than masquerade as enforce.
if (budgetMode === 'enforce' && clientBudget === undefined) {
// PR 14 fix (review #4247 wenshao S4 + R8 #2): mode-without-budget
// downgrade. Originally only `enforce` got downgraded — but `warn`
// mode without a budget threshold is equally meaningless: nothing
// actionable can ever fire (no `liveCount >= 0.75 * budget`
// comparison can be true when budget is undefined). Downgrading
// BOTH to `off` removes the comment-vs-code mismatch in
// `emitBudgetTelemetry` (which previously claimed
// `mode !== 'off' ⇒ clientBudget defined` — true for enforce,
// false for warn until this fix).
if (
(budgetMode === 'enforce' || budgetMode === 'warn') &&
clientBudget === undefined
) {
budgetMode = 'off';
}
return { clientBudget, budgetMode };
@ -272,6 +276,22 @@ export class McpClientManager {
private readonly reservedSlots = new Set<string>();
private readonly clientBudget?: number;
private readonly budgetMode: McpBudgetMode;
/**
* PR 14 fix (review #4247 wenshao R8 #4 line 1221): names whose
* slot was freshly reserved (not `'already_held'`) by an
* in-flight `discoverMcpToolsForServerInternal` call. Read by
* `runWithDiscoveryTimeout`'s timeout handler to decide whether
* to release the slot on hard timeout fresh reservations
* release (server never connected, slot shouldn't permanently
* block other servers); `'already_held'` reconnects keep their
* slot (operator's previously-healthy server shouldn't be
* permanently demoted by a transient timeout).
*
* Lifetime: `add` after `tryReserveSlot` returns `'reserved'`
* with the `.has` guard, `delete` in success / catch / finally
* cleanup. Idempotent multiple deletes are no-ops.
*/
private readonly freshReservations = new Set<string>();
/**
* Servers refused during the most recent `discoverAllMcpTools*` pass.
* Reset at the start of each pass; survives between passes so a
@ -300,8 +320,22 @@ export class McpClientManager {
// when spawning the ACP child. Standalone `qwen` invocations
// leave both unset and get `mode: 'off'` — the pre-PR-14 default.
const resolved = budgetConfig ?? readBudgetFromEnv();
let resolvedMode = resolved.budgetMode;
// PR 14 fix (review #4247 wenshao R8 #5): mirror
// `readBudgetFromEnv`'s `(enforce|warn)`-without-budget
// downgrade for the direct-`budgetConfig` path too. All
// production callers (CLI handler, `runQwenServe`, env-var
// fallback) validate upfront, but a future code path that
// injects `budgetConfig` without running the validation
// would re-introduce the silent fail-open. Defense in depth.
if (
(resolvedMode === 'enforce' || resolvedMode === 'warn') &&
resolved.clientBudget === undefined
) {
resolvedMode = 'off';
}
this.clientBudget = resolved.clientBudget;
this.budgetMode = resolved.budgetMode;
this.budgetMode = resolvedMode;
}
/**
@ -428,10 +462,12 @@ export class McpClientManager {
* `off` mode is a no-op operators who never set a budget don't
* pollute the startup-event sink.
*
* Invariant: `mode !== 'off'` `clientBudget` was resolved (both
* `readBudgetFromEnv` and the CLI handler downgrade enforce-without-
* budget to `off`). `clientBudget ?? 0` is defense-in-depth for
* future code paths that might set `mode='warn'` programmatically.
* Invariant (post R8 #2): `mode !== 'off'` `clientBudget` was
* resolved. Both `readBudgetFromEnv` AND the constructor downgrade
* `enforce`/`warn`-without-budget to `off` so neither call site can
* leave a budgetless mode reaching this telemetry path.
* `clientBudget ?? 0` is kept as belt-and-suspenders against future
* call sites that might bypass both validations.
*/
private emitBudgetTelemetry(configuredCount: number): void {
if (this.budgetMode === 'off') return;
@ -516,18 +552,38 @@ export class McpClientManager {
// full discovery restart. Release both so the budget cap
// reflects actual usable capacity.
//
// Note (wenshao R2 P3 follow-up): release-here is correct
// for the LEGACY BULK PATH because `discoverAllMcpTools`
// begins with `await this.stop()` (line ~320) which clears
// all slots — `stop()` owns the lifecycle, this catch is
// defense-in-depth for any caller that bypasses stop. The
// LONG-LIVED per-server reconnect path in
// `discoverMcpToolsForServerInternal` intentionally does
// NOT release on connect failure: the client stays in
// `this.clients` with `DISCONNECTED` status so the health
// monitor + next incremental pass can retry without
// needing to re-reserve. Different lifecycle, different
// contract.
// Slot bookkeeping in this bulk path is partially redundant
// with `await this.stop()` at the top of
// `discoverAllMcpTools` (line ~320) — the next bulk run
// wipes `reservedSlots` regardless. But the SAME catch
// ALSO needs to handle the transport (see below): the
// client object held by `clients.delete(name)` only had
// its tracking reference removed, not its underlying
// transport closed. Leaving the orphan transport alive
// would leak the stdio child / WebSocket / HTTP socket
// for the rest of the process — `stop()` can't clean it
// because we just removed it from the map.
//
// The per-server reconnect path
// (`discoverMcpToolsForServerInternal`) keeps the slot
// when `weReservedSlot === false` so health-monitor retry
// doesn't have to compete for capacity — different
// lifecycle, different contract. Bulk path always releases
// because every server is "fresh" here (preceded by
// stop()).
//
// PR 14 fix (review #4247 wenshao R8 #1 line 532): also
// call `await client.disconnect()` BEFORE dropping the
// reference. R7 #3 fixed the analogous leak in the
// per-server path; this is the bulk-path mirror. Errors
// intentionally swallowed (we're already in a discovery-
// failure catch; double-throwing would lose the original
// error context).
try {
await client.disconnect();
} catch {
// best-effort transport cleanup
}
this.reservedSlots.delete(name);
this.clients.delete(name);
this.eventEmitter?.emit('mcp-client-update', this.clients);
@ -640,6 +696,18 @@ export class McpClientManager {
// rediscovery").
const weReservedSlot =
reservation === 'reserved' && this.reservedSlots.has(serverName);
// PR 14 fix (review #4247 wenshao R8 #4): mark this name in
// `freshReservations` so the `runWithDiscoveryTimeout` timeout
// handler can distinguish fresh-reservation timeouts (release
// the slot — never connected, shouldn't block others) from
// `'already_held'` reconnect timeouts (keep the slot — operator's
// previously-healthy server shouldn't be demoted by a transient
// timeout). Cleared in success / catch / finally below so the
// marker only spans the current discoverMcpToolsForServerInternal
// invocation.
if (weReservedSlot) {
this.freshReservations.add(serverName);
}
this.stopHealthCheck(serverName);
@ -741,6 +809,11 @@ export class McpClientManager {
} finally {
this.startHealthCheck(serverName);
this.eventEmitter?.emit('mcp-client-update', this.clients);
// R8 #4: clear the fresh-reservation marker — this in-flight
// call has settled (success, catch, OR a timeout that already
// ran its handler). Idempotent on the timeout-already-deleted
// case.
this.freshReservations.delete(serverName);
}
}
@ -774,6 +847,7 @@ export class McpClientManager {
// its own body line 90, which awaits `this.stop()` first) starts
// from an empty reservation set.
this.reservedSlots.clear();
this.freshReservations.clear();
this.lastRefusedServerNames = [];
}
@ -1208,17 +1282,26 @@ export class McpClientManager {
// absent, so the trailing `finally`-block call becomes a no-op.
this.stopHealthCheck(serverName);
this.clients.delete(serverName);
// PR 14 fix (review #4247 wenshao R5 line 956): the timeout
// handler must release the budget slot too. Pre-fix only
// `clients.delete` ran here, so a timed-out server in
// `enforce` mode permanently held its reservation — N
// consecutive timeouts permanently degraded daemon capacity.
// The slot is released because the operator's "this server
// should run" intent is invalidated by a hard timeout
// (`runWithDiscoveryTimeout` rejection IS the operator-
// visible "server unreachable" signal); a later manual
// `/mcp reconnect` will re-reserve.
this.reservedSlots.delete(serverName);
// PR 14 fix (review #4247 wenshao R5 line 956 + R8 #4 line
// 1221): release the budget slot ONLY if THIS in-flight
// discoverMcpToolsForServerInternal call freshly reserved
// it. `freshReservations.has(serverName)` distinguishes:
//
// - Fresh reservation (never connected): release — a server
// that never connected shouldn't permanently consume a
// slot under enforce mode.
// - `'already_held'` reconnect (server was previously
// healthy, now flaky): KEEP the slot. Health-monitor
// retry doesn't have to compete for capacity with new
// servers admitted during the timeout window.
//
// R5 originally treated all timeouts as "release"; wenshao
// R8 #4 caught the asymmetry with the connect-failure
// path's `weReservedSlot` guard. Now they match.
if (this.freshReservations.has(serverName)) {
this.reservedSlots.delete(serverName);
this.freshReservations.delete(serverName);
}
// And drop any stale refusal entry — operator intent shifts
// when a slot becomes free again, and snapshot consumers
// shouldn't keep tagging a now-slotless server as