diff --git a/packages/opencode/src/cli/cmd/tui/context/editor-zed.ts b/packages/opencode/src/cli/cmd/tui/context/editor-zed.ts index bae88f28d0..9a776f2506 100644 --- a/packages/opencode/src/cli/cmd/tui/context/editor-zed.ts +++ b/packages/opencode/src/cli/cmd/tui/context/editor-zed.ts @@ -38,7 +38,9 @@ export async function resolveZedSelection(dbPath: string, cwd = process.cwd()): const text = contents.type === "contents" && contents.contents != null ? contents.contents - : await Bun.file(row.buffer_path).text().catch(() => undefined) + : await Bun.file(row.buffer_path) + .text() + .catch(() => undefined) if (text == null) return { type: "unavailable" } const startOffset = Math.min(row.selection_start, row.selection_end) @@ -79,11 +81,10 @@ function queryZedActiveEditor(dbPath: string, cwd: string) { ) .all() - const rows = raw - .flatMap((row) => { - const parsed = ZedEditorRowSchema.safeParse(row) - return parsed.success ? [parsed.data] : [] - }) + const rows = raw.flatMap((row) => { + const parsed = ZedEditorRowSchema.safeParse(row) + return parsed.success ? [parsed.data] : [] + }) if (raw.length > 0 && rows.length === 0) return { type: "unavailable" as const } diff --git a/packages/opencode/src/effect/runner.ts b/packages/opencode/src/effect/runner.ts index 0e923b1194..bbc85309e8 100644 --- a/packages/opencode/src/effect/runner.ts +++ b/packages/opencode/src/effect/runner.ts @@ -4,7 +4,7 @@ export interface Runner { readonly state: State readonly busy: boolean readonly ensureRunning: (work: Effect.Effect) => Effect.Effect - readonly startShell: (work: Effect.Effect) => Effect.Effect + readonly startShell: (work: Effect.Effect, ready?: Deferred.Deferred) => Effect.Effect readonly cancel: Effect.Effect } @@ -18,6 +18,8 @@ interface RunHandle { interface ShellHandle { id: number + cancelled: Deferred.Deferred + ready?: Deferred.Deferred fiber: Fiber.Fiber } @@ -59,6 +61,9 @@ export const make = ( ? Deferred.fail(done, new Cancelled()).pipe(Effect.asVoid) : Deferred.done(done, exit).pipe(Effect.asVoid) + const awaitDone = (done: Deferred.Deferred) => + Deferred.await(done).pipe(Effect.catchTag("RunnerCancelled", (e) => onInterrupt ?? Effect.die(e))) + const idleIfCurrent = () => SynchronizedRef.modify(ref, (st) => [st._tag === "Idle" ? idle : Effect.void, st] as const).pipe(Effect.flatten) @@ -89,7 +94,9 @@ export const make = ( SynchronizedRef.modifyEffect( ref, Effect.fnUntraced(function* (st) { - if (st._tag === "Shell" && st.shell.id === id) return [idle, { _tag: "Idle" }] as const + if (st._tag === "Shell" && st.shell.id === id) { + return [idle, { _tag: "Idle" }] as const + } if (st._tag === "ShellThenRun" && st.shell.id === id) { const run = yield* startRun(st.run.work, st.run.done) return [Effect.void, { _tag: "Running", run }] as const @@ -98,7 +105,12 @@ export const make = ( }), ).pipe(Effect.flatten) - const stopShell = (shell: ShellHandle) => Fiber.interrupt(shell.fiber) + const stopShell = (shell: ShellHandle) => + Effect.gen(function* () { + if (shell.ready) yield* Deferred.await(shell.ready).pipe(Effect.exit, Effect.asVoid) + yield* Deferred.succeed(shell.cancelled, undefined).pipe(Effect.asVoid) + yield* Fiber.interrupt(shell.fiber) + }) const ensureRunning = (work: Effect.Effect) => SynchronizedRef.modifyEffect( @@ -107,30 +119,25 @@ export const make = ( switch (st._tag) { case "Running": case "ShellThenRun": - return [Deferred.await(st.run.done), st] as const + return [awaitDone(st.run.done), st] as const case "Shell": { const run = { id: next(), done: yield* Deferred.make(), work, } satisfies PendingHandle - return [Deferred.await(run.done), { _tag: "ShellThenRun", shell: st.shell, run }] as const + return [awaitDone(run.done), { _tag: "ShellThenRun", shell: st.shell, run }] as const } case "Idle": { const done = yield* Deferred.make() const run = yield* startRun(work, done) - return [Deferred.await(done), { _tag: "Running", run }] as const + return [awaitDone(done), { _tag: "Running", run }] as const } } }), - ).pipe( - Effect.flatten, - Effect.catch( - (e): Effect.Effect => (e instanceof Cancelled ? (onInterrupt ?? Effect.die(e)) : Effect.fail(e as E)), - ), - ) + ).pipe(Effect.flatten) - const startShell = (work: Effect.Effect) => + const startShell = (work: Effect.Effect, ready?: Deferred.Deferred) => SynchronizedRef.modifyEffect( ref, Effect.fnUntraced(function* (st) { @@ -145,13 +152,17 @@ export const make = ( } yield* busy const id = next() + const cancelled = yield* Deferred.make() const fiber = yield* work.pipe(Effect.ensuring(finishShell(id)), Effect.forkChild) - const shell = { id, fiber } satisfies ShellHandle + const shell = { id, cancelled, ready, fiber } satisfies ShellHandle return [ Effect.gen(function* () { const exit = yield* Fiber.await(fiber) if (Exit.isSuccess(exit)) return exit.value - if (Cause.hasInterruptsOnly(exit.cause) && onInterrupt) return yield* onInterrupt + if (Cause.hasInterruptsOnly(exit.cause) || ((yield* Deferred.isDone(cancelled)) && !Cause.hasDies(exit.cause))) { + if (onInterrupt) return yield* onInterrupt + return yield* Effect.die(new Cancelled()) + } return yield* Effect.failCause(exit.cause) }), { _tag: "Shell", shell }, @@ -183,8 +194,8 @@ export const make = ( case "ShellThenRun": return [ Effect.gen(function* () { - yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid) yield* stopShell(st.shell) + yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid) yield* idleIfCurrent() }), { _tag: "Idle" } as const, diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 229039e3a6..bc2da37896 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -46,7 +46,7 @@ import { AppFileSystem } from "@opencode-ai/core/filesystem" import { Truncate } from "@/tool/truncate" import { decodeDataUrl } from "@/util/data-url" import { Process } from "@/util/process" -import { Cause, Effect, Exit, Layer, Option, Scope, Context, Schema } from "effect" +import { Cause, Deferred, Effect, Exit, Layer, Option, Scope, Context, Schema } from "effect" import { zod } from "@/util/effect-zod" import { withStatics } from "@/util/schema" import * as EffectLogger from "@opencode-ai/core/effect/logger" @@ -725,143 +725,146 @@ NOTE: At any point in time through this workflow you should feel free to ask the } satisfies MessageV2.TextPart) }) - const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput) { - const ctx = yield* InstanceState.context - const run = yield* runner() - const session = yield* sessions.get(input.sessionID) - if (session.revert) { - yield* revert.cleanup(session) - } - const agent = yield* agents.get(input.agent) - if (!agent) { - const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name) - const hint = available.length ? ` Available agents: ${available.join(", ")}` : "" - const error = new NamedError.Unknown({ message: `Agent not found: "${input.agent}".${hint}` }) - yield* bus.publish(Session.Event.Error, { sessionID: input.sessionID, error: error.toObject() }) - throw error - } - const model = input.model ?? agent.model ?? (yield* lastModel(input.sessionID)) - const userMsg: MessageV2.User = { - id: input.messageID ?? MessageID.ascending(), - sessionID: input.sessionID, - time: { created: Date.now() }, - role: "user", - agent: input.agent, - model: { providerID: model.providerID, modelID: model.modelID }, - } - yield* sessions.updateMessage(userMsg) - const userPart: MessageV2.Part = { - type: "text", - id: PartID.ascending(), - messageID: userMsg.id, - sessionID: input.sessionID, - text: "The following tool was executed by the user", - synthetic: true, - } - yield* sessions.updatePart(userPart) - - const msg: MessageV2.Assistant = { - id: MessageID.ascending(), - sessionID: input.sessionID, - parentID: userMsg.id, - mode: input.agent, - agent: input.agent, - cost: 0, - path: { cwd: ctx.directory, root: ctx.worktree }, - time: { created: Date.now() }, - role: "assistant", - tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, - modelID: model.modelID, - providerID: model.providerID, - } - yield* sessions.updateMessage(msg) - const part: MessageV2.ToolPart = { - type: "tool", - id: PartID.ascending(), - messageID: msg.id, - sessionID: input.sessionID, - tool: ShellToolID.id, - callID: ulid(), - state: { - status: "running", - time: { start: Date.now() }, - input: { command: input.command }, - }, - } - yield* sessions.updatePart(part) - - const cfg = yield* config.get() - const sh = Shell.preferred(cfg.shell) - const cwd = ctx.directory - const args = Shell.args(sh, input.command, cwd) - const shellEnv = yield* plugin.trigger( - "shell.env", - { cwd, sessionID: input.sessionID, callID: part.callID }, - { env: {} }, - ) - - const cmd = ChildProcess.make(sh, args, { - cwd, - extendEnv: true, - env: { ...shellEnv.env, TERM: "dumb" }, - stdin: "ignore", - forceKillAfter: "3 seconds", - }) - - let output = "" - let aborted = false - const finish = Effect.uninterruptible( + const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput, ready?: Deferred.Deferred) { + return yield* Effect.uninterruptibleMask((restore) => Effect.gen(function* () { - if (aborted) { - output += "\n\n" + ["", "User aborted the command", ""].join("\n") - } - if (!msg.time.completed) { - msg.time.completed = Date.now() + const markReady = ready ? Deferred.succeed(ready, undefined).pipe(Effect.asVoid) : Effect.void + const { msg, part, cwd } = yield* Effect.gen(function* () { + const ctx = yield* InstanceState.context + const session = yield* sessions.get(input.sessionID) + if (session.revert) { + yield* revert.cleanup(session) + } + const agent = yield* agents.get(input.agent) + if (!agent) { + const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name) + const hint = available.length ? ` Available agents: ${available.join(", ")}` : "" + const error = new NamedError.Unknown({ message: `Agent not found: "${input.agent}".${hint}` }) + yield* bus.publish(Session.Event.Error, { sessionID: input.sessionID, error: error.toObject() }) + throw error + } + const model = input.model ?? agent.model ?? (yield* lastModel(input.sessionID)) + const userMsg: MessageV2.User = { + id: input.messageID ?? MessageID.ascending(), + sessionID: input.sessionID, + time: { created: Date.now() }, + role: "user", + agent: input.agent, + model: { providerID: model.providerID, modelID: model.modelID }, + } + yield* sessions.updateMessage(userMsg) + const userPart: MessageV2.Part = { + type: "text", + id: PartID.ascending(), + messageID: userMsg.id, + sessionID: input.sessionID, + text: "The following tool was executed by the user", + synthetic: true, + } + yield* sessions.updatePart(userPart) + + const msg: MessageV2.Assistant = { + id: MessageID.ascending(), + sessionID: input.sessionID, + parentID: userMsg.id, + mode: input.agent, + agent: input.agent, + cost: 0, + path: { cwd: ctx.directory, root: ctx.worktree }, + time: { created: Date.now() }, + role: "assistant", + tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, + modelID: model.modelID, + providerID: model.providerID, + } yield* sessions.updateMessage(msg) - } - if (part.state.status === "running") { - part.state = { - status: "completed", - time: { ...part.state.time, end: Date.now() }, - input: part.state.input, - title: "", - metadata: { output, description: "" }, - output, + const part: MessageV2.ToolPart = { + type: "tool", + id: PartID.ascending(), + messageID: msg.id, + sessionID: input.sessionID, + tool: ShellToolID.id, + callID: ulid(), + state: { + status: "running", + time: { start: Date.now() }, + input: { command: input.command }, + }, } yield* sessions.updatePart(part) + yield* markReady + return { msg, part, cwd: ctx.directory } + }).pipe(Effect.onExit(() => markReady)) + + const cfg = yield* config.get() + const sh = Shell.preferred(cfg.shell) + const args = Shell.args(sh, input.command, cwd) + let output = "" + let aborted = false + + const finish = Effect.uninterruptible( + Effect.gen(function* () { + if (aborted) { + output += "\n\n" + ["", "User aborted the command", ""].join("\n") + } + if (!msg.time.completed) { + msg.time.completed = Date.now() + yield* sessions.updateMessage(msg) + } + if (part.state.status === "running") { + part.state = { + status: "completed", + time: { ...part.state.time, end: Date.now() }, + input: part.state.input, + title: "", + metadata: { output, description: "" }, + output, + } + yield* sessions.updatePart(part) + } + }), + ) + + const exit = yield* restore( + Effect.gen(function* () { + const shellEnv = yield* plugin.trigger( + "shell.env", + { cwd, sessionID: input.sessionID, callID: part.callID }, + { env: {} }, + ) + const cmd = ChildProcess.make(sh, args, { + cwd, + extendEnv: true, + env: { ...shellEnv.env, TERM: "dumb" }, + stdin: "ignore", + forceKillAfter: "3 seconds", + }) + const handle = yield* spawner.spawn(cmd) + yield* Stream.runForEach(Stream.decodeText(handle.all), (chunk) => + Effect.gen(function* () { + output += chunk + if (part.state.status === "running") { + part.state.metadata = { output, description: "" } + yield* sessions.updatePart(part) + } + }), + ) + yield* handle.exitCode + }).pipe(Effect.scoped, Effect.orDie), + ).pipe(Effect.exit) + + if (Exit.isFailure(exit) && Cause.hasInterrupts(exit.cause)) { + aborted = true } + yield* finish + + if (Exit.isFailure(exit) && !aborted && !Cause.hasInterruptsOnly(exit.cause)) { + return yield* Effect.failCause(exit.cause) + } + + return { info: msg, parts: [part] } }), ) - - const exit = yield* Effect.gen(function* () { - const handle = yield* spawner.spawn(cmd) - yield* Stream.runForEach(Stream.decodeText(handle.all), (chunk) => - Effect.sync(() => { - output += chunk - if (part.state.status === "running") { - part.state.metadata = { output, description: "" } - void run.fork(sessions.updatePart(part)) - } - }), - ) - yield* handle.exitCode - }).pipe( - Effect.scoped, - Effect.onInterrupt(() => - Effect.sync(() => { - aborted = true - }), - ), - Effect.orDie, - Effect.ensuring(finish), - Effect.exit, - ) - - if (Exit.isFailure(exit) && !Cause.hasInterruptsOnly(exit.cause)) { - return yield* Effect.failCause(exit.cause) - } - - return { info: msg, parts: [part] } }) const getModel = Effect.fn("SessionPrompt.getModel")(function* ( @@ -1512,7 +1515,8 @@ NOTE: At any point in time through this workflow you should feel free to ask the const shell: (input: ShellInput) => Effect.Effect = Effect.fn("SessionPrompt.shell")( function* (input: ShellInput) { - return yield* state.startShell(input.sessionID, lastAssistant(input.sessionID), shellImpl(input)) + const ready = yield* Deferred.make() + return yield* state.startShell(input.sessionID, lastAssistant(input.sessionID), shellImpl(input, ready), ready) }, ) diff --git a/packages/opencode/src/session/run-state.ts b/packages/opencode/src/session/run-state.ts index 5d37a69f12..fc931fad4a 100644 --- a/packages/opencode/src/session/run-state.ts +++ b/packages/opencode/src/session/run-state.ts @@ -1,6 +1,6 @@ import { InstanceState } from "@/effect/instance-state" import { Runner } from "@/effect/runner" -import { Effect, Layer, Scope, Context } from "effect" +import { Deferred, Effect, Layer, Scope, Context } from "effect" import * as Session from "./session" import { MessageV2 } from "./message-v2" import { SessionID } from "./schema" @@ -18,6 +18,7 @@ export interface Interface { sessionID: SessionID, onInterrupt: Effect.Effect, work: Effect.Effect, + ready?: Deferred.Deferred, ) => Effect.Effect } @@ -98,8 +99,9 @@ export const layer = Layer.effect( sessionID: SessionID, onInterrupt: Effect.Effect, work: Effect.Effect, + ready?: Deferred.Deferred, ) { - return yield* (yield* runner(sessionID, onInterrupt)).startShell(work) + return yield* (yield* runner(sessionID, onInterrupt)).startShell(work, ready) }) return Service.of({ assertNotBusy, cancel, ensureRunning, startShell }) diff --git a/packages/opencode/test/cli/tui/editor-context.test.ts b/packages/opencode/test/cli/tui/editor-context.test.ts index 60a1d58341..770e850d2d 100644 --- a/packages/opencode/test/cli/tui/editor-context.test.ts +++ b/packages/opencode/test/cli/tui/editor-context.test.ts @@ -25,13 +25,10 @@ async function writeZedFixture(dir: string, options: ZedFixtureOptions = {}) { db.run("insert into panes values (1, 1, 1)") db.run("insert into items values (1, 1, 1, 1, 'Editor')") db.run("insert into editors values (1, 1, ?, ?)", [filePath, "one\ntwo\nthree"]) - db.run( - "insert into editor_selections values (1, 1, ?, ?)", - [ - options.selectionStart === undefined ? 4 : options.selectionStart, - options.selectionEnd === undefined ? 7 : options.selectionEnd, - ], - ) + db.run("insert into editor_selections values (1, 1, ?, ?)", [ + options.selectionStart === undefined ? 4 : options.selectionStart, + options.selectionEnd === undefined ? 7 : options.selectionEnd, + ]) db.close() return { dbPath, filePath } diff --git a/packages/opencode/test/effect/runner.test.ts b/packages/opencode/test/effect/runner.test.ts index 4b0fbc1b51..ee99050a8c 100644 --- a/packages/opencode/test/effect/runner.test.ts +++ b/packages/opencode/test/effect/runner.test.ts @@ -334,6 +334,22 @@ describe("Runner", () => { }), ) + it.live( + "cancel does not mask shell defects", + Effect.gen(function* () { + const s = yield* Scope.Scope + const runner = Runner.make(s, { onInterrupt: Effect.succeed("interrupted") }) + + const sh = yield* runner + .startShell(Effect.never.pipe(Effect.ensuring(Effect.die("boom")), Effect.as("ignored"))) + .pipe(Effect.forkChild) + yield* Effect.sleep("10 millis") + + yield* runner.cancel + expect(Exit.isFailure(yield* Fiber.await(sh))).toBe(true) + }), + ) + // --- shell→run handoff --- it.live( diff --git a/packages/opencode/test/session/prompt.test.ts b/packages/opencode/test/session/prompt.test.ts index 98019cd160..385f48c57b 100644 --- a/packages/opencode/test/session/prompt.test.ts +++ b/packages/opencode/test/session/prompt.test.ts @@ -1472,6 +1472,10 @@ unix( const exit = yield* Fiber.await(loop) expect(Exit.isSuccess(exit)).toBe(true) + if (Exit.isSuccess(exit)) { + const tool = completedTool(exit.value.parts) + expect(tool?.state.output).toContain("User aborted the command") + } yield* Fiber.await(sh) }),