mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-27 08:58:10 +00:00
feat(acp): stream acp-next tool updates (#29333)
This commit is contained in:
parent
b5632ea700
commit
717e74f3e5
4 changed files with 591 additions and 7 deletions
|
|
@ -7,9 +7,18 @@ import type {
|
|||
OpencodeClient,
|
||||
Part,
|
||||
SessionMessageResponse,
|
||||
ToolPart,
|
||||
} from "@opencode-ai/sdk/v2"
|
||||
import { Effect } from "effect"
|
||||
import { ACPNextSession } from "./session"
|
||||
import {
|
||||
duplicateRunningToolUpdate,
|
||||
errorToolUpdate,
|
||||
pendingToolCall,
|
||||
runningToolUpdate,
|
||||
shellOutputSnapshot,
|
||||
completedToolUpdate,
|
||||
} from "./tool"
|
||||
|
||||
const log = Log.create({ service: "acp-next-event" })
|
||||
|
||||
|
|
@ -29,6 +38,8 @@ export function start(input: { sdk: OpencodeClient; connection: Connection; sess
|
|||
|
||||
export class Subscription {
|
||||
private readonly abort = new AbortController()
|
||||
private readonly shellSnapshots = new Map<string, string>()
|
||||
private readonly toolStarts = new Set<string>()
|
||||
private started = false
|
||||
|
||||
constructor(
|
||||
|
|
@ -61,6 +72,17 @@ export class Subscription {
|
|||
}
|
||||
}
|
||||
|
||||
async replayMessage(message: SessionMessageResponse) {
|
||||
if (message.info.role !== "assistant" && message.info.role !== "user") return
|
||||
|
||||
for (const part of message.parts) {
|
||||
await this.recordFetchedPart(message.info.sessionID, message, part)
|
||||
if (part.type === "tool") {
|
||||
await this.handleToolPart(message.info.sessionID, part)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async run() {
|
||||
while (!this.abort.signal.aborted) {
|
||||
const events = (await this.input.sdk.global.event({
|
||||
|
|
@ -96,6 +118,9 @@ export class Subscription {
|
|||
metadata: "metadata" in part ? part.metadata : undefined,
|
||||
}),
|
||||
)
|
||||
if (part.type === "tool") {
|
||||
await this.handleToolPart(session.id, part)
|
||||
}
|
||||
}
|
||||
|
||||
private async handlePartDelta(event: EventMessagePartDelta) {
|
||||
|
|
@ -181,6 +206,106 @@ export class Subscription {
|
|||
}),
|
||||
)
|
||||
}
|
||||
|
||||
private async handleToolPart(sessionId: string, part: ToolPart) {
|
||||
await this.toolStart(sessionId, part)
|
||||
|
||||
switch (part.state.status) {
|
||||
case "pending":
|
||||
this.shellSnapshots.delete(part.callID)
|
||||
return
|
||||
|
||||
case "running":
|
||||
await this.runningTool(sessionId, part)
|
||||
return
|
||||
|
||||
case "completed":
|
||||
this.clearTool(part.callID)
|
||||
await this.input.connection.sessionUpdate({
|
||||
sessionId,
|
||||
update: {
|
||||
sessionUpdate: "tool_call_update",
|
||||
...completedToolUpdate({
|
||||
toolCallId: part.callID,
|
||||
toolName: part.tool,
|
||||
state: part.state,
|
||||
}),
|
||||
},
|
||||
})
|
||||
return
|
||||
|
||||
case "error":
|
||||
this.clearTool(part.callID)
|
||||
await this.input.connection.sessionUpdate({
|
||||
sessionId,
|
||||
update: {
|
||||
sessionUpdate: "tool_call_update",
|
||||
...errorToolUpdate({
|
||||
toolCallId: part.callID,
|
||||
toolName: part.tool,
|
||||
state: part.state,
|
||||
}),
|
||||
},
|
||||
})
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
private async runningTool(sessionId: string, part: ToolPart) {
|
||||
if (part.state.status !== "running") return
|
||||
|
||||
const output = part.tool === "bash" ? shellOutputSnapshot(part.state) : undefined
|
||||
if (output !== undefined) {
|
||||
if (this.shellSnapshots.get(part.callID) === output) {
|
||||
await this.input.connection.sessionUpdate({
|
||||
sessionId,
|
||||
update: {
|
||||
sessionUpdate: "tool_call_update",
|
||||
...duplicateRunningToolUpdate({
|
||||
toolCallId: part.callID,
|
||||
toolName: part.tool,
|
||||
state: part.state,
|
||||
}),
|
||||
},
|
||||
})
|
||||
return
|
||||
}
|
||||
this.shellSnapshots.set(part.callID, output)
|
||||
}
|
||||
|
||||
await this.input.connection.sessionUpdate({
|
||||
sessionId,
|
||||
update: {
|
||||
sessionUpdate: "tool_call_update",
|
||||
...runningToolUpdate({
|
||||
toolCallId: part.callID,
|
||||
toolName: part.tool,
|
||||
state: part.state,
|
||||
output,
|
||||
}),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
private async toolStart(sessionId: string, part: ToolPart) {
|
||||
if (this.toolStarts.has(part.callID)) return
|
||||
this.toolStarts.add(part.callID)
|
||||
await this.input.connection.sessionUpdate({
|
||||
sessionId,
|
||||
update: {
|
||||
sessionUpdate: "tool_call",
|
||||
...pendingToolCall({
|
||||
toolCallId: part.callID,
|
||||
toolName: part.tool,
|
||||
}),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
private clearTool(toolCallId: string) {
|
||||
this.toolStarts.delete(toolCallId)
|
||||
this.shellSnapshots.delete(toolCallId)
|
||||
}
|
||||
}
|
||||
|
||||
export * as ACPNextEvent from "./event"
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ import {
|
|||
} from "@agentclientprotocol/sdk"
|
||||
import { InstallationVersion } from "@opencode-ai/core/installation/version"
|
||||
import * as Log from "@opencode-ai/core/util/log"
|
||||
import type { Message, OpencodeClient } from "@opencode-ai/sdk/v2"
|
||||
import type { Message, OpencodeClient, SessionMessageResponse } from "@opencode-ai/sdk/v2"
|
||||
import { Context, Effect, Layer, ManagedRuntime } from "effect"
|
||||
import * as ACPNextError from "./error"
|
||||
import { buildConfigOptions, parseModelSelection } from "./config-option"
|
||||
|
|
@ -77,10 +77,10 @@ export function make(input: {
|
|||
const session = input.session ?? makeSessionService()
|
||||
const directoryService = input.directory ?? makeDirectoryService(input.sdk)
|
||||
const registeredMcp = new Map<string, Set<string>>()
|
||||
if (input.connection) {
|
||||
const subscription = ACPNextEvent.start({ sdk: input.sdk, connection: input.connection, session })
|
||||
input.eventSubscription?.(subscription)
|
||||
}
|
||||
const events = input.connection
|
||||
? ACPNextEvent.start({ sdk: input.sdk, connection: input.connection, session })
|
||||
: undefined
|
||||
if (events) input.eventSubscription?.(events)
|
||||
|
||||
const initialize = Effect.fn("ACPNext.initialize")(function* (params: InitializeRequest) {
|
||||
const authMethod: AuthMethod = {
|
||||
|
|
@ -207,6 +207,7 @@ export function make(input: {
|
|||
|
||||
yield* registerMcpServers(input.sdk, registeredMcp, params.cwd, state.id, params.mcpServers)
|
||||
yield* sendAvailableCommands(input.connection, state.id, snapshot)
|
||||
yield* replayMessages(events, messages)
|
||||
|
||||
return {
|
||||
configOptions: configOptions(snapshot, {
|
||||
|
|
@ -276,6 +277,7 @@ export function make(input: {
|
|||
|
||||
yield* registerMcpServers(input.sdk, registeredMcp, params.cwd, state.id, params.mcpServers ?? [])
|
||||
yield* sendAvailableCommands(input.connection, state.id, snapshot)
|
||||
yield* replayMessages(events, messages)
|
||||
|
||||
return {
|
||||
configOptions: configOptions(snapshot, {
|
||||
|
|
@ -335,6 +337,7 @@ export function make(input: {
|
|||
|
||||
yield* registerMcpServers(input.sdk, registeredMcp, params.cwd, state.id, params.mcpServers ?? [])
|
||||
yield* sendAvailableCommands(input.connection, state.id, snapshot)
|
||||
yield* replayMessages(events, messages)
|
||||
|
||||
return {
|
||||
sessionId: state.id,
|
||||
|
|
@ -470,6 +473,17 @@ function makeDirectoryService(sdk: OpencodeClient) {
|
|||
).runSync(Directory.Service.use((service) => Effect.succeed(service)))
|
||||
}
|
||||
|
||||
function replayMessages(subscription: ACPNextEvent.Subscription | undefined, messages: SessionMessageResponse[]) {
|
||||
if (!subscription) return Effect.void
|
||||
return Effect.promise(async () => {
|
||||
for (const message of messages) {
|
||||
await subscription.replayMessage(message).catch((error: unknown) => {
|
||||
log.error("failed to replay ACP message", { error, messageID: message.info.id })
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
type ConfigState = {
|
||||
readonly model: Directory.DefaultModel
|
||||
readonly variant?: string
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
import type { ToolCallContent, ToolCallLocation, ToolKind } from "@agentclientprotocol/sdk"
|
||||
import type { ToolCall, ToolCallContent, ToolCallLocation, ToolCallUpdate, ToolKind } from "@agentclientprotocol/sdk"
|
||||
|
||||
export type ToolInput = Record<string, unknown>
|
||||
|
||||
|
|
@ -16,6 +16,19 @@ export type CompletedToolState = {
|
|||
readonly attachments?: ReadonlyArray<ToolAttachment>
|
||||
}
|
||||
|
||||
export type RunningToolState = {
|
||||
readonly status: "running"
|
||||
readonly input: ToolInput
|
||||
readonly title?: string
|
||||
}
|
||||
|
||||
export type ErrorToolState = {
|
||||
readonly status: "error"
|
||||
readonly input: ToolInput
|
||||
readonly error: string
|
||||
readonly metadata?: unknown
|
||||
}
|
||||
|
||||
export type ImageAttachment = {
|
||||
readonly mimeType: string
|
||||
readonly data: string
|
||||
|
|
@ -100,6 +113,104 @@ export function completedToolContent(toolName: string, state: CompletedToolState
|
|||
return content
|
||||
}
|
||||
|
||||
export function pendingToolCall(input: { readonly toolCallId: string; readonly toolName: string }): ToolCall {
|
||||
return {
|
||||
toolCallId: input.toolCallId,
|
||||
title: input.toolName,
|
||||
kind: toToolKind(input.toolName),
|
||||
status: "pending",
|
||||
locations: [],
|
||||
rawInput: {},
|
||||
}
|
||||
}
|
||||
|
||||
export function runningToolUpdate(input: {
|
||||
readonly toolCallId: string
|
||||
readonly toolName: string
|
||||
readonly state: RunningToolState
|
||||
readonly output?: string
|
||||
}): ToolCallUpdate {
|
||||
const content = input.output
|
||||
? [
|
||||
{
|
||||
type: "content" as const,
|
||||
content: {
|
||||
type: "text" as const,
|
||||
text: input.output,
|
||||
},
|
||||
},
|
||||
]
|
||||
: undefined
|
||||
|
||||
return {
|
||||
toolCallId: input.toolCallId,
|
||||
status: "in_progress",
|
||||
kind: toToolKind(input.toolName),
|
||||
title: input.state.title ?? input.toolName,
|
||||
locations: toLocations(input.toolName, input.state.input),
|
||||
rawInput: input.state.input,
|
||||
...(content ? { content } : {}),
|
||||
}
|
||||
}
|
||||
|
||||
export function duplicateRunningToolUpdate(input: {
|
||||
readonly toolCallId: string
|
||||
readonly toolName: string
|
||||
readonly state: RunningToolState
|
||||
}): ToolCallUpdate {
|
||||
return {
|
||||
toolCallId: input.toolCallId,
|
||||
status: "in_progress",
|
||||
kind: toToolKind(input.toolName),
|
||||
title: input.state.title ?? input.toolName,
|
||||
locations: toLocations(input.toolName, input.state.input),
|
||||
rawInput: input.state.input,
|
||||
}
|
||||
}
|
||||
|
||||
export function completedToolUpdate(input: {
|
||||
readonly toolCallId: string
|
||||
readonly toolName: string
|
||||
readonly state: CompletedToolState & { readonly title: string }
|
||||
}): ToolCallUpdate {
|
||||
return {
|
||||
toolCallId: input.toolCallId,
|
||||
status: "completed",
|
||||
kind: toToolKind(input.toolName),
|
||||
title: input.state.title,
|
||||
content: completedToolContent(input.toolName, input.state),
|
||||
rawInput: input.state.input,
|
||||
rawOutput: completedToolRawOutput(input.state),
|
||||
}
|
||||
}
|
||||
|
||||
export function errorToolUpdate(input: {
|
||||
readonly toolCallId: string
|
||||
readonly toolName: string
|
||||
readonly state: ErrorToolState
|
||||
}): ToolCallUpdate {
|
||||
return {
|
||||
toolCallId: input.toolCallId,
|
||||
status: "failed",
|
||||
kind: toToolKind(input.toolName),
|
||||
title: input.toolName,
|
||||
rawInput: input.state.input,
|
||||
content: [
|
||||
{
|
||||
type: "content",
|
||||
content: {
|
||||
type: "text",
|
||||
text: input.state.error,
|
||||
},
|
||||
},
|
||||
],
|
||||
rawOutput: {
|
||||
error: input.state.error,
|
||||
metadata: input.state.metadata,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
export function completedToolRawOutput(state: CompletedToolState) {
|
||||
return {
|
||||
output: state.output,
|
||||
|
|
@ -138,6 +249,11 @@ export const extractLocations = toLocations
|
|||
export const buildCompletedToolContent = completedToolContent
|
||||
export const buildCompletedRawOutput = completedToolRawOutput
|
||||
export const extractShellOutputSnapshot = shellOutputSnapshot
|
||||
export const buildPendingToolCall = pendingToolCall
|
||||
export const buildRunningToolUpdate = runningToolUpdate
|
||||
export const buildDuplicateRunningToolUpdate = duplicateRunningToolUpdate
|
||||
export const buildCompletedToolUpdate = completedToolUpdate
|
||||
export const buildErrorToolUpdate = errorToolUpdate
|
||||
|
||||
function locationFrom(value: unknown): ToolCallLocation[] {
|
||||
const path = stringValue(value)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
import { describe, expect, it } from "bun:test"
|
||||
import type { AgentSideConnection } from "@agentclientprotocol/sdk"
|
||||
import type { Event, Message, OpencodeClient, Part, SessionMessageResponse } from "@opencode-ai/sdk/v2"
|
||||
import type { Event, Message, OpencodeClient, Part, SessionMessageResponse, ToolPart } from "@opencode-ai/sdk/v2"
|
||||
import { Effect, ManagedRuntime } from "effect"
|
||||
import { ACPNextEvent } from "@/acp-next/event"
|
||||
import * as ACPNextService from "@/acp-next/service"
|
||||
|
|
@ -8,6 +8,9 @@ import { Directory } from "@/acp-next/directory"
|
|||
import { ACPNextSession } from "@/acp-next/session"
|
||||
|
||||
type SessionUpdateParams = Parameters<AgentSideConnection["sessionUpdate"]>[0]
|
||||
type ToolSessionUpdateParams = SessionUpdateParams & {
|
||||
update: Extract<SessionUpdateParams["update"], { sessionUpdate: "tool_call" | "tool_call_update" }>
|
||||
}
|
||||
type GlobalEventEnvelope = {
|
||||
payload?: Event
|
||||
}
|
||||
|
|
@ -151,6 +154,18 @@ function partUpdated(sessionID: string, messageID: string, partID: string, type:
|
|||
}
|
||||
}
|
||||
|
||||
function toolUpdated(part: ToolPart): Event {
|
||||
return {
|
||||
id: `evt_${part.sessionID}_${part.messageID}_${part.id}_${part.state.status}`,
|
||||
type: "message.part.updated",
|
||||
properties: {
|
||||
sessionID: part.sessionID,
|
||||
time: Date.now(),
|
||||
part,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
function assistantMessage(sessionID: string, messageID: string, partID: string, type: DeltaPartType) {
|
||||
return {
|
||||
info: {
|
||||
|
|
@ -188,6 +203,98 @@ function assistantMessage(sessionID: string, messageID: string, partID: string,
|
|||
} satisfies SessionMessageResponse
|
||||
}
|
||||
|
||||
function assistantToolMessage(part: ToolPart) {
|
||||
return {
|
||||
info: {
|
||||
id: part.messageID,
|
||||
sessionID: part.sessionID,
|
||||
role: "assistant",
|
||||
time: { created: Date.now() },
|
||||
parentID: "msg_parent",
|
||||
modelID: "model",
|
||||
providerID: "provider",
|
||||
mode: "build",
|
||||
agent: "build",
|
||||
path: { cwd: "/workspace", root: "/workspace" },
|
||||
cost: 0,
|
||||
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
|
||||
},
|
||||
parts: [part],
|
||||
} satisfies SessionMessageResponse
|
||||
}
|
||||
|
||||
function runningTool(
|
||||
sessionID: string,
|
||||
callID: string,
|
||||
output?: string,
|
||||
input: Record<string, unknown> = { cmd: "printf hello" },
|
||||
) {
|
||||
return {
|
||||
id: `part_${callID}`,
|
||||
sessionID,
|
||||
messageID: `msg_${callID}`,
|
||||
type: "tool",
|
||||
callID,
|
||||
tool: "bash",
|
||||
state: {
|
||||
status: "running",
|
||||
input,
|
||||
title: "bash",
|
||||
...(output !== undefined ? { metadata: { output } } : {}),
|
||||
time: { start: Date.now() },
|
||||
},
|
||||
} satisfies ToolPart
|
||||
}
|
||||
|
||||
function completedTool(
|
||||
sessionID: string,
|
||||
callID: string,
|
||||
output = "done",
|
||||
attachments: Extract<ToolPart["state"], { status: "completed" }>["attachments"] = [],
|
||||
) {
|
||||
return {
|
||||
id: `part_${callID}`,
|
||||
sessionID,
|
||||
messageID: `msg_${callID}`,
|
||||
type: "tool",
|
||||
callID,
|
||||
tool: "bash",
|
||||
state: {
|
||||
status: "completed",
|
||||
input: { cmd: "printf done" },
|
||||
output,
|
||||
title: "bash",
|
||||
metadata: { exit: 0 },
|
||||
time: { start: Date.now() - 1, end: Date.now() },
|
||||
...(attachments.length ? { attachments } : {}),
|
||||
},
|
||||
} satisfies ToolPart
|
||||
}
|
||||
|
||||
function errorTool(sessionID: string, callID: string) {
|
||||
return {
|
||||
id: `part_${callID}`,
|
||||
sessionID,
|
||||
messageID: `msg_${callID}`,
|
||||
type: "tool",
|
||||
callID,
|
||||
tool: "bash",
|
||||
state: {
|
||||
status: "error",
|
||||
input: { cmd: "exit 1" },
|
||||
error: "failed hard",
|
||||
metadata: { exit: 1 },
|
||||
time: { start: Date.now() - 1, end: Date.now() },
|
||||
},
|
||||
} satisfies ToolPart
|
||||
}
|
||||
|
||||
function toolUpdates(updates: SessionUpdateParams[]) {
|
||||
return updates.filter((item): item is ToolSessionUpdateParams => {
|
||||
return item.update.sessionUpdate === "tool_call" || item.update.sessionUpdate === "tool_call_update"
|
||||
})
|
||||
}
|
||||
|
||||
async function createKnownSession(
|
||||
session: ACPNextSession.Interface,
|
||||
sessionId: string,
|
||||
|
|
@ -310,6 +417,85 @@ describe("acp-next event routing", () => {
|
|||
expect(harness.updates).toHaveLength(2)
|
||||
})
|
||||
|
||||
it("replays loaded session messages sequentially and continues after update failures", async () => {
|
||||
const events = createEventStream()
|
||||
const updates: SessionUpdateParams[] = []
|
||||
const connection = {
|
||||
sessionUpdate: (params: SessionUpdateParams) => {
|
||||
if (params.update.sessionUpdate === "tool_call" && params.update.toolCallId === "call_slow") {
|
||||
return new Promise<void>((resolve) => {
|
||||
setTimeout(() => {
|
||||
updates.push(params)
|
||||
resolve()
|
||||
}, 20)
|
||||
})
|
||||
}
|
||||
|
||||
if (params.update.sessionUpdate === "tool_call_update" && params.update.toolCallId === "call_slow") {
|
||||
return Promise.reject(new Error("replay send failed"))
|
||||
}
|
||||
|
||||
updates.push(params)
|
||||
return Promise.resolve()
|
||||
},
|
||||
} satisfies Pick<AgentSideConnection, "sessionUpdate">
|
||||
let subscription: ACPNextEvent.Subscription | undefined
|
||||
const service = ACPNextService.make({
|
||||
sdk: {
|
||||
global: {
|
||||
event: (options?: { signal?: AbortSignal }) => Promise.resolve({ stream: events.stream(options?.signal) }),
|
||||
},
|
||||
session: {
|
||||
get: () => Promise.resolve({ data: { id: "ses_loaded" } }),
|
||||
messages: () =>
|
||||
Promise.resolve({
|
||||
data: [
|
||||
assistantToolMessage(completedTool("ses_loaded", "call_slow", "slow")),
|
||||
assistantToolMessage(completedTool("ses_loaded", "call_after", "after")),
|
||||
],
|
||||
}),
|
||||
},
|
||||
} as unknown as OpencodeClient,
|
||||
connection,
|
||||
directory: {
|
||||
get: () =>
|
||||
Effect.succeed(
|
||||
Directory.build({
|
||||
directory: "/workspace",
|
||||
providers: {},
|
||||
modes: [],
|
||||
defaultModeID: "build",
|
||||
commands: [],
|
||||
}),
|
||||
),
|
||||
refresh: () =>
|
||||
Effect.succeed(
|
||||
Directory.build({
|
||||
directory: "/workspace",
|
||||
providers: {},
|
||||
modes: [],
|
||||
defaultModeID: "build",
|
||||
commands: [],
|
||||
}),
|
||||
),
|
||||
variants: Directory.variants,
|
||||
},
|
||||
eventSubscription: (started) => {
|
||||
subscription = started
|
||||
},
|
||||
})
|
||||
|
||||
await Effect.runPromise(service.loadSession({ cwd: "/workspace", sessionId: "ses_loaded", mcpServers: [] }))
|
||||
|
||||
expect(toolUpdates(updates).map((item) => item.update.toolCallId)).toEqual([
|
||||
"call_slow",
|
||||
"call_after",
|
||||
"call_after",
|
||||
])
|
||||
subscription?.stop()
|
||||
events.close()
|
||||
})
|
||||
|
||||
it("ignores unknown sessions and live user parts without user_message_chunk duplication", async () => {
|
||||
const harness = createHarness()
|
||||
await createKnownSession(harness.session, "ses_user", {
|
||||
|
|
@ -325,4 +511,147 @@ describe("acp-next event routing", () => {
|
|||
|
||||
expect(harness.updates).toHaveLength(0)
|
||||
})
|
||||
|
||||
it("emits synthetic pending before the first running tool update", async () => {
|
||||
const harness = createHarness()
|
||||
await Effect.runPromise(harness.session.create({ id: "ses_tool", cwd: "/workspace" }))
|
||||
|
||||
await harness.subscription.handle(toolUpdated(runningTool("ses_tool", "call_1", "hello")))
|
||||
|
||||
expect(toolUpdates(harness.updates).map((item) => item.update.sessionUpdate)).toEqual([
|
||||
"tool_call",
|
||||
"tool_call_update",
|
||||
])
|
||||
expect(harness.updates[0]?.update).toMatchObject({ status: "pending", toolCallId: "call_1" })
|
||||
expect(harness.updates[1]?.update).toMatchObject({ status: "in_progress", toolCallId: "call_1" })
|
||||
})
|
||||
|
||||
it("does not emit duplicate synthetic pending after a replayed running tool", async () => {
|
||||
const harness = createHarness()
|
||||
await Effect.runPromise(harness.session.create({ id: "ses_replay", cwd: "/workspace" }))
|
||||
|
||||
await harness.subscription.replayMessage(assistantToolMessage(runningTool("ses_replay", "call_replay", "first")))
|
||||
await harness.subscription.handle(toolUpdated(runningTool("ses_replay", "call_replay", "second")))
|
||||
|
||||
expect(toolUpdates(harness.updates).filter((item) => item.update.sessionUpdate === "tool_call")).toHaveLength(1)
|
||||
expect(toolUpdates(harness.updates).map((item) => item.update.sessionUpdate)).toEqual([
|
||||
"tool_call",
|
||||
"tool_call_update",
|
||||
"tool_call_update",
|
||||
])
|
||||
})
|
||||
|
||||
it("dedupes shell output snapshots while still sending status-only running updates", async () => {
|
||||
const harness = createHarness()
|
||||
await Effect.runPromise(harness.session.create({ id: "ses_shell", cwd: "/workspace" }))
|
||||
|
||||
await harness.subscription.handle(toolUpdated(runningTool("ses_shell", "call_shell", "same")))
|
||||
await harness.subscription.handle(toolUpdated(runningTool("ses_shell", "call_shell", "same")))
|
||||
|
||||
const updates = toolUpdates(harness.updates)
|
||||
expect(updates).toHaveLength(3)
|
||||
expect(updates[1]?.update).toMatchObject({
|
||||
sessionUpdate: "tool_call_update",
|
||||
content: [{ type: "content", content: { type: "text", text: "same" } }],
|
||||
})
|
||||
expect(updates[2]?.update).toMatchObject({ sessionUpdate: "tool_call_update", status: "in_progress" })
|
||||
expect("content" in updates[2]!.update).toBe(false)
|
||||
})
|
||||
|
||||
it("clears shell snapshot marker when a tool returns to pending", async () => {
|
||||
const harness = createHarness()
|
||||
await Effect.runPromise(harness.session.create({ id: "ses_pending", cwd: "/workspace" }))
|
||||
|
||||
await harness.subscription.handle(toolUpdated(runningTool("ses_pending", "call_pending", "repeat")))
|
||||
await harness.subscription.handle(
|
||||
toolUpdated({
|
||||
id: "part_call_pending",
|
||||
sessionID: "ses_pending",
|
||||
messageID: "msg_call_pending",
|
||||
type: "tool",
|
||||
callID: "call_pending",
|
||||
tool: "bash",
|
||||
state: {
|
||||
status: "pending",
|
||||
input: { cmd: "printf repeat" },
|
||||
raw: '{"cmd":"printf repeat"}',
|
||||
},
|
||||
}),
|
||||
)
|
||||
await harness.subscription.handle(toolUpdated(runningTool("ses_pending", "call_pending", "repeat")))
|
||||
|
||||
expect(
|
||||
toolUpdates(harness.updates)
|
||||
.filter((item) => item.update.sessionUpdate === "tool_call_update")
|
||||
.map((item) => ("content" in item.update ? item.update.content : undefined)),
|
||||
).toEqual([
|
||||
[{ type: "content", content: { type: "text", text: "repeat" } }],
|
||||
[{ type: "content", content: { type: "text", text: "repeat" } }],
|
||||
])
|
||||
})
|
||||
|
||||
it("emits completed tool output and rawOutput", async () => {
|
||||
const harness = createHarness()
|
||||
await Effect.runPromise(harness.session.create({ id: "ses_done", cwd: "/workspace" }))
|
||||
|
||||
await harness.subscription.handle(toolUpdated(completedTool("ses_done", "call_done", "finished")))
|
||||
|
||||
expect(harness.updates.at(-1)?.update).toMatchObject({
|
||||
sessionUpdate: "tool_call_update",
|
||||
toolCallId: "call_done",
|
||||
status: "completed",
|
||||
content: [{ type: "content", content: { type: "text", text: "finished" } }],
|
||||
rawOutput: { output: "finished", metadata: { exit: 0 } },
|
||||
})
|
||||
})
|
||||
|
||||
it("emits error tool output", async () => {
|
||||
const harness = createHarness()
|
||||
await Effect.runPromise(harness.session.create({ id: "ses_error", cwd: "/workspace" }))
|
||||
|
||||
await harness.subscription.handle(toolUpdated(errorTool("ses_error", "call_error")))
|
||||
|
||||
expect(harness.updates.at(-1)?.update).toMatchObject({
|
||||
sessionUpdate: "tool_call_update",
|
||||
toolCallId: "call_error",
|
||||
status: "failed",
|
||||
content: [{ type: "content", content: { type: "text", text: "failed hard" } }],
|
||||
rawOutput: { error: "failed hard", metadata: { exit: 1 } },
|
||||
})
|
||||
})
|
||||
|
||||
it("emits image attachments as ACP image content for live and replayed completed tool updates", async () => {
|
||||
const harness = createHarness()
|
||||
const image = Buffer.from("image-data").toString("base64")
|
||||
const attachment = {
|
||||
id: "file_image",
|
||||
sessionID: "ses_image",
|
||||
messageID: "msg_image",
|
||||
type: "file",
|
||||
mime: "image/png",
|
||||
filename: "image.png",
|
||||
url: `data:image/png;base64,${image}`,
|
||||
} as const
|
||||
await Effect.runPromise(harness.session.create({ id: "ses_image", cwd: "/workspace" }))
|
||||
|
||||
await harness.subscription.handle(toolUpdated(completedTool("ses_image", "call_live", "live", [attachment])))
|
||||
await harness.subscription.replayMessage(
|
||||
assistantToolMessage(completedTool("ses_image", "call_replayed", "replayed", [attachment])),
|
||||
)
|
||||
|
||||
expect(
|
||||
toolUpdates(harness.updates)
|
||||
.filter((item) => item.update.sessionUpdate === "tool_call_update" && item.update.status === "completed")
|
||||
.map((item) => ("content" in item.update ? item.update.content : [])),
|
||||
).toEqual([
|
||||
[
|
||||
{ type: "content", content: { type: "text", text: "live" } },
|
||||
{ type: "content", content: { type: "image", mimeType: "image/png", data: image } },
|
||||
],
|
||||
[
|
||||
{ type: "content", content: { type: "text", text: "replayed" } },
|
||||
{ type: "content", content: { type: "image", mimeType: "image/png", data: image } },
|
||||
],
|
||||
])
|
||||
})
|
||||
})
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue