mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-06 08:21:50 +00:00
test: use testEffect for instance state (#25115)
This commit is contained in:
parent
92e80b4660
commit
79e23b7eb9
1 changed files with 361 additions and 450 deletions
|
|
@ -1,482 +1,393 @@
|
|||
import { afterEach, expect, test } from "bun:test"
|
||||
import { Deferred, Duration, Effect, Exit, Fiber, Layer, ManagedRuntime, Context } from "effect"
|
||||
import { afterEach, expect } from "bun:test"
|
||||
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
|
||||
import { $ } from "bun"
|
||||
import { Context, Deferred, Duration, Effect, Exit, Fiber, Layer } from "effect"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { InstanceRef } from "../../src/effect/instance-ref"
|
||||
import { Instance } from "../../src/project/instance"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
import { provideInstance, tmpdirScoped } from "../fixture/fixture"
|
||||
import { testEffect } from "../lib/effect"
|
||||
|
||||
async function access<A, E>(state: InstanceState.InstanceState<A, E>, dir: string) {
|
||||
return Instance.provide({
|
||||
directory: dir,
|
||||
fn: () => Effect.runPromise(InstanceState.get(state)),
|
||||
})
|
||||
}
|
||||
const it = testEffect(CrossSpawnSpawner.defaultLayer)
|
||||
|
||||
const access = <A, E>(state: InstanceState.InstanceState<A, E>, dir: string) =>
|
||||
InstanceState.get(state).pipe(provideInstance(dir))
|
||||
|
||||
const tmpdirGitScoped = Effect.gen(function* () {
|
||||
const dir = yield* tmpdirScoped({ git: true })
|
||||
yield* Effect.promise(() => $`git commit --allow-empty --amend -m ${`root commit ${dir}`}`.cwd(dir).quiet())
|
||||
return dir
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
await Instance.disposeAll()
|
||||
})
|
||||
|
||||
test("InstanceState caches values per directory", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
let n = 0
|
||||
it.live("InstanceState caches values per directory", () =>
|
||||
Effect.gen(function* () {
|
||||
const dir = yield* tmpdirScoped()
|
||||
let n = 0
|
||||
const state = yield* InstanceState.make(() => Effect.sync(() => ({ n: ++n })))
|
||||
|
||||
await Effect.runPromise(
|
||||
Effect.scoped(
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make(() => Effect.sync(() => ({ n: ++n })))
|
||||
const a = yield* access(state, dir)
|
||||
const b = yield* access(state, dir)
|
||||
|
||||
const a = yield* Effect.promise(() => access(state, tmp.path))
|
||||
const b = yield* Effect.promise(() => access(state, tmp.path))
|
||||
expect(a).toBe(b)
|
||||
expect(n).toBe(1)
|
||||
}),
|
||||
)
|
||||
|
||||
expect(a).toBe(b)
|
||||
expect(n).toBe(1)
|
||||
}),
|
||||
),
|
||||
)
|
||||
})
|
||||
it.live("InstanceState isolates directories", () =>
|
||||
Effect.gen(function* () {
|
||||
const one = yield* tmpdirScoped()
|
||||
const two = yield* tmpdirScoped()
|
||||
let n = 0
|
||||
const state = yield* InstanceState.make((dir) => Effect.sync(() => ({ dir, n: ++n })))
|
||||
|
||||
test("InstanceState isolates directories", async () => {
|
||||
await using one = await tmpdir()
|
||||
await using two = await tmpdir()
|
||||
let n = 0
|
||||
const a = yield* access(state, one)
|
||||
const b = yield* access(state, two)
|
||||
const c = yield* access(state, one)
|
||||
|
||||
await Effect.runPromise(
|
||||
Effect.scoped(
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make((dir) => Effect.sync(() => ({ dir, n: ++n })))
|
||||
expect(a).toBe(c)
|
||||
expect(a).not.toBe(b)
|
||||
expect(n).toBe(2)
|
||||
}),
|
||||
)
|
||||
|
||||
const a = yield* Effect.promise(() => access(state, one.path))
|
||||
const b = yield* Effect.promise(() => access(state, two.path))
|
||||
const c = yield* Effect.promise(() => access(state, one.path))
|
||||
|
||||
expect(a).toBe(c)
|
||||
expect(a).not.toBe(b)
|
||||
expect(n).toBe(2)
|
||||
}),
|
||||
),
|
||||
)
|
||||
})
|
||||
|
||||
test("InstanceState invalidates on reload", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const seen: string[] = []
|
||||
let n = 0
|
||||
|
||||
await Effect.runPromise(
|
||||
Effect.scoped(
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make(() =>
|
||||
Effect.acquireRelease(
|
||||
Effect.sync(() => ({ n: ++n })),
|
||||
(value) =>
|
||||
Effect.sync(() => {
|
||||
seen.push(String(value.n))
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
const a = yield* Effect.promise(() => access(state, tmp.path))
|
||||
yield* Effect.promise(() => Instance.reload({ directory: tmp.path }))
|
||||
const b = yield* Effect.promise(() => access(state, tmp.path))
|
||||
|
||||
expect(a).not.toBe(b)
|
||||
expect(seen).toEqual(["1"])
|
||||
}),
|
||||
),
|
||||
)
|
||||
})
|
||||
|
||||
test("InstanceState invalidates on disposeAll", async () => {
|
||||
await using one = await tmpdir()
|
||||
await using two = await tmpdir()
|
||||
const seen: string[] = []
|
||||
|
||||
await Effect.runPromise(
|
||||
Effect.scoped(
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make((ctx) =>
|
||||
Effect.acquireRelease(
|
||||
Effect.sync(() => ({ dir: ctx.directory })),
|
||||
(value) =>
|
||||
Effect.sync(() => {
|
||||
seen.push(value.dir)
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
yield* Effect.promise(() => access(state, one.path))
|
||||
yield* Effect.promise(() => access(state, two.path))
|
||||
yield* Effect.promise(() => Instance.disposeAll())
|
||||
|
||||
expect(seen.sort()).toEqual([one.path, two.path].sort())
|
||||
}),
|
||||
),
|
||||
)
|
||||
})
|
||||
|
||||
test("InstanceState.get reads the current directory lazily", async () => {
|
||||
await using one = await tmpdir()
|
||||
await using two = await tmpdir()
|
||||
|
||||
interface Api {
|
||||
readonly get: () => Effect.Effect<string>
|
||||
}
|
||||
|
||||
class Test extends Context.Service<Test, Api>()("@test/InstanceStateLazy") {
|
||||
static readonly layer = Layer.effect(
|
||||
Test,
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory))
|
||||
const get = InstanceState.get(state)
|
||||
|
||||
return Test.of({
|
||||
get: Effect.fn("Test.get")(function* () {
|
||||
return yield* get
|
||||
it.live("InstanceState invalidates on reload", () =>
|
||||
Effect.gen(function* () {
|
||||
const dir = yield* tmpdirScoped()
|
||||
const seen: string[] = []
|
||||
let n = 0
|
||||
const state = yield* InstanceState.make(() =>
|
||||
Effect.acquireRelease(
|
||||
Effect.sync(() => ({ n: ++n })),
|
||||
(value) =>
|
||||
Effect.sync(() => {
|
||||
seen.push(String(value.n))
|
||||
}),
|
||||
})
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
const rt = ManagedRuntime.make(Test.layer)
|
||||
|
||||
try {
|
||||
const a = await Instance.provide({
|
||||
directory: one.path,
|
||||
fn: () => rt.runPromise(Test.use((svc) => svc.get())),
|
||||
})
|
||||
const b = await Instance.provide({
|
||||
directory: two.path,
|
||||
fn: () => rt.runPromise(Test.use((svc) => svc.get())),
|
||||
})
|
||||
|
||||
expect(a).toBe(one.path)
|
||||
expect(b).toBe(two.path)
|
||||
} finally {
|
||||
await rt.dispose()
|
||||
}
|
||||
})
|
||||
|
||||
test("InstanceState preserves directory across async boundaries", async () => {
|
||||
await using one = await tmpdir({ git: true })
|
||||
await using two = await tmpdir({ git: true })
|
||||
await using three = await tmpdir({ git: true })
|
||||
|
||||
interface Api {
|
||||
readonly get: () => Effect.Effect<{ directory: string; worktree: string; project: string }>
|
||||
}
|
||||
|
||||
class Test extends Context.Service<Test, Api>()("@test/InstanceStateAsync") {
|
||||
static readonly layer = Layer.effect(
|
||||
Test,
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make((ctx) =>
|
||||
Effect.sync(() => ({
|
||||
directory: ctx.directory,
|
||||
worktree: ctx.worktree,
|
||||
project: ctx.project.id,
|
||||
})),
|
||||
)
|
||||
|
||||
return Test.of({
|
||||
get: Effect.fn("Test.get")(function* () {
|
||||
yield* Effect.promise(() => Bun.sleep(1))
|
||||
yield* Effect.sleep(Duration.millis(1))
|
||||
for (let i = 0; i < 100; i++) {
|
||||
yield* Effect.yieldNow
|
||||
}
|
||||
for (let i = 0; i < 100; i++) {
|
||||
yield* Effect.promise(() => Promise.resolve())
|
||||
}
|
||||
yield* Effect.sleep(Duration.millis(2))
|
||||
yield* Effect.promise(() => Bun.sleep(1))
|
||||
return yield* InstanceState.get(state)
|
||||
}),
|
||||
})
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
const rt = ManagedRuntime.make(Test.layer)
|
||||
|
||||
try {
|
||||
const [a, b, c] = await Promise.all([
|
||||
Instance.provide({
|
||||
directory: one.path,
|
||||
fn: () => rt.runPromise(Test.use((svc) => svc.get())),
|
||||
}),
|
||||
Instance.provide({
|
||||
directory: two.path,
|
||||
fn: () => rt.runPromise(Test.use((svc) => svc.get())),
|
||||
}),
|
||||
Instance.provide({
|
||||
directory: three.path,
|
||||
fn: () => rt.runPromise(Test.use((svc) => svc.get())),
|
||||
}),
|
||||
])
|
||||
|
||||
expect(a).toEqual({ directory: one.path, worktree: one.path, project: a.project })
|
||||
expect(b).toEqual({ directory: two.path, worktree: two.path, project: b.project })
|
||||
expect(c).toEqual({ directory: three.path, worktree: three.path, project: c.project })
|
||||
expect(a.project).not.toBe(b.project)
|
||||
expect(a.project).not.toBe(c.project)
|
||||
expect(b.project).not.toBe(c.project)
|
||||
} finally {
|
||||
await rt.dispose()
|
||||
}
|
||||
})
|
||||
|
||||
test("InstanceState survives high-contention concurrent access", async () => {
|
||||
const N = 20
|
||||
const dirs = await Promise.all(Array.from({ length: N }, () => tmpdir()))
|
||||
|
||||
interface Api {
|
||||
readonly get: () => Effect.Effect<string>
|
||||
}
|
||||
|
||||
class Test extends Context.Service<Test, Api>()("@test/HighContention") {
|
||||
static readonly layer = Layer.effect(
|
||||
Test,
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory))
|
||||
|
||||
return Test.of({
|
||||
get: Effect.fn("Test.get")(function* () {
|
||||
// Interleave many async hops to maximize chance of ALS corruption
|
||||
for (let i = 0; i < 10; i++) {
|
||||
yield* Effect.promise(() => Bun.sleep(Math.random() * 3))
|
||||
yield* Effect.yieldNow
|
||||
yield* Effect.promise(() => Promise.resolve())
|
||||
}
|
||||
return yield* InstanceState.get(state)
|
||||
}),
|
||||
})
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
const rt = ManagedRuntime.make(Test.layer)
|
||||
|
||||
try {
|
||||
const results = await Promise.all(
|
||||
dirs.map((d) =>
|
||||
Instance.provide({
|
||||
directory: d.path,
|
||||
fn: () => rt.runPromise(Test.use((svc) => svc.get())),
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
for (let i = 0; i < N; i++) {
|
||||
expect(results[i]).toBe(dirs[i].path)
|
||||
const a = yield* access(state, dir)
|
||||
yield* Effect.promise(() => Instance.reload({ directory: dir }))
|
||||
const b = yield* access(state, dir)
|
||||
|
||||
expect(a).not.toBe(b)
|
||||
expect(seen).toEqual(["1"])
|
||||
}),
|
||||
)
|
||||
|
||||
it.live("InstanceState invalidates on disposeAll", () =>
|
||||
Effect.gen(function* () {
|
||||
const one = yield* tmpdirScoped()
|
||||
const two = yield* tmpdirScoped()
|
||||
const seen: string[] = []
|
||||
const state = yield* InstanceState.make((ctx) =>
|
||||
Effect.acquireRelease(
|
||||
Effect.sync(() => ({ dir: ctx.directory })),
|
||||
(value) =>
|
||||
Effect.sync(() => {
|
||||
seen.push(value.dir)
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
yield* access(state, one)
|
||||
yield* access(state, two)
|
||||
yield* Effect.promise(() => Instance.disposeAll())
|
||||
|
||||
expect(seen.sort()).toEqual([one, two].sort())
|
||||
}),
|
||||
)
|
||||
|
||||
it.live("InstanceState.get reads the current directory lazily", () =>
|
||||
Effect.gen(function* () {
|
||||
const one = yield* tmpdirScoped()
|
||||
const two = yield* tmpdirScoped()
|
||||
|
||||
interface Api {
|
||||
readonly get: () => Effect.Effect<string>
|
||||
}
|
||||
} finally {
|
||||
await rt.dispose()
|
||||
for (const d of dirs) await d[Symbol.asyncDispose]()
|
||||
}
|
||||
})
|
||||
|
||||
test("InstanceState correct after interleaved init and dispose", async () => {
|
||||
await using one = await tmpdir()
|
||||
await using two = await tmpdir()
|
||||
class Test extends Context.Service<Test, Api>()("@test/InstanceStateLazy") {
|
||||
static readonly layer = Layer.effect(
|
||||
Test,
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory))
|
||||
const get = InstanceState.get(state)
|
||||
|
||||
interface Api {
|
||||
readonly get: () => Effect.Effect<string>
|
||||
}
|
||||
return Test.of({
|
||||
get: Effect.fn("Test.get")(function* () {
|
||||
return yield* get
|
||||
}),
|
||||
})
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
class Test extends Context.Service<Test, Api>()("@test/InterleavedDispose") {
|
||||
static readonly layer = Layer.effect(
|
||||
Test,
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make((ctx) =>
|
||||
Effect.promise(async () => {
|
||||
await Bun.sleep(5) // slow init
|
||||
return ctx.directory
|
||||
}),
|
||||
)
|
||||
yield* Effect.gen(function* () {
|
||||
const a = yield* Test.use((svc) => svc.get()).pipe(provideInstance(one))
|
||||
const b = yield* Test.use((svc) => svc.get()).pipe(provideInstance(two))
|
||||
|
||||
return Test.of({
|
||||
get: Effect.fn("Test.get")(function* () {
|
||||
return yield* InstanceState.get(state)
|
||||
}),
|
||||
})
|
||||
expect(a).toBe(one)
|
||||
expect(b).toBe(two)
|
||||
}).pipe(Effect.provide(Test.layer))
|
||||
}),
|
||||
)
|
||||
|
||||
it.live("InstanceState preserves directory across async boundaries", () =>
|
||||
Effect.gen(function* () {
|
||||
const one = yield* tmpdirGitScoped
|
||||
const two = yield* tmpdirGitScoped
|
||||
const three = yield* tmpdirGitScoped
|
||||
|
||||
interface Api {
|
||||
readonly get: () => Effect.Effect<{ directory: string; worktree: string; project: string }>
|
||||
}
|
||||
|
||||
class Test extends Context.Service<Test, Api>()("@test/InstanceStateAsync") {
|
||||
static readonly layer = Layer.effect(
|
||||
Test,
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make((ctx) =>
|
||||
Effect.sync(() => ({
|
||||
directory: ctx.directory,
|
||||
worktree: ctx.worktree,
|
||||
project: ctx.project.id,
|
||||
})),
|
||||
)
|
||||
|
||||
return Test.of({
|
||||
get: Effect.fn("Test.get")(function* () {
|
||||
yield* Effect.promise(() => Bun.sleep(1))
|
||||
yield* Effect.sleep(Duration.millis(1))
|
||||
for (let i = 0; i < 100; i++) {
|
||||
yield* Effect.yieldNow
|
||||
}
|
||||
for (let i = 0; i < 100; i++) {
|
||||
yield* Effect.promise(() => Promise.resolve())
|
||||
}
|
||||
yield* Effect.sleep(Duration.millis(2))
|
||||
yield* Effect.promise(() => Bun.sleep(1))
|
||||
return yield* InstanceState.get(state)
|
||||
}),
|
||||
})
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
yield* Effect.gen(function* () {
|
||||
const [a, b, c] = yield* Effect.all(
|
||||
[one, two, three].map((dir) => Test.use((svc) => svc.get()).pipe(provideInstance(dir))),
|
||||
{ concurrency: "unbounded" },
|
||||
)
|
||||
|
||||
expect(a).toEqual({ directory: one, worktree: one, project: a.project })
|
||||
expect(b).toEqual({ directory: two, worktree: two, project: b.project })
|
||||
expect(c).toEqual({ directory: three, worktree: three, project: c.project })
|
||||
expect(a.project).not.toBe(b.project)
|
||||
expect(a.project).not.toBe(c.project)
|
||||
expect(b.project).not.toBe(c.project)
|
||||
}).pipe(Effect.provide(Test.layer))
|
||||
}),
|
||||
)
|
||||
|
||||
it.live("InstanceState survives high-contention concurrent access", () =>
|
||||
Effect.gen(function* () {
|
||||
const dirs = yield* Effect.all(
|
||||
Array.from({ length: 20 }, () => tmpdirScoped()),
|
||||
{ concurrency: "unbounded" },
|
||||
)
|
||||
|
||||
interface Api {
|
||||
readonly get: () => Effect.Effect<string>
|
||||
}
|
||||
|
||||
class Test extends Context.Service<Test, Api>()("@test/HighContention") {
|
||||
static readonly layer = Layer.effect(
|
||||
Test,
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory))
|
||||
|
||||
return Test.of({
|
||||
get: Effect.fn("Test.get")(function* () {
|
||||
for (let i = 0; i < 10; i++) {
|
||||
yield* Effect.promise(() => Bun.sleep(Math.random() * 3))
|
||||
yield* Effect.yieldNow
|
||||
yield* Effect.promise(() => Promise.resolve())
|
||||
}
|
||||
return yield* InstanceState.get(state)
|
||||
}),
|
||||
})
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
yield* Effect.gen(function* () {
|
||||
const results = yield* Effect.all(
|
||||
dirs.map((dir) => Test.use((svc) => svc.get()).pipe(provideInstance(dir))),
|
||||
{ concurrency: "unbounded" },
|
||||
)
|
||||
|
||||
expect(results).toEqual(dirs)
|
||||
}).pipe(Effect.provide(Test.layer))
|
||||
}),
|
||||
)
|
||||
|
||||
it.live("InstanceState correct after interleaved init and dispose", () =>
|
||||
Effect.gen(function* () {
|
||||
const one = yield* tmpdirScoped()
|
||||
const two = yield* tmpdirScoped()
|
||||
|
||||
interface Api {
|
||||
readonly get: () => Effect.Effect<string>
|
||||
}
|
||||
|
||||
class Test extends Context.Service<Test, Api>()("@test/InterleavedDispose") {
|
||||
static readonly layer = Layer.effect(
|
||||
Test,
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make((ctx) =>
|
||||
Effect.promise(async () => {
|
||||
await Bun.sleep(5)
|
||||
return ctx.directory
|
||||
}),
|
||||
)
|
||||
|
||||
return Test.of({
|
||||
get: Effect.fn("Test.get")(function* () {
|
||||
return yield* InstanceState.get(state)
|
||||
}),
|
||||
})
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
yield* Effect.gen(function* () {
|
||||
const a = yield* Test.use((svc) => svc.get()).pipe(provideInstance(one))
|
||||
expect(a).toBe(one)
|
||||
|
||||
const [, b] = yield* Effect.all(
|
||||
[
|
||||
Effect.promise(() => Instance.reload({ directory: one })),
|
||||
Test.use((svc) => svc.get()).pipe(provideInstance(two)),
|
||||
],
|
||||
{ concurrency: "unbounded" },
|
||||
)
|
||||
expect(b).toBe(two)
|
||||
|
||||
const c = yield* Test.use((svc) => svc.get()).pipe(provideInstance(one))
|
||||
expect(c).toBe(one)
|
||||
}).pipe(Effect.provide(Test.layer))
|
||||
}),
|
||||
)
|
||||
|
||||
it.live("InstanceState mutation in one directory does not leak to another", () =>
|
||||
Effect.gen(function* () {
|
||||
const one = yield* tmpdirScoped()
|
||||
const two = yield* tmpdirScoped()
|
||||
const state = yield* InstanceState.make(() => Effect.sync(() => ({ count: 0 })))
|
||||
|
||||
const s1 = yield* access(state, one)
|
||||
s1.count = 42
|
||||
|
||||
const s2 = yield* access(state, two)
|
||||
expect(s2.count).toBe(0)
|
||||
|
||||
const s1again = yield* access(state, one)
|
||||
expect(s1again.count).toBe(42)
|
||||
expect(s1again).toBe(s1)
|
||||
}),
|
||||
)
|
||||
|
||||
it.live("InstanceState dedupes concurrent lookups", () =>
|
||||
Effect.gen(function* () {
|
||||
const dir = yield* tmpdirScoped()
|
||||
let n = 0
|
||||
const state = yield* InstanceState.make(() =>
|
||||
Effect.promise(async () => {
|
||||
n += 1
|
||||
await Bun.sleep(10)
|
||||
return { n }
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
const rt = ManagedRuntime.make(Test.layer)
|
||||
const [a, b] = yield* Effect.all([access(state, dir), access(state, dir)], { concurrency: "unbounded" })
|
||||
expect(a).toBe(b)
|
||||
expect(n).toBe(1)
|
||||
}),
|
||||
)
|
||||
|
||||
try {
|
||||
// Init both directories
|
||||
const a = await Instance.provide({
|
||||
directory: one.path,
|
||||
fn: () => rt.runPromise(Test.use((svc) => svc.get())),
|
||||
})
|
||||
expect(a).toBe(one.path)
|
||||
it.live("InstanceState survives deferred resume from the same instance context", () =>
|
||||
Effect.gen(function* () {
|
||||
const dir = yield* tmpdirScoped({ git: true })
|
||||
|
||||
// Dispose one directory, access the other concurrently
|
||||
const [, b] = await Promise.all([
|
||||
Instance.reload({ directory: one.path }),
|
||||
Instance.provide({
|
||||
directory: two.path,
|
||||
fn: () => rt.runPromise(Test.use((svc) => svc.get())),
|
||||
}),
|
||||
])
|
||||
expect(b).toBe(two.path)
|
||||
|
||||
// Re-access disposed directory - should get fresh state
|
||||
const c = await Instance.provide({
|
||||
directory: one.path,
|
||||
fn: () => rt.runPromise(Test.use((svc) => svc.get())),
|
||||
})
|
||||
expect(c).toBe(one.path)
|
||||
} finally {
|
||||
await rt.dispose()
|
||||
}
|
||||
})
|
||||
|
||||
test("InstanceState mutation in one directory does not leak to another", async () => {
|
||||
await using one = await tmpdir()
|
||||
await using two = await tmpdir()
|
||||
|
||||
await Effect.runPromise(
|
||||
Effect.scoped(
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make(() => Effect.sync(() => ({ count: 0 })))
|
||||
|
||||
// Mutate state in directory one
|
||||
const s1 = yield* Effect.promise(() => access(state, one.path))
|
||||
s1.count = 42
|
||||
|
||||
// Access directory two — should be independent
|
||||
const s2 = yield* Effect.promise(() => access(state, two.path))
|
||||
expect(s2.count).toBe(0)
|
||||
|
||||
// Confirm directory one still has the mutation
|
||||
const s1again = yield* Effect.promise(() => access(state, one.path))
|
||||
expect(s1again.count).toBe(42)
|
||||
expect(s1again).toBe(s1) // same reference
|
||||
}),
|
||||
),
|
||||
)
|
||||
})
|
||||
|
||||
test("InstanceState dedupes concurrent lookups", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
let n = 0
|
||||
|
||||
await Effect.runPromise(
|
||||
Effect.scoped(
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make(() =>
|
||||
Effect.promise(async () => {
|
||||
n += 1
|
||||
await Bun.sleep(10)
|
||||
return { n }
|
||||
}),
|
||||
)
|
||||
|
||||
const [a, b] = yield* Effect.promise(() => Promise.all([access(state, tmp.path), access(state, tmp.path)]))
|
||||
expect(a).toBe(b)
|
||||
expect(n).toBe(1)
|
||||
}),
|
||||
),
|
||||
)
|
||||
})
|
||||
|
||||
test("InstanceState survives deferred resume from the same instance context", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
|
||||
interface Api {
|
||||
readonly get: (gate: Deferred.Deferred<void>) => Effect.Effect<string>
|
||||
}
|
||||
|
||||
class Test extends Context.Service<Test, Api>()("@test/DeferredResume") {
|
||||
static readonly layer = Layer.effect(
|
||||
Test,
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory))
|
||||
|
||||
return Test.of({
|
||||
get: Effect.fn("Test.get")(function* (gate: Deferred.Deferred<void>) {
|
||||
yield* Deferred.await(gate)
|
||||
return yield* InstanceState.get(state)
|
||||
}),
|
||||
})
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
const rt = ManagedRuntime.make(Test.layer)
|
||||
|
||||
try {
|
||||
const gate = await Effect.runPromise(Deferred.make<void>())
|
||||
const fiber = await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: () => Promise.resolve(rt.runFork(Test.use((svc) => svc.get(gate)))),
|
||||
})
|
||||
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: () => Effect.runPromise(Deferred.succeed(gate, void 0)),
|
||||
})
|
||||
const exit = await Effect.runPromise(Fiber.await(fiber))
|
||||
|
||||
expect(Exit.isSuccess(exit)).toBe(true)
|
||||
if (Exit.isSuccess(exit)) {
|
||||
expect(exit.value).toBe(tmp.path)
|
||||
interface Api {
|
||||
readonly get: (gate: Deferred.Deferred<void>) => Effect.Effect<string>
|
||||
}
|
||||
} finally {
|
||||
await rt.dispose()
|
||||
}
|
||||
})
|
||||
|
||||
test("InstanceState survives deferred resume outside ALS when InstanceRef is set", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
class Test extends Context.Service<Test, Api>()("@test/DeferredResume") {
|
||||
static readonly layer = Layer.effect(
|
||||
Test,
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory))
|
||||
|
||||
interface Api {
|
||||
readonly get: (gate: Deferred.Deferred<void>) => Effect.Effect<string>
|
||||
}
|
||||
|
||||
class Test extends Context.Service<Test, Api>()("@test/DeferredResumeOutside") {
|
||||
static readonly layer = Layer.effect(
|
||||
Test,
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory))
|
||||
|
||||
return Test.of({
|
||||
get: Effect.fn("Test.get")(function* (gate: Deferred.Deferred<void>) {
|
||||
yield* Deferred.await(gate)
|
||||
return yield* InstanceState.get(state)
|
||||
}),
|
||||
})
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
const rt = ManagedRuntime.make(Test.layer)
|
||||
|
||||
try {
|
||||
const gate = await Effect.runPromise(Deferred.make<void>())
|
||||
// Provide InstanceRef so the fiber carries the context even when
|
||||
// the deferred is resolved from outside Instance.provide ALS.
|
||||
const fiber = await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: () =>
|
||||
Promise.resolve(
|
||||
rt.runFork(Test.use((svc) => svc.get(gate)).pipe(Effect.provideService(InstanceRef, Instance.current))),
|
||||
),
|
||||
})
|
||||
|
||||
// Resume from outside any Instance.provide — ALS is NOT set here
|
||||
await Effect.runPromise(Deferred.succeed(gate, void 0))
|
||||
const exit = await Effect.runPromise(Fiber.await(fiber))
|
||||
|
||||
expect(Exit.isSuccess(exit)).toBe(true)
|
||||
if (Exit.isSuccess(exit)) {
|
||||
expect(exit.value).toBe(tmp.path)
|
||||
return Test.of({
|
||||
get: Effect.fn("Test.get")(function* (gate: Deferred.Deferred<void>) {
|
||||
yield* Deferred.await(gate)
|
||||
return yield* InstanceState.get(state)
|
||||
}),
|
||||
})
|
||||
}),
|
||||
)
|
||||
}
|
||||
} finally {
|
||||
await rt.dispose()
|
||||
}
|
||||
})
|
||||
|
||||
yield* Effect.gen(function* () {
|
||||
const gate = yield* Deferred.make<void>()
|
||||
const fiber = yield* Test.use((svc) => svc.get(gate)).pipe(provideInstance(dir), Effect.forkScoped)
|
||||
|
||||
yield* Deferred.succeed(gate, undefined).pipe(provideInstance(dir))
|
||||
const exit = yield* Fiber.await(fiber)
|
||||
|
||||
expect(Exit.isSuccess(exit)).toBe(true)
|
||||
if (Exit.isSuccess(exit)) expect(exit.value).toBe(dir)
|
||||
}).pipe(Effect.provide(Test.layer))
|
||||
}),
|
||||
)
|
||||
|
||||
it.live("InstanceState survives deferred resume outside ALS when InstanceRef is set", () =>
|
||||
Effect.gen(function* () {
|
||||
const dir = yield* tmpdirScoped({ git: true })
|
||||
|
||||
interface Api {
|
||||
readonly get: (gate: Deferred.Deferred<void>) => Effect.Effect<string>
|
||||
}
|
||||
|
||||
class Test extends Context.Service<Test, Api>()("@test/DeferredResumeOutside") {
|
||||
static readonly layer = Layer.effect(
|
||||
Test,
|
||||
Effect.gen(function* () {
|
||||
const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory))
|
||||
|
||||
return Test.of({
|
||||
get: Effect.fn("Test.get")(function* (gate: Deferred.Deferred<void>) {
|
||||
yield* Deferred.await(gate)
|
||||
return yield* InstanceState.get(state)
|
||||
}),
|
||||
})
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
yield* Effect.gen(function* () {
|
||||
const gate = yield* Deferred.make<void>()
|
||||
const fiber = yield* Test.use((svc) => svc.get(gate)).pipe(provideInstance(dir), Effect.forkScoped)
|
||||
|
||||
yield* Deferred.succeed(gate, undefined)
|
||||
const exit = yield* Fiber.await(fiber)
|
||||
|
||||
expect(Exit.isSuccess(exit)).toBe(true)
|
||||
if (Exit.isSuccess(exit)) expect(exit.value).toBe(dir)
|
||||
}).pipe(Effect.provide(Test.layer))
|
||||
}),
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue