mirror of
https://github.com/QwenLM/qwen-code.git
synced 2026-05-16 02:39:08 +00:00
fix(core): coalesce MCP server rediscovery (#3818)
* fix(core): coalesce MCP server rediscovery * test(core): assert MCP rediscovery cleanup * fix(core): address MCP rediscovery review feedback * fix(core): preserve MCP rediscovery health checks --------- Co-authored-by: cyphercodes <cyphercodes@users.noreply.github.com>
This commit is contained in:
parent
095a39a8d5
commit
ec51fd3138
2 changed files with 239 additions and 2 deletions
|
|
@ -227,6 +227,214 @@ describe('McpClientManager', () => {
|
|||
expect(secondClient.disconnect).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it('should coalesce concurrent discovery for the same server', async () => {
|
||||
let resolveDisconnect!: () => void;
|
||||
const disconnectPromise = new Promise<void>((resolve) => {
|
||||
resolveDisconnect = resolve;
|
||||
});
|
||||
const firstClient = {
|
||||
connect: vi.fn().mockResolvedValue(undefined),
|
||||
discover: vi.fn().mockResolvedValue(undefined),
|
||||
disconnect: vi.fn(() => disconnectPromise),
|
||||
getStatus: vi.fn(),
|
||||
};
|
||||
const replacementClients: Array<{
|
||||
connect: ReturnType<typeof vi.fn>;
|
||||
discover: ReturnType<typeof vi.fn>;
|
||||
disconnect: ReturnType<typeof vi.fn>;
|
||||
getStatus: ReturnType<typeof vi.fn>;
|
||||
}> = [];
|
||||
|
||||
vi.mocked(McpClient).mockImplementation(() => {
|
||||
if (vi.mocked(McpClient).mock.calls.length === 1) {
|
||||
return firstClient as unknown as McpClient;
|
||||
}
|
||||
|
||||
const replacementClient = {
|
||||
connect: vi.fn().mockResolvedValue(undefined),
|
||||
discover: vi.fn().mockResolvedValue(undefined),
|
||||
disconnect: vi.fn().mockResolvedValue(undefined),
|
||||
getStatus: vi.fn(),
|
||||
};
|
||||
replacementClients.push(replacementClient);
|
||||
return replacementClient as unknown as McpClient;
|
||||
});
|
||||
|
||||
const mockConfig = {
|
||||
isTrustedFolder: () => true,
|
||||
getMcpServers: () => ({ 'test-server': {} }),
|
||||
getMcpServerCommand: () => undefined,
|
||||
getPromptRegistry: () => ({}) as PromptRegistry,
|
||||
getWorkspaceContext: () => ({}) as WorkspaceContext,
|
||||
getDebugMode: () => false,
|
||||
} as unknown as Config;
|
||||
const manager = new McpClientManager(mockConfig, {} as ToolRegistry);
|
||||
|
||||
await manager.discoverMcpToolsForServer(
|
||||
'test-server',
|
||||
{} as unknown as Config,
|
||||
);
|
||||
|
||||
const firstRediscovery = manager.discoverMcpToolsForServer(
|
||||
'test-server',
|
||||
{} as unknown as Config,
|
||||
);
|
||||
await Promise.resolve();
|
||||
|
||||
const secondRediscovery = manager.discoverMcpToolsForServer(
|
||||
'test-server',
|
||||
{} as unknown as Config,
|
||||
);
|
||||
const disconnectCallsBeforeResolve =
|
||||
firstClient.disconnect.mock.calls.length;
|
||||
|
||||
resolveDisconnect();
|
||||
await Promise.all([firstRediscovery, secondRediscovery]);
|
||||
|
||||
expect(disconnectCallsBeforeResolve).toBe(1);
|
||||
expect(vi.mocked(McpClient)).toHaveBeenCalledTimes(2);
|
||||
expect(replacementClients).toHaveLength(1);
|
||||
expect(replacementClients[0].connect).toHaveBeenCalledOnce();
|
||||
expect(replacementClients[0].discover).toHaveBeenCalledOnce();
|
||||
|
||||
// Verify map was cleaned up: a third call should do real work,
|
||||
// not get coalesced into a stale promise.
|
||||
await manager.discoverMcpToolsForServer(
|
||||
'test-server',
|
||||
{} as unknown as Config,
|
||||
);
|
||||
|
||||
expect(vi.mocked(McpClient)).toHaveBeenCalledTimes(3);
|
||||
expect(replacementClients).toHaveLength(2);
|
||||
expect(replacementClients[1].connect).toHaveBeenCalledOnce();
|
||||
expect(replacementClients[1].discover).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it('should restore health checks after failed server rediscovery', async () => {
|
||||
vi.useFakeTimers();
|
||||
|
||||
const firstClient = {
|
||||
connect: vi.fn().mockResolvedValue(undefined),
|
||||
discover: vi.fn().mockResolvedValue(undefined),
|
||||
disconnect: vi.fn().mockResolvedValue(undefined),
|
||||
getStatus: vi.fn(),
|
||||
};
|
||||
const failedClient = {
|
||||
connect: vi.fn().mockRejectedValue(new Error('transient failure')),
|
||||
discover: vi.fn(),
|
||||
disconnect: vi.fn().mockResolvedValue(undefined),
|
||||
getStatus: vi.fn(),
|
||||
};
|
||||
vi.mocked(McpClient)
|
||||
.mockReturnValueOnce(firstClient as unknown as McpClient)
|
||||
.mockReturnValueOnce(failedClient as unknown as McpClient);
|
||||
|
||||
const mockConfig = {
|
||||
isTrustedFolder: () => true,
|
||||
getMcpServers: () => ({ 'test-server': {} }),
|
||||
getMcpServerCommand: () => undefined,
|
||||
getPromptRegistry: () => ({}) as PromptRegistry,
|
||||
getWorkspaceContext: () => ({}) as WorkspaceContext,
|
||||
getDebugMode: () => false,
|
||||
} as unknown as Config;
|
||||
const manager = new McpClientManager(
|
||||
mockConfig,
|
||||
{} as ToolRegistry,
|
||||
undefined,
|
||||
undefined,
|
||||
{
|
||||
autoReconnect: true,
|
||||
checkIntervalMs: 10,
|
||||
maxConsecutiveFailures: 1,
|
||||
reconnectDelayMs: 10,
|
||||
},
|
||||
);
|
||||
|
||||
try {
|
||||
await manager.discoverMcpToolsForServer(
|
||||
'test-server',
|
||||
{} as unknown as Config,
|
||||
);
|
||||
expect(
|
||||
(
|
||||
manager as unknown as {
|
||||
healthCheckTimers: Map<string, NodeJS.Timeout>;
|
||||
}
|
||||
).healthCheckTimers.has('test-server'),
|
||||
).toBe(true);
|
||||
|
||||
await manager.discoverMcpToolsForServer(
|
||||
'test-server',
|
||||
{} as unknown as Config,
|
||||
);
|
||||
|
||||
expect(failedClient.connect).toHaveBeenCalledOnce();
|
||||
expect(
|
||||
(
|
||||
manager as unknown as {
|
||||
healthCheckTimers: Map<string, NodeJS.Timeout>;
|
||||
}
|
||||
).healthCheckTimers.has('test-server'),
|
||||
).toBe(true);
|
||||
} finally {
|
||||
await manager.stop();
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it('should clear in-flight discovery tracking when stopping', async () => {
|
||||
let resolveConnect!: () => void;
|
||||
const connectPromise = new Promise<void>((resolve) => {
|
||||
resolveConnect = resolve;
|
||||
});
|
||||
const mockedMcpClient = {
|
||||
connect: vi.fn(() => connectPromise),
|
||||
discover: vi.fn().mockResolvedValue(undefined),
|
||||
disconnect: vi.fn().mockResolvedValue(undefined),
|
||||
getStatus: vi.fn(),
|
||||
};
|
||||
vi.mocked(McpClient).mockReturnValue(
|
||||
mockedMcpClient as unknown as McpClient,
|
||||
);
|
||||
|
||||
const mockConfig = {
|
||||
isTrustedFolder: () => true,
|
||||
getMcpServers: () => ({ 'test-server': {} }),
|
||||
getMcpServerCommand: () => undefined,
|
||||
getPromptRegistry: () => ({}) as PromptRegistry,
|
||||
getWorkspaceContext: () => ({}) as WorkspaceContext,
|
||||
getDebugMode: () => false,
|
||||
} as unknown as Config;
|
||||
const manager = new McpClientManager(mockConfig, {} as ToolRegistry);
|
||||
|
||||
const discovery = manager.discoverMcpToolsForServer(
|
||||
'test-server',
|
||||
{} as unknown as Config,
|
||||
);
|
||||
await Promise.resolve();
|
||||
|
||||
expect(
|
||||
(
|
||||
manager as unknown as {
|
||||
serverDiscoveryPromises: Map<string, Promise<void>>;
|
||||
}
|
||||
).serverDiscoveryPromises.has('test-server'),
|
||||
).toBe(true);
|
||||
|
||||
await manager.stop();
|
||||
|
||||
expect(
|
||||
(
|
||||
manager as unknown as {
|
||||
serverDiscoveryPromises: Map<string, Promise<void>>;
|
||||
}
|
||||
).serverDiscoveryPromises.has('test-server'),
|
||||
).toBe(false);
|
||||
|
||||
resolveConnect();
|
||||
await discovery;
|
||||
});
|
||||
|
||||
it('should no-op when discovering an unknown server', async () => {
|
||||
const mockedMcpClient = {
|
||||
connect: vi.fn(),
|
||||
|
|
|
|||
|
|
@ -58,6 +58,7 @@ export class McpClientManager {
|
|||
private healthCheckTimers: Map<string, NodeJS.Timeout> = new Map();
|
||||
private consecutiveFailures: Map<string, number> = new Map();
|
||||
private isReconnecting: Map<string, boolean> = new Map();
|
||||
private serverDiscoveryPromises: Map<string, Promise<void>> = new Map();
|
||||
|
||||
constructor(
|
||||
config: Config,
|
||||
|
|
@ -147,6 +148,31 @@ export class McpClientManager {
|
|||
async discoverMcpToolsForServer(
|
||||
serverName: string,
|
||||
cliConfig: Config,
|
||||
): Promise<void> {
|
||||
const inProgressDiscovery = this.serverDiscoveryPromises.get(serverName);
|
||||
if (inProgressDiscovery) {
|
||||
await inProgressDiscovery;
|
||||
return;
|
||||
}
|
||||
|
||||
const discoveryPromise = this.discoverMcpToolsForServerInternal(
|
||||
serverName,
|
||||
cliConfig,
|
||||
);
|
||||
this.serverDiscoveryPromises.set(serverName, discoveryPromise);
|
||||
|
||||
try {
|
||||
await discoveryPromise;
|
||||
} finally {
|
||||
if (this.serverDiscoveryPromises.get(serverName) === discoveryPromise) {
|
||||
this.serverDiscoveryPromises.delete(serverName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async discoverMcpToolsForServerInternal(
|
||||
serverName: string,
|
||||
cliConfig: Config,
|
||||
): Promise<void> {
|
||||
const servers = populateMcpServerCommand(
|
||||
this.cliConfig.getMcpServers() || {},
|
||||
|
|
@ -157,6 +183,8 @@ export class McpClientManager {
|
|||
return;
|
||||
}
|
||||
|
||||
this.stopHealthCheck(serverName);
|
||||
|
||||
// Ensure we don't leak an existing connection for this server.
|
||||
const existingClient = this.clients.get(serverName);
|
||||
if (existingClient) {
|
||||
|
|
@ -193,8 +221,6 @@ export class McpClientManager {
|
|||
try {
|
||||
await client.connect();
|
||||
await client.discover(cliConfig);
|
||||
// Start health check for this server after successful discovery
|
||||
this.startHealthCheck(serverName);
|
||||
} catch (error) {
|
||||
// Log the error but don't throw: callers expect best-effort discovery.
|
||||
debugLogger.error(
|
||||
|
|
@ -203,6 +229,7 @@ export class McpClientManager {
|
|||
)}`,
|
||||
);
|
||||
} finally {
|
||||
this.startHealthCheck(serverName);
|
||||
this.eventEmitter?.emit('mcp-client-update', this.clients);
|
||||
}
|
||||
}
|
||||
|
|
@ -231,6 +258,7 @@ export class McpClientManager {
|
|||
this.clients.clear();
|
||||
this.consecutiveFailures.clear();
|
||||
this.isReconnecting.clear();
|
||||
this.serverDiscoveryPromises.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -253,6 +281,7 @@ export class McpClientManager {
|
|||
this.clients.delete(serverName);
|
||||
this.consecutiveFailures.delete(serverName);
|
||||
this.isReconnecting.delete(serverName);
|
||||
this.serverDiscoveryPromises.delete(serverName);
|
||||
this.eventEmitter?.emit('mcp-client-update', this.clients);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue