mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-06 16:31:50 +00:00
523 lines
17 KiB
TypeScript
523 lines
17 KiB
TypeScript
import { describe, expect } from "bun:test"
|
|
import { Deferred, Effect, Exit, Fiber, Ref, Scope } from "effect"
|
|
import { Runner } from "@/effect/runner"
|
|
import { it } from "../lib/effect"
|
|
|
|
describe("Runner", () => {
|
|
// --- ensureRunning semantics ---
|
|
|
|
it.live(
|
|
"ensureRunning starts work and returns result",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string>(s)
|
|
const result = yield* runner.ensureRunning(Effect.succeed("hello"))
|
|
expect(result).toBe("hello")
|
|
expect(runner.state._tag).toBe("Idle")
|
|
expect(runner.busy).toBe(false)
|
|
}),
|
|
)
|
|
|
|
it.live(
|
|
"ensureRunning propagates work failures",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string, string>(s)
|
|
const exit = yield* runner.ensureRunning(Effect.fail("boom")).pipe(Effect.exit)
|
|
expect(Exit.isFailure(exit)).toBe(true)
|
|
expect(runner.state._tag).toBe("Idle")
|
|
}),
|
|
)
|
|
|
|
it.live(
|
|
"concurrent callers share the same run",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string>(s)
|
|
const calls = yield* Ref.make(0)
|
|
const work = Effect.gen(function* () {
|
|
yield* Ref.update(calls, (n) => n + 1)
|
|
yield* Effect.sleep("10 millis")
|
|
return "shared"
|
|
})
|
|
|
|
const [a, b] = yield* Effect.all([runner.ensureRunning(work), runner.ensureRunning(work)], {
|
|
concurrency: "unbounded",
|
|
})
|
|
|
|
expect(a).toBe("shared")
|
|
expect(b).toBe("shared")
|
|
expect(yield* Ref.get(calls)).toBe(1)
|
|
}),
|
|
)
|
|
|
|
it.live(
|
|
"concurrent callers all receive same error",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string, string>(s)
|
|
const work = Effect.gen(function* () {
|
|
yield* Effect.sleep("10 millis")
|
|
return yield* Effect.fail("boom")
|
|
})
|
|
|
|
const [a, b] = yield* Effect.all(
|
|
[runner.ensureRunning(work).pipe(Effect.exit), runner.ensureRunning(work).pipe(Effect.exit)],
|
|
{ concurrency: "unbounded" },
|
|
)
|
|
|
|
expect(Exit.isFailure(a)).toBe(true)
|
|
expect(Exit.isFailure(b)).toBe(true)
|
|
}),
|
|
)
|
|
|
|
it.live(
|
|
"ensureRunning can be called again after previous run completes",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string>(s)
|
|
expect(yield* runner.ensureRunning(Effect.succeed("first"))).toBe("first")
|
|
expect(yield* runner.ensureRunning(Effect.succeed("second"))).toBe("second")
|
|
}),
|
|
)
|
|
|
|
it.live(
|
|
"second ensureRunning ignores new work if already running",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string>(s)
|
|
const ran = yield* Ref.make<string[]>([])
|
|
|
|
const first = Effect.gen(function* () {
|
|
yield* Ref.update(ran, (a) => [...a, "first"])
|
|
yield* Effect.sleep("50 millis")
|
|
return "first-result"
|
|
})
|
|
const second = Effect.gen(function* () {
|
|
yield* Ref.update(ran, (a) => [...a, "second"])
|
|
return "second-result"
|
|
})
|
|
|
|
const [a, b] = yield* Effect.all([runner.ensureRunning(first), runner.ensureRunning(second)], {
|
|
concurrency: "unbounded",
|
|
})
|
|
|
|
expect(a).toBe("first-result")
|
|
expect(b).toBe("first-result")
|
|
expect(yield* Ref.get(ran)).toEqual(["first"])
|
|
}),
|
|
)
|
|
|
|
// --- cancel semantics ---
|
|
|
|
it.live(
|
|
"cancel interrupts running work",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string>(s)
|
|
const started = yield* Deferred.make<void>()
|
|
const fiber = yield* runner
|
|
.ensureRunning(
|
|
Effect.gen(function* () {
|
|
yield* Deferred.succeed(started, void 0)
|
|
return yield* Effect.never.pipe(Effect.as("never"))
|
|
}),
|
|
)
|
|
.pipe(Effect.forkChild)
|
|
yield* Deferred.await(started)
|
|
expect(runner.busy).toBe(true)
|
|
expect(runner.state._tag).toBe("Running")
|
|
|
|
yield* runner.cancel
|
|
expect(runner.busy).toBe(false)
|
|
|
|
const exit = yield* Fiber.await(fiber)
|
|
expect(Exit.isFailure(exit)).toBe(true)
|
|
}),
|
|
)
|
|
|
|
it.live(
|
|
"cancel on idle is a no-op",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string>(s)
|
|
yield* runner.cancel
|
|
expect(runner.busy).toBe(false)
|
|
}),
|
|
)
|
|
|
|
it.live(
|
|
"cancel with onInterrupt resolves callers gracefully",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string>(s, { onInterrupt: Effect.succeed("fallback") })
|
|
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("never"))).pipe(Effect.forkChild)
|
|
yield* Effect.sleep("10 millis")
|
|
|
|
yield* runner.cancel
|
|
|
|
const exit = yield* Fiber.await(fiber)
|
|
expect(Exit.isSuccess(exit)).toBe(true)
|
|
if (Exit.isSuccess(exit)) expect(exit.value).toBe("fallback")
|
|
}),
|
|
)
|
|
|
|
it.live(
|
|
"cancel with queued callers resolves all",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string>(s, { onInterrupt: Effect.succeed("fallback") })
|
|
|
|
const a = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
|
|
yield* Effect.sleep("10 millis")
|
|
const b = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild)
|
|
yield* Effect.sleep("10 millis")
|
|
|
|
yield* runner.cancel
|
|
|
|
const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
|
|
expect(Exit.isSuccess(exitA)).toBe(true)
|
|
expect(Exit.isSuccess(exitB)).toBe(true)
|
|
if (Exit.isSuccess(exitA)) expect(exitA.value).toBe("fallback")
|
|
if (Exit.isSuccess(exitB)) expect(exitB.value).toBe("fallback")
|
|
}),
|
|
)
|
|
|
|
it.live(
|
|
"work can be started after cancel",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string>(s)
|
|
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
|
|
yield* Effect.sleep("10 millis")
|
|
yield* runner.cancel
|
|
yield* Fiber.await(fiber)
|
|
|
|
const result = yield* runner.ensureRunning(Effect.succeed("after-cancel"))
|
|
expect(result).toBe("after-cancel")
|
|
}),
|
|
)
|
|
|
|
it.live(
|
|
"cancel does not deadlock when replacement work starts before interrupted run exits",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const hit = yield* Deferred.make<void>()
|
|
const hold = yield* Deferred.make<void>()
|
|
const done = yield* Deferred.make<void>()
|
|
|
|
yield* Effect.gen(function* () {
|
|
const runner = Runner.make<string>(s)
|
|
const first = Effect.never.pipe(
|
|
Effect.onInterrupt(() => Deferred.succeed(hit, undefined)),
|
|
Effect.ensuring(Deferred.await(hold)),
|
|
Effect.as("first"),
|
|
)
|
|
|
|
const a = yield* runner.ensureRunning(first).pipe(Effect.exit, Effect.forkChild)
|
|
yield* Effect.sleep("10 millis")
|
|
|
|
const stop = yield* runner.cancel.pipe(Effect.forkChild)
|
|
yield* Deferred.await(hit).pipe(Effect.timeout("250 millis"))
|
|
|
|
const b = yield* runner.ensureRunning(Deferred.await(done).pipe(Effect.as("second"))).pipe(Effect.forkChild)
|
|
yield* Effect.yieldNow
|
|
expect(runner.busy).toBe(true)
|
|
|
|
yield* Deferred.succeed(hold, undefined)
|
|
const stopExit = yield* Fiber.await(stop).pipe(Effect.timeout("250 millis"))
|
|
expect(Exit.isSuccess(stopExit)).toBe(true)
|
|
|
|
expect(runner.busy).toBe(true)
|
|
yield* Deferred.succeed(done, undefined)
|
|
expect(yield* Fiber.join(b).pipe(Effect.timeout("250 millis"))).toBe("second")
|
|
expect(runner.busy).toBe(false)
|
|
|
|
const exit = yield* Fiber.join(a)
|
|
expect(Exit.isFailure(exit)).toBe(true)
|
|
}).pipe(
|
|
Effect.ensuring(
|
|
Effect.all([Deferred.succeed(hold, undefined), Deferred.succeed(done, undefined)], { discard: true }).pipe(
|
|
Effect.ignore,
|
|
),
|
|
),
|
|
)
|
|
}),
|
|
)
|
|
|
|
// --- shell semantics ---
|
|
|
|
it.live(
|
|
"shell runs exclusively",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string>(s)
|
|
const result = yield* runner.startShell(Effect.succeed("shell-done"))
|
|
expect(result).toBe("shell-done")
|
|
expect(runner.busy).toBe(false)
|
|
}),
|
|
)
|
|
|
|
it.live(
|
|
"shell rejects when run is active",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string>(s)
|
|
const started = yield* Deferred.make<void>()
|
|
const fiber = yield* runner
|
|
.ensureRunning(
|
|
Effect.gen(function* () {
|
|
yield* Deferred.succeed(started, undefined)
|
|
return yield* Effect.never.pipe(Effect.as("x"))
|
|
}),
|
|
)
|
|
.pipe(Effect.forkChild)
|
|
yield* Deferred.await(started).pipe(Effect.timeout("250 millis"))
|
|
yield* Effect.gen(function* () {
|
|
while (runner.state._tag !== "Running") yield* Effect.yieldNow
|
|
}).pipe(Effect.timeout("250 millis"))
|
|
|
|
const exit = yield* runner.startShell(Effect.succeed("nope")).pipe(Effect.exit)
|
|
expect(Exit.isFailure(exit)).toBe(true)
|
|
|
|
yield* runner.cancel
|
|
yield* Fiber.await(fiber).pipe(Effect.timeout("250 millis"))
|
|
}),
|
|
)
|
|
|
|
it.live(
|
|
"shell rejects when another shell is running",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string>(s)
|
|
const gate = yield* Deferred.make<void>()
|
|
|
|
const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("first"))).pipe(Effect.forkChild)
|
|
yield* Effect.sleep("10 millis")
|
|
|
|
const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit)
|
|
expect(Exit.isFailure(exit)).toBe(true)
|
|
|
|
yield* Deferred.succeed(gate, undefined)
|
|
yield* Fiber.await(sh)
|
|
}),
|
|
)
|
|
|
|
it.live(
|
|
"shell rejects via busy callback and cancel still stops the first shell",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string>(s, {
|
|
busy: () => {
|
|
throw new Error("busy")
|
|
},
|
|
})
|
|
|
|
const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild)
|
|
yield* Effect.sleep("10 millis")
|
|
|
|
const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit)
|
|
expect(Exit.isFailure(exit)).toBe(true)
|
|
|
|
yield* runner.cancel
|
|
const done = yield* Fiber.await(sh)
|
|
expect(Exit.isFailure(done)).toBe(true)
|
|
}),
|
|
)
|
|
|
|
it.live(
|
|
"cancel interrupts shell",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string>(s)
|
|
const gate = yield* Deferred.make<void>()
|
|
|
|
const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("ignored"))).pipe(Effect.forkChild)
|
|
yield* Effect.sleep("10 millis")
|
|
|
|
const stop = yield* runner.cancel.pipe(Effect.forkChild)
|
|
const stopExit = yield* Fiber.await(stop).pipe(Effect.timeout("250 millis"))
|
|
expect(Exit.isSuccess(stopExit)).toBe(true)
|
|
expect(runner.busy).toBe(false)
|
|
|
|
const shellExit = yield* Fiber.await(sh)
|
|
expect(Exit.isFailure(shellExit)).toBe(true)
|
|
|
|
yield* Deferred.succeed(gate, undefined).pipe(Effect.ignore)
|
|
}),
|
|
)
|
|
|
|
it.live(
|
|
"cancel does not mask shell defects",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string>(s, { onInterrupt: Effect.succeed("interrupted") })
|
|
|
|
const sh = yield* runner
|
|
.startShell(Effect.never.pipe(Effect.ensuring(Effect.die("boom")), Effect.as("ignored")))
|
|
.pipe(Effect.forkChild)
|
|
yield* Effect.sleep("10 millis")
|
|
|
|
yield* runner.cancel
|
|
expect(Exit.isFailure(yield* Fiber.await(sh))).toBe(true)
|
|
}),
|
|
)
|
|
|
|
// --- shell→run handoff ---
|
|
|
|
it.live(
|
|
"ensureRunning queues behind shell then runs after",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string>(s)
|
|
const gate = yield* Deferred.make<void>()
|
|
|
|
const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("shell-result"))).pipe(Effect.forkChild)
|
|
yield* Effect.sleep("10 millis")
|
|
expect(runner.state._tag).toBe("Shell")
|
|
|
|
const run = yield* runner.ensureRunning(Effect.succeed("run-result")).pipe(Effect.forkChild)
|
|
yield* Effect.sleep("10 millis")
|
|
expect(runner.state._tag).toBe("ShellThenRun")
|
|
|
|
yield* Deferred.succeed(gate, undefined)
|
|
yield* Fiber.await(sh)
|
|
|
|
const exit = yield* Fiber.await(run)
|
|
expect(Exit.isSuccess(exit)).toBe(true)
|
|
if (Exit.isSuccess(exit)) expect(exit.value).toBe("run-result")
|
|
expect(runner.state._tag).toBe("Idle")
|
|
}),
|
|
)
|
|
|
|
it.live(
|
|
"multiple ensureRunning callers share the queued run behind shell",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string>(s)
|
|
const calls = yield* Ref.make(0)
|
|
const gate = yield* Deferred.make<void>()
|
|
|
|
const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("shell"))).pipe(Effect.forkChild)
|
|
yield* Effect.sleep("10 millis")
|
|
|
|
const work = Effect.gen(function* () {
|
|
yield* Ref.update(calls, (n) => n + 1)
|
|
return "run"
|
|
})
|
|
const a = yield* runner.ensureRunning(work).pipe(Effect.forkChild)
|
|
const b = yield* runner.ensureRunning(work).pipe(Effect.forkChild)
|
|
yield* Effect.sleep("10 millis")
|
|
|
|
yield* Deferred.succeed(gate, undefined)
|
|
yield* Fiber.await(sh)
|
|
|
|
const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
|
|
expect(Exit.isSuccess(exitA)).toBe(true)
|
|
expect(Exit.isSuccess(exitB)).toBe(true)
|
|
expect(yield* Ref.get(calls)).toBe(1)
|
|
}),
|
|
)
|
|
|
|
it.live(
|
|
"cancel during shell_then_run cancels both",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string>(s)
|
|
|
|
const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild)
|
|
yield* Effect.sleep("10 millis")
|
|
|
|
const run = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild)
|
|
yield* Effect.sleep("10 millis")
|
|
expect(runner.state._tag).toBe("ShellThenRun")
|
|
|
|
yield* runner.cancel
|
|
expect(runner.busy).toBe(false)
|
|
|
|
yield* Fiber.await(sh)
|
|
const exit = yield* Fiber.await(run)
|
|
expect(Exit.isFailure(exit)).toBe(true)
|
|
}),
|
|
)
|
|
|
|
// --- lifecycle callbacks ---
|
|
|
|
it.live(
|
|
"onIdle fires when returning to idle from running",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const count = yield* Ref.make(0)
|
|
const runner = Runner.make<string>(s, {
|
|
onIdle: Ref.update(count, (n) => n + 1),
|
|
})
|
|
yield* runner.ensureRunning(Effect.succeed("ok"))
|
|
expect(yield* Ref.get(count)).toBe(1)
|
|
}),
|
|
)
|
|
|
|
it.live(
|
|
"onIdle fires on cancel",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const count = yield* Ref.make(0)
|
|
const runner = Runner.make<string>(s, {
|
|
onIdle: Ref.update(count, (n) => n + 1),
|
|
})
|
|
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
|
|
yield* Effect.sleep("10 millis")
|
|
yield* runner.cancel
|
|
yield* Fiber.await(fiber)
|
|
expect(yield* Ref.get(count)).toBeGreaterThanOrEqual(1)
|
|
}),
|
|
)
|
|
|
|
it.live(
|
|
"onBusy fires when shell starts",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const count = yield* Ref.make(0)
|
|
const runner = Runner.make<string>(s, {
|
|
onBusy: Ref.update(count, (n) => n + 1),
|
|
})
|
|
yield* runner.startShell(Effect.succeed("done"))
|
|
expect(yield* Ref.get(count)).toBe(1)
|
|
}),
|
|
)
|
|
|
|
// --- busy flag ---
|
|
|
|
it.live(
|
|
"busy is true during run",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string>(s)
|
|
const gate = yield* Deferred.make<void>()
|
|
|
|
const fiber = yield* runner.ensureRunning(Deferred.await(gate).pipe(Effect.as("ok"))).pipe(Effect.forkChild)
|
|
yield* Effect.sleep("10 millis")
|
|
expect(runner.busy).toBe(true)
|
|
|
|
yield* Deferred.succeed(gate, undefined)
|
|
yield* Fiber.await(fiber)
|
|
expect(runner.busy).toBe(false)
|
|
}),
|
|
)
|
|
|
|
it.live(
|
|
"busy is true during shell",
|
|
Effect.gen(function* () {
|
|
const s = yield* Scope.Scope
|
|
const runner = Runner.make<string>(s)
|
|
const gate = yield* Deferred.make<void>()
|
|
|
|
const fiber = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("ok"))).pipe(Effect.forkChild)
|
|
yield* Effect.sleep("10 millis")
|
|
expect(runner.busy).toBe(true)
|
|
|
|
yield* Deferred.succeed(gate, undefined)
|
|
yield* Fiber.await(fiber)
|
|
expect(runner.busy).toBe(false)
|
|
}),
|
|
)
|
|
})
|