diff --git a/packages/opencode/src/server/routes/instance/sync.ts b/packages/opencode/src/server/routes/instance/sync.ts index 636739a2c8..b7bf413d4e 100644 --- a/packages/opencode/src/server/routes/instance/sync.ts +++ b/packages/opencode/src/server/routes/instance/sync.ts @@ -94,7 +94,7 @@ export const SyncRoutes = lazy(() => last: events.at(-1)?.seq, directory: body.directory, }) - await AppRuntime.runPromise(SyncEvent.Service.use((sync) => sync.replayAll(events))) + await AppRuntime.runPromise(SyncEvent.use.replayAll(events)) log.info("sync replay complete", { sessionID: source, diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts index c70fc27843..ebf7543af1 100644 --- a/packages/opencode/src/sync/index.ts +++ b/packages/opencode/src/sync/index.ts @@ -12,6 +12,8 @@ import { Flag } from "@opencode-ai/core/flag/flag" import { Context, Effect, Layer, Schema as EffectSchema } from "effect" import { zodObject } from "@/util/effect-zod" import type { DeepMutable } from "@/util/schema" +import { makeRuntime } from "@/effect/run-service" +import { serviceUse } from "@/effect/service-use" // Keep `Event["data"]` mutable because projectors mutate the persisted shape // when writing to the database. Bus payloads (`Properties`) stay readonly — @@ -161,6 +163,10 @@ export const layer = Layer.effect(Service)( export const defaultLayer = layer +export const use = serviceUse(Service) + +const runtime = makeRuntime(Service, defaultLayer) + export const registry = new Map() let projectors: Map | undefined const versions = new Map() @@ -301,19 +307,19 @@ function process(def: Def, event: Event, options: { } export function replay(event: SerializedEvent, options?: { publish: boolean }) { - return Effect.runSync(Service.use((sync) => sync.replay(event, options)).pipe(Effect.provide(defaultLayer))) + return runtime.runSync((sync) => sync.replay(event, options)) } export function replayAll(events: SerializedEvent[], options?: { publish: boolean }) { - return Effect.runSync(Service.use((sync) => sync.replayAll(events, options)).pipe(Effect.provide(defaultLayer))) + return runtime.runSync((sync) => sync.replayAll(events, options)) } export function run(def: Def, data: Event["data"], options?: { publish?: boolean }) { - return Effect.runSync(Service.use((sync) => sync.run(def, data, options)).pipe(Effect.provide(defaultLayer))) + return runtime.runSync((sync) => sync.run(def, data, options)) } export function remove(aggregateID: string) { - return Effect.runSync(Service.use((sync) => sync.remove(aggregateID)).pipe(Effect.provide(defaultLayer))) + return runtime.runSync((sync) => sync.remove(aggregateID)) } export function payloads() { diff --git a/packages/opencode/test/sync/index.test.ts b/packages/opencode/test/sync/index.test.ts index 160d7b02dc..0afbb18317 100644 --- a/packages/opencode/test/sync/index.test.ts +++ b/packages/opencode/test/sync/index.test.ts @@ -1,16 +1,18 @@ -import { describe, test, expect, beforeEach, afterEach, afterAll } from "bun:test" -import { tmpdir } from "../fixture/fixture" -import { Effect, Schema } from "effect" +import { describe, expect, beforeEach, afterEach, afterAll } from "bun:test" +import { provideTmpdirInstance } from "../fixture/fixture" +import { Effect, Layer, Schema } from "effect" +import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner" import { Bus } from "../../src/bus" -import { Instance } from "../../src/project/instance" import { SyncEvent } from "../../src/sync" import { Database } from "@/storage/db" import { EventTable } from "../../src/sync/event.sql" -import { Identifier } from "../../src/id/id" +import { MessageID } from "../../src/session/schema" import { Flag } from "@opencode-ai/core/flag/flag" import { initProjectors } from "../../src/server/projectors" +import { testEffect } from "../lib/effect" const original = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES +const it = testEffect(Layer.mergeAll(SyncEvent.defaultLayer, CrossSpawnSpawner.defaultLayer)) beforeEach(() => { Database.close() @@ -22,34 +24,6 @@ afterEach(() => { Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = original }) -function withInstance(fn: () => void | Promise) { - return async () => { - await using tmp = await tmpdir() - - await Instance.provide({ - directory: tmp.path, - fn: async () => { - await fn() - }, - }) - } -} - -function runSyncEvent(fn: (sync: SyncEvent.Interface) => Effect.Effect) { - return Effect.runPromise(SyncEvent.Service.use(fn).pipe(Effect.provide(SyncEvent.defaultLayer))) -} - -async function expectRejects(input: Promise, pattern: RegExp) { - try { - await input - } catch (error) { - if (!(error instanceof Error)) throw error - expect(error.message).toMatch(pattern) - return - } - throw new Error("Expected promise to reject") -} - describe("SyncEvent", () => { function setup() { SyncEvent.reset() @@ -74,151 +48,169 @@ describe("SyncEvent", () => { return { Created, Sent } } + function expectDefect(effect: Effect.Effect, pattern: RegExp) { + return Effect.gen(function* () { + const exit = yield* Effect.exit(effect) + if (exit._tag === "Success") throw new Error("Expected effect to fail") + expect(String(exit.cause)).toMatch(pattern) + }) + } + afterAll(() => { SyncEvent.reset() initProjectors() }) describe("run", () => { - test( + it.live( "inserts event row", - withInstance(async () => { - const { Created } = setup() - await runSyncEvent((sync) => sync.run(Created, { id: "evt_1", name: "first" })) - const rows = Database.use((db) => db.select().from(EventTable).all()) - expect(rows).toHaveLength(1) - expect(rows[0].type).toBe("item.created.1") - expect(rows[0].aggregate_id).toBe("evt_1") - }), + provideTmpdirInstance(() => + Effect.gen(function* () { + const { Created } = setup() + yield* SyncEvent.use.run(Created, { id: "evt_1", name: "first" }) + const rows = Database.use((db) => db.select().from(EventTable).all()) + expect(rows).toHaveLength(1) + expect(rows[0].type).toBe("item.created.1") + expect(rows[0].aggregate_id).toBe("evt_1") + }), + ), ) - test( + it.live( "increments seq per aggregate", - withInstance(async () => { - const { Created } = setup() - await runSyncEvent((sync) => sync.run(Created, { id: "evt_1", name: "first" })) - await runSyncEvent((sync) => sync.run(Created, { id: "evt_1", name: "second" })) - const rows = Database.use((db) => db.select().from(EventTable).all()) - expect(rows).toHaveLength(2) - expect(rows[1].seq).toBe(rows[0].seq + 1) - }), + provideTmpdirInstance(() => + Effect.gen(function* () { + const { Created } = setup() + yield* SyncEvent.use.run(Created, { id: "evt_1", name: "first" }) + yield* SyncEvent.use.run(Created, { id: "evt_1", name: "second" }) + const rows = Database.use((db) => db.select().from(EventTable).all()) + expect(rows).toHaveLength(2) + expect(rows[1].seq).toBe(rows[0].seq + 1) + }), + ), ) - test( + it.live( "uses custom aggregate field from agg()", - withInstance(async () => { - const { Sent } = setup() - await runSyncEvent((sync) => sync.run(Sent, { item_id: "evt_1", to: "james" })) - const rows = Database.use((db) => db.select().from(EventTable).all()) - expect(rows).toHaveLength(1) - expect(rows[0].aggregate_id).toBe("evt_1") - }), + provideTmpdirInstance(() => + Effect.gen(function* () { + const { Sent } = setup() + yield* SyncEvent.use.run(Sent, { item_id: "evt_1", to: "james" }) + const rows = Database.use((db) => db.select().from(EventTable).all()) + expect(rows).toHaveLength(1) + expect(rows[0].aggregate_id).toBe("evt_1") + }), + ), ) - test( + it.live( "emits events", - withInstance(async () => { - const { Created } = setup() - const events: Array<{ - type: string - properties: { id: string; name: string } - }> = [] - const received = new Promise((resolve) => { - Bus.subscribeAll((event) => { + provideTmpdirInstance(() => + Effect.gen(function* () { + const { Created } = setup() + const events: Array<{ + type: string + properties: { id: string; name: string } + }> = [] + let resolve = () => {} + const received = new Promise((done) => { + resolve = done + }) + const dispose = Bus.subscribeAll((event) => { events.push(event) resolve() }) - }) - - await runSyncEvent((sync) => sync.run(Created, { id: "evt_1", name: "test" })) - - await received - expect(events).toHaveLength(1) - expect(events[0]).toEqual({ - type: "item.created", - properties: { - id: "evt_1", - name: "test", - }, - }) - }), + try { + yield* SyncEvent.use.run(Created, { id: "evt_1", name: "test" }) + yield* Effect.promise(() => received) + expect(events).toHaveLength(1) + expect(events[0]).toEqual({ + type: "item.created", + properties: { + id: "evt_1", + name: "test", + }, + }) + } finally { + dispose() + } + }), + ), ) }) describe("replay", () => { - test( + it.live( "inserts event from external payload", - withInstance(async () => { - const id = Identifier.descending("message") - await runSyncEvent((sync) => - sync.replay({ + provideTmpdirInstance(() => + Effect.gen(function* () { + const id = MessageID.ascending() + yield* SyncEvent.use.replay({ id: "evt_1", type: "item.created.1", seq: 0, aggregateID: id, data: { id, name: "replayed" }, - }), - ) - const rows = Database.use((db) => db.select().from(EventTable).all()) - expect(rows).toHaveLength(1) - expect(rows[0].aggregate_id).toBe(id) - }), + }) + const rows = Database.use((db) => db.select().from(EventTable).all()) + expect(rows).toHaveLength(1) + expect(rows[0].aggregate_id).toBe(id) + }), + ), ) - test( + it.live( "throws on sequence mismatch", - withInstance(async () => { - const id = Identifier.descending("message") - await runSyncEvent((sync) => - sync.replay({ + provideTmpdirInstance(() => + Effect.gen(function* () { + const id = MessageID.ascending() + yield* SyncEvent.use.replay({ id: "evt_1", type: "item.created.1", seq: 0, aggregateID: id, data: { id, name: "first" }, - }), - ) - await expectRejects( - runSyncEvent((sync) => - sync.replay({ + }) + yield* expectDefect( + SyncEvent.use.replay({ id: "evt_1", type: "item.created.1", seq: 5, aggregateID: id, data: { id, name: "bad" }, }), - ), - /Sequence mismatch/, - ) - }), + /Sequence mismatch/, + ) + }), + ), ) - test( + it.live( "throws on unknown event type", - withInstance(async () => { - await expectRejects( - runSyncEvent((sync) => - sync.replay({ + provideTmpdirInstance(() => + Effect.gen(function* () { + yield* expectDefect( + SyncEvent.use.replay({ id: "evt_1", type: "unknown.event.1", seq: 0, aggregateID: "x", data: {}, }), - ), - /Unknown event type/, - ) - }), + /Unknown event type/, + ) + }), + ), ) - test( + it.live( "replayAll accepts later chunks after the first batch", - withInstance(async () => { - const { Created } = setup() - const id = Identifier.descending("message") + provideTmpdirInstance(() => + Effect.gen(function* () { + const { Created } = setup() + const id = MessageID.ascending() - const one = await runSyncEvent((sync) => - sync.replayAll([ + const one = yield* SyncEvent.use.replayAll([ { id: "evt_1", type: SyncEvent.versionedType(Created.type, Created.version), @@ -233,11 +225,9 @@ describe("SyncEvent", () => { aggregateID: id, data: { id, name: "second" }, }, - ]), - ) + ]) - const two = await runSyncEvent((sync) => - sync.replayAll([ + const two = yield* SyncEvent.use.replayAll([ { id: "evt_3", type: SyncEvent.versionedType(Created.type, Created.version), @@ -252,15 +242,15 @@ describe("SyncEvent", () => { aggregateID: id, data: { id, name: "fourth" }, }, - ]), - ) + ]) - expect(one).toBe(id) - expect(two).toBe(id) + expect(one).toBe(id) + expect(two).toBe(id) - const rows = Database.use((db) => db.select().from(EventTable).all()) - expect(rows.map((row) => row.seq)).toEqual([0, 1, 2, 3]) - }), + const rows = Database.use((db) => db.select().from(EventTable).all()) + expect(rows.map((row) => row.seq)).toEqual([0, 1, 2, 3]) + }), + ), ) }) })