fix(acp): share acp-next session state (#29253)

This commit is contained in:
Shoubhit Dash 2026-05-25 23:09:41 +05:30 committed by GitHub
parent 00ea47a502
commit 56743dcf04
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 150 additions and 46 deletions

View file

@ -5,6 +5,7 @@ import { InstanceStore } from "@/project/instance-store"
import { ModelID, ProviderID } from "@/provider/schema"
import { Provider } from "@/provider/provider"
import { Context, Effect, Layer, SynchronizedRef } from "effect"
import type * as ACPNextError from "./error"
export type ModelOption = {
readonly providerID: ProviderID
@ -38,12 +39,12 @@ export type Snapshot = {
}
export interface LoaderInterface {
readonly load: (directory: string) => Effect.Effect<Snapshot>
readonly load: (directory: string) => Effect.Effect<Snapshot, ACPNextError.Error>
}
export interface Interface {
readonly get: (directory: string) => Effect.Effect<Snapshot>
readonly refresh: (directory: string) => Effect.Effect<Snapshot>
readonly get: (directory: string) => Effect.Effect<Snapshot, ACPNextError.Error>
readonly refresh: (directory: string) => Effect.Effect<Snapshot, ACPNextError.Error>
readonly variants: (snapshot: Snapshot, model: DefaultModel) => ModelVariants | undefined
}
@ -141,7 +142,7 @@ export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const loader = yield* Loader
const snapshots = yield* SynchronizedRef.make(new Map<string, Effect.Effect<Snapshot>>())
const snapshots = yield* SynchronizedRef.make(new Map<string, Effect.Effect<Snapshot, ACPNextError.Error>>())
const cached = Effect.fnUntraced(function* (directory: string) {
return yield* SynchronizedRef.modifyEffect(
@ -149,7 +150,17 @@ export const layer = Layer.effect(
Effect.fnUntraced(function* (items) {
const current = items.get(directory)
if (current) return [current, items] as const
const next = yield* Effect.cached(loader.load(directory))
const next = yield* Effect.cached(
loader.load(directory).pipe(
Effect.tapError(() =>
SynchronizedRef.update(snapshots, (state) => {
const next = new Map(state)
next.delete(directory)
return next
}),
),
),
)
return [next, new Map(items).set(directory, next)] as const
}),
)
@ -163,7 +174,17 @@ export const layer = Layer.effect(
return yield* SynchronizedRef.modifyEffect(
snapshots,
Effect.fnUntraced(function* (items) {
const next = yield* Effect.cached(loader.load(directory))
const next = yield* Effect.cached(
loader.load(directory).pipe(
Effect.tapError(() =>
SynchronizedRef.update(snapshots, (state) => {
const next = new Map(state)
next.delete(directory)
return next
}),
),
),
)
return [next, new Map(items).set(directory, next)] as const
}),
).pipe(Effect.flatten)

View file

@ -16,10 +16,11 @@ import {
} from "@agentclientprotocol/sdk"
import { InstallationVersion } from "@opencode-ai/core/installation/version"
import type { OpencodeClient } from "@opencode-ai/sdk/v2"
import { Context, Effect } from "effect"
import { Context, Effect, Layer, ManagedRuntime } from "effect"
import * as ACPNextError from "./error"
import { buildConfigOptions } from "./config-option"
import { Directory } from "./directory"
import { ACPNextSession } from "./session"
import { ModelID, ProviderID } from "@/provider/schema"
import { Provider } from "@/provider/provider"
import type { Command } from "@/command"
@ -42,9 +43,11 @@ export class Service extends Context.Service<Service, Interface>()("@opencode/AC
export function make(input: {
sdk: OpencodeClient
connection?: Pick<AgentSideConnection, "sessionUpdate">
directory?: Directory.Interface
session?: ACPNextSession.Interface
}): Interface {
const sessions = new Map<string, SessionState>()
const directories = new Map<string, Promise<Directory.Snapshot>>()
const session = input.session ?? makeSessionService()
const directoryService = input.directory ?? makeDirectoryService(input.sdk)
const registeredMcp = new Map<string, Set<string>>()
const initialize = Effect.fn("ACPNext.initialize")(function* (params: InitializeRequest) {
@ -92,16 +95,8 @@ export function make(input: {
return {}
})
const directorySnapshot = Effect.fn("ACPNext.directorySnapshot")(function* (directory: string) {
const cached = directories.get(directory)
if (cached) return yield* request(() => cached, "directory")
const promise = loadDirectorySnapshot(input.sdk, directory).catch((error: unknown) => {
directories.delete(directory)
throw fromUnknownError(error, "directory")
})
directories.set(directory, promise)
return yield* request(() => promise, "directory")
const directorySnapshot = Effect.fn("ACPNext.directorySnapshot")(function* (cwd: string) {
return yield* directoryService.get(cwd)
})
const newSession = Effect.fn("ACPNext.newSession")(function* (params: NewSessionRequest) {
@ -125,7 +120,7 @@ export function make(input: {
),
"session",
)
const state = storeSession(sessions, {
const state = yield* session.create({
id: created.id,
cwd: params.cwd,
mcpServers: params.mcpServers,
@ -134,12 +129,16 @@ export function make(input: {
modeId,
})
yield* registerMcpServers(input.sdk, registeredMcp, params.cwd, params.mcpServers)
yield* registerMcpServers(input.sdk, registeredMcp, params.cwd, state.id, params.mcpServers)
yield* sendAvailableCommands(input.connection, state.id, snapshot)
return {
sessionId: state.id,
configOptions: configOptions(snapshot, state),
configOptions: configOptions(snapshot, {
model: state.model ?? selected,
variant: state.variant,
modeId: state.modeId,
}),
}
})
@ -159,7 +158,7 @@ export function make(input: {
)
const restored = restoreFromMessages(messages.map((item) => item.info))
const model = restored.model ?? selectDefaultModel(snapshot)
const state = storeSession(sessions, {
const state = yield* session.load({
id: params.sessionId,
cwd: params.cwd,
mcpServers: params.mcpServers,
@ -168,12 +167,16 @@ export function make(input: {
modeId: restored.modeId ?? (snapshot.availableModes.length > 0 ? snapshot.defaultModeID : undefined),
})
yield* registerMcpServers(input.sdk, registeredMcp, params.cwd, params.mcpServers)
yield* registerMcpServers(input.sdk, registeredMcp, params.cwd, state.id, params.mcpServers)
yield* sendAvailableCommands(input.connection, state.id, snapshot)
return {
sessionId: state.id,
configOptions: configOptions(snapshot, state),
configOptions: configOptions(snapshot, {
model: state.model ?? model,
variant: state.variant,
modeId: state.modeId,
}),
}
})
@ -191,10 +194,28 @@ export function make(input: {
}
}
type SessionState = {
readonly id: string
readonly cwd: string
readonly mcpServers: readonly McpServer[]
function makeSessionService() {
return ManagedRuntime.make(ACPNextSession.defaultLayer).runSync(
ACPNextSession.Service.use((service) => Effect.succeed(service)),
)
}
function makeDirectoryService(sdk: OpencodeClient) {
return ManagedRuntime.make(
Directory.layer.pipe(
Layer.provide(
Layer.succeed(
Directory.Loader,
Directory.Loader.of({
load: (directory) => request(() => loadDirectorySnapshot(sdk, directory), "directory"),
}),
),
),
),
).runSync(Directory.Service.use((service) => Effect.succeed(service)))
}
type ConfigState = {
readonly model: Directory.DefaultModel
readonly variant?: string
readonly modeId?: string
@ -340,15 +361,7 @@ function selectVariant(snapshot: Directory.Snapshot, model: Directory.DefaultMod
return Object.keys(variants)[0]
}
function storeSession(sessions: Map<string, SessionState>, state: SessionState) {
sessions.set(state.id, {
...state,
mcpServers: [...state.mcpServers],
})
return sessions.get(state.id)!
}
function configOptions(snapshot: Directory.Snapshot, session: SessionState) {
function configOptions(snapshot: Directory.Snapshot, session: ConfigState) {
return buildConfigOptions({
providers: Object.values(snapshot.providers),
currentModel: session.model,
@ -384,28 +397,36 @@ function registerMcpServers(
sdk: OpencodeClient,
registered: Map<string, Set<string>>,
directory: string,
sessionId: string,
servers: readonly McpServer[],
) {
const current = registered.get(directory) ?? new Set<string>()
registered.set(directory, current)
const current = registered.get(sessionId) ?? new Set<string>()
registered.set(sessionId, current)
const pending = new Set<string>()
return Effect.all(
Array.from(new Map(servers.map((server) => [server.name, server])).values())
.filter((server) => !current.has(server.name))
.map((server) =>
servers
.map((server) => ({ server, config: mcpConfig(server) }))
.filter((entry) => {
const key = mcpRegistrationKey(entry.server.name, entry.config)
if (current.has(key) || pending.has(key)) return false
pending.add(key)
return true
})
.map((entry) =>
request(
() =>
sdk.mcp.add(
{
directory,
name: server.name,
config: mcpConfig(server),
name: entry.server.name,
config: entry.config,
},
{ throwOnError: true },
),
"mcp",
).pipe(
Effect.tap(() => Effect.sync(() => current.add(server.name))),
Effect.tap(() => Effect.sync(() => current.add(mcpRegistrationKey(entry.server.name, entry.config)))),
Effect.ignore,
),
),
@ -413,6 +434,10 @@ function registerMcpServers(
).pipe(Effect.asVoid)
}
function mcpRegistrationKey(name: string, config: ReturnType<typeof mcpConfig>) {
return `${name}:${stableStringify(config)}`
}
function mcpConfig(server: McpServer) {
if ("type" in server) {
return {
@ -428,6 +453,15 @@ function mcpConfig(server: McpServer) {
}
}
function stableStringify(value: unknown): string {
if (Array.isArray(value)) return `[${value.map(stableStringify).join(",")}]`
if (!value || typeof value !== "object") return JSON.stringify(value)
return `{${Object.entries(value)
.toSorted(([a], [b]) => a.localeCompare(b))
.map(([key, item]) => `${JSON.stringify(key)}:${stableStringify(item)}`)
.join(",")}}`
}
function restoreFromMessages(messages: readonly MessageInfo[]) {
const user = messages.findLast(
(message) => message.role === "user" && message.model?.providerID && message.model.modelID,

View file

@ -282,6 +282,55 @@ describe("ACP next service sessions", () => {
expect(providersCalls).toBe(2)
})
it("registers same-name MCP servers again for different sessions or configs", async () => {
const adds: unknown[] = []
let nextSession = 0
const sdk = {
config: {
providers: () => Promise.resolve({ data: { providers: [provider], default: { test: modelID } } }),
get: () => Promise.resolve({ data: {} }),
},
app: {
agents: () => Promise.resolve({ data: [{ name: "build", mode: "primary", permission: [], options: {} }] }),
skills: () => Promise.resolve({ data: [] }),
},
command: {
list: () => Promise.resolve({ data: [] }),
},
session: {
create: () => {
nextSession++
return Promise.resolve({ data: { id: `ses_${nextSession}` } })
},
list: () => Promise.resolve({ data: [] }),
},
mcp: {
add: (input: unknown) => {
adds.push(input)
return Promise.resolve({ data: {} })
},
},
} as unknown as OpencodeClient
const service = ACPNextService.make({ sdk })
await Effect.runPromise(
service.newSession({
cwd: "/workspace",
mcpServers: [{ name: "tools", command: "node", args: ["one.js"], env: [] }],
}),
)
await Effect.runPromise(
service.newSession({
cwd: "/workspace",
mcpServers: [{ name: "tools", command: "node", args: ["two.js"], env: [] }],
}),
)
expect(adds).toHaveLength(2)
expect(JSON.stringify(adds[0])).toContain("one.js")
expect(JSON.stringify(adds[1])).toContain("two.js")
})
it("uses the configured model as the new session default", async () => {
const sdk = {
config: {