diff --git a/packages/opencode/src/acp-next/event.ts b/packages/opencode/src/acp-next/event.ts index 86c902d46e..25aa5c08ef 100644 --- a/packages/opencode/src/acp-next/event.ts +++ b/packages/opencode/src/acp-next/event.ts @@ -7,9 +7,18 @@ import type { OpencodeClient, Part, SessionMessageResponse, + ToolPart, } from "@opencode-ai/sdk/v2" import { Effect } from "effect" import { ACPNextSession } from "./session" +import { + duplicateRunningToolUpdate, + errorToolUpdate, + pendingToolCall, + runningToolUpdate, + shellOutputSnapshot, + completedToolUpdate, +} from "./tool" const log = Log.create({ service: "acp-next-event" }) @@ -29,6 +38,8 @@ export function start(input: { sdk: OpencodeClient; connection: Connection; sess export class Subscription { private readonly abort = new AbortController() + private readonly shellSnapshots = new Map() + private readonly toolStarts = new Set() private started = false constructor( @@ -61,6 +72,17 @@ export class Subscription { } } + async replayMessage(message: SessionMessageResponse) { + if (message.info.role !== "assistant" && message.info.role !== "user") return + + for (const part of message.parts) { + await this.recordFetchedPart(message.info.sessionID, message, part) + if (part.type === "tool") { + await this.handleToolPart(message.info.sessionID, part) + } + } + } + private async run() { while (!this.abort.signal.aborted) { const events = (await this.input.sdk.global.event({ @@ -96,6 +118,9 @@ export class Subscription { metadata: "metadata" in part ? part.metadata : undefined, }), ) + if (part.type === "tool") { + await this.handleToolPart(session.id, part) + } } private async handlePartDelta(event: EventMessagePartDelta) { @@ -181,6 +206,106 @@ export class Subscription { }), ) } + + private async handleToolPart(sessionId: string, part: ToolPart) { + await this.toolStart(sessionId, part) + + switch (part.state.status) { + case "pending": + this.shellSnapshots.delete(part.callID) + return + + case "running": + await this.runningTool(sessionId, part) + return + + case "completed": + this.clearTool(part.callID) + await this.input.connection.sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + ...completedToolUpdate({ + toolCallId: part.callID, + toolName: part.tool, + state: part.state, + }), + }, + }) + return + + case "error": + this.clearTool(part.callID) + await this.input.connection.sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + ...errorToolUpdate({ + toolCallId: part.callID, + toolName: part.tool, + state: part.state, + }), + }, + }) + return + } + } + + private async runningTool(sessionId: string, part: ToolPart) { + if (part.state.status !== "running") return + + const output = part.tool === "bash" ? shellOutputSnapshot(part.state) : undefined + if (output !== undefined) { + if (this.shellSnapshots.get(part.callID) === output) { + await this.input.connection.sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + ...duplicateRunningToolUpdate({ + toolCallId: part.callID, + toolName: part.tool, + state: part.state, + }), + }, + }) + return + } + this.shellSnapshots.set(part.callID, output) + } + + await this.input.connection.sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + ...runningToolUpdate({ + toolCallId: part.callID, + toolName: part.tool, + state: part.state, + output, + }), + }, + }) + } + + private async toolStart(sessionId: string, part: ToolPart) { + if (this.toolStarts.has(part.callID)) return + this.toolStarts.add(part.callID) + await this.input.connection.sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call", + ...pendingToolCall({ + toolCallId: part.callID, + toolName: part.tool, + }), + }, + }) + } + + private clearTool(toolCallId: string) { + this.toolStarts.delete(toolCallId) + this.shellSnapshots.delete(toolCallId) + } } export * as ACPNextEvent from "./event" diff --git a/packages/opencode/src/acp-next/service.ts b/packages/opencode/src/acp-next/service.ts index a465264d45..be368da623 100644 --- a/packages/opencode/src/acp-next/service.ts +++ b/packages/opencode/src/acp-next/service.ts @@ -31,7 +31,7 @@ import { } from "@agentclientprotocol/sdk" import { InstallationVersion } from "@opencode-ai/core/installation/version" import * as Log from "@opencode-ai/core/util/log" -import type { Message, OpencodeClient } from "@opencode-ai/sdk/v2" +import type { Message, OpencodeClient, SessionMessageResponse } from "@opencode-ai/sdk/v2" import { Context, Effect, Layer, ManagedRuntime } from "effect" import * as ACPNextError from "./error" import { buildConfigOptions, parseModelSelection } from "./config-option" @@ -77,10 +77,10 @@ export function make(input: { const session = input.session ?? makeSessionService() const directoryService = input.directory ?? makeDirectoryService(input.sdk) const registeredMcp = new Map>() - if (input.connection) { - const subscription = ACPNextEvent.start({ sdk: input.sdk, connection: input.connection, session }) - input.eventSubscription?.(subscription) - } + const events = input.connection + ? ACPNextEvent.start({ sdk: input.sdk, connection: input.connection, session }) + : undefined + if (events) input.eventSubscription?.(events) const initialize = Effect.fn("ACPNext.initialize")(function* (params: InitializeRequest) { const authMethod: AuthMethod = { @@ -207,6 +207,7 @@ export function make(input: { yield* registerMcpServers(input.sdk, registeredMcp, params.cwd, state.id, params.mcpServers) yield* sendAvailableCommands(input.connection, state.id, snapshot) + yield* replayMessages(events, messages) return { configOptions: configOptions(snapshot, { @@ -276,6 +277,7 @@ export function make(input: { yield* registerMcpServers(input.sdk, registeredMcp, params.cwd, state.id, params.mcpServers ?? []) yield* sendAvailableCommands(input.connection, state.id, snapshot) + yield* replayMessages(events, messages) return { configOptions: configOptions(snapshot, { @@ -335,6 +337,7 @@ export function make(input: { yield* registerMcpServers(input.sdk, registeredMcp, params.cwd, state.id, params.mcpServers ?? []) yield* sendAvailableCommands(input.connection, state.id, snapshot) + yield* replayMessages(events, messages) return { sessionId: state.id, @@ -470,6 +473,17 @@ function makeDirectoryService(sdk: OpencodeClient) { ).runSync(Directory.Service.use((service) => Effect.succeed(service))) } +function replayMessages(subscription: ACPNextEvent.Subscription | undefined, messages: SessionMessageResponse[]) { + if (!subscription) return Effect.void + return Effect.promise(async () => { + for (const message of messages) { + await subscription.replayMessage(message).catch((error: unknown) => { + log.error("failed to replay ACP message", { error, messageID: message.info.id }) + }) + } + }) +} + type ConfigState = { readonly model: Directory.DefaultModel readonly variant?: string diff --git a/packages/opencode/src/acp-next/tool.ts b/packages/opencode/src/acp-next/tool.ts index 128c4c9c85..288d24e31e 100644 --- a/packages/opencode/src/acp-next/tool.ts +++ b/packages/opencode/src/acp-next/tool.ts @@ -1,4 +1,4 @@ -import type { ToolCallContent, ToolCallLocation, ToolKind } from "@agentclientprotocol/sdk" +import type { ToolCall, ToolCallContent, ToolCallLocation, ToolCallUpdate, ToolKind } from "@agentclientprotocol/sdk" export type ToolInput = Record @@ -16,6 +16,19 @@ export type CompletedToolState = { readonly attachments?: ReadonlyArray } +export type RunningToolState = { + readonly status: "running" + readonly input: ToolInput + readonly title?: string +} + +export type ErrorToolState = { + readonly status: "error" + readonly input: ToolInput + readonly error: string + readonly metadata?: unknown +} + export type ImageAttachment = { readonly mimeType: string readonly data: string @@ -100,6 +113,104 @@ export function completedToolContent(toolName: string, state: CompletedToolState return content } +export function pendingToolCall(input: { readonly toolCallId: string; readonly toolName: string }): ToolCall { + return { + toolCallId: input.toolCallId, + title: input.toolName, + kind: toToolKind(input.toolName), + status: "pending", + locations: [], + rawInput: {}, + } +} + +export function runningToolUpdate(input: { + readonly toolCallId: string + readonly toolName: string + readonly state: RunningToolState + readonly output?: string +}): ToolCallUpdate { + const content = input.output + ? [ + { + type: "content" as const, + content: { + type: "text" as const, + text: input.output, + }, + }, + ] + : undefined + + return { + toolCallId: input.toolCallId, + status: "in_progress", + kind: toToolKind(input.toolName), + title: input.state.title ?? input.toolName, + locations: toLocations(input.toolName, input.state.input), + rawInput: input.state.input, + ...(content ? { content } : {}), + } +} + +export function duplicateRunningToolUpdate(input: { + readonly toolCallId: string + readonly toolName: string + readonly state: RunningToolState +}): ToolCallUpdate { + return { + toolCallId: input.toolCallId, + status: "in_progress", + kind: toToolKind(input.toolName), + title: input.state.title ?? input.toolName, + locations: toLocations(input.toolName, input.state.input), + rawInput: input.state.input, + } +} + +export function completedToolUpdate(input: { + readonly toolCallId: string + readonly toolName: string + readonly state: CompletedToolState & { readonly title: string } +}): ToolCallUpdate { + return { + toolCallId: input.toolCallId, + status: "completed", + kind: toToolKind(input.toolName), + title: input.state.title, + content: completedToolContent(input.toolName, input.state), + rawInput: input.state.input, + rawOutput: completedToolRawOutput(input.state), + } +} + +export function errorToolUpdate(input: { + readonly toolCallId: string + readonly toolName: string + readonly state: ErrorToolState +}): ToolCallUpdate { + return { + toolCallId: input.toolCallId, + status: "failed", + kind: toToolKind(input.toolName), + title: input.toolName, + rawInput: input.state.input, + content: [ + { + type: "content", + content: { + type: "text", + text: input.state.error, + }, + }, + ], + rawOutput: { + error: input.state.error, + metadata: input.state.metadata, + }, + } +} + export function completedToolRawOutput(state: CompletedToolState) { return { output: state.output, @@ -138,6 +249,11 @@ export const extractLocations = toLocations export const buildCompletedToolContent = completedToolContent export const buildCompletedRawOutput = completedToolRawOutput export const extractShellOutputSnapshot = shellOutputSnapshot +export const buildPendingToolCall = pendingToolCall +export const buildRunningToolUpdate = runningToolUpdate +export const buildDuplicateRunningToolUpdate = duplicateRunningToolUpdate +export const buildCompletedToolUpdate = completedToolUpdate +export const buildErrorToolUpdate = errorToolUpdate function locationFrom(value: unknown): ToolCallLocation[] { const path = stringValue(value) diff --git a/packages/opencode/test/acp-next/event.test.ts b/packages/opencode/test/acp-next/event.test.ts index a101d2f347..92b65b8881 100644 --- a/packages/opencode/test/acp-next/event.test.ts +++ b/packages/opencode/test/acp-next/event.test.ts @@ -1,6 +1,6 @@ import { describe, expect, it } from "bun:test" import type { AgentSideConnection } from "@agentclientprotocol/sdk" -import type { Event, Message, OpencodeClient, Part, SessionMessageResponse } from "@opencode-ai/sdk/v2" +import type { Event, Message, OpencodeClient, Part, SessionMessageResponse, ToolPart } from "@opencode-ai/sdk/v2" import { Effect, ManagedRuntime } from "effect" import { ACPNextEvent } from "@/acp-next/event" import * as ACPNextService from "@/acp-next/service" @@ -8,6 +8,9 @@ import { Directory } from "@/acp-next/directory" import { ACPNextSession } from "@/acp-next/session" type SessionUpdateParams = Parameters[0] +type ToolSessionUpdateParams = SessionUpdateParams & { + update: Extract +} type GlobalEventEnvelope = { payload?: Event } @@ -151,6 +154,18 @@ function partUpdated(sessionID: string, messageID: string, partID: string, type: } } +function toolUpdated(part: ToolPart): Event { + return { + id: `evt_${part.sessionID}_${part.messageID}_${part.id}_${part.state.status}`, + type: "message.part.updated", + properties: { + sessionID: part.sessionID, + time: Date.now(), + part, + }, + } +} + function assistantMessage(sessionID: string, messageID: string, partID: string, type: DeltaPartType) { return { info: { @@ -188,6 +203,98 @@ function assistantMessage(sessionID: string, messageID: string, partID: string, } satisfies SessionMessageResponse } +function assistantToolMessage(part: ToolPart) { + return { + info: { + id: part.messageID, + sessionID: part.sessionID, + role: "assistant", + time: { created: Date.now() }, + parentID: "msg_parent", + modelID: "model", + providerID: "provider", + mode: "build", + agent: "build", + path: { cwd: "/workspace", root: "/workspace" }, + cost: 0, + tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, + }, + parts: [part], + } satisfies SessionMessageResponse +} + +function runningTool( + sessionID: string, + callID: string, + output?: string, + input: Record = { cmd: "printf hello" }, +) { + return { + id: `part_${callID}`, + sessionID, + messageID: `msg_${callID}`, + type: "tool", + callID, + tool: "bash", + state: { + status: "running", + input, + title: "bash", + ...(output !== undefined ? { metadata: { output } } : {}), + time: { start: Date.now() }, + }, + } satisfies ToolPart +} + +function completedTool( + sessionID: string, + callID: string, + output = "done", + attachments: Extract["attachments"] = [], +) { + return { + id: `part_${callID}`, + sessionID, + messageID: `msg_${callID}`, + type: "tool", + callID, + tool: "bash", + state: { + status: "completed", + input: { cmd: "printf done" }, + output, + title: "bash", + metadata: { exit: 0 }, + time: { start: Date.now() - 1, end: Date.now() }, + ...(attachments.length ? { attachments } : {}), + }, + } satisfies ToolPart +} + +function errorTool(sessionID: string, callID: string) { + return { + id: `part_${callID}`, + sessionID, + messageID: `msg_${callID}`, + type: "tool", + callID, + tool: "bash", + state: { + status: "error", + input: { cmd: "exit 1" }, + error: "failed hard", + metadata: { exit: 1 }, + time: { start: Date.now() - 1, end: Date.now() }, + }, + } satisfies ToolPart +} + +function toolUpdates(updates: SessionUpdateParams[]) { + return updates.filter((item): item is ToolSessionUpdateParams => { + return item.update.sessionUpdate === "tool_call" || item.update.sessionUpdate === "tool_call_update" + }) +} + async function createKnownSession( session: ACPNextSession.Interface, sessionId: string, @@ -310,6 +417,85 @@ describe("acp-next event routing", () => { expect(harness.updates).toHaveLength(2) }) + it("replays loaded session messages sequentially and continues after update failures", async () => { + const events = createEventStream() + const updates: SessionUpdateParams[] = [] + const connection = { + sessionUpdate: (params: SessionUpdateParams) => { + if (params.update.sessionUpdate === "tool_call" && params.update.toolCallId === "call_slow") { + return new Promise((resolve) => { + setTimeout(() => { + updates.push(params) + resolve() + }, 20) + }) + } + + if (params.update.sessionUpdate === "tool_call_update" && params.update.toolCallId === "call_slow") { + return Promise.reject(new Error("replay send failed")) + } + + updates.push(params) + return Promise.resolve() + }, + } satisfies Pick + let subscription: ACPNextEvent.Subscription | undefined + const service = ACPNextService.make({ + sdk: { + global: { + event: (options?: { signal?: AbortSignal }) => Promise.resolve({ stream: events.stream(options?.signal) }), + }, + session: { + get: () => Promise.resolve({ data: { id: "ses_loaded" } }), + messages: () => + Promise.resolve({ + data: [ + assistantToolMessage(completedTool("ses_loaded", "call_slow", "slow")), + assistantToolMessage(completedTool("ses_loaded", "call_after", "after")), + ], + }), + }, + } as unknown as OpencodeClient, + connection, + directory: { + get: () => + Effect.succeed( + Directory.build({ + directory: "/workspace", + providers: {}, + modes: [], + defaultModeID: "build", + commands: [], + }), + ), + refresh: () => + Effect.succeed( + Directory.build({ + directory: "/workspace", + providers: {}, + modes: [], + defaultModeID: "build", + commands: [], + }), + ), + variants: Directory.variants, + }, + eventSubscription: (started) => { + subscription = started + }, + }) + + await Effect.runPromise(service.loadSession({ cwd: "/workspace", sessionId: "ses_loaded", mcpServers: [] })) + + expect(toolUpdates(updates).map((item) => item.update.toolCallId)).toEqual([ + "call_slow", + "call_after", + "call_after", + ]) + subscription?.stop() + events.close() + }) + it("ignores unknown sessions and live user parts without user_message_chunk duplication", async () => { const harness = createHarness() await createKnownSession(harness.session, "ses_user", { @@ -325,4 +511,147 @@ describe("acp-next event routing", () => { expect(harness.updates).toHaveLength(0) }) + + it("emits synthetic pending before the first running tool update", async () => { + const harness = createHarness() + await Effect.runPromise(harness.session.create({ id: "ses_tool", cwd: "/workspace" })) + + await harness.subscription.handle(toolUpdated(runningTool("ses_tool", "call_1", "hello"))) + + expect(toolUpdates(harness.updates).map((item) => item.update.sessionUpdate)).toEqual([ + "tool_call", + "tool_call_update", + ]) + expect(harness.updates[0]?.update).toMatchObject({ status: "pending", toolCallId: "call_1" }) + expect(harness.updates[1]?.update).toMatchObject({ status: "in_progress", toolCallId: "call_1" }) + }) + + it("does not emit duplicate synthetic pending after a replayed running tool", async () => { + const harness = createHarness() + await Effect.runPromise(harness.session.create({ id: "ses_replay", cwd: "/workspace" })) + + await harness.subscription.replayMessage(assistantToolMessage(runningTool("ses_replay", "call_replay", "first"))) + await harness.subscription.handle(toolUpdated(runningTool("ses_replay", "call_replay", "second"))) + + expect(toolUpdates(harness.updates).filter((item) => item.update.sessionUpdate === "tool_call")).toHaveLength(1) + expect(toolUpdates(harness.updates).map((item) => item.update.sessionUpdate)).toEqual([ + "tool_call", + "tool_call_update", + "tool_call_update", + ]) + }) + + it("dedupes shell output snapshots while still sending status-only running updates", async () => { + const harness = createHarness() + await Effect.runPromise(harness.session.create({ id: "ses_shell", cwd: "/workspace" })) + + await harness.subscription.handle(toolUpdated(runningTool("ses_shell", "call_shell", "same"))) + await harness.subscription.handle(toolUpdated(runningTool("ses_shell", "call_shell", "same"))) + + const updates = toolUpdates(harness.updates) + expect(updates).toHaveLength(3) + expect(updates[1]?.update).toMatchObject({ + sessionUpdate: "tool_call_update", + content: [{ type: "content", content: { type: "text", text: "same" } }], + }) + expect(updates[2]?.update).toMatchObject({ sessionUpdate: "tool_call_update", status: "in_progress" }) + expect("content" in updates[2]!.update).toBe(false) + }) + + it("clears shell snapshot marker when a tool returns to pending", async () => { + const harness = createHarness() + await Effect.runPromise(harness.session.create({ id: "ses_pending", cwd: "/workspace" })) + + await harness.subscription.handle(toolUpdated(runningTool("ses_pending", "call_pending", "repeat"))) + await harness.subscription.handle( + toolUpdated({ + id: "part_call_pending", + sessionID: "ses_pending", + messageID: "msg_call_pending", + type: "tool", + callID: "call_pending", + tool: "bash", + state: { + status: "pending", + input: { cmd: "printf repeat" }, + raw: '{"cmd":"printf repeat"}', + }, + }), + ) + await harness.subscription.handle(toolUpdated(runningTool("ses_pending", "call_pending", "repeat"))) + + expect( + toolUpdates(harness.updates) + .filter((item) => item.update.sessionUpdate === "tool_call_update") + .map((item) => ("content" in item.update ? item.update.content : undefined)), + ).toEqual([ + [{ type: "content", content: { type: "text", text: "repeat" } }], + [{ type: "content", content: { type: "text", text: "repeat" } }], + ]) + }) + + it("emits completed tool output and rawOutput", async () => { + const harness = createHarness() + await Effect.runPromise(harness.session.create({ id: "ses_done", cwd: "/workspace" })) + + await harness.subscription.handle(toolUpdated(completedTool("ses_done", "call_done", "finished"))) + + expect(harness.updates.at(-1)?.update).toMatchObject({ + sessionUpdate: "tool_call_update", + toolCallId: "call_done", + status: "completed", + content: [{ type: "content", content: { type: "text", text: "finished" } }], + rawOutput: { output: "finished", metadata: { exit: 0 } }, + }) + }) + + it("emits error tool output", async () => { + const harness = createHarness() + await Effect.runPromise(harness.session.create({ id: "ses_error", cwd: "/workspace" })) + + await harness.subscription.handle(toolUpdated(errorTool("ses_error", "call_error"))) + + expect(harness.updates.at(-1)?.update).toMatchObject({ + sessionUpdate: "tool_call_update", + toolCallId: "call_error", + status: "failed", + content: [{ type: "content", content: { type: "text", text: "failed hard" } }], + rawOutput: { error: "failed hard", metadata: { exit: 1 } }, + }) + }) + + it("emits image attachments as ACP image content for live and replayed completed tool updates", async () => { + const harness = createHarness() + const image = Buffer.from("image-data").toString("base64") + const attachment = { + id: "file_image", + sessionID: "ses_image", + messageID: "msg_image", + type: "file", + mime: "image/png", + filename: "image.png", + url: `data:image/png;base64,${image}`, + } as const + await Effect.runPromise(harness.session.create({ id: "ses_image", cwd: "/workspace" })) + + await harness.subscription.handle(toolUpdated(completedTool("ses_image", "call_live", "live", [attachment]))) + await harness.subscription.replayMessage( + assistantToolMessage(completedTool("ses_image", "call_replayed", "replayed", [attachment])), + ) + + expect( + toolUpdates(harness.updates) + .filter((item) => item.update.sessionUpdate === "tool_call_update" && item.update.status === "completed") + .map((item) => ("content" in item.update ? item.update.content : [])), + ).toEqual([ + [ + { type: "content", content: { type: "text", text: "live" } }, + { type: "content", content: { type: "image", mimeType: "image/png", data: image } }, + ], + [ + { type: "content", content: { type: "text", text: "replayed" } }, + { type: "content", content: { type: "image", mimeType: "image/png", data: image } }, + ], + ]) + }) })