From 4e218610b26f88d844dd30aa7f3603fb9e8f4b17 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Sun, 26 Apr 2026 21:08:33 -0400 Subject: [PATCH] fix(session): harden shell cancellation --- packages/opencode/src/effect/runner.ts | 35 ++-- packages/opencode/src/session/prompt.ts | 259 ++++++++++++------------ 2 files changed, 152 insertions(+), 142 deletions(-) diff --git a/packages/opencode/src/effect/runner.ts b/packages/opencode/src/effect/runner.ts index 0e923b1194..e03cbffb11 100644 --- a/packages/opencode/src/effect/runner.ts +++ b/packages/opencode/src/effect/runner.ts @@ -18,6 +18,7 @@ interface RunHandle { interface ShellHandle { id: number + cancelled: Deferred.Deferred fiber: Fiber.Fiber } @@ -59,6 +60,9 @@ export const make = ( ? Deferred.fail(done, new Cancelled()).pipe(Effect.asVoid) : Deferred.done(done, exit).pipe(Effect.asVoid) + const awaitDone = (done: Deferred.Deferred) => + Deferred.await(done).pipe(Effect.catchTag("RunnerCancelled", (e) => onInterrupt ?? Effect.die(e))) + const idleIfCurrent = () => SynchronizedRef.modify(ref, (st) => [st._tag === "Idle" ? idle : Effect.void, st] as const).pipe(Effect.flatten) @@ -89,7 +93,9 @@ export const make = ( SynchronizedRef.modifyEffect( ref, Effect.fnUntraced(function* (st) { - if (st._tag === "Shell" && st.shell.id === id) return [idle, { _tag: "Idle" }] as const + if (st._tag === "Shell" && st.shell.id === id) { + return [idle, { _tag: "Idle" }] as const + } if (st._tag === "ShellThenRun" && st.shell.id === id) { const run = yield* startRun(st.run.work, st.run.done) return [Effect.void, { _tag: "Running", run }] as const @@ -98,7 +104,11 @@ export const make = ( }), ).pipe(Effect.flatten) - const stopShell = (shell: ShellHandle) => Fiber.interrupt(shell.fiber) + const stopShell = (shell: ShellHandle) => + Effect.gen(function* () { + yield* Deferred.succeed(shell.cancelled, undefined).pipe(Effect.asVoid) + yield* Fiber.interrupt(shell.fiber) + }) const ensureRunning = (work: Effect.Effect) => SynchronizedRef.modifyEffect( @@ -107,28 +117,23 @@ export const make = ( switch (st._tag) { case "Running": case "ShellThenRun": - return [Deferred.await(st.run.done), st] as const + return [awaitDone(st.run.done), st] as const case "Shell": { const run = { id: next(), done: yield* Deferred.make(), work, } satisfies PendingHandle - return [Deferred.await(run.done), { _tag: "ShellThenRun", shell: st.shell, run }] as const + return [awaitDone(run.done), { _tag: "ShellThenRun", shell: st.shell, run }] as const } case "Idle": { const done = yield* Deferred.make() const run = yield* startRun(work, done) - return [Deferred.await(done), { _tag: "Running", run }] as const + return [awaitDone(done), { _tag: "Running", run }] as const } } }), - ).pipe( - Effect.flatten, - Effect.catch( - (e): Effect.Effect => (e instanceof Cancelled ? (onInterrupt ?? Effect.die(e)) : Effect.fail(e as E)), - ), - ) + ).pipe(Effect.flatten) const startShell = (work: Effect.Effect) => SynchronizedRef.modifyEffect( @@ -145,13 +150,17 @@ export const make = ( } yield* busy const id = next() + const cancelled = yield* Deferred.make() const fiber = yield* work.pipe(Effect.ensuring(finishShell(id)), Effect.forkChild) - const shell = { id, fiber } satisfies ShellHandle + const shell = { id, cancelled, fiber } satisfies ShellHandle return [ Effect.gen(function* () { const exit = yield* Fiber.await(fiber) if (Exit.isSuccess(exit)) return exit.value - if (Cause.hasInterruptsOnly(exit.cause) && onInterrupt) return yield* onInterrupt + if ((yield* Deferred.isDone(cancelled)) || Cause.hasInterruptsOnly(exit.cause)) { + if (onInterrupt) return yield* onInterrupt + return yield* Effect.die(new Cancelled()) + } return yield* Effect.failCause(exit.cause) }), { _tag: "Shell", shell }, diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index f7306280fe..a94f520c30 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -721,142 +721,143 @@ NOTE: At any point in time through this workflow you should feel free to ask the }) const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput) { - const ctx = yield* InstanceState.context - const run = yield* runner() - const session = yield* sessions.get(input.sessionID) - if (session.revert) { - yield* revert.cleanup(session) - } - const agent = yield* agents.get(input.agent) - if (!agent) { - const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name) - const hint = available.length ? ` Available agents: ${available.join(", ")}` : "" - const error = new NamedError.Unknown({ message: `Agent not found: "${input.agent}".${hint}` }) - yield* bus.publish(Session.Event.Error, { sessionID: input.sessionID, error: error.toObject() }) - throw error - } - const model = input.model ?? agent.model ?? (yield* lastModel(input.sessionID)) - const userMsg: MessageV2.User = { - id: input.messageID ?? MessageID.ascending(), - sessionID: input.sessionID, - time: { created: Date.now() }, - role: "user", - agent: input.agent, - model: { providerID: model.providerID, modelID: model.modelID }, - } - yield* sessions.updateMessage(userMsg) - const userPart: MessageV2.Part = { - type: "text", - id: PartID.ascending(), - messageID: userMsg.id, - sessionID: input.sessionID, - text: "The following tool was executed by the user", - synthetic: true, - } - yield* sessions.updatePart(userPart) - - const msg: MessageV2.Assistant = { - id: MessageID.ascending(), - sessionID: input.sessionID, - parentID: userMsg.id, - mode: input.agent, - agent: input.agent, - cost: 0, - path: { cwd: ctx.directory, root: ctx.worktree }, - time: { created: Date.now() }, - role: "assistant", - tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, - modelID: model.modelID, - providerID: model.providerID, - } - yield* sessions.updateMessage(msg) - const part: MessageV2.ToolPart = { - type: "tool", - id: PartID.ascending(), - messageID: msg.id, - sessionID: input.sessionID, - tool: "bash", - callID: ulid(), - state: { - status: "running", - time: { start: Date.now() }, - input: { command: input.command }, - }, - } - yield* sessions.updatePart(part) - - const cfg = yield* config.get() - const sh = Shell.preferred(cfg.shell) - const cwd = ctx.directory - const args = Shell.args(sh, input.command, cwd) - const shellEnv = yield* plugin.trigger( - "shell.env", - { cwd, sessionID: input.sessionID, callID: part.callID }, - { env: {} }, - ) - - const cmd = ChildProcess.make(sh, args, { - cwd, - extendEnv: true, - env: { ...shellEnv.env, TERM: "dumb" }, - stdin: "ignore", - forceKillAfter: "3 seconds", - }) - - let output = "" - let aborted = false - const finish = Effect.uninterruptible( + return yield* Effect.uninterruptibleMask((restore) => Effect.gen(function* () { - if (aborted) { - output += "\n\n" + ["", "User aborted the command", ""].join("\n") - } - if (!msg.time.completed) { - msg.time.completed = Date.now() + const { msg, part, cwd } = yield* Effect.gen(function* () { + const ctx = yield* InstanceState.context + const session = yield* sessions.get(input.sessionID) + if (session.revert) { + yield* revert.cleanup(session) + } + const agent = yield* agents.get(input.agent) + if (!agent) { + const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name) + const hint = available.length ? ` Available agents: ${available.join(", ")}` : "" + const error = new NamedError.Unknown({ message: `Agent not found: "${input.agent}".${hint}` }) + yield* bus.publish(Session.Event.Error, { sessionID: input.sessionID, error: error.toObject() }) + throw error + } + const model = input.model ?? agent.model ?? (yield* lastModel(input.sessionID)) + const userMsg: MessageV2.User = { + id: input.messageID ?? MessageID.ascending(), + sessionID: input.sessionID, + time: { created: Date.now() }, + role: "user", + agent: input.agent, + model: { providerID: model.providerID, modelID: model.modelID }, + } + yield* sessions.updateMessage(userMsg) + const userPart: MessageV2.Part = { + type: "text", + id: PartID.ascending(), + messageID: userMsg.id, + sessionID: input.sessionID, + text: "The following tool was executed by the user", + synthetic: true, + } + yield* sessions.updatePart(userPart) + + const msg: MessageV2.Assistant = { + id: MessageID.ascending(), + sessionID: input.sessionID, + parentID: userMsg.id, + mode: input.agent, + agent: input.agent, + cost: 0, + path: { cwd: ctx.directory, root: ctx.worktree }, + time: { created: Date.now() }, + role: "assistant", + tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, + modelID: model.modelID, + providerID: model.providerID, + } yield* sessions.updateMessage(msg) - } - if (part.state.status === "running") { - part.state = { - status: "completed", - time: { ...part.state.time, end: Date.now() }, - input: part.state.input, - title: "", - metadata: { output, description: "" }, - output, + const part: MessageV2.ToolPart = { + type: "tool", + id: PartID.ascending(), + messageID: msg.id, + sessionID: input.sessionID, + tool: "bash", + callID: ulid(), + state: { + status: "running", + time: { start: Date.now() }, + input: { command: input.command }, + }, } yield* sessions.updatePart(part) + return { msg, part, cwd: ctx.directory } + }) + + const cfg = yield* config.get() + const sh = Shell.preferred(cfg.shell) + const args = Shell.args(sh, input.command, cwd) + let output = "" + let aborted = false + + const finish = Effect.uninterruptible( + Effect.gen(function* () { + if (aborted) { + output += "\n\n" + ["", "User aborted the command", ""].join("\n") + } + if (!msg.time.completed) { + msg.time.completed = Date.now() + yield* sessions.updateMessage(msg) + } + if (part.state.status === "running") { + part.state = { + status: "completed", + time: { ...part.state.time, end: Date.now() }, + input: part.state.input, + title: "", + metadata: { output, description: "" }, + output, + } + yield* sessions.updatePart(part) + } + }), + ) + + const exit = yield* restore( + Effect.gen(function* () { + const shellEnv = yield* plugin.trigger( + "shell.env", + { cwd, sessionID: input.sessionID, callID: part.callID }, + { env: {} }, + ) + const cmd = ChildProcess.make(sh, args, { + cwd, + extendEnv: true, + env: { ...shellEnv.env, TERM: "dumb" }, + stdin: "ignore", + forceKillAfter: "3 seconds", + }) + const handle = yield* spawner.spawn(cmd) + yield* Stream.runForEach(Stream.decodeText(handle.all), (chunk) => + Effect.gen(function* () { + output += chunk + if (part.state.status === "running") { + part.state.metadata = { output, description: "" } + yield* sessions.updatePart(part) + } + }), + ) + yield* handle.exitCode + }).pipe(Effect.scoped, Effect.orDie), + ).pipe(Effect.exit) + + if (Exit.isFailure(exit) && Cause.hasInterrupts(exit.cause)) { + aborted = true } + yield* finish + + if (Exit.isFailure(exit) && !aborted && !Cause.hasInterruptsOnly(exit.cause)) { + return yield* Effect.failCause(exit.cause) + } + + return { info: msg, parts: [part] } }), ) - - const exit = yield* Effect.gen(function* () { - const handle = yield* spawner.spawn(cmd) - yield* Stream.runForEach(Stream.decodeText(handle.all), (chunk) => - Effect.sync(() => { - output += chunk - if (part.state.status === "running") { - part.state.metadata = { output, description: "" } - void run.fork(sessions.updatePart(part)) - } - }), - ) - yield* handle.exitCode - }).pipe( - Effect.scoped, - Effect.onInterrupt(() => - Effect.sync(() => { - aborted = true - }), - ), - Effect.orDie, - Effect.ensuring(finish), - Effect.exit, - ) - - if (Exit.isFailure(exit) && !Cause.hasInterruptsOnly(exit.cause)) { - return yield* Effect.failCause(exit.cause) - } - - return { info: msg, parts: [part] } }) const getModel = Effect.fn("SessionPrompt.getModel")(function* (