Make SyncEvent layer canonical

This commit is contained in:
Kit Langton 2026-04-30 16:28:30 -04:00
parent ec98b656fd
commit bdefdc2306
2 changed files with 136 additions and 126 deletions

View file

@ -59,13 +59,103 @@ export interface Interface {
export class Service extends Context.Service<Service, Interface>()("@opencode/SyncEvent") {}
export const layer = Layer.succeed(
Service,
Service.of({
run: (def, data, options) => Effect.sync(() => run(def, data, options)),
replay: (event, options) => Effect.sync(() => replay(event, options)),
replayAll: (events, options) => Effect.sync(() => replayAll(events, options)),
remove: (aggregateID) => Effect.sync(() => remove(aggregateID)),
export const layer = Layer.effect(Service)(
Effect.gen(function* () {
const replay: Interface["replay"] = Effect.fn("SyncEvent.replay")(function* (event, options) {
const def = registry.get(event.type)
if (!def) {
throw new Error(`Unknown event type: ${event.type}`)
}
const row = Database.use((db) =>
db
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
.get(),
)
const latest = row?.seq ?? -1
if (event.seq <= latest) return
const expected = latest + 1
if (event.seq !== expected) {
throw new Error(
`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`,
)
}
process(def, event, { publish: !!options?.publish })
})
const replayAll: Interface["replayAll"] = Effect.fn("SyncEvent.replayAll")(function* (events, options) {
const source = events[0]?.aggregateID
if (!source) return undefined
if (events.some((item) => item.aggregateID !== source)) {
throw new Error("Replay events must belong to the same session")
}
const start = events[0].seq
for (const [i, item] of events.entries()) {
const seq = start + i
if (item.seq !== seq) {
throw new Error(`Replay sequence mismatch at index ${i}: expected ${seq}, got ${item.seq}`)
}
}
for (const item of events) {
yield* replay(item, options)
}
return source
})
const run: Interface["run"] = Effect.fn("SyncEvent.run")(function* (def, data, options) {
const agg = (data as Record<string, string>)[def.aggregate]
// This should never happen: we've enforced it via typescript in
// the definition
if (agg == null) {
throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
}
if (def.version !== versions.get(def.type)) {
throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
}
const { publish = true } = options || {}
// Note that this is an "immediate" transaction which is critical.
// We need to make sure we can safely read and write with nothing
// else changing the data from under us
Database.transaction(
(tx) => {
const id = EventID.ascending()
const row = tx
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, agg))
.get()
const seq = row?.seq != null ? row.seq + 1 : 0
const event = { id, seq, aggregateID: agg, data }
process(def, event, { publish })
},
{
behavior: "immediate",
},
)
})
const remove: Interface["remove"] = Effect.fn("SyncEvent.remove")(function* (aggregateID) {
Database.transaction((tx) => {
tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
})
})
return Service.of({
run,
replay,
replayAll,
remove,
})
}),
)
@ -211,92 +301,19 @@ function process<Def extends Definition>(def: Def, event: Event<Def>, options: {
}
export function replay(event: SerializedEvent, options?: { publish: boolean }) {
const def = registry.get(event.type)
if (!def) {
throw new Error(`Unknown event type: ${event.type}`)
}
const row = Database.use((db) =>
db
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
.get(),
)
const latest = row?.seq ?? -1
if (event.seq <= latest) {
return
}
const expected = latest + 1
if (event.seq !== expected) {
throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`)
}
process(def, event, { publish: !!options?.publish })
return Effect.runSync(Service.use((sync) => sync.replay(event, options)).pipe(Effect.provide(defaultLayer)))
}
export function replayAll(events: SerializedEvent[], options?: { publish: boolean }) {
const source = events[0]?.aggregateID
if (!source) return
if (events.some((item) => item.aggregateID !== source)) {
throw new Error("Replay events must belong to the same session")
}
const start = events[0].seq
for (const [i, item] of events.entries()) {
const seq = start + i
if (item.seq !== seq) {
throw new Error(`Replay sequence mismatch at index ${i}: expected ${seq}, got ${item.seq}`)
}
}
for (const item of events) {
replay(item, options)
}
return source
return Effect.runSync(Service.use((sync) => sync.replayAll(events, options)).pipe(Effect.provide(defaultLayer)))
}
export function run<Def extends Definition>(def: Def, data: Event<Def>["data"], options?: { publish?: boolean }) {
const agg = (data as Record<string, string>)[def.aggregate]
// This should never happen: we've enforced it via typescript in
// the definition
if (agg == null) {
throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
}
if (def.version !== versions.get(def.type)) {
throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
}
const { publish = true } = options || {}
// Note that this is an "immediate" transaction which is critical.
// We need to make sure we can safely read and write with nothing
// else changing the data from under us
Database.transaction(
(tx) => {
const id = EventID.ascending()
const row = tx
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, agg))
.get()
const seq = row?.seq != null ? row.seq + 1 : 0
const event = { id, seq, aggregateID: agg, data }
process(def, event, { publish })
},
{
behavior: "immediate",
},
)
return Effect.runSync(Service.use((sync) => sync.run(def, data, options)).pipe(Effect.provide(defaultLayer)))
}
export function remove(aggregateID: string) {
Database.transaction((tx) => {
tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
})
return Effect.runSync(Service.use((sync) => sync.remove(aggregateID)).pipe(Effect.provide(defaultLayer)))
}
export function payloads() {

View file

@ -1,4 +1,4 @@
import { afterEach, beforeEach, describe, expect, mock, spyOn, test } from "bun:test"
import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"
import fs from "node:fs/promises"
import Http from "node:http"
import path from "node:path"
@ -1426,48 +1426,41 @@ describe("workspace-old sessionRestore", () => {
})
})
test("local restore replays batches without fetch and emits progress", async () => {
await withInstance(async (dir) => {
const captured = captureGlobalEvents()
let fetchCallCount = 0
const replayAll = spyOn(SyncEvent, "replayAll")
try {
using server = Bun.serve({
port: 0,
fetch() {
fetchCallCount++
return Response.json({ ok: true })
},
})
const type = unique("restore-local")
const info = workspaceInfo(Instance.project.id, type, { directory: dir })
insertWorkspace(info)
registerAdaptor(Instance.project.id, type, localAdaptor(dir).adaptor)
const session = await AppRuntime.runPromise(
SessionNs.Service.use((svc) => svc.create({ title: "restore local" })),
)
replaceSessionEvents(session.id, 20)
it.live("local restore replays batches and emits progress", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const workspace = yield* WorkspaceOld.Service
const sessionSvc = yield* SessionNs.Service
const captured = captureGlobalEvents()
try {
const type = unique("restore-local")
const info = workspaceInfo(Instance.project.id, type, { directory: dir })
insertWorkspace(info)
registerAdaptor(Instance.project.id, type, localAdaptor(dir).adaptor)
const session = yield* sessionSvc.create({ title: "restore local" })
replaceSessionEvents(session.id, 20)
expect(await restoreWorkspaceSession({ workspaceID: info.id, sessionID: session.id })).toEqual({ total: 3 })
expect(fetchCallCount).toBe(0)
expect(replayAll).toHaveBeenCalledTimes(3)
expect(replayAll.mock.calls.map((call) => call[0].length)).toEqual([10, 10, 1])
expect((await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.get(session.id)))).workspaceID).toBe(
info.id,
)
expect(eventRows(session.id).map((row) => row.seq)).toEqual(Array.from({ length: 21 }, (_, i) => i))
expect(
captured.events
.filter((event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Restore.type)
.map((event) => event.payload.properties.step),
).toEqual([0, 1, 2, 3])
await removeWorkspace(info.id)
} finally {
captured.dispose()
}
})
})
expect(yield* workspace.sessionRestore({ workspaceID: info.id, sessionID: session.id })).toEqual({
total: 3,
})
expect((yield* sessionSvc.get(session.id)).workspaceID).toBe(info.id)
expect(eventRows(session.id).map((row) => row.seq)).toEqual(Array.from({ length: 21 }, (_, i) => i))
expect(
captured.events
.filter(
(event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Restore.type,
)
.map((event) => event.payload.properties.step),
).toEqual([0, 1, 2, 3])
yield* workspace.remove(info.id)
} finally {
captured.dispose()
}
}),
{ git: true },
),
)
it.live("session restore includes real message and part events in sequence order", () => {
const replay: FetchCall[] = []