mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-08 10:10:58 +00:00
fix(session): close shell cancellation races
This commit is contained in:
parent
4e218610b2
commit
042748a1b1
5 changed files with 38 additions and 11 deletions
|
|
@ -4,7 +4,7 @@ export interface Runner<A, E = never> {
|
|||
readonly state: State<A, E>
|
||||
readonly busy: boolean
|
||||
readonly ensureRunning: (work: Effect.Effect<A, E>) => Effect.Effect<A, E>
|
||||
readonly startShell: (work: Effect.Effect<A, E>) => Effect.Effect<A, E>
|
||||
readonly startShell: (work: Effect.Effect<A, E>, ready?: Deferred.Deferred<void>) => Effect.Effect<A, E>
|
||||
readonly cancel: Effect.Effect<void>
|
||||
}
|
||||
|
||||
|
|
@ -19,6 +19,7 @@ interface RunHandle<A, E> {
|
|||
interface ShellHandle<A, E> {
|
||||
id: number
|
||||
cancelled: Deferred.Deferred<void>
|
||||
ready?: Deferred.Deferred<void>
|
||||
fiber: Fiber.Fiber<A, E>
|
||||
}
|
||||
|
||||
|
|
@ -106,6 +107,7 @@ export const make = <A, E = never>(
|
|||
|
||||
const stopShell = (shell: ShellHandle<A, E>) =>
|
||||
Effect.gen(function* () {
|
||||
if (shell.ready) yield* Deferred.await(shell.ready).pipe(Effect.exit, Effect.asVoid)
|
||||
yield* Deferred.succeed(shell.cancelled, undefined).pipe(Effect.asVoid)
|
||||
yield* Fiber.interrupt(shell.fiber)
|
||||
})
|
||||
|
|
@ -135,7 +137,7 @@ export const make = <A, E = never>(
|
|||
}),
|
||||
).pipe(Effect.flatten)
|
||||
|
||||
const startShell = (work: Effect.Effect<A, E>) =>
|
||||
const startShell = (work: Effect.Effect<A, E>, ready?: Deferred.Deferred<void>) =>
|
||||
SynchronizedRef.modifyEffect(
|
||||
ref,
|
||||
Effect.fnUntraced(function* (st) {
|
||||
|
|
@ -152,12 +154,12 @@ export const make = <A, E = never>(
|
|||
const id = next()
|
||||
const cancelled = yield* Deferred.make<void>()
|
||||
const fiber = yield* work.pipe(Effect.ensuring(finishShell(id)), Effect.forkChild)
|
||||
const shell = { id, cancelled, fiber } satisfies ShellHandle<A, E>
|
||||
const shell = { id, cancelled, ready, fiber } satisfies ShellHandle<A, E>
|
||||
return [
|
||||
Effect.gen(function* () {
|
||||
const exit = yield* Fiber.await(fiber)
|
||||
if (Exit.isSuccess(exit)) return exit.value
|
||||
if ((yield* Deferred.isDone(cancelled)) || Cause.hasInterruptsOnly(exit.cause)) {
|
||||
if (Cause.hasInterruptsOnly(exit.cause) || ((yield* Deferred.isDone(cancelled)) && !Cause.hasDies(exit.cause))) {
|
||||
if (onInterrupt) return yield* onInterrupt
|
||||
return yield* Effect.die(new Cancelled())
|
||||
}
|
||||
|
|
@ -192,8 +194,8 @@ export const make = <A, E = never>(
|
|||
case "ShellThenRun":
|
||||
return [
|
||||
Effect.gen(function* () {
|
||||
yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid)
|
||||
yield* stopShell(st.shell)
|
||||
yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid)
|
||||
yield* idleIfCurrent()
|
||||
}),
|
||||
{ _tag: "Idle" } as const,
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ import { AppFileSystem } from "@opencode-ai/core/filesystem"
|
|||
import { Truncate } from "@/tool/truncate"
|
||||
import { decodeDataUrl } from "@/util/data-url"
|
||||
import { Process } from "@/util/process"
|
||||
import { Cause, Effect, Exit, Layer, Option, Scope, Context, Schema } from "effect"
|
||||
import { Cause, Deferred, Effect, Exit, Layer, Option, Scope, Context, Schema } from "effect"
|
||||
import { zod } from "@/util/effect-zod"
|
||||
import { withStatics } from "@/util/schema"
|
||||
import * as EffectLogger from "@opencode-ai/core/effect/logger"
|
||||
|
|
@ -720,9 +720,10 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
|||
} satisfies MessageV2.TextPart)
|
||||
})
|
||||
|
||||
const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput) {
|
||||
const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput, ready?: Deferred.Deferred<void>) {
|
||||
return yield* Effect.uninterruptibleMask((restore) =>
|
||||
Effect.gen(function* () {
|
||||
const markReady = ready ? Deferred.succeed(ready, undefined).pipe(Effect.asVoid) : Effect.void
|
||||
const { msg, part, cwd } = yield* Effect.gen(function* () {
|
||||
const ctx = yield* InstanceState.context
|
||||
const session = yield* sessions.get(input.sessionID)
|
||||
|
|
@ -786,8 +787,9 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
|||
},
|
||||
}
|
||||
yield* sessions.updatePart(part)
|
||||
yield* markReady
|
||||
return { msg, part, cwd: ctx.directory }
|
||||
})
|
||||
}).pipe(Effect.onExit(() => markReady))
|
||||
|
||||
const cfg = yield* config.get()
|
||||
const sh = Shell.preferred(cfg.shell)
|
||||
|
|
@ -1508,7 +1510,8 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
|||
|
||||
const shell: (input: ShellInput) => Effect.Effect<MessageV2.WithParts> = Effect.fn("SessionPrompt.shell")(
|
||||
function* (input: ShellInput) {
|
||||
return yield* state.startShell(input.sessionID, lastAssistant(input.sessionID), shellImpl(input))
|
||||
const ready = yield* Deferred.make<void>()
|
||||
return yield* state.startShell(input.sessionID, lastAssistant(input.sessionID), shellImpl(input, ready), ready)
|
||||
},
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { Runner } from "@/effect/runner"
|
||||
import { Effect, Layer, Scope, Context } from "effect"
|
||||
import { Deferred, Effect, Layer, Scope, Context } from "effect"
|
||||
import * as Session from "./session"
|
||||
import { MessageV2 } from "./message-v2"
|
||||
import { SessionID } from "./schema"
|
||||
|
|
@ -18,6 +18,7 @@ export interface Interface {
|
|||
sessionID: SessionID,
|
||||
onInterrupt: Effect.Effect<MessageV2.WithParts>,
|
||||
work: Effect.Effect<MessageV2.WithParts>,
|
||||
ready?: Deferred.Deferred<void>,
|
||||
) => Effect.Effect<MessageV2.WithParts>
|
||||
}
|
||||
|
||||
|
|
@ -95,8 +96,9 @@ export const layer = Layer.effect(
|
|||
sessionID: SessionID,
|
||||
onInterrupt: Effect.Effect<MessageV2.WithParts>,
|
||||
work: Effect.Effect<MessageV2.WithParts>,
|
||||
ready?: Deferred.Deferred<void>,
|
||||
) {
|
||||
return yield* (yield* runner(sessionID, onInterrupt)).startShell(work)
|
||||
return yield* (yield* runner(sessionID, onInterrupt)).startShell(work, ready)
|
||||
})
|
||||
|
||||
return Service.of({ assertNotBusy, cancel, ensureRunning, startShell })
|
||||
|
|
|
|||
|
|
@ -334,6 +334,22 @@ describe("Runner", () => {
|
|||
}),
|
||||
)
|
||||
|
||||
it.live(
|
||||
"cancel does not mask shell defects",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
const runner = Runner.make<string>(s, { onInterrupt: Effect.succeed("interrupted") })
|
||||
|
||||
const sh = yield* runner
|
||||
.startShell(Effect.never.pipe(Effect.ensuring(Effect.die("boom")), Effect.as("ignored")))
|
||||
.pipe(Effect.forkChild)
|
||||
yield* Effect.sleep("10 millis")
|
||||
|
||||
yield* runner.cancel
|
||||
expect(Exit.isFailure(yield* Fiber.await(sh))).toBe(true)
|
||||
}),
|
||||
)
|
||||
|
||||
// --- shell→run handoff ---
|
||||
|
||||
it.live(
|
||||
|
|
|
|||
|
|
@ -1470,6 +1470,10 @@ unix(
|
|||
|
||||
const exit = yield* Fiber.await(loop)
|
||||
expect(Exit.isSuccess(exit)).toBe(true)
|
||||
if (Exit.isSuccess(exit)) {
|
||||
const tool = completedTool(exit.value.parts)
|
||||
expect(tool?.state.output).toContain("User aborted the command")
|
||||
}
|
||||
|
||||
yield* Fiber.await(sh)
|
||||
}),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue