From ec51fd3138daef040edb5f3ce6e626398faf304e Mon Sep 17 00:00:00 2001 From: Rayan Salhab Date: Tue, 5 May 2026 13:31:26 +0300 Subject: [PATCH] fix(core): coalesce MCP server rediscovery (#3818) * fix(core): coalesce MCP server rediscovery * test(core): assert MCP rediscovery cleanup * fix(core): address MCP rediscovery review feedback * fix(core): preserve MCP rediscovery health checks --------- Co-authored-by: cyphercodes --- .../core/src/tools/mcp-client-manager.test.ts | 208 ++++++++++++++++++ packages/core/src/tools/mcp-client-manager.ts | 33 ++- 2 files changed, 239 insertions(+), 2 deletions(-) diff --git a/packages/core/src/tools/mcp-client-manager.test.ts b/packages/core/src/tools/mcp-client-manager.test.ts index 140b78324..cbc927699 100644 --- a/packages/core/src/tools/mcp-client-manager.test.ts +++ b/packages/core/src/tools/mcp-client-manager.test.ts @@ -227,6 +227,214 @@ describe('McpClientManager', () => { expect(secondClient.disconnect).toHaveBeenCalledOnce(); }); + it('should coalesce concurrent discovery for the same server', async () => { + let resolveDisconnect!: () => void; + const disconnectPromise = new Promise((resolve) => { + resolveDisconnect = resolve; + }); + const firstClient = { + connect: vi.fn().mockResolvedValue(undefined), + discover: vi.fn().mockResolvedValue(undefined), + disconnect: vi.fn(() => disconnectPromise), + getStatus: vi.fn(), + }; + const replacementClients: Array<{ + connect: ReturnType; + discover: ReturnType; + disconnect: ReturnType; + getStatus: ReturnType; + }> = []; + + vi.mocked(McpClient).mockImplementation(() => { + if (vi.mocked(McpClient).mock.calls.length === 1) { + return firstClient as unknown as McpClient; + } + + const replacementClient = { + connect: vi.fn().mockResolvedValue(undefined), + discover: vi.fn().mockResolvedValue(undefined), + disconnect: vi.fn().mockResolvedValue(undefined), + getStatus: vi.fn(), + }; + replacementClients.push(replacementClient); + return replacementClient as unknown as McpClient; + }); + + const mockConfig = { + isTrustedFolder: () => true, + getMcpServers: () => ({ 'test-server': {} }), + getMcpServerCommand: () => undefined, + getPromptRegistry: () => ({}) as PromptRegistry, + getWorkspaceContext: () => ({}) as WorkspaceContext, + getDebugMode: () => false, + } as unknown as Config; + const manager = new McpClientManager(mockConfig, {} as ToolRegistry); + + await manager.discoverMcpToolsForServer( + 'test-server', + {} as unknown as Config, + ); + + const firstRediscovery = manager.discoverMcpToolsForServer( + 'test-server', + {} as unknown as Config, + ); + await Promise.resolve(); + + const secondRediscovery = manager.discoverMcpToolsForServer( + 'test-server', + {} as unknown as Config, + ); + const disconnectCallsBeforeResolve = + firstClient.disconnect.mock.calls.length; + + resolveDisconnect(); + await Promise.all([firstRediscovery, secondRediscovery]); + + expect(disconnectCallsBeforeResolve).toBe(1); + expect(vi.mocked(McpClient)).toHaveBeenCalledTimes(2); + expect(replacementClients).toHaveLength(1); + expect(replacementClients[0].connect).toHaveBeenCalledOnce(); + expect(replacementClients[0].discover).toHaveBeenCalledOnce(); + + // Verify map was cleaned up: a third call should do real work, + // not get coalesced into a stale promise. + await manager.discoverMcpToolsForServer( + 'test-server', + {} as unknown as Config, + ); + + expect(vi.mocked(McpClient)).toHaveBeenCalledTimes(3); + expect(replacementClients).toHaveLength(2); + expect(replacementClients[1].connect).toHaveBeenCalledOnce(); + expect(replacementClients[1].discover).toHaveBeenCalledOnce(); + }); + + it('should restore health checks after failed server rediscovery', async () => { + vi.useFakeTimers(); + + const firstClient = { + connect: vi.fn().mockResolvedValue(undefined), + discover: vi.fn().mockResolvedValue(undefined), + disconnect: vi.fn().mockResolvedValue(undefined), + getStatus: vi.fn(), + }; + const failedClient = { + connect: vi.fn().mockRejectedValue(new Error('transient failure')), + discover: vi.fn(), + disconnect: vi.fn().mockResolvedValue(undefined), + getStatus: vi.fn(), + }; + vi.mocked(McpClient) + .mockReturnValueOnce(firstClient as unknown as McpClient) + .mockReturnValueOnce(failedClient as unknown as McpClient); + + const mockConfig = { + isTrustedFolder: () => true, + getMcpServers: () => ({ 'test-server': {} }), + getMcpServerCommand: () => undefined, + getPromptRegistry: () => ({}) as PromptRegistry, + getWorkspaceContext: () => ({}) as WorkspaceContext, + getDebugMode: () => false, + } as unknown as Config; + const manager = new McpClientManager( + mockConfig, + {} as ToolRegistry, + undefined, + undefined, + { + autoReconnect: true, + checkIntervalMs: 10, + maxConsecutiveFailures: 1, + reconnectDelayMs: 10, + }, + ); + + try { + await manager.discoverMcpToolsForServer( + 'test-server', + {} as unknown as Config, + ); + expect( + ( + manager as unknown as { + healthCheckTimers: Map; + } + ).healthCheckTimers.has('test-server'), + ).toBe(true); + + await manager.discoverMcpToolsForServer( + 'test-server', + {} as unknown as Config, + ); + + expect(failedClient.connect).toHaveBeenCalledOnce(); + expect( + ( + manager as unknown as { + healthCheckTimers: Map; + } + ).healthCheckTimers.has('test-server'), + ).toBe(true); + } finally { + await manager.stop(); + vi.useRealTimers(); + } + }); + + it('should clear in-flight discovery tracking when stopping', async () => { + let resolveConnect!: () => void; + const connectPromise = new Promise((resolve) => { + resolveConnect = resolve; + }); + const mockedMcpClient = { + connect: vi.fn(() => connectPromise), + discover: vi.fn().mockResolvedValue(undefined), + disconnect: vi.fn().mockResolvedValue(undefined), + getStatus: vi.fn(), + }; + vi.mocked(McpClient).mockReturnValue( + mockedMcpClient as unknown as McpClient, + ); + + const mockConfig = { + isTrustedFolder: () => true, + getMcpServers: () => ({ 'test-server': {} }), + getMcpServerCommand: () => undefined, + getPromptRegistry: () => ({}) as PromptRegistry, + getWorkspaceContext: () => ({}) as WorkspaceContext, + getDebugMode: () => false, + } as unknown as Config; + const manager = new McpClientManager(mockConfig, {} as ToolRegistry); + + const discovery = manager.discoverMcpToolsForServer( + 'test-server', + {} as unknown as Config, + ); + await Promise.resolve(); + + expect( + ( + manager as unknown as { + serverDiscoveryPromises: Map>; + } + ).serverDiscoveryPromises.has('test-server'), + ).toBe(true); + + await manager.stop(); + + expect( + ( + manager as unknown as { + serverDiscoveryPromises: Map>; + } + ).serverDiscoveryPromises.has('test-server'), + ).toBe(false); + + resolveConnect(); + await discovery; + }); + it('should no-op when discovering an unknown server', async () => { const mockedMcpClient = { connect: vi.fn(), diff --git a/packages/core/src/tools/mcp-client-manager.ts b/packages/core/src/tools/mcp-client-manager.ts index ecc700739..885700abb 100644 --- a/packages/core/src/tools/mcp-client-manager.ts +++ b/packages/core/src/tools/mcp-client-manager.ts @@ -58,6 +58,7 @@ export class McpClientManager { private healthCheckTimers: Map = new Map(); private consecutiveFailures: Map = new Map(); private isReconnecting: Map = new Map(); + private serverDiscoveryPromises: Map> = new Map(); constructor( config: Config, @@ -147,6 +148,31 @@ export class McpClientManager { async discoverMcpToolsForServer( serverName: string, cliConfig: Config, + ): Promise { + const inProgressDiscovery = this.serverDiscoveryPromises.get(serverName); + if (inProgressDiscovery) { + await inProgressDiscovery; + return; + } + + const discoveryPromise = this.discoverMcpToolsForServerInternal( + serverName, + cliConfig, + ); + this.serverDiscoveryPromises.set(serverName, discoveryPromise); + + try { + await discoveryPromise; + } finally { + if (this.serverDiscoveryPromises.get(serverName) === discoveryPromise) { + this.serverDiscoveryPromises.delete(serverName); + } + } + } + + private async discoverMcpToolsForServerInternal( + serverName: string, + cliConfig: Config, ): Promise { const servers = populateMcpServerCommand( this.cliConfig.getMcpServers() || {}, @@ -157,6 +183,8 @@ export class McpClientManager { return; } + this.stopHealthCheck(serverName); + // Ensure we don't leak an existing connection for this server. const existingClient = this.clients.get(serverName); if (existingClient) { @@ -193,8 +221,6 @@ export class McpClientManager { try { await client.connect(); await client.discover(cliConfig); - // Start health check for this server after successful discovery - this.startHealthCheck(serverName); } catch (error) { // Log the error but don't throw: callers expect best-effort discovery. debugLogger.error( @@ -203,6 +229,7 @@ export class McpClientManager { )}`, ); } finally { + this.startHealthCheck(serverName); this.eventEmitter?.emit('mcp-client-update', this.clients); } } @@ -231,6 +258,7 @@ export class McpClientManager { this.clients.clear(); this.consecutiveFailures.clear(); this.isReconnecting.clear(); + this.serverDiscoveryPromises.clear(); } /** @@ -253,6 +281,7 @@ export class McpClientManager { this.clients.delete(serverName); this.consecutiveFailures.delete(serverName); this.isReconnecting.delete(serverName); + this.serverDiscoveryPromises.delete(serverName); this.eventEmitter?.emit('mcp-client-update', this.clients); } }