Add instance-aware Effect test helper (#25442)

This commit is contained in:
Kit Langton 2026-05-02 13:13:13 -04:00 committed by GitHub
parent 50d5c57193
commit 913321e4b6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 118 additions and 89 deletions

View file

@ -2,9 +2,8 @@ import { describe, expect } from "bun:test"
import { Deferred, Effect, Layer, Schema, Stream } from "effect"
import { Bus } from "../../src/bus"
import { BusEvent } from "../../src/bus/bus-event"
import { Instance } from "../../src/project/instance"
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
import { disposeAllInstances, provideInstance, provideTmpdirInstance, tmpdirScoped } from "../fixture/fixture"
import { disposeAllInstances, provideInstance, tmpdirScoped } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
const TestEvent = {
@ -19,111 +18,103 @@ const live = Layer.mergeAll(Bus.layer, node)
const it = testEffect(live)
describe("Bus (Effect-native)", () => {
it.live("publish + subscribe stream delivers events", () =>
provideTmpdirInstance(() =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const received: number[] = []
const done = yield* Deferred.make<void>()
it.instance("publish + subscribe stream delivers events", () =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const received: number[] = []
const done = yield* Deferred.make<void>()
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
Effect.sync(() => {
received.push(evt.properties.value)
if (received.length === 2) Deferred.doneUnsafe(done, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
Effect.sync(() => {
received.push(evt.properties.value)
if (received.length === 2) Deferred.doneUnsafe(done, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Effect.sleep("10 millis")
yield* bus.publish(TestEvent.Ping, { value: 1 })
yield* bus.publish(TestEvent.Ping, { value: 2 })
yield* Deferred.await(done)
yield* Effect.sleep("10 millis")
yield* bus.publish(TestEvent.Ping, { value: 1 })
yield* bus.publish(TestEvent.Ping, { value: 2 })
yield* Deferred.await(done)
expect(received).toEqual([1, 2])
}),
),
expect(received).toEqual([1, 2])
}),
)
it.live("subscribe filters by event type", () =>
provideTmpdirInstance(() =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const pings: number[] = []
const done = yield* Deferred.make<void>()
it.instance("subscribe filters by event type", () =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const pings: number[] = []
const done = yield* Deferred.make<void>()
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
Effect.sync(() => {
pings.push(evt.properties.value)
Deferred.doneUnsafe(done, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
Effect.sync(() => {
pings.push(evt.properties.value)
Deferred.doneUnsafe(done, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Effect.sleep("10 millis")
yield* bus.publish(TestEvent.Pong, { message: "ignored" })
yield* bus.publish(TestEvent.Ping, { value: 42 })
yield* Deferred.await(done)
yield* Effect.sleep("10 millis")
yield* bus.publish(TestEvent.Pong, { message: "ignored" })
yield* bus.publish(TestEvent.Ping, { value: 42 })
yield* Deferred.await(done)
expect(pings).toEqual([42])
}),
),
expect(pings).toEqual([42])
}),
)
it.live("subscribeAll receives all types", () =>
provideTmpdirInstance(() =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const types: string[] = []
const done = yield* Deferred.make<void>()
it.instance("subscribeAll receives all types", () =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const types: string[] = []
const done = yield* Deferred.make<void>()
yield* Stream.runForEach(bus.subscribeAll(), (evt) =>
Effect.sync(() => {
types.push(evt.type)
if (types.length === 2) Deferred.doneUnsafe(done, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Stream.runForEach(bus.subscribeAll(), (evt) =>
Effect.sync(() => {
types.push(evt.type)
if (types.length === 2) Deferred.doneUnsafe(done, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Effect.sleep("10 millis")
yield* bus.publish(TestEvent.Ping, { value: 1 })
yield* bus.publish(TestEvent.Pong, { message: "hi" })
yield* Deferred.await(done)
yield* Effect.sleep("10 millis")
yield* bus.publish(TestEvent.Ping, { value: 1 })
yield* bus.publish(TestEvent.Pong, { message: "hi" })
yield* Deferred.await(done)
expect(types).toContain("test.effect.ping")
expect(types).toContain("test.effect.pong")
}),
),
expect(types).toContain("test.effect.ping")
expect(types).toContain("test.effect.pong")
}),
)
it.live("multiple subscribers each receive the event", () =>
provideTmpdirInstance(() =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const a: number[] = []
const b: number[] = []
const doneA = yield* Deferred.make<void>()
const doneB = yield* Deferred.make<void>()
it.instance("multiple subscribers each receive the event", () =>
Effect.gen(function* () {
const bus = yield* Bus.Service
const a: number[] = []
const b: number[] = []
const doneA = yield* Deferred.make<void>()
const doneB = yield* Deferred.make<void>()
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
Effect.sync(() => {
a.push(evt.properties.value)
Deferred.doneUnsafe(doneA, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
Effect.sync(() => {
a.push(evt.properties.value)
Deferred.doneUnsafe(doneA, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
Effect.sync(() => {
b.push(evt.properties.value)
Deferred.doneUnsafe(doneB, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
Effect.sync(() => {
b.push(evt.properties.value)
Deferred.doneUnsafe(doneB, Effect.void)
}),
).pipe(Effect.forkScoped)
yield* Effect.sleep("10 millis")
yield* bus.publish(TestEvent.Ping, { value: 99 })
yield* Deferred.await(doneA)
yield* Deferred.await(doneB)
yield* Effect.sleep("10 millis")
yield* bus.publish(TestEvent.Ping, { value: 99 })
yield* Deferred.await(doneA)
yield* Deferred.await(doneB)
expect(a).toEqual([99])
expect(b).toEqual([99])
}),
),
expect(a).toEqual([99])
expect(b).toEqual([99])
}),
)
it.live("subscribeAll stream sees InstanceDisposed on disposal", () =>

View file

@ -5,10 +5,12 @@ import path from "path"
import { Effect, Context } from "effect"
import type * as PlatformError from "effect/PlatformError"
import type * as Scope from "effect/Scope"
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import type { Config } from "@/config/config"
import { InstanceRef } from "../../src/effect/instance-ref"
import { Instance } from "../../src/project/instance"
import { InstanceStore } from "../../src/project/instance-store"
import { TestLLMServer } from "../lib/llm-server"
// Re-export for test ergonomics. The implementation lives next to the runtime
@ -160,6 +162,18 @@ export function provideTmpdirInstance<A, E, R>(
})
}
export class TestInstance extends Context.Service<TestInstance, { readonly directory: string }>()("@test/Instance") {}
export const withTmpdirInstance =
(options?: { git?: boolean; config?: Partial<Config.Info> }) =>
<A, E, R>(self: Effect.Effect<A, E, R>) =>
Effect.gen(function* () {
const directory = yield* tmpdirScoped(options)
return yield* InstanceStore.Service.use((store) =>
store.provide({ directory }, self.pipe(Effect.provideService(TestInstance, { directory }))),
)
}).pipe(Effect.provide(InstanceStore.defaultLayer), Effect.provide(CrossSpawnSpawner.defaultLayer))
export function provideTmpdirServer<A, E, R>(
self: (input: { dir: string; llm: TestLLMServer["Service"] }) => Effect.Effect<A, E, R>,
options?: { git?: boolean; config?: (url: string) => Partial<Config.Info> },

View file

@ -3,8 +3,11 @@ import { Cause, Effect, Exit, Layer } from "effect"
import type * as Scope from "effect/Scope"
import * as TestClock from "effect/testing/TestClock"
import * as TestConsole from "effect/testing/TestConsole"
import type { Config } from "@/config/config"
import { TestInstance, withTmpdirInstance } from "../fixture/fixture"
type Body<A, E, R> = Effect.Effect<A, E, R> | (() => Effect.Effect<A, E, R>)
type InstanceOptions = { git?: boolean; config?: Partial<Config.Info> }
const body = <A, E, R>(value: Body<A, E, R>) => Effect.suspend(() => (typeof value === "function" ? value() : value))
@ -38,7 +41,28 @@ const make = <R, E>(testLayer: Layer.Layer<R, E>, liveLayer: Layer.Layer<R, E>)
live.skip = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
test.skip(name, () => run(value, liveLayer), opts)
return { effect, live }
const instance = <A, E2>(
name: string,
value: Body<A, E2, R | TestInstance | Scope.Scope>,
instanceOptions?: InstanceOptions,
opts?: number | TestOptions,
) => test(name, () => run(body(value).pipe(withTmpdirInstance(instanceOptions)), liveLayer), opts)
instance.only = <A, E2>(
name: string,
value: Body<A, E2, R | TestInstance | Scope.Scope>,
instanceOptions?: InstanceOptions,
opts?: number | TestOptions,
) => test.only(name, () => run(body(value).pipe(withTmpdirInstance(instanceOptions)), liveLayer), opts)
instance.skip = <A, E2>(
name: string,
value: Body<A, E2, R | TestInstance | Scope.Scope>,
instanceOptions?: InstanceOptions,
opts?: number | TestOptions,
) => test.skip(name, () => run(body(value).pipe(withTmpdirInstance(instanceOptions)), liveLayer), opts)
return { effect, live, instance }
}
// Test environment with TestClock and TestConsole