fix(session): harden shell cancellation

This commit is contained in:
Kit Langton 2026-04-26 21:08:33 -04:00
parent 7a1c8465f5
commit 4e218610b2
2 changed files with 152 additions and 142 deletions

View file

@ -18,6 +18,7 @@ interface RunHandle<A, E> {
interface ShellHandle<A, E> {
id: number
cancelled: Deferred.Deferred<void>
fiber: Fiber.Fiber<A, E>
}
@ -59,6 +60,9 @@ export const make = <A, E = never>(
? Deferred.fail(done, new Cancelled()).pipe(Effect.asVoid)
: Deferred.done(done, exit).pipe(Effect.asVoid)
const awaitDone = (done: Deferred.Deferred<A, E | Cancelled>) =>
Deferred.await(done).pipe(Effect.catchTag("RunnerCancelled", (e) => onInterrupt ?? Effect.die(e)))
const idleIfCurrent = () =>
SynchronizedRef.modify(ref, (st) => [st._tag === "Idle" ? idle : Effect.void, st] as const).pipe(Effect.flatten)
@ -89,7 +93,9 @@ export const make = <A, E = never>(
SynchronizedRef.modifyEffect(
ref,
Effect.fnUntraced(function* (st) {
if (st._tag === "Shell" && st.shell.id === id) return [idle, { _tag: "Idle" }] as const
if (st._tag === "Shell" && st.shell.id === id) {
return [idle, { _tag: "Idle" }] as const
}
if (st._tag === "ShellThenRun" && st.shell.id === id) {
const run = yield* startRun(st.run.work, st.run.done)
return [Effect.void, { _tag: "Running", run }] as const
@ -98,7 +104,11 @@ export const make = <A, E = never>(
}),
).pipe(Effect.flatten)
const stopShell = (shell: ShellHandle<A, E>) => Fiber.interrupt(shell.fiber)
const stopShell = (shell: ShellHandle<A, E>) =>
Effect.gen(function* () {
yield* Deferred.succeed(shell.cancelled, undefined).pipe(Effect.asVoid)
yield* Fiber.interrupt(shell.fiber)
})
const ensureRunning = (work: Effect.Effect<A, E>) =>
SynchronizedRef.modifyEffect(
@ -107,28 +117,23 @@ export const make = <A, E = never>(
switch (st._tag) {
case "Running":
case "ShellThenRun":
return [Deferred.await(st.run.done), st] as const
return [awaitDone(st.run.done), st] as const
case "Shell": {
const run = {
id: next(),
done: yield* Deferred.make<A, E | Cancelled>(),
work,
} satisfies PendingHandle<A, E>
return [Deferred.await(run.done), { _tag: "ShellThenRun", shell: st.shell, run }] as const
return [awaitDone(run.done), { _tag: "ShellThenRun", shell: st.shell, run }] as const
}
case "Idle": {
const done = yield* Deferred.make<A, E | Cancelled>()
const run = yield* startRun(work, done)
return [Deferred.await(done), { _tag: "Running", run }] as const
return [awaitDone(done), { _tag: "Running", run }] as const
}
}
}),
).pipe(
Effect.flatten,
Effect.catch(
(e): Effect.Effect<A, E> => (e instanceof Cancelled ? (onInterrupt ?? Effect.die(e)) : Effect.fail(e as E)),
),
)
).pipe(Effect.flatten)
const startShell = (work: Effect.Effect<A, E>) =>
SynchronizedRef.modifyEffect(
@ -145,13 +150,17 @@ export const make = <A, E = never>(
}
yield* busy
const id = next()
const cancelled = yield* Deferred.make<void>()
const fiber = yield* work.pipe(Effect.ensuring(finishShell(id)), Effect.forkChild)
const shell = { id, fiber } satisfies ShellHandle<A, E>
const shell = { id, cancelled, fiber } satisfies ShellHandle<A, E>
return [
Effect.gen(function* () {
const exit = yield* Fiber.await(fiber)
if (Exit.isSuccess(exit)) return exit.value
if (Cause.hasInterruptsOnly(exit.cause) && onInterrupt) return yield* onInterrupt
if ((yield* Deferred.isDone(cancelled)) || Cause.hasInterruptsOnly(exit.cause)) {
if (onInterrupt) return yield* onInterrupt
return yield* Effect.die(new Cancelled())
}
return yield* Effect.failCause(exit.cause)
}),
{ _tag: "Shell", shell },

View file

@ -721,142 +721,143 @@ NOTE: At any point in time through this workflow you should feel free to ask the
})
const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput) {
const ctx = yield* InstanceState.context
const run = yield* runner()
const session = yield* sessions.get(input.sessionID)
if (session.revert) {
yield* revert.cleanup(session)
}
const agent = yield* agents.get(input.agent)
if (!agent) {
const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name)
const hint = available.length ? ` Available agents: ${available.join(", ")}` : ""
const error = new NamedError.Unknown({ message: `Agent not found: "${input.agent}".${hint}` })
yield* bus.publish(Session.Event.Error, { sessionID: input.sessionID, error: error.toObject() })
throw error
}
const model = input.model ?? agent.model ?? (yield* lastModel(input.sessionID))
const userMsg: MessageV2.User = {
id: input.messageID ?? MessageID.ascending(),
sessionID: input.sessionID,
time: { created: Date.now() },
role: "user",
agent: input.agent,
model: { providerID: model.providerID, modelID: model.modelID },
}
yield* sessions.updateMessage(userMsg)
const userPart: MessageV2.Part = {
type: "text",
id: PartID.ascending(),
messageID: userMsg.id,
sessionID: input.sessionID,
text: "The following tool was executed by the user",
synthetic: true,
}
yield* sessions.updatePart(userPart)
const msg: MessageV2.Assistant = {
id: MessageID.ascending(),
sessionID: input.sessionID,
parentID: userMsg.id,
mode: input.agent,
agent: input.agent,
cost: 0,
path: { cwd: ctx.directory, root: ctx.worktree },
time: { created: Date.now() },
role: "assistant",
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
modelID: model.modelID,
providerID: model.providerID,
}
yield* sessions.updateMessage(msg)
const part: MessageV2.ToolPart = {
type: "tool",
id: PartID.ascending(),
messageID: msg.id,
sessionID: input.sessionID,
tool: "bash",
callID: ulid(),
state: {
status: "running",
time: { start: Date.now() },
input: { command: input.command },
},
}
yield* sessions.updatePart(part)
const cfg = yield* config.get()
const sh = Shell.preferred(cfg.shell)
const cwd = ctx.directory
const args = Shell.args(sh, input.command, cwd)
const shellEnv = yield* plugin.trigger(
"shell.env",
{ cwd, sessionID: input.sessionID, callID: part.callID },
{ env: {} },
)
const cmd = ChildProcess.make(sh, args, {
cwd,
extendEnv: true,
env: { ...shellEnv.env, TERM: "dumb" },
stdin: "ignore",
forceKillAfter: "3 seconds",
})
let output = ""
let aborted = false
const finish = Effect.uninterruptible(
return yield* Effect.uninterruptibleMask((restore) =>
Effect.gen(function* () {
if (aborted) {
output += "\n\n" + ["<metadata>", "User aborted the command", "</metadata>"].join("\n")
}
if (!msg.time.completed) {
msg.time.completed = Date.now()
const { msg, part, cwd } = yield* Effect.gen(function* () {
const ctx = yield* InstanceState.context
const session = yield* sessions.get(input.sessionID)
if (session.revert) {
yield* revert.cleanup(session)
}
const agent = yield* agents.get(input.agent)
if (!agent) {
const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name)
const hint = available.length ? ` Available agents: ${available.join(", ")}` : ""
const error = new NamedError.Unknown({ message: `Agent not found: "${input.agent}".${hint}` })
yield* bus.publish(Session.Event.Error, { sessionID: input.sessionID, error: error.toObject() })
throw error
}
const model = input.model ?? agent.model ?? (yield* lastModel(input.sessionID))
const userMsg: MessageV2.User = {
id: input.messageID ?? MessageID.ascending(),
sessionID: input.sessionID,
time: { created: Date.now() },
role: "user",
agent: input.agent,
model: { providerID: model.providerID, modelID: model.modelID },
}
yield* sessions.updateMessage(userMsg)
const userPart: MessageV2.Part = {
type: "text",
id: PartID.ascending(),
messageID: userMsg.id,
sessionID: input.sessionID,
text: "The following tool was executed by the user",
synthetic: true,
}
yield* sessions.updatePart(userPart)
const msg: MessageV2.Assistant = {
id: MessageID.ascending(),
sessionID: input.sessionID,
parentID: userMsg.id,
mode: input.agent,
agent: input.agent,
cost: 0,
path: { cwd: ctx.directory, root: ctx.worktree },
time: { created: Date.now() },
role: "assistant",
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
modelID: model.modelID,
providerID: model.providerID,
}
yield* sessions.updateMessage(msg)
}
if (part.state.status === "running") {
part.state = {
status: "completed",
time: { ...part.state.time, end: Date.now() },
input: part.state.input,
title: "",
metadata: { output, description: "" },
output,
const part: MessageV2.ToolPart = {
type: "tool",
id: PartID.ascending(),
messageID: msg.id,
sessionID: input.sessionID,
tool: "bash",
callID: ulid(),
state: {
status: "running",
time: { start: Date.now() },
input: { command: input.command },
},
}
yield* sessions.updatePart(part)
return { msg, part, cwd: ctx.directory }
})
const cfg = yield* config.get()
const sh = Shell.preferred(cfg.shell)
const args = Shell.args(sh, input.command, cwd)
let output = ""
let aborted = false
const finish = Effect.uninterruptible(
Effect.gen(function* () {
if (aborted) {
output += "\n\n" + ["<metadata>", "User aborted the command", "</metadata>"].join("\n")
}
if (!msg.time.completed) {
msg.time.completed = Date.now()
yield* sessions.updateMessage(msg)
}
if (part.state.status === "running") {
part.state = {
status: "completed",
time: { ...part.state.time, end: Date.now() },
input: part.state.input,
title: "",
metadata: { output, description: "" },
output,
}
yield* sessions.updatePart(part)
}
}),
)
const exit = yield* restore(
Effect.gen(function* () {
const shellEnv = yield* plugin.trigger(
"shell.env",
{ cwd, sessionID: input.sessionID, callID: part.callID },
{ env: {} },
)
const cmd = ChildProcess.make(sh, args, {
cwd,
extendEnv: true,
env: { ...shellEnv.env, TERM: "dumb" },
stdin: "ignore",
forceKillAfter: "3 seconds",
})
const handle = yield* spawner.spawn(cmd)
yield* Stream.runForEach(Stream.decodeText(handle.all), (chunk) =>
Effect.gen(function* () {
output += chunk
if (part.state.status === "running") {
part.state.metadata = { output, description: "" }
yield* sessions.updatePart(part)
}
}),
)
yield* handle.exitCode
}).pipe(Effect.scoped, Effect.orDie),
).pipe(Effect.exit)
if (Exit.isFailure(exit) && Cause.hasInterrupts(exit.cause)) {
aborted = true
}
yield* finish
if (Exit.isFailure(exit) && !aborted && !Cause.hasInterruptsOnly(exit.cause)) {
return yield* Effect.failCause(exit.cause)
}
return { info: msg, parts: [part] }
}),
)
const exit = yield* Effect.gen(function* () {
const handle = yield* spawner.spawn(cmd)
yield* Stream.runForEach(Stream.decodeText(handle.all), (chunk) =>
Effect.sync(() => {
output += chunk
if (part.state.status === "running") {
part.state.metadata = { output, description: "" }
void run.fork(sessions.updatePart(part))
}
}),
)
yield* handle.exitCode
}).pipe(
Effect.scoped,
Effect.onInterrupt(() =>
Effect.sync(() => {
aborted = true
}),
),
Effect.orDie,
Effect.ensuring(finish),
Effect.exit,
)
if (Exit.isFailure(exit) && !Cause.hasInterruptsOnly(exit.cause)) {
return yield* Effect.failCause(exit.cause)
}
return { info: msg, parts: [part] }
})
const getModel = Effect.fn("SessionPrompt.getModel")(function* (