mirror of
https://github.com/QwenLM/qwen-code.git
synced 2026-05-18 23:42:43 +00:00
fixup(serve): address PR 14 review round 8 (#4247 wenshao seventh pass)
Addresses @wenshao's seventh review pass on PR #4247 (gpt-5.5 + DeepSeek/deepseek-v4-pro via Qwen Code /review). One critical transport leak + three soundness/consistency fixes; one optional clarity refactor explicitly deferred. Critical: - R8 #1 line 532 (4 duplicate threads): bulk-path transport leak. Mirrors the R7 #3 fix but in `discoverAllMcpTools` instead of the per-server path. Pre-fix: when `connect()` succeeded (transport established) and `discover()` later threw, the bulk catch deleted the client reference without calling `client.disconnect()`, leaking the stdio child / WebSocket / HTTP socket for the rest of the daemon's lifetime (`stop()` can't see what we just removed from `this.clients`). Best-effort `await client.disconnect()` added before `clients.delete` + `reservedSlots.delete`. Updated the doc comment that misleadingly claimed `stop()` was the lifecycle owner — true only for slot bookkeeping, not transports. Soundness: - R8 #2 line 431: tighten `readBudgetFromEnv` mode-without-budget downgrade. Originally only `enforce` got downgraded to `off` when no budget was set; `warn` mode without a budget threshold reached `emitBudgetTelemetry` with `clientBudget: undefined`, contradicting the JSDoc invariant `mode !== 'off' ⇒ clientBudget defined`. Now both `enforce` AND `warn` downgrade to `off` when no budget is configured. The invariant comment was also weakened to match the actual `?? 0` defense-in-depth (the new R8 #5 constructor downgrade closes the remaining edge case). - R8 #5 line 302: constructor mirrors the `readBudgetFromEnv` downgrade for the direct `budgetConfig` parameter. All production callers (CLI, `runQwenServe`, env-var fallback) validate upfront, but a future code path that injects `budgetConfig` directly without re-validating would re-introduce the silent fail-open. Defense in depth. - R8 #4 line 1221: distinguish fresh vs `'already_held'` reservations in `runWithDiscoveryTimeout`'s timeout handler. New private `freshReservations: Set<string>` field marked when `weReservedSlot === true` inside `discoverMcpToolsForServerInternal` and cleared in finally / catch / success. Timeout handler now releases the slot ONLY when `freshReservations.has(serverName)` — meaning the slot was freshly reserved by THIS in-flight call. `'already_held'` reconnect timeouts (a previously-healthy server's transient hiccup) keep the slot so health-monitor retry doesn't have to compete for capacity with new servers admitted during the timeout window. Aligns the timeout handler with the connect-failure catch's `weReservedSlot` semantics — closes the asymmetry wenshao R8 #4 caught. Deferred: - R8 #3 line 332 (`tryReserveSlot` `'observed'` return value clarity): optional, non-blocking style improvement that ripples through 3 call sites + many tests for zero behavior change. Worth doing in a focused refactor PR; flagged as deferred polish, not in this fixup. + 3 new core regression tests (bulk discover-throw disconnects, warn-no-budget downgrade, constructor enforce downgrade). 679/679 focused tests pass; typecheck + lint clean.
This commit is contained in:
parent
f80629ec88
commit
7228813c58
2 changed files with 196 additions and 37 deletions
|
|
@ -1609,6 +1609,18 @@ describe('McpClientManager — PR 14 guardrails', () => {
|
|||
).toBe(false);
|
||||
});
|
||||
|
||||
// R8 #4 (line 1221): the `freshReservations` Set distinguishes
|
||||
// fresh-reservation timeouts (release) from `'already_held'`
|
||||
// reconnect timeouts (keep slot). Verified by code inspection +
|
||||
// the R5 release-on-fresh test below; a dedicated already_held
|
||||
// timeout test requires either driving the health-monitor flow
|
||||
// end-to-end (which needs autoReconnect timer interleaving with
|
||||
// fake timers — interferes with the sibling R5 test in the same
|
||||
// file) or piercing the private `runWithDiscoveryTimeout`
|
||||
// helper. The invariant is small enough that the fresh-release
|
||||
// test below is sufficient regression coverage; an integration
|
||||
// test in a separate file can add the already_held variant
|
||||
// without the timer interleave problem.
|
||||
it('runWithDiscoveryTimeout timeout handler releases the budget slot (wenshao R5 line 956)', async () => {
|
||||
vi.useFakeTimers();
|
||||
// McpClient.connect never resolves → timeout fires.
|
||||
|
|
@ -1849,6 +1861,70 @@ describe('McpClientManager — PR 14 guardrails', () => {
|
|||
);
|
||||
});
|
||||
|
||||
it('discoverAllMcpTools disconnects on discover() failure (wenshao R8 #1 line 532)', async () => {
|
||||
// Bulk-path mirror of R7 #3 (per-server path). Pre-fix:
|
||||
// connect() success + discover() throw → catch deleted client
|
||||
// without disconnect() → stdio child / WebSocket / HTTP socket
|
||||
// leaked for the rest of the daemon's lifetime (stop() can't
|
||||
// see the entry it just removed from this.clients).
|
||||
let disconnectCalls = 0;
|
||||
vi.mocked(McpClient).mockImplementation(
|
||||
() =>
|
||||
({
|
||||
connect: vi.fn().mockResolvedValue(undefined),
|
||||
discover: vi.fn().mockRejectedValue(new Error('discover failed')),
|
||||
disconnect: vi.fn().mockImplementation(() => {
|
||||
disconnectCalls += 1;
|
||||
return Promise.resolve();
|
||||
}),
|
||||
getStatus: vi.fn(),
|
||||
}) as unknown as McpClient,
|
||||
);
|
||||
const config = configWithServers({ a: { command: 'node' } });
|
||||
const manager = new McpClientManager(
|
||||
config,
|
||||
{} as ToolRegistry,
|
||||
undefined,
|
||||
undefined,
|
||||
undefined,
|
||||
{ clientBudget: 1, budgetMode: 'enforce' },
|
||||
);
|
||||
await manager.discoverAllMcpTools(config);
|
||||
// Transport closed before client reference dropped + slot released.
|
||||
expect(disconnectCalls).toBeGreaterThanOrEqual(1);
|
||||
expect(manager.getMcpClientAccounting().reservedSlots).toEqual([]);
|
||||
});
|
||||
|
||||
it('readBudgetFromEnv downgrades warn-without-budget to off (wenshao R8 #2)', async () => {
|
||||
process.env['QWEN_SERVE_MCP_BUDGET_MODE'] = 'warn';
|
||||
// No budget — pre-fix this passed through with mode='warn',
|
||||
// reaching emitBudgetTelemetry with clientBudget=undefined.
|
||||
const config = configWithServers({});
|
||||
const manager = new McpClientManager(config, {} as ToolRegistry);
|
||||
expect(manager.getMcpClientBudget()).toBeUndefined();
|
||||
expect(manager.getMcpBudgetMode()).toBe('off');
|
||||
});
|
||||
|
||||
it('constructor downgrades enforce-without-budget when budgetConfig passed directly (wenshao R8 #5)', async () => {
|
||||
// Direct-budgetConfig path is test-/embedded-only — production
|
||||
// callers (CLI, runQwenServe, env-var fallback) all validate
|
||||
// upfront. Defense-in-depth: constructor mirrors the env-var
|
||||
// path's downgrade so a future caller that bypasses validation
|
||||
// can't silently fail-open.
|
||||
const config = configWithServers({});
|
||||
const manager = new McpClientManager(
|
||||
config,
|
||||
{} as ToolRegistry,
|
||||
undefined,
|
||||
undefined,
|
||||
undefined,
|
||||
// Invalid combination: enforce mode without a budget.
|
||||
{ budgetMode: 'enforce' },
|
||||
);
|
||||
// Downgraded to off so tryReserveSlot doesn't masquerade as enforce.
|
||||
expect(manager.getMcpBudgetMode()).toBe('off');
|
||||
});
|
||||
|
||||
it('discoverMcpToolsForServer reconnect-attempt connect-failure KEEPS slot (wenshao R4 C2 already_held)', async () => {
|
||||
// Distinguish from the previous test: same call signature, but
|
||||
// here the slot is already-held (from a prior successful connect
|
||||
|
|
|
|||
|
|
@ -229,15 +229,19 @@ function readBudgetFromEnv(): McpBudgetConfig {
|
|||
}
|
||||
budgetMode = clientBudget === undefined ? 'off' : 'warn';
|
||||
}
|
||||
// PR 14 fix (review #4247 wenshao S4): enforce-without-budget would
|
||||
// silently fail-open — `tryReserveSlot` returns 'reserved' when
|
||||
// `clientBudget === undefined`, so a daemon spawned with the env-var
|
||||
// pair `MODE=enforce` + `BUDGET=<unset>` would advertise enforcement
|
||||
// but allow unlimited servers. The CLI handler + `runQwenServe`
|
||||
// both reject this combination; the env-var fallback path
|
||||
// (e.g. child manually launched with stale env) now mirrors that
|
||||
// invariant: downgrade to `off` rather than masquerade as enforce.
|
||||
if (budgetMode === 'enforce' && clientBudget === undefined) {
|
||||
// PR 14 fix (review #4247 wenshao S4 + R8 #2): mode-without-budget
|
||||
// downgrade. Originally only `enforce` got downgraded — but `warn`
|
||||
// mode without a budget threshold is equally meaningless: nothing
|
||||
// actionable can ever fire (no `liveCount >= 0.75 * budget`
|
||||
// comparison can be true when budget is undefined). Downgrading
|
||||
// BOTH to `off` removes the comment-vs-code mismatch in
|
||||
// `emitBudgetTelemetry` (which previously claimed
|
||||
// `mode !== 'off' ⇒ clientBudget defined` — true for enforce,
|
||||
// false for warn until this fix).
|
||||
if (
|
||||
(budgetMode === 'enforce' || budgetMode === 'warn') &&
|
||||
clientBudget === undefined
|
||||
) {
|
||||
budgetMode = 'off';
|
||||
}
|
||||
return { clientBudget, budgetMode };
|
||||
|
|
@ -272,6 +276,22 @@ export class McpClientManager {
|
|||
private readonly reservedSlots = new Set<string>();
|
||||
private readonly clientBudget?: number;
|
||||
private readonly budgetMode: McpBudgetMode;
|
||||
/**
|
||||
* PR 14 fix (review #4247 wenshao R8 #4 line 1221): names whose
|
||||
* slot was freshly reserved (not `'already_held'`) by an
|
||||
* in-flight `discoverMcpToolsForServerInternal` call. Read by
|
||||
* `runWithDiscoveryTimeout`'s timeout handler to decide whether
|
||||
* to release the slot on hard timeout — fresh reservations
|
||||
* release (server never connected, slot shouldn't permanently
|
||||
* block other servers); `'already_held'` reconnects keep their
|
||||
* slot (operator's previously-healthy server shouldn't be
|
||||
* permanently demoted by a transient timeout).
|
||||
*
|
||||
* Lifetime: `add` after `tryReserveSlot` returns `'reserved'`
|
||||
* with the `.has` guard, `delete` in success / catch / finally
|
||||
* cleanup. Idempotent — multiple deletes are no-ops.
|
||||
*/
|
||||
private readonly freshReservations = new Set<string>();
|
||||
/**
|
||||
* Servers refused during the most recent `discoverAllMcpTools*` pass.
|
||||
* Reset at the start of each pass; survives between passes so a
|
||||
|
|
@ -300,8 +320,22 @@ export class McpClientManager {
|
|||
// when spawning the ACP child. Standalone `qwen` invocations
|
||||
// leave both unset and get `mode: 'off'` — the pre-PR-14 default.
|
||||
const resolved = budgetConfig ?? readBudgetFromEnv();
|
||||
let resolvedMode = resolved.budgetMode;
|
||||
// PR 14 fix (review #4247 wenshao R8 #5): mirror
|
||||
// `readBudgetFromEnv`'s `(enforce|warn)`-without-budget
|
||||
// downgrade for the direct-`budgetConfig` path too. All
|
||||
// production callers (CLI handler, `runQwenServe`, env-var
|
||||
// fallback) validate upfront, but a future code path that
|
||||
// injects `budgetConfig` without running the validation
|
||||
// would re-introduce the silent fail-open. Defense in depth.
|
||||
if (
|
||||
(resolvedMode === 'enforce' || resolvedMode === 'warn') &&
|
||||
resolved.clientBudget === undefined
|
||||
) {
|
||||
resolvedMode = 'off';
|
||||
}
|
||||
this.clientBudget = resolved.clientBudget;
|
||||
this.budgetMode = resolved.budgetMode;
|
||||
this.budgetMode = resolvedMode;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -428,10 +462,12 @@ export class McpClientManager {
|
|||
* `off` mode is a no-op — operators who never set a budget don't
|
||||
* pollute the startup-event sink.
|
||||
*
|
||||
* Invariant: `mode !== 'off'` ⇒ `clientBudget` was resolved (both
|
||||
* `readBudgetFromEnv` and the CLI handler downgrade enforce-without-
|
||||
* budget to `off`). `clientBudget ?? 0` is defense-in-depth for
|
||||
* future code paths that might set `mode='warn'` programmatically.
|
||||
* Invariant (post R8 #2): `mode !== 'off'` ⇒ `clientBudget` was
|
||||
* resolved. Both `readBudgetFromEnv` AND the constructor downgrade
|
||||
* `enforce`/`warn`-without-budget to `off` so neither call site can
|
||||
* leave a budgetless mode reaching this telemetry path.
|
||||
* `clientBudget ?? 0` is kept as belt-and-suspenders against future
|
||||
* call sites that might bypass both validations.
|
||||
*/
|
||||
private emitBudgetTelemetry(configuredCount: number): void {
|
||||
if (this.budgetMode === 'off') return;
|
||||
|
|
@ -516,18 +552,38 @@ export class McpClientManager {
|
|||
// full discovery restart. Release both so the budget cap
|
||||
// reflects actual usable capacity.
|
||||
//
|
||||
// Note (wenshao R2 P3 follow-up): release-here is correct
|
||||
// for the LEGACY BULK PATH because `discoverAllMcpTools`
|
||||
// begins with `await this.stop()` (line ~320) which clears
|
||||
// all slots — `stop()` owns the lifecycle, this catch is
|
||||
// defense-in-depth for any caller that bypasses stop. The
|
||||
// LONG-LIVED per-server reconnect path in
|
||||
// `discoverMcpToolsForServerInternal` intentionally does
|
||||
// NOT release on connect failure: the client stays in
|
||||
// `this.clients` with `DISCONNECTED` status so the health
|
||||
// monitor + next incremental pass can retry without
|
||||
// needing to re-reserve. Different lifecycle, different
|
||||
// contract.
|
||||
// Slot bookkeeping in this bulk path is partially redundant
|
||||
// with `await this.stop()` at the top of
|
||||
// `discoverAllMcpTools` (line ~320) — the next bulk run
|
||||
// wipes `reservedSlots` regardless. But the SAME catch
|
||||
// ALSO needs to handle the transport (see below): the
|
||||
// client object held by `clients.delete(name)` only had
|
||||
// its tracking reference removed, not its underlying
|
||||
// transport closed. Leaving the orphan transport alive
|
||||
// would leak the stdio child / WebSocket / HTTP socket
|
||||
// for the rest of the process — `stop()` can't clean it
|
||||
// because we just removed it from the map.
|
||||
//
|
||||
// The per-server reconnect path
|
||||
// (`discoverMcpToolsForServerInternal`) keeps the slot
|
||||
// when `weReservedSlot === false` so health-monitor retry
|
||||
// doesn't have to compete for capacity — different
|
||||
// lifecycle, different contract. Bulk path always releases
|
||||
// because every server is "fresh" here (preceded by
|
||||
// stop()).
|
||||
//
|
||||
// PR 14 fix (review #4247 wenshao R8 #1 line 532): also
|
||||
// call `await client.disconnect()` BEFORE dropping the
|
||||
// reference. R7 #3 fixed the analogous leak in the
|
||||
// per-server path; this is the bulk-path mirror. Errors
|
||||
// intentionally swallowed (we're already in a discovery-
|
||||
// failure catch; double-throwing would lose the original
|
||||
// error context).
|
||||
try {
|
||||
await client.disconnect();
|
||||
} catch {
|
||||
// best-effort transport cleanup
|
||||
}
|
||||
this.reservedSlots.delete(name);
|
||||
this.clients.delete(name);
|
||||
this.eventEmitter?.emit('mcp-client-update', this.clients);
|
||||
|
|
@ -640,6 +696,18 @@ export class McpClientManager {
|
|||
// rediscovery").
|
||||
const weReservedSlot =
|
||||
reservation === 'reserved' && this.reservedSlots.has(serverName);
|
||||
// PR 14 fix (review #4247 wenshao R8 #4): mark this name in
|
||||
// `freshReservations` so the `runWithDiscoveryTimeout` timeout
|
||||
// handler can distinguish fresh-reservation timeouts (release
|
||||
// the slot — never connected, shouldn't block others) from
|
||||
// `'already_held'` reconnect timeouts (keep the slot — operator's
|
||||
// previously-healthy server shouldn't be demoted by a transient
|
||||
// timeout). Cleared in success / catch / finally below so the
|
||||
// marker only spans the current discoverMcpToolsForServerInternal
|
||||
// invocation.
|
||||
if (weReservedSlot) {
|
||||
this.freshReservations.add(serverName);
|
||||
}
|
||||
|
||||
this.stopHealthCheck(serverName);
|
||||
|
||||
|
|
@ -741,6 +809,11 @@ export class McpClientManager {
|
|||
} finally {
|
||||
this.startHealthCheck(serverName);
|
||||
this.eventEmitter?.emit('mcp-client-update', this.clients);
|
||||
// R8 #4: clear the fresh-reservation marker — this in-flight
|
||||
// call has settled (success, catch, OR a timeout that already
|
||||
// ran its handler). Idempotent on the timeout-already-deleted
|
||||
// case.
|
||||
this.freshReservations.delete(serverName);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -774,6 +847,7 @@ export class McpClientManager {
|
|||
// its own body line 90, which awaits `this.stop()` first) starts
|
||||
// from an empty reservation set.
|
||||
this.reservedSlots.clear();
|
||||
this.freshReservations.clear();
|
||||
this.lastRefusedServerNames = [];
|
||||
}
|
||||
|
||||
|
|
@ -1208,17 +1282,26 @@ export class McpClientManager {
|
|||
// absent, so the trailing `finally`-block call becomes a no-op.
|
||||
this.stopHealthCheck(serverName);
|
||||
this.clients.delete(serverName);
|
||||
// PR 14 fix (review #4247 wenshao R5 line 956): the timeout
|
||||
// handler must release the budget slot too. Pre-fix only
|
||||
// `clients.delete` ran here, so a timed-out server in
|
||||
// `enforce` mode permanently held its reservation — N
|
||||
// consecutive timeouts permanently degraded daemon capacity.
|
||||
// The slot is released because the operator's "this server
|
||||
// should run" intent is invalidated by a hard timeout
|
||||
// (`runWithDiscoveryTimeout` rejection IS the operator-
|
||||
// visible "server unreachable" signal); a later manual
|
||||
// `/mcp reconnect` will re-reserve.
|
||||
this.reservedSlots.delete(serverName);
|
||||
// PR 14 fix (review #4247 wenshao R5 line 956 + R8 #4 line
|
||||
// 1221): release the budget slot ONLY if THIS in-flight
|
||||
// discoverMcpToolsForServerInternal call freshly reserved
|
||||
// it. `freshReservations.has(serverName)` distinguishes:
|
||||
//
|
||||
// - Fresh reservation (never connected): release — a server
|
||||
// that never connected shouldn't permanently consume a
|
||||
// slot under enforce mode.
|
||||
// - `'already_held'` reconnect (server was previously
|
||||
// healthy, now flaky): KEEP the slot. Health-monitor
|
||||
// retry doesn't have to compete for capacity with new
|
||||
// servers admitted during the timeout window.
|
||||
//
|
||||
// R5 originally treated all timeouts as "release"; wenshao
|
||||
// R8 #4 caught the asymmetry with the connect-failure
|
||||
// path's `weReservedSlot` guard. Now they match.
|
||||
if (this.freshReservations.has(serverName)) {
|
||||
this.reservedSlots.delete(serverName);
|
||||
this.freshReservations.delete(serverName);
|
||||
}
|
||||
// And drop any stale refusal entry — operator intent shifts
|
||||
// when a slot becomes free again, and snapshot consumers
|
||||
// shouldn't keep tagging a now-slotless server as
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue