Simplify SyncEvent service access

This commit is contained in:
Kit Langton 2026-04-30 16:39:15 -04:00
parent bdefdc2306
commit 4f5af93e44
3 changed files with 134 additions and 138 deletions

View file

@ -94,7 +94,7 @@ export const SyncRoutes = lazy(() =>
last: events.at(-1)?.seq,
directory: body.directory,
})
await AppRuntime.runPromise(SyncEvent.Service.use((sync) => sync.replayAll(events)))
await AppRuntime.runPromise(SyncEvent.use.replayAll(events))
log.info("sync replay complete", {
sessionID: source,

View file

@ -12,6 +12,8 @@ import { Flag } from "@opencode-ai/core/flag/flag"
import { Context, Effect, Layer, Schema as EffectSchema } from "effect"
import { zodObject } from "@/util/effect-zod"
import type { DeepMutable } from "@/util/schema"
import { makeRuntime } from "@/effect/run-service"
import { serviceUse } from "@/effect/service-use"
// Keep `Event["data"]` mutable because projectors mutate the persisted shape
// when writing to the database. Bus payloads (`Properties`) stay readonly —
@ -161,6 +163,10 @@ export const layer = Layer.effect(Service)(
export const defaultLayer = layer
export const use = serviceUse(Service)
const runtime = makeRuntime(Service, defaultLayer)
export const registry = new Map<string, Definition>()
let projectors: Map<Definition, ProjectorFunc> | undefined
const versions = new Map<string, number>()
@ -301,19 +307,19 @@ function process<Def extends Definition>(def: Def, event: Event<Def>, options: {
}
export function replay(event: SerializedEvent, options?: { publish: boolean }) {
return Effect.runSync(Service.use((sync) => sync.replay(event, options)).pipe(Effect.provide(defaultLayer)))
return runtime.runSync((sync) => sync.replay(event, options))
}
export function replayAll(events: SerializedEvent[], options?: { publish: boolean }) {
return Effect.runSync(Service.use((sync) => sync.replayAll(events, options)).pipe(Effect.provide(defaultLayer)))
return runtime.runSync((sync) => sync.replayAll(events, options))
}
export function run<Def extends Definition>(def: Def, data: Event<Def>["data"], options?: { publish?: boolean }) {
return Effect.runSync(Service.use((sync) => sync.run(def, data, options)).pipe(Effect.provide(defaultLayer)))
return runtime.runSync((sync) => sync.run(def, data, options))
}
export function remove(aggregateID: string) {
return Effect.runSync(Service.use((sync) => sync.remove(aggregateID)).pipe(Effect.provide(defaultLayer)))
return runtime.runSync((sync) => sync.remove(aggregateID))
}
export function payloads() {

View file

@ -1,16 +1,18 @@
import { describe, test, expect, beforeEach, afterEach, afterAll } from "bun:test"
import { tmpdir } from "../fixture/fixture"
import { Effect, Schema } from "effect"
import { describe, expect, beforeEach, afterEach, afterAll } from "bun:test"
import { provideTmpdirInstance } from "../fixture/fixture"
import { Effect, Layer, Schema } from "effect"
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
import { Bus } from "../../src/bus"
import { Instance } from "../../src/project/instance"
import { SyncEvent } from "../../src/sync"
import { Database } from "@/storage/db"
import { EventTable } from "../../src/sync/event.sql"
import { Identifier } from "../../src/id/id"
import { MessageID } from "../../src/session/schema"
import { Flag } from "@opencode-ai/core/flag/flag"
import { initProjectors } from "../../src/server/projectors"
import { testEffect } from "../lib/effect"
const original = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES
const it = testEffect(Layer.mergeAll(SyncEvent.defaultLayer, CrossSpawnSpawner.defaultLayer))
beforeEach(() => {
Database.close()
@ -22,34 +24,6 @@ afterEach(() => {
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = original
})
function withInstance(fn: () => void | Promise<void>) {
return async () => {
await using tmp = await tmpdir()
await Instance.provide({
directory: tmp.path,
fn: async () => {
await fn()
},
})
}
}
function runSyncEvent<A>(fn: (sync: SyncEvent.Interface) => Effect.Effect<A>) {
return Effect.runPromise(SyncEvent.Service.use(fn).pipe(Effect.provide(SyncEvent.defaultLayer)))
}
async function expectRejects(input: Promise<unknown>, pattern: RegExp) {
try {
await input
} catch (error) {
if (!(error instanceof Error)) throw error
expect(error.message).toMatch(pattern)
return
}
throw new Error("Expected promise to reject")
}
describe("SyncEvent", () => {
function setup() {
SyncEvent.reset()
@ -74,151 +48,169 @@ describe("SyncEvent", () => {
return { Created, Sent }
}
function expectDefect<A, E, R>(effect: Effect.Effect<A, E, R>, pattern: RegExp) {
return Effect.gen(function* () {
const exit = yield* Effect.exit(effect)
if (exit._tag === "Success") throw new Error("Expected effect to fail")
expect(String(exit.cause)).toMatch(pattern)
})
}
afterAll(() => {
SyncEvent.reset()
initProjectors()
})
describe("run", () => {
test(
it.live(
"inserts event row",
withInstance(async () => {
const { Created } = setup()
await runSyncEvent((sync) => sync.run(Created, { id: "evt_1", name: "first" }))
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(1)
expect(rows[0].type).toBe("item.created.1")
expect(rows[0].aggregate_id).toBe("evt_1")
}),
provideTmpdirInstance(() =>
Effect.gen(function* () {
const { Created } = setup()
yield* SyncEvent.use.run(Created, { id: "evt_1", name: "first" })
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(1)
expect(rows[0].type).toBe("item.created.1")
expect(rows[0].aggregate_id).toBe("evt_1")
}),
),
)
test(
it.live(
"increments seq per aggregate",
withInstance(async () => {
const { Created } = setup()
await runSyncEvent((sync) => sync.run(Created, { id: "evt_1", name: "first" }))
await runSyncEvent((sync) => sync.run(Created, { id: "evt_1", name: "second" }))
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(2)
expect(rows[1].seq).toBe(rows[0].seq + 1)
}),
provideTmpdirInstance(() =>
Effect.gen(function* () {
const { Created } = setup()
yield* SyncEvent.use.run(Created, { id: "evt_1", name: "first" })
yield* SyncEvent.use.run(Created, { id: "evt_1", name: "second" })
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(2)
expect(rows[1].seq).toBe(rows[0].seq + 1)
}),
),
)
test(
it.live(
"uses custom aggregate field from agg()",
withInstance(async () => {
const { Sent } = setup()
await runSyncEvent((sync) => sync.run(Sent, { item_id: "evt_1", to: "james" }))
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(1)
expect(rows[0].aggregate_id).toBe("evt_1")
}),
provideTmpdirInstance(() =>
Effect.gen(function* () {
const { Sent } = setup()
yield* SyncEvent.use.run(Sent, { item_id: "evt_1", to: "james" })
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(1)
expect(rows[0].aggregate_id).toBe("evt_1")
}),
),
)
test(
it.live(
"emits events",
withInstance(async () => {
const { Created } = setup()
const events: Array<{
type: string
properties: { id: string; name: string }
}> = []
const received = new Promise<void>((resolve) => {
Bus.subscribeAll((event) => {
provideTmpdirInstance(() =>
Effect.gen(function* () {
const { Created } = setup()
const events: Array<{
type: string
properties: { id: string; name: string }
}> = []
let resolve = () => {}
const received = new Promise<void>((done) => {
resolve = done
})
const dispose = Bus.subscribeAll((event) => {
events.push(event)
resolve()
})
})
await runSyncEvent((sync) => sync.run(Created, { id: "evt_1", name: "test" }))
await received
expect(events).toHaveLength(1)
expect(events[0]).toEqual({
type: "item.created",
properties: {
id: "evt_1",
name: "test",
},
})
}),
try {
yield* SyncEvent.use.run(Created, { id: "evt_1", name: "test" })
yield* Effect.promise(() => received)
expect(events).toHaveLength(1)
expect(events[0]).toEqual({
type: "item.created",
properties: {
id: "evt_1",
name: "test",
},
})
} finally {
dispose()
}
}),
),
)
})
describe("replay", () => {
test(
it.live(
"inserts event from external payload",
withInstance(async () => {
const id = Identifier.descending("message")
await runSyncEvent((sync) =>
sync.replay({
provideTmpdirInstance(() =>
Effect.gen(function* () {
const id = MessageID.ascending()
yield* SyncEvent.use.replay({
id: "evt_1",
type: "item.created.1",
seq: 0,
aggregateID: id,
data: { id, name: "replayed" },
}),
)
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(1)
expect(rows[0].aggregate_id).toBe(id)
}),
})
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(1)
expect(rows[0].aggregate_id).toBe(id)
}),
),
)
test(
it.live(
"throws on sequence mismatch",
withInstance(async () => {
const id = Identifier.descending("message")
await runSyncEvent((sync) =>
sync.replay({
provideTmpdirInstance(() =>
Effect.gen(function* () {
const id = MessageID.ascending()
yield* SyncEvent.use.replay({
id: "evt_1",
type: "item.created.1",
seq: 0,
aggregateID: id,
data: { id, name: "first" },
}),
)
await expectRejects(
runSyncEvent((sync) =>
sync.replay({
})
yield* expectDefect(
SyncEvent.use.replay({
id: "evt_1",
type: "item.created.1",
seq: 5,
aggregateID: id,
data: { id, name: "bad" },
}),
),
/Sequence mismatch/,
)
}),
/Sequence mismatch/,
)
}),
),
)
test(
it.live(
"throws on unknown event type",
withInstance(async () => {
await expectRejects(
runSyncEvent((sync) =>
sync.replay({
provideTmpdirInstance(() =>
Effect.gen(function* () {
yield* expectDefect(
SyncEvent.use.replay({
id: "evt_1",
type: "unknown.event.1",
seq: 0,
aggregateID: "x",
data: {},
}),
),
/Unknown event type/,
)
}),
/Unknown event type/,
)
}),
),
)
test(
it.live(
"replayAll accepts later chunks after the first batch",
withInstance(async () => {
const { Created } = setup()
const id = Identifier.descending("message")
provideTmpdirInstance(() =>
Effect.gen(function* () {
const { Created } = setup()
const id = MessageID.ascending()
const one = await runSyncEvent((sync) =>
sync.replayAll([
const one = yield* SyncEvent.use.replayAll([
{
id: "evt_1",
type: SyncEvent.versionedType(Created.type, Created.version),
@ -233,11 +225,9 @@ describe("SyncEvent", () => {
aggregateID: id,
data: { id, name: "second" },
},
]),
)
])
const two = await runSyncEvent((sync) =>
sync.replayAll([
const two = yield* SyncEvent.use.replayAll([
{
id: "evt_3",
type: SyncEvent.versionedType(Created.type, Created.version),
@ -252,15 +242,15 @@ describe("SyncEvent", () => {
aggregateID: id,
data: { id, name: "fourth" },
},
]),
)
])
expect(one).toBe(id)
expect(two).toBe(id)
expect(one).toBe(id)
expect(two).toBe(id)
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows.map((row) => row.seq)).toEqual([0, 1, 2, 3])
}),
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows.map((row) => row.seq)).toEqual([0, 1, 2, 3])
}),
),
)
})
})