diff --git a/packages/opencode/src/acp-next/service.ts b/packages/opencode/src/acp-next/service.ts index 76adf85e6e..2373df3ac5 100644 --- a/packages/opencode/src/acp-next/service.ts +++ b/packages/opencode/src/acp-next/service.ts @@ -35,9 +35,11 @@ import type { Message, OpencodeClient, SessionMessageResponse } from "@opencode- import { Context, Effect, Layer, ManagedRuntime } from "effect" import * as ACPNextError from "./error" import { buildConfigOptions, parseModelSelection } from "./config-option" +import { promptContentToParts } from "./content" import { Directory } from "./directory" import { ACPNextEvent } from "./event" import { ACPNextSession } from "./session" +import { UsageService } from "./usage" import { ModelID, ProviderID } from "@/provider/schema" import { Provider } from "@/provider/provider" import type { Command } from "@/command" @@ -46,6 +48,8 @@ export const AuthMethodID = "opencode-login" const log = Log.create({ service: "acp-next-service" }) export type Error = ACPNextError.Error +type ServiceConnection = Pick & + Partial> export type Interface = { readonly initialize: (input: InitializeRequest) => Effect.Effect @@ -69,10 +73,10 @@ export class Service extends Context.Service()("@opencode/AC export function make(input: { sdk: OpencodeClient - connection?: Pick & - Partial> + connection?: ServiceConnection directory?: Directory.Interface session?: ACPNextSession.Interface + usage?: UsageService.Interface eventSubscription?: (subscription: ACPNextEvent.Subscription) => void }): Interface { const session = input.session ?? makeSessionService() @@ -444,8 +448,81 @@ export function make(input: { setSessionConfigOption, setSessionMode, setSessionModel, - prompt: Effect.fn("ACPNext.prompt")(function* (_input: PromptRequest) { - return yield* new ACPNextError.UnsupportedOperationError({ method: "session/prompt" }) + prompt: Effect.fn("ACPNext.prompt")(function* (params: PromptRequest) { + const current = yield* session.get(params.sessionId) + const snapshot = yield* directorySnapshot(current.cwd) + const selected = current.model ?? selectDefaultModel(snapshot) + if (!current.model) { + yield* session.setModel(params.sessionId, selected) + } + const variant = current.variant ?? selectVariant(snapshot, selected) + const modeId = current.modeId ?? (snapshot.availableModes.length > 0 ? snapshot.defaultModeID : undefined) + const parts = promptContentToParts(params.prompt) + const command = detectSlashCommand(parts) + + if (!command) { + const response = yield* request( + () => + input.sdk.session.prompt( + { + sessionID: current.id, + model: { + providerID: selected.providerID, + modelID: selected.modelID, + }, + ...(variant ? { variant } : {}), + parts, + ...(modeId ? { agent: modeId } : {}), + directory: current.cwd, + }, + { throwOnError: true }, + ), + "session", + ) + yield* sendUsageUpdate(input.usage, input.sdk, input.connection, current.id, current.cwd) + return promptResponse(response.info, params.messageId) + } + + const known = snapshot.availableCommands.find((item) => item.name === command.name) + if (known) { + const response = yield* request( + () => + input.sdk.session.command( + { + sessionID: current.id, + command: known.name, + arguments: command.args, + model: `${selected.providerID}/${selected.modelID}`, + ...(variant ? { variant } : {}), + ...(modeId ? { agent: modeId } : {}), + directory: current.cwd, + }, + { throwOnError: true }, + ), + "session", + ) + yield* sendUsageUpdate(input.usage, input.sdk, input.connection, current.id, current.cwd) + return promptResponse(response.info, params.messageId) + } + + if (command.name === "compact") { + yield* request( + () => + input.sdk.session.summarize( + { + sessionID: current.id, + directory: current.cwd, + providerID: selected.providerID, + modelID: selected.modelID, + }, + { throwOnError: true }, + ), + "session", + ) + } + + yield* sendUsageUpdate(input.usage, input.sdk, input.connection, current.id, current.cwd) + return promptResponse(undefined, params.messageId) }), cancel: Effect.fn("ACPNext.cancel")(function* (_input: CancelNotification) { return yield* new ACPNextError.UnsupportedOperationError({ method: "session/cancel" }) @@ -474,6 +551,91 @@ function makeDirectoryService(sdk: OpencodeClient) { ).runSync(Directory.Service.use((service) => Effect.succeed(service))) } +function makeUsageService(sdk: OpencodeClient) { + const limits = new Map>() + const contextLimit: UsageService.Interface["contextLimit"] = Effect.fn("ACPNext.promptUsage.contextLimit")(function* ( + params, + ) { + const key = `${params.directory}\u0000${params.providerID}\u0000${params.modelID}` + const current = limits.get(key) + if (current) return yield* Effect.promise(() => current) + + const next = sdk.config + .providers({ directory: params.directory }, { throwOnError: true }) + .then((response) => { + const providers = Object.fromEntries( + (response.data?.providers ?? []).map((provider) => [provider.id, provider]), + ) as Record + return UsageService.findContextLimit(providers, params.providerID, params.modelID) + }) + .catch((error: unknown) => { + log.error("failed to get providers for usage context limit", { error }) + return undefined + }) + limits.set(key, next) + return yield* Effect.promise(() => next) + }) + + const sendUpdate: UsageService.Interface["sendUpdate"] = Effect.fn("ACPNext.promptUsage.sendUpdate")(function* ( + params, + ) { + const messages = yield* request( + () => + sdk.session.messages( + { + sessionID: params.sessionID, + directory: params.directory, + }, + { throwOnError: true }, + ), + "session", + ).pipe( + Effect.map((messages) => messages as readonly UsageService.SessionMessage[]), + Effect.catch((error) => + Effect.sync(() => { + log.error("failed to fetch messages for usage update", { error }) + return undefined + }), + ), + ) + if (!messages) return + + const message = UsageService.latestAssistantMessage(messages) + if (!message?.providerID || !message.modelID) return + + const size = yield* contextLimit({ + directory: params.directory, + providerID: ProviderID.make(message.providerID), + modelID: ModelID.make(message.modelID), + }) + if (!size) return + + yield* Effect.promise(() => + params.connection + .sessionUpdate({ + sessionId: params.sessionID, + update: { + sessionUpdate: "usage_update", + used: message.tokens.input + message.tokens.cache.read, + size, + cost: { amount: UsageService.totalSessionCost(messages), currency: "USD" }, + }, + }) + .catch((error) => { + log.error("failed to send usage update", { error }) + }), + ) + }) + + return UsageService.Service.of({ + buildUsage: UsageService.buildUsage, + latestAssistantMessage: UsageService.latestAssistantMessage, + totalSessionCost: UsageService.totalSessionCost, + contextLimit, + sendUpdate, + }) +} + function replayMessages(subscription: ACPNextEvent.Subscription | undefined, messages: SessionMessageResponse[]) { if (!subscription) return Effect.void return Effect.promise(async () => { @@ -506,6 +668,8 @@ type MessageInfo = { readonly agent?: Message["agent"] } +type AssistantInfo = UsageService.AssistantTokenCost | undefined + function request(fn: () => Promise>, service?: string) { return Effect.tryPromise({ try: async () => { @@ -620,6 +784,43 @@ function selectDefaultModel(snapshot: Directory.Snapshot) { return { providerID: "unknown" as ProviderID, modelID: "unknown" as ModelID } } +function detectSlashCommand(parts: ReturnType) { + const text = parts + .filter((part): part is Extract<(typeof parts)[number], { type: "text" }> => part.type === "text") + .map((part) => part.text) + .join("") + .trim() + if (!text.startsWith("/")) return + + const [name, ...rest] = text.slice(1).split(/\s+/) + if (!name) return + return { name, args: rest.join(" ").trim() } +} + +function promptResponse(info: AssistantInfo, messageId: string | null | undefined): PromptResponse { + return { + stopReason: "end_turn", + ...(info ? { usage: UsageService.buildUsage(info) } : {}), + ...(messageId ? { userMessageId: messageId } : {}), + _meta: {}, + } +} + +function sendUsageUpdate( + usage: UsageService.Interface | undefined, + sdk: OpencodeClient, + connection: ServiceConnection | undefined, + sessionID: string, + directory: string, +) { + if (!connection) return Effect.void + return (usage ?? makeUsageService(sdk)).sendUpdate({ + connection, + sessionID, + directory, + }) +} + function selectVariant(snapshot: Directory.Snapshot, model: Directory.DefaultModel) { const variants = Directory.variants(snapshot, model) if (!variants) return diff --git a/packages/opencode/test/acp-next/service-session.test.ts b/packages/opencode/test/acp-next/service-session.test.ts index 96d4066f7e..4c0563b63e 100644 --- a/packages/opencode/test/acp-next/service-session.test.ts +++ b/packages/opencode/test/acp-next/service-session.test.ts @@ -4,6 +4,7 @@ import type { ForkSessionResponse, LoadSessionResponse, NewSessionResponse, + SessionNotification, ResumeSessionResponse, SessionConfigOption, SessionConfigSelectOption, @@ -14,6 +15,7 @@ import { Effect, ManagedRuntime } from "effect" import * as ACPNextService from "@/acp-next/service" import * as ACPNextError from "@/acp-next/error" import { ACPNextSession } from "@/acp-next/session" +import { UsageService } from "@/acp-next/usage" import { ModelID, ProviderID } from "@/provider/schema" import type { Provider } from "@/provider/provider" @@ -141,10 +143,14 @@ const provider: Provider.Info = { describe("ACP next service sessions", () => { const makeService = (messages: readonly { info: unknown; parts: readonly unknown[] }[] = []) => { - const updates: unknown[] = [] + const updates: SessionNotification[] = [] const mcpAdds: string[] = [] const aborts: string[] = [] const forks: string[] = [] + const prompts: unknown[] = [] + const commands: unknown[] = [] + const summarizes: unknown[] = [] + const usageUpdates: string[] = [] const sessions = Array.from({ length: 102 }, (_, index) => ({ id: `ses_${index + 1}`, directory: index % 2 === 0 ? "/workspace" : "/other", @@ -184,6 +190,36 @@ describe("ACP next service sessions", () => { data: input.directory ? sessions.filter((session) => session.directory === input.directory) : sessions, }), messages: () => Promise.resolve({ data: messages }), + prompt: (input: unknown) => { + prompts.push(input) + return Promise.resolve({ + data: { + info: assistantInfo({ + input: 100, + output: 40, + reasoning: 7, + cache: { read: 11, write: 13 }, + }), + }, + }) + }, + command: (input: unknown) => { + commands.push(input) + return Promise.resolve({ + data: { + info: assistantInfo({ + input: 3, + output: 4, + reasoning: 0, + cache: { read: 0, write: 0 }, + }), + }, + }) + }, + summarize: (input: unknown) => { + summarizes.push(input) + return Promise.resolve({ data: true }) + }, abort: (input: { sessionID: string }) => { aborts.push(input.sessionID) return Promise.resolve({ data: true }) @@ -201,13 +237,33 @@ describe("ACP next service sessions", () => { }, } as unknown as OpencodeClient const connection = { - sessionUpdate: (update: unknown) => { + sessionUpdate: (update: SessionNotification) => { updates.push(update) return Promise.resolve() }, } as Pick + const usage = UsageService.Service.of({ + buildUsage: UsageService.buildUsage, + latestAssistantMessage: UsageService.latestAssistantMessage, + totalSessionCost: UsageService.totalSessionCost, + contextLimit: () => Effect.succeed(128000), + sendUpdate: (input) => + Effect.sync(() => { + usageUpdates.push(input.sessionID) + }), + }) - return { service: ACPNextService.make({ sdk, connection }), updates, mcpAdds, aborts, forks } + return { + service: ACPNextService.make({ sdk, connection, usage }), + updates, + mcpAdds, + aborts, + forks, + prompts, + commands, + summarizes, + usageUpdates, + } } it("creates a backed session with config options and command update", async () => { @@ -657,8 +713,220 @@ describe("ACP next service sessions", () => { expect(providersCalls).toBe(1) expect(commandCalls).toBe(1) }) + + it("normal text prompt sends model variant mode and converted parts", async () => { + const { service, prompts, usageUpdates } = makeService() + const session = await Effect.runPromise(service.newSession({ cwd: "/workspace", mcpServers: [] })) + await Effect.runPromise( + service.setSessionConfigOption({ + sessionId: session.sessionId, + configId: "effort", + value: "high", + }), + ) + await Effect.runPromise( + service.setSessionConfigOption({ + sessionId: session.sessionId, + configId: "mode", + value: "plan", + }), + ) + + const result = await Effect.runPromise( + service.prompt({ + sessionId: session.sessionId, + messageId: "00000000-0000-4000-8000-000000000001", + prompt: [{ type: "text", text: "hello" }], + }), + ) + + expect(prompts).toEqual([ + { + sessionID: session.sessionId, + model: { providerID, modelID }, + variant: "high", + parts: [{ type: "text", text: "hello" }], + agent: "plan", + directory: "/workspace", + }, + ]) + expect(result).toEqual({ + stopReason: "end_turn", + usage: { + inputTokens: 100, + outputTokens: 40, + thoughtTokens: 7, + cachedReadTokens: 11, + cachedWriteTokens: 13, + totalTokens: 171, + }, + userMessageId: "00000000-0000-4000-8000-000000000001", + _meta: {}, + }) + expect(usageUpdates).toEqual([session.sessionId]) + }) + + it("prompt maps assistant and user audience annotations", async () => { + const { service, prompts } = makeService() + const session = await Effect.runPromise(service.newSession({ cwd: "/workspace", mcpServers: [] })) + + await Effect.runPromise( + service.prompt({ + sessionId: session.sessionId, + prompt: [ + { type: "text", text: "assistant context", annotations: { audience: ["assistant"] } }, + { type: "text", text: "user context", annotations: { audience: ["user"] } }, + ], + }), + ) + + expect(prompts).toContainEqual({ + sessionID: session.sessionId, + model: { providerID, modelID }, + variant: "default", + parts: [ + { type: "text", text: "assistant context", synthetic: true }, + { type: "text", text: "user context", ignored: true }, + ], + agent: "build", + directory: "/workspace", + }) + }) + + it("prompt sends image and resource parts", async () => { + const { service, prompts } = makeService() + const session = await Effect.runPromise(service.newSession({ cwd: "/workspace", mcpServers: [] })) + + await Effect.runPromise( + service.prompt({ + sessionId: session.sessionId, + prompt: [ + { type: "image", data: "AAAA", mimeType: "image/png", uri: "file:///tmp/screenshot.png" }, + { + type: "resource", + resource: { + uri: "file:///tmp/report.pdf", + mimeType: "application/pdf", + blob: "JVBERg==", + }, + }, + ], + }), + ) + + expect((prompts[0] as { parts?: unknown }).parts).toEqual([ + { + type: "file", + url: "data:image/png;base64,AAAA", + filename: "screenshot.png", + mime: "image/png", + }, + { + type: "file", + url: "data:application/pdf;base64,JVBERg==", + filename: "report.pdf", + mime: "application/pdf", + }, + ]) + }) + + it("slash command prompt calls session command", async () => { + const { service, prompts, commands } = makeService() + const session = await Effect.runPromise(service.newSession({ cwd: "/workspace", mcpServers: [] })) + + const result = await Effect.runPromise( + service.prompt({ sessionId: session.sessionId, prompt: [{ type: "text", text: "/init now" }] }), + ) + + expect(prompts).toEqual([]) + expect(commands).toEqual([ + { + sessionID: session.sessionId, + command: "init", + arguments: "now", + model: "test/test-model", + variant: "default", + agent: "build", + directory: "/workspace", + }, + ]) + expect(result.usage).toEqual({ inputTokens: 3, outputTokens: 4, totalTokens: 7 }) + }) + + it("compact slash command calls summarize path", async () => { + const { service, prompts, commands, summarizes } = makeService() + const session = await Effect.runPromise(service.newSession({ cwd: "/workspace", mcpServers: [] })) + + await Effect.runPromise( + service.prompt({ sessionId: session.sessionId, prompt: [{ type: "text", text: "/compact" }] }), + ) + + expect(prompts).toEqual([]) + expect(commands).toEqual([]) + expect(summarizes).toEqual([ + { + sessionID: session.sessionId, + directory: "/workspace", + providerID, + modelID, + }, + ]) + }) + + it("maps prompt auth failures to auth-required request errors", async () => { + const { service } = makeService() + const session = await Effect.runPromise(service.newSession({ cwd: "/workspace", mcpServers: [] })) + const failing = ACPNextService.make({ + sdk: { + config: { + providers: () => Promise.resolve({ data: { providers: [provider], default: { test: modelID } } }), + get: () => Promise.resolve({ data: {} }), + }, + app: { + agents: () => Promise.resolve({ data: [{ name: "build", mode: "primary", permission: [], options: {} }] }), + skills: () => Promise.resolve({ data: [] }), + }, + command: { + list: () => Promise.resolve({ data: [] }), + }, + session: { + create: () => Promise.resolve({ data: { id: session.sessionId } }), + list: () => Promise.resolve({ data: [] }), + prompt: () => Promise.reject({ name: "ProviderAuthError", data: { providerID: "test" } }), + }, + mcp: { + add: () => Promise.resolve({ data: {} }), + }, + } as unknown as OpencodeClient, + usage: UsageService.Service.of({ + buildUsage: UsageService.buildUsage, + latestAssistantMessage: UsageService.latestAssistantMessage, + totalSessionCost: UsageService.totalSessionCost, + contextLimit: () => Effect.succeed(128000), + sendUpdate: () => Effect.void, + }), + }) + await Effect.runPromise(failing.newSession({ cwd: "/workspace", mcpServers: [] })) + const error = await Effect.runPromise( + failing + .prompt({ sessionId: session.sessionId, prompt: [{ type: "text", text: "hello" }] }) + .pipe(Effect.mapError(ACPNextError.toRequestError), Effect.flip), + ) + + expect(error.code).toBe(-32000) + }) }) +function assistantInfo(tokens: UsageService.AssistantTokenCost["tokens"]): UsageService.AssistantMessage { + return { + role: "assistant", + providerID: "test", + modelID: "test-model", + cost: 0, + tokens, + } +} + function categories(result: NewSessionResponse | LoadSessionResponse) { return result.configOptions?.map((option) => option.category) ?? [] } diff --git a/packages/opencode/test/cli/acp-next/acp-next-process.test.ts b/packages/opencode/test/cli/acp-next/acp-next-process.test.ts index 95b905543b..5c01929d95 100644 --- a/packages/opencode/test/cli/acp-next/acp-next-process.test.ts +++ b/packages/opencode/test/cli/acp-next/acp-next-process.test.ts @@ -5,6 +5,7 @@ import type { InitializeResponse, LoadSessionResponse, NewSessionResponse, + PromptResponse, ResumeSessionResponse, SessionNotification, SetSessionConfigOptionResponse, @@ -95,11 +96,21 @@ describe("opencode acp-next (subprocess)", () => { ) expect(selectConfigOption(loaded.configOptions, "model")?.category).toBe("model") - const prompt = yield* acp.request("session/prompt", { + yield* llm.text("hello from acp-next", { usage: { input: 11, output: 7 } }) + const prompted = expectOk( + yield* acp.request("session/prompt", { + sessionId: session.sessionId, + prompt: [{ type: "text", text: "hello" }], + }), + ) + expect(prompted.stopReason).toBe("end_turn") + expect(prompted.usage?.totalTokens).toBeGreaterThan(0) + + const missing = yield* acp.request("session/prompt", { sessionId: "ses_missing", prompt: [{ type: "text", text: "hello" }], }) - expect(errorCode(prompt.error)).toBe(-32601) + expect(errorCode(missing.error)).toBe(-32602) }), 60_000, )