mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-24 05:35:15 +00:00
feat(mcp): start configured servers asynchronously
This commit is contained in:
parent
2c4ad9f405
commit
6a477c4cf5
12 changed files with 369 additions and 99 deletions
|
|
@ -11,6 +11,7 @@ import { pathKey } from "@/utils/path-key"
|
|||
|
||||
const statusLabels = {
|
||||
connected: "mcp.status.connected",
|
||||
connecting: "mcp.status.connecting",
|
||||
failed: "mcp.status.failed",
|
||||
needs_auth: "mcp.status.needs_auth",
|
||||
needs_client_registration: "mcp.status.needs_client_registration",
|
||||
|
|
@ -79,6 +80,7 @@ export const DialogSelectMcp: Component = () => {
|
|||
if (s?.status === "failed" || s?.status === "needs_client_registration") return s.error
|
||||
}
|
||||
const enabled = () => status() === "connected"
|
||||
const connecting = () => status() === "connecting"
|
||||
return (
|
||||
<div class="w-full flex items-center justify-between gap-x-3">
|
||||
<div class="flex flex-col gap-0.5 min-w-0">
|
||||
|
|
@ -95,8 +97,9 @@ export const DialogSelectMcp: Component = () => {
|
|||
<div onClick={(e) => e.stopPropagation()}>
|
||||
<Switch
|
||||
checked={enabled()}
|
||||
disabled={toggle.isPending && toggle.variables === i.name}
|
||||
disabled={connecting() || (toggle.isPending && toggle.variables === i.name)}
|
||||
onChange={() => {
|
||||
if (connecting()) return
|
||||
if (toggle.isPending) return
|
||||
toggle.mutate(i.name)
|
||||
}}
|
||||
|
|
|
|||
|
|
@ -321,15 +321,17 @@ export function StatusPopoverBody(props: { shown: Accessor<boolean> }) {
|
|||
{(name) => {
|
||||
const status = () => mcpStatus(name)
|
||||
const enabled = () => status() === "connected"
|
||||
const connecting = () => status() === "connecting"
|
||||
return (
|
||||
<button
|
||||
type="button"
|
||||
class="flex items-center gap-2 w-full min-h-8 pl-3 pr-2 py-1 rounded-md hover:bg-surface-raised-base-hover transition-colors text-left"
|
||||
onClick={() => {
|
||||
if (connecting()) return
|
||||
if (toggleMcp.isPending) return
|
||||
toggleMcp.mutate(name)
|
||||
}}
|
||||
disabled={toggleMcp.isPending && toggleMcp.variables === name}
|
||||
disabled={connecting() || (toggleMcp.isPending && toggleMcp.variables === name)}
|
||||
>
|
||||
<div
|
||||
classList={{
|
||||
|
|
@ -337,6 +339,7 @@ export function StatusPopoverBody(props: { shown: Accessor<boolean> }) {
|
|||
"bg-icon-success-base": status() === "connected",
|
||||
"bg-icon-critical-base": status() === "failed",
|
||||
"bg-border-weak-base": status() === "disabled",
|
||||
"bg-icon-warning-base animate-pulse": status() === "connecting",
|
||||
"bg-icon-warning-base":
|
||||
status() === "needs_auth" || status() === "needs_client_registration",
|
||||
}}
|
||||
|
|
@ -354,8 +357,9 @@ export function StatusPopoverBody(props: { shown: Accessor<boolean> }) {
|
|||
<div onClick={(event) => event.stopPropagation()}>
|
||||
<Switch
|
||||
checked={enabled()}
|
||||
disabled={toggleMcp.isPending && toggleMcp.variables === name}
|
||||
disabled={connecting() || (toggleMcp.isPending && toggleMcp.variables === name)}
|
||||
onChange={() => {
|
||||
if (connecting()) return
|
||||
if (toggleMcp.isPending) return
|
||||
toggleMcp.mutate(name)
|
||||
}}
|
||||
|
|
|
|||
|
|
@ -49,6 +49,8 @@ export const loadMcpQuery = (directory: string, sdk: OpencodeClient) =>
|
|||
queryOptions({
|
||||
queryKey: [directory, "mcp"] as const,
|
||||
queryFn: () => sdk.mcp.status().then((r) => r.data ?? {}),
|
||||
refetchInterval: (query) =>
|
||||
Object.values(query.state.data ?? {}).some((status) => status.status === "connecting") ? 1000 : false,
|
||||
})
|
||||
|
||||
export const loadLspQuery = (directory: string, sdk: OpencodeClient) =>
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ import { Binary } from "@opencode-ai/core/util/binary"
|
|||
import { produce, reconcile, type SetStoreFunction, type Store } from "solid-js/store"
|
||||
import type {
|
||||
Message,
|
||||
McpStatus,
|
||||
Part,
|
||||
PermissionRequest,
|
||||
Project,
|
||||
|
|
@ -182,6 +183,11 @@ export function applyDirectoryEvent(input: {
|
|||
input.setStore("session_status", props.sessionID, reconcile(props.status))
|
||||
break
|
||||
}
|
||||
case "mcp.status.changed": {
|
||||
const props = event.properties as { name: string; status: McpStatus }
|
||||
input.setStore("mcp", props.name, reconcile(props.status))
|
||||
break
|
||||
}
|
||||
case "message.updated": {
|
||||
const info = clean((event.properties as { info: Message }).info)
|
||||
const messages = input.store.message[info.sessionID]
|
||||
|
|
|
|||
|
|
@ -303,6 +303,7 @@ export const dict = {
|
|||
"dialog.plugins.empty": "Plugins configured in opencode.json",
|
||||
|
||||
"mcp.status.connected": "connected",
|
||||
"mcp.status.connecting": "connecting",
|
||||
"mcp.status.failed": "failed",
|
||||
"mcp.status.needs_auth": "needs auth",
|
||||
"mcp.status.disabled": "disabled",
|
||||
|
|
|
|||
|
|
@ -143,6 +143,9 @@ export const McpListCommand = effectCmd({
|
|||
} else if (status.status === "disabled") {
|
||||
statusIcon = "○"
|
||||
statusText = "disabled"
|
||||
} else if (status.status === "connecting") {
|
||||
statusIcon = "…"
|
||||
statusText = "connecting"
|
||||
} else if (status.status === "needs_auth") {
|
||||
statusIcon = "⚠"
|
||||
statusText = "needs authentication"
|
||||
|
|
|
|||
|
|
@ -363,6 +363,13 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
|
|||
break
|
||||
}
|
||||
|
||||
case "mcp.status.changed": {
|
||||
if (workspace === project.workspace.current()) {
|
||||
setStore("mcp", event.properties.name, reconcile(event.properties.status))
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
case "vcs.branch.updated": {
|
||||
if (workspace === project.workspace.current()) {
|
||||
setStore("vcs", { branch: event.properties.branch })
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ export const Local = Schema.Struct({
|
|||
description: "Enable or disable the MCP server on startup",
|
||||
}),
|
||||
timeout: Schema.optional(PositiveInt).annotate({
|
||||
description: "Timeout in ms for MCP server requests. Defaults to 5000 (5 seconds) if not specified.",
|
||||
description: "Timeout in ms for MCP server requests. Defaults to 30000 (30 seconds) if not specified.",
|
||||
}),
|
||||
}).annotate({ identifier: "McpLocalConfig" })
|
||||
export type Local = Schema.Schema.Type<typeof Local>
|
||||
|
|
@ -49,7 +49,7 @@ export const Remote = Schema.Struct({
|
|||
description: "OAuth authentication configuration for the MCP server. Set to false to disable OAuth auto-detection.",
|
||||
}),
|
||||
timeout: Schema.optional(PositiveInt).annotate({
|
||||
description: "Timeout in ms for MCP server requests. Defaults to 5000 (5 seconds) if not specified.",
|
||||
description: "Timeout in ms for MCP server requests. Defaults to 30000 (30 seconds) if not specified.",
|
||||
}),
|
||||
}).annotate({ identifier: "McpRemoteConfig" })
|
||||
export type Remote = Schema.Schema.Type<typeof Remote>
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ import { BusEvent } from "../bus/bus-event"
|
|||
import { Bus } from "@/bus"
|
||||
import { TuiEvent } from "@/cli/cmd/tui/event"
|
||||
import open from "open"
|
||||
import { Effect, Exit, Layer, Option, Context, Schema, Stream } from "effect"
|
||||
import { Effect, Exit, Layer, Option, Context, Schema, Stream, Scope, Semaphore } from "effect"
|
||||
import { EffectBridge } from "@/effect/bridge"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
|
||||
|
|
@ -79,6 +79,9 @@ const StatusConnected = Schema.Struct({ status: Schema.Literal("connected") }).a
|
|||
const StatusDisabled = Schema.Struct({ status: Schema.Literal("disabled") }).annotate({
|
||||
identifier: "MCPStatusDisabled",
|
||||
})
|
||||
const StatusConnecting = Schema.Struct({ status: Schema.Literal("connecting") }).annotate({
|
||||
identifier: "MCPStatusConnecting",
|
||||
})
|
||||
const StatusFailed = Schema.Struct({ status: Schema.Literal("failed"), error: Schema.String }).annotate({
|
||||
identifier: "MCPStatusFailed",
|
||||
})
|
||||
|
|
@ -93,12 +96,21 @@ const StatusNeedsClientRegistration = Schema.Struct({
|
|||
export const Status = Schema.Union([
|
||||
StatusConnected,
|
||||
StatusDisabled,
|
||||
StatusConnecting,
|
||||
StatusFailed,
|
||||
StatusNeedsAuth,
|
||||
StatusNeedsClientRegistration,
|
||||
]).annotate({ identifier: "MCPStatus", discriminator: "status" })
|
||||
export type Status = Schema.Schema.Type<typeof Status>
|
||||
|
||||
export const StatusChanged = BusEvent.define(
|
||||
"mcp.status.changed",
|
||||
Schema.Struct({
|
||||
name: Schema.String,
|
||||
status: Status,
|
||||
}),
|
||||
)
|
||||
|
||||
// Store transports for OAuth servers to allow finishing auth
|
||||
type TransportWithAuth = StreamableHTTPClientTransport | SSEClientTransport
|
||||
const pendingOAuthTransports = new Map<string, TransportWithAuth>()
|
||||
|
|
@ -237,6 +249,7 @@ interface State {
|
|||
status: Record<string, Status>
|
||||
clients: Record<string, MCPClient>
|
||||
defs: Record<string, MCPToolDef[]>
|
||||
revision: Record<string, number>
|
||||
}
|
||||
|
||||
export interface Interface {
|
||||
|
|
@ -480,6 +493,7 @@ export const layer = Layer.effect(
|
|||
return { mcpClient, status, defs: listed } satisfies CreateResult
|
||||
})
|
||||
const cfgSvc = yield* Config.Service
|
||||
const startupLock = Semaphore.makeUnsafe(1)
|
||||
|
||||
const descendants = Effect.fnUntraced(
|
||||
function* (pid: number) {
|
||||
|
|
@ -519,43 +533,133 @@ export const layer = Layer.effect(
|
|||
})
|
||||
}
|
||||
|
||||
function failedStatus(error: unknown): Status {
|
||||
return { status: "failed", error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
|
||||
function bump(s: State, name: string) {
|
||||
const next = (s.revision[name] ?? 0) + 1
|
||||
s.revision[name] = next
|
||||
return next
|
||||
}
|
||||
|
||||
function closeClient(s: State, name: string) {
|
||||
const client = s.clients[name]
|
||||
delete s.defs[name]
|
||||
if (!client) return Effect.void
|
||||
return Effect.tryPromise(() => client.close()).pipe(Effect.ignore)
|
||||
}
|
||||
|
||||
function closeCreateResult(result: CreateResult) {
|
||||
const client = result.mcpClient
|
||||
if (!client) return Effect.void
|
||||
return Effect.tryPromise(() => client.close()).pipe(Effect.ignore)
|
||||
}
|
||||
|
||||
const setStatus = Effect.fnUntraced(function* (s: State, name: string, status: Status) {
|
||||
s.status[name] = status
|
||||
yield* bus.publish(StatusChanged, { name, status }).pipe(Effect.ignore)
|
||||
return status
|
||||
})
|
||||
|
||||
const storeClient = Effect.fnUntraced(function* (
|
||||
s: State,
|
||||
name: string,
|
||||
client: MCPClient,
|
||||
listed: MCPToolDef[],
|
||||
timeout?: number,
|
||||
) {
|
||||
const bridge = yield* EffectBridge.make()
|
||||
yield* closeClient(s, name)
|
||||
s.clients[name] = client
|
||||
s.defs[name] = listed
|
||||
watch(s, name, client, bridge, timeout)
|
||||
return yield* setStatus(s, name, { status: "connected" })
|
||||
})
|
||||
|
||||
const applyCreateResult = Effect.fnUntraced(function* (
|
||||
s: State,
|
||||
name: string,
|
||||
result: CreateResult,
|
||||
timeout?: number,
|
||||
) {
|
||||
const client = result.mcpClient
|
||||
if (!client) {
|
||||
yield* closeClient(s, name)
|
||||
delete s.clients[name]
|
||||
return yield* setStatus(s, name, result.status)
|
||||
}
|
||||
|
||||
if (!result.defs) {
|
||||
yield* closeCreateResult(result)
|
||||
yield* closeClient(s, name)
|
||||
delete s.clients[name]
|
||||
return yield* setStatus(s, name, { status: "failed", error: "Failed to get tools" })
|
||||
}
|
||||
|
||||
return yield* storeClient(s, name, client, result.defs, timeout)
|
||||
})
|
||||
|
||||
const createSafely = (key: string, mcp: ConfigMCP.Info) =>
|
||||
create(key, mcp).pipe(
|
||||
Effect.catch((error) => {
|
||||
log.error("mcp startup failed", { key, error })
|
||||
return Effect.succeed({ status: failedStatus(error) } satisfies CreateResult)
|
||||
}),
|
||||
)
|
||||
|
||||
const startConfigured = Effect.fn("MCP.startConfigured")(function* (
|
||||
s: State,
|
||||
entries: ReadonlyArray<readonly [string, ConfigMCP.Info]>,
|
||||
) {
|
||||
yield* startupLock.withPermits(1)(
|
||||
Effect.forEach(
|
||||
entries,
|
||||
([key, mcp]) =>
|
||||
Effect.gen(function* () {
|
||||
const revision = s.revision[key] ?? 0
|
||||
const result = yield* createSafely(key, mcp)
|
||||
if ((s.revision[key] ?? 0) !== revision) {
|
||||
yield* closeCreateResult(result)
|
||||
return
|
||||
}
|
||||
yield* applyCreateResult(s, key, result, mcp.timeout)
|
||||
}),
|
||||
{ concurrency: "unbounded", discard: true },
|
||||
),
|
||||
)
|
||||
})
|
||||
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("MCP.state")(function* () {
|
||||
const cfg = yield* cfgSvc.get()
|
||||
const bridge = yield* EffectBridge.make()
|
||||
const scope = yield* Scope.Scope
|
||||
const config = cfg.mcp ?? {}
|
||||
const s: State = {
|
||||
status: {},
|
||||
clients: {},
|
||||
defs: {},
|
||||
revision: {},
|
||||
}
|
||||
|
||||
yield* Effect.forEach(
|
||||
Object.entries(config),
|
||||
([key, mcp]) =>
|
||||
Effect.gen(function* () {
|
||||
if (!isMcpConfigured(mcp)) {
|
||||
log.error("Ignoring MCP config entry without type", { key })
|
||||
return
|
||||
}
|
||||
const configured = Object.entries(config).flatMap(([key, mcp]) => {
|
||||
if (!isMcpConfigured(mcp)) {
|
||||
log.error("Ignoring MCP config entry without type", { key })
|
||||
return []
|
||||
}
|
||||
|
||||
if (mcp.enabled === false) {
|
||||
s.status[key] = { status: "disabled" }
|
||||
return
|
||||
}
|
||||
if (mcp.enabled === false) {
|
||||
s.status[key] = { status: "disabled" }
|
||||
return []
|
||||
}
|
||||
|
||||
const result = yield* create(key, mcp).pipe(Effect.catch(() => Effect.void))
|
||||
if (!result) return
|
||||
s.status[key] = { status: "connecting" }
|
||||
return [[key, mcp] as const]
|
||||
})
|
||||
|
||||
s.status[key] = result.status
|
||||
if (result.mcpClient) {
|
||||
s.clients[key] = result.mcpClient
|
||||
s.defs[key] = result.defs!
|
||||
watch(s, key, result.mcpClient, bridge, mcp.timeout)
|
||||
}
|
||||
}),
|
||||
{ concurrency: "unbounded" },
|
||||
)
|
||||
if (configured.length > 0) {
|
||||
yield* startConfigured(s, configured).pipe(Effect.ignore, Effect.forkIn(scope), Effect.asVoid)
|
||||
}
|
||||
|
||||
yield* Effect.addFinalizer(() =>
|
||||
Effect.gen(function* () {
|
||||
|
|
@ -584,29 +688,6 @@ export const layer = Layer.effect(
|
|||
}),
|
||||
)
|
||||
|
||||
function closeClient(s: State, name: string) {
|
||||
const client = s.clients[name]
|
||||
delete s.defs[name]
|
||||
if (!client) return Effect.void
|
||||
return Effect.tryPromise(() => client.close()).pipe(Effect.ignore)
|
||||
}
|
||||
|
||||
const storeClient = Effect.fnUntraced(function* (
|
||||
s: State,
|
||||
name: string,
|
||||
client: MCPClient,
|
||||
listed: MCPToolDef[],
|
||||
timeout?: number,
|
||||
) {
|
||||
const bridge = yield* EffectBridge.make()
|
||||
yield* closeClient(s, name)
|
||||
s.status[name] = { status: "connected" }
|
||||
s.clients[name] = client
|
||||
s.defs[name] = listed
|
||||
watch(s, name, client, bridge, timeout)
|
||||
return s.status[name]
|
||||
})
|
||||
|
||||
const status = Effect.fn("MCP.status")(function* () {
|
||||
const s = yield* InstanceState.get(state)
|
||||
|
||||
|
|
@ -629,16 +710,15 @@ export const layer = Layer.effect(
|
|||
|
||||
const createAndStore = Effect.fn("MCP.createAndStore")(function* (name: string, mcp: ConfigMCP.Info) {
|
||||
const s = yield* InstanceState.get(state)
|
||||
const result = yield* create(name, mcp)
|
||||
|
||||
s.status[name] = result.status
|
||||
if (!result.mcpClient) {
|
||||
yield* closeClient(s, name)
|
||||
delete s.clients[name]
|
||||
return result.status
|
||||
const revision = bump(s, name)
|
||||
yield* setStatus(s, name, mcp.enabled === false ? { status: "disabled" } : { status: "connecting" })
|
||||
const result = yield* createSafely(name, mcp)
|
||||
if ((s.revision[name] ?? 0) !== revision) {
|
||||
yield* closeCreateResult(result)
|
||||
return s.status[name] ?? result.status
|
||||
}
|
||||
|
||||
return yield* storeClient(s, name, result.mcpClient, result.defs!, mcp.timeout)
|
||||
return yield* applyCreateResult(s, name, result, mcp.timeout)
|
||||
})
|
||||
|
||||
const add = Effect.fn("MCP.add")(function* (name: string, mcp: ConfigMCP.Info) {
|
||||
|
|
@ -655,9 +735,10 @@ export const layer = Layer.effect(
|
|||
const disconnect = Effect.fn("MCP.disconnect")(function* (name: string) {
|
||||
yield* requireMcpConfig(name)
|
||||
const s = yield* InstanceState.get(state)
|
||||
bump(s, name)
|
||||
yield* closeClient(s, name)
|
||||
delete s.clients[name]
|
||||
s.status[name] = { status: "disabled" }
|
||||
yield* setStatus(s, name, { status: "disabled" })
|
||||
})
|
||||
|
||||
const tools = Effect.fn("MCP.tools")(function* () {
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
import { describe, expect, test } from "bun:test"
|
||||
import { Global } from "@opencode-ai/core/global"
|
||||
import { tmpdir } from "../../../fixture/fixture"
|
||||
import { mount, wait } from "./sync-fixture"
|
||||
import { json, mount, wait } from "./sync-fixture"
|
||||
import type { GlobalEvent } from "@opencode-ai/sdk/v2"
|
||||
|
||||
function branchEvent(branch: string, workspace?: string): GlobalEvent {
|
||||
|
|
@ -18,6 +18,19 @@ function branchEvent(branch: string, workspace?: string): GlobalEvent {
|
|||
}
|
||||
}
|
||||
|
||||
function mcpStatusEvent(status: "connecting" | "connected", workspace?: string): GlobalEvent {
|
||||
return {
|
||||
directory: "/tmp/other",
|
||||
project: "proj_test",
|
||||
workspace,
|
||||
payload: {
|
||||
id: `evt_mcp_${status}`,
|
||||
type: "mcp.status.changed",
|
||||
properties: { name: "playwright", status: { status } },
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
describe("tui sync", () => {
|
||||
test("refresh scopes sessions by default and lists project sessions when disabled", async () => {
|
||||
const previous = Global.Path.state
|
||||
|
|
@ -67,4 +80,33 @@ describe("tui sync", () => {
|
|||
Global.Path.state = previous
|
||||
}
|
||||
})
|
||||
|
||||
test("mcp status changes update the active workspace", async () => {
|
||||
const previous = Global.Path.state
|
||||
await using tmp = await tmpdir()
|
||||
Global.Path.state = tmp.path
|
||||
await Bun.write(`${tmp.path}/kv.json`, "{}")
|
||||
const { app, emit, project, sync } = await mount((url) => {
|
||||
if (url.pathname === "/mcp") return json({ playwright: { status: "connecting" } })
|
||||
return undefined
|
||||
})
|
||||
|
||||
try {
|
||||
expect(sync.data.mcp.playwright?.status).toBe("connecting")
|
||||
|
||||
project.workspace.set("ws_a")
|
||||
emit(mcpStatusEvent("connected", "ws_b"))
|
||||
await Bun.sleep(30)
|
||||
|
||||
expect(sync.data.mcp.playwright?.status).toBe("connecting")
|
||||
|
||||
emit(mcpStatusEvent("connected", "ws_a"))
|
||||
await wait(() => sync.data.mcp.playwright?.status === "connected")
|
||||
|
||||
expect(sync.data.mcp.playwright?.status).toBe("connected")
|
||||
} finally {
|
||||
app.renderer.destroy()
|
||||
Global.Path.state = previous
|
||||
}
|
||||
})
|
||||
})
|
||||
|
|
|
|||
|
|
@ -1,7 +1,9 @@
|
|||
import { expect, mock, beforeEach } from "bun:test"
|
||||
import { Cause, Effect, Exit } from "effect"
|
||||
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
|
||||
import type { MCP as MCPNS } from "../../src/mcp/index"
|
||||
import { testEffect } from "../lib/effect"
|
||||
import { provideInstance, tmpdirScoped } from "../fixture/fixture"
|
||||
import { awaitWithTimeout, pollWithTimeout, testEffect } from "../lib/effect"
|
||||
|
||||
// --- Mock infrastructure ---
|
||||
|
||||
|
|
@ -25,6 +27,10 @@ let lastCreatedClientName: string | undefined
|
|||
let connectShouldFail = false
|
||||
let connectShouldHang = false
|
||||
let connectError = "Mock transport cannot connect"
|
||||
let connectHook: (() => Promise<void>) | undefined
|
||||
let activeConnects = 0
|
||||
let maxActiveConnects = 0
|
||||
let connectStarts = 0
|
||||
// Tracks how many Client instances were created (detects leaks)
|
||||
let clientCreateCount = 0
|
||||
// Tracks how many times transport.close() is called across all mock transports
|
||||
|
|
@ -52,6 +58,19 @@ function getOrCreateClientState(name?: string): MockClientState {
|
|||
return state
|
||||
}
|
||||
|
||||
async function runMockConnect() {
|
||||
activeConnects++
|
||||
connectStarts++
|
||||
maxActiveConnects = Math.max(maxActiveConnects, activeConnects)
|
||||
try {
|
||||
await connectHook?.()
|
||||
if (connectShouldHang) return new Promise<void>(() => {}) // never resolves
|
||||
if (connectShouldFail) throw new Error(connectError)
|
||||
} finally {
|
||||
activeConnects--
|
||||
}
|
||||
}
|
||||
|
||||
// Mock transport that succeeds or fails based on connectShouldFail / connectShouldHang
|
||||
class MockStdioTransport {
|
||||
stderr: null = null
|
||||
|
|
@ -59,8 +78,7 @@ class MockStdioTransport {
|
|||
// oxlint-disable-next-line no-useless-constructor
|
||||
constructor(_opts: any) {}
|
||||
async start() {
|
||||
if (connectShouldHang) return new Promise<void>(() => {}) // never resolves
|
||||
if (connectShouldFail) throw new Error(connectError)
|
||||
return runMockConnect()
|
||||
}
|
||||
async close() {
|
||||
transportCloseCount++
|
||||
|
|
@ -71,8 +89,7 @@ class MockStreamableHTTP {
|
|||
// oxlint-disable-next-line no-useless-constructor
|
||||
constructor(_url: URL, _opts?: any) {}
|
||||
async start() {
|
||||
if (connectShouldHang) return new Promise<void>(() => {}) // never resolves
|
||||
if (connectShouldFail) throw new Error(connectError)
|
||||
return runMockConnect()
|
||||
}
|
||||
async close() {
|
||||
transportCloseCount++
|
||||
|
|
@ -84,8 +101,7 @@ class MockSSE {
|
|||
// oxlint-disable-next-line no-useless-constructor
|
||||
constructor(_url: URL, _opts?: any) {}
|
||||
async start() {
|
||||
if (connectShouldHang) return new Promise<void>(() => {}) // never resolves
|
||||
if (connectShouldFail) throw new Error(connectError)
|
||||
return runMockConnect()
|
||||
}
|
||||
async close() {
|
||||
transportCloseCount++
|
||||
|
|
@ -173,6 +189,10 @@ beforeEach(() => {
|
|||
connectShouldFail = false
|
||||
connectShouldHang = false
|
||||
connectError = "Mock transport cannot connect"
|
||||
connectHook = undefined
|
||||
activeConnects = 0
|
||||
maxActiveConnects = 0
|
||||
connectStarts = 0
|
||||
clientCreateCount = 0
|
||||
transportCloseCount = 0
|
||||
})
|
||||
|
|
@ -188,6 +208,91 @@ function statusName(status: Record<string, MCPNS.Status> | MCPNS.Status, server:
|
|||
return status[server]?.status
|
||||
}
|
||||
|
||||
function deferred() {
|
||||
let resolve = () => {}
|
||||
const promise = new Promise<void>((done) => {
|
||||
resolve = done
|
||||
})
|
||||
return { promise, resolve }
|
||||
}
|
||||
|
||||
it.instance(
|
||||
"status() returns connecting without waiting for configured startup",
|
||||
() =>
|
||||
MCP.Service.use((mcp: MCPNS.Interface) =>
|
||||
Effect.gen(function* () {
|
||||
const connect = deferred()
|
||||
connectHook = () => connect.promise
|
||||
|
||||
const status = yield* awaitWithTimeout(mcp.status(), "mcp status blocked on startup", "200 millis")
|
||||
expect(status["slow-server"]?.status).toBe("connecting")
|
||||
|
||||
yield* pollWithTimeout(
|
||||
Effect.sync(() => (connectStarts === 1 ? true : undefined)),
|
||||
"configured mcp startup did not begin",
|
||||
)
|
||||
|
||||
connect.resolve()
|
||||
|
||||
yield* pollWithTimeout(
|
||||
Effect.gen(function* () {
|
||||
const next = yield* mcp.status()
|
||||
return next["slow-server"]?.status === "connected" ? true : undefined
|
||||
}),
|
||||
"configured mcp startup did not complete",
|
||||
)
|
||||
}),
|
||||
),
|
||||
{
|
||||
config: {
|
||||
mcp: {
|
||||
"slow-server": {
|
||||
type: "local",
|
||||
command: ["echo", "test"],
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
it.live("configured MCP startup runs for one project at a time", () =>
|
||||
Effect.gen(function* () {
|
||||
const connect = deferred()
|
||||
connectHook = () => connect.promise
|
||||
const config = {
|
||||
mcp: {
|
||||
"slow-server": {
|
||||
type: "local" as const,
|
||||
command: ["echo", "test"],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
const first = yield* tmpdirScoped({ config })
|
||||
const second = yield* tmpdirScoped({ config })
|
||||
const mcp = yield* MCP.Service
|
||||
|
||||
yield* mcp.status().pipe(provideInstance(first))
|
||||
yield* pollWithTimeout(
|
||||
Effect.sync(() => (connectStarts === 1 ? true : undefined)),
|
||||
"first configured mcp startup did not begin",
|
||||
)
|
||||
|
||||
yield* mcp.status().pipe(provideInstance(second))
|
||||
yield* Effect.sleep("100 millis")
|
||||
expect(connectStarts).toBe(1)
|
||||
expect(maxActiveConnects).toBe(1)
|
||||
|
||||
connect.resolve()
|
||||
|
||||
yield* pollWithTimeout(
|
||||
Effect.sync(() => (connectStarts === 2 && activeConnects === 0 ? true : undefined)),
|
||||
"second configured mcp startup did not run after first completed",
|
||||
)
|
||||
expect(maxActiveConnects).toBe(1)
|
||||
}).pipe(Effect.provide(CrossSpawnSpawner.defaultLayer)),
|
||||
)
|
||||
|
||||
// ========================================================================
|
||||
// Test: tools() are cached after connect
|
||||
// ========================================================================
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ export type Event =
|
|||
| EventSessionIdle
|
||||
| EventMcpToolsChanged
|
||||
| EventMcpBrowserOpenFailed
|
||||
| EventMcpStatusChanged
|
||||
| EventCommandExecuted
|
||||
| EventProjectUpdated
|
||||
| EventSessionCompacted
|
||||
|
|
@ -352,6 +353,40 @@ export type SessionStatus =
|
|||
type: "busy"
|
||||
}
|
||||
|
||||
export type McpStatusConnected = {
|
||||
status: "connected"
|
||||
}
|
||||
|
||||
export type McpStatusDisabled = {
|
||||
status: "disabled"
|
||||
}
|
||||
|
||||
export type McpStatusConnecting = {
|
||||
status: "connecting"
|
||||
}
|
||||
|
||||
export type McpStatusFailed = {
|
||||
status: "failed"
|
||||
error: string
|
||||
}
|
||||
|
||||
export type McpStatusNeedsAuth = {
|
||||
status: "needs_auth"
|
||||
}
|
||||
|
||||
export type McpStatusNeedsClientRegistration = {
|
||||
status: "needs_client_registration"
|
||||
error: string
|
||||
}
|
||||
|
||||
export type McpStatus =
|
||||
| McpStatusConnected
|
||||
| McpStatusDisabled
|
||||
| McpStatusConnecting
|
||||
| McpStatusFailed
|
||||
| McpStatusNeedsAuth
|
||||
| McpStatusNeedsClientRegistration
|
||||
|
||||
export type Project = {
|
||||
id: string
|
||||
worktree: string
|
||||
|
|
@ -830,6 +865,7 @@ export type GlobalEvent = {
|
|||
| EventSessionIdle
|
||||
| EventMcpToolsChanged
|
||||
| EventMcpBrowserOpenFailed
|
||||
| EventMcpStatusChanged
|
||||
| EventCommandExecuted
|
||||
| EventProjectUpdated
|
||||
| EventSessionCompacted
|
||||
|
|
@ -1661,35 +1697,6 @@ export type FormatterStatus = {
|
|||
enabled: boolean
|
||||
}
|
||||
|
||||
export type McpStatusConnected = {
|
||||
status: "connected"
|
||||
}
|
||||
|
||||
export type McpStatusDisabled = {
|
||||
status: "disabled"
|
||||
}
|
||||
|
||||
export type McpStatusFailed = {
|
||||
status: "failed"
|
||||
error: string
|
||||
}
|
||||
|
||||
export type McpStatusNeedsAuth = {
|
||||
status: "needs_auth"
|
||||
}
|
||||
|
||||
export type McpStatusNeedsClientRegistration = {
|
||||
status: "needs_client_registration"
|
||||
error: string
|
||||
}
|
||||
|
||||
export type McpStatus =
|
||||
| McpStatusConnected
|
||||
| McpStatusDisabled
|
||||
| McpStatusFailed
|
||||
| McpStatusNeedsAuth
|
||||
| McpStatusNeedsClientRegistration
|
||||
|
||||
export type McpUnsupportedOAuthError = {
|
||||
error: string
|
||||
}
|
||||
|
|
@ -2671,6 +2678,15 @@ export type EventMcpBrowserOpenFailed = {
|
|||
}
|
||||
}
|
||||
|
||||
export type EventMcpStatusChanged = {
|
||||
id: string
|
||||
type: "mcp.status.changed"
|
||||
properties: {
|
||||
name: string
|
||||
status: McpStatus
|
||||
}
|
||||
}
|
||||
|
||||
export type EventCommandExecuted = {
|
||||
id: string
|
||||
type: "command.executed"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue