refactor(session): use latch for shell readiness

This commit is contained in:
Kit Langton 2026-04-27 16:39:13 -04:00
parent 8df9f54c1d
commit 9a362bd06d
3 changed files with 12 additions and 12 deletions

View file

@ -1,10 +1,10 @@
import { Cause, Deferred, Effect, Exit, Fiber, Schema, Scope, SynchronizedRef } from "effect"
import { Cause, Deferred, Effect, Exit, Fiber, Latch, Schema, Scope, SynchronizedRef } from "effect"
export interface Runner<A, E = never> {
readonly state: State<A, E>
readonly busy: boolean
readonly ensureRunning: (work: Effect.Effect<A, E>) => Effect.Effect<A, E>
readonly startShell: (work: Effect.Effect<A, E>, ready?: Deferred.Deferred<void>) => Effect.Effect<A, E>
readonly startShell: (work: Effect.Effect<A, E>, ready?: Latch.Latch) => Effect.Effect<A, E>
readonly cancel: Effect.Effect<void>
}
@ -19,7 +19,7 @@ interface RunHandle<A, E> {
interface ShellHandle<A, E> {
id: number
cancelled: Deferred.Deferred<void>
ready?: Deferred.Deferred<void>
ready?: Latch.Latch
fiber: Fiber.Fiber<A, E>
}
@ -107,7 +107,7 @@ export const make = <A, E = never>(
const stopShell = (shell: ShellHandle<A, E>) =>
Effect.gen(function* () {
if (shell.ready) yield* Deferred.await(shell.ready).pipe(Effect.exit, Effect.asVoid)
if (shell.ready) yield* shell.ready.await.pipe(Effect.exit, Effect.asVoid)
yield* Deferred.succeed(shell.cancelled, undefined).pipe(Effect.asVoid)
yield* Fiber.interrupt(shell.fiber)
})
@ -137,7 +137,7 @@ export const make = <A, E = never>(
}),
).pipe(Effect.flatten)
const startShell = (work: Effect.Effect<A, E>, ready?: Deferred.Deferred<void>) =>
const startShell = (work: Effect.Effect<A, E>, ready?: Latch.Latch) =>
SynchronizedRef.modifyEffect(
ref,
Effect.fnUntraced(function* (st) {

View file

@ -45,7 +45,7 @@ import { AppFileSystem } from "@opencode-ai/core/filesystem"
import { Truncate } from "@/tool/truncate"
import { decodeDataUrl } from "@/util/data-url"
import { Process } from "@/util/process"
import { Cause, Deferred, Effect, Exit, Layer, Option, Scope, Context, Schema } from "effect"
import { Cause, Effect, Exit, Latch, Layer, Option, Scope, Context, Schema } from "effect"
import { zod } from "@/util/effect-zod"
import { withStatics } from "@/util/schema"
import * as EffectLogger from "@opencode-ai/core/effect/logger"
@ -720,10 +720,10 @@ NOTE: At any point in time through this workflow you should feel free to ask the
} satisfies MessageV2.TextPart)
})
const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput, ready?: Deferred.Deferred<void>) {
const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput, ready?: Latch.Latch) {
return yield* Effect.uninterruptibleMask((restore) =>
Effect.gen(function* () {
const markReady = ready ? Deferred.succeed(ready, undefined).pipe(Effect.asVoid) : Effect.void
const markReady = ready ? ready.open.pipe(Effect.asVoid) : Effect.void
const { msg, part, cwd } = yield* Effect.gen(function* () {
const ctx = yield* InstanceState.context
const session = yield* sessions.get(input.sessionID)
@ -1509,7 +1509,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
const shell: (input: ShellInput) => Effect.Effect<MessageV2.WithParts> = Effect.fn("SessionPrompt.shell")(
function* (input: ShellInput) {
const ready = yield* Deferred.make<void>()
const ready = yield* Latch.make()
return yield* state.startShell(input.sessionID, lastAssistant(input.sessionID), shellImpl(input, ready), ready)
},
)

View file

@ -1,6 +1,6 @@
import { InstanceState } from "@/effect/instance-state"
import { Runner } from "@/effect/runner"
import { Deferred, Effect, Layer, Scope, Context } from "effect"
import { Effect, Latch, Layer, Scope, Context } from "effect"
import * as Session from "./session"
import { MessageV2 } from "./message-v2"
import { SessionID } from "./schema"
@ -18,7 +18,7 @@ export interface Interface {
sessionID: SessionID,
onInterrupt: Effect.Effect<MessageV2.WithParts>,
work: Effect.Effect<MessageV2.WithParts>,
ready?: Deferred.Deferred<void>,
ready?: Latch.Latch,
) => Effect.Effect<MessageV2.WithParts>
}
@ -96,7 +96,7 @@ export const layer = Layer.effect(
sessionID: SessionID,
onInterrupt: Effect.Effect<MessageV2.WithParts>,
work: Effect.Effect<MessageV2.WithParts>,
ready?: Deferred.Deferred<void>,
ready?: Latch.Latch,
) {
return yield* (yield* runner(sessionID, onInterrupt)).startShell(work, ready)
})