diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts index e57eec4e82..c70fc27843 100644 --- a/packages/opencode/src/sync/index.ts +++ b/packages/opencode/src/sync/index.ts @@ -59,13 +59,103 @@ export interface Interface { export class Service extends Context.Service()("@opencode/SyncEvent") {} -export const layer = Layer.succeed( - Service, - Service.of({ - run: (def, data, options) => Effect.sync(() => run(def, data, options)), - replay: (event, options) => Effect.sync(() => replay(event, options)), - replayAll: (events, options) => Effect.sync(() => replayAll(events, options)), - remove: (aggregateID) => Effect.sync(() => remove(aggregateID)), +export const layer = Layer.effect(Service)( + Effect.gen(function* () { + const replay: Interface["replay"] = Effect.fn("SyncEvent.replay")(function* (event, options) { + const def = registry.get(event.type) + if (!def) { + throw new Error(`Unknown event type: ${event.type}`) + } + + const row = Database.use((db) => + db + .select({ seq: EventSequenceTable.seq }) + .from(EventSequenceTable) + .where(eq(EventSequenceTable.aggregate_id, event.aggregateID)) + .get(), + ) + + const latest = row?.seq ?? -1 + if (event.seq <= latest) return + + const expected = latest + 1 + if (event.seq !== expected) { + throw new Error( + `Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`, + ) + } + + process(def, event, { publish: !!options?.publish }) + }) + + const replayAll: Interface["replayAll"] = Effect.fn("SyncEvent.replayAll")(function* (events, options) { + const source = events[0]?.aggregateID + if (!source) return undefined + if (events.some((item) => item.aggregateID !== source)) { + throw new Error("Replay events must belong to the same session") + } + const start = events[0].seq + for (const [i, item] of events.entries()) { + const seq = start + i + if (item.seq !== seq) { + throw new Error(`Replay sequence mismatch at index ${i}: expected ${seq}, got ${item.seq}`) + } + } + for (const item of events) { + yield* replay(item, options) + } + return source + }) + + const run: Interface["run"] = Effect.fn("SyncEvent.run")(function* (def, data, options) { + const agg = (data as Record)[def.aggregate] + // This should never happen: we've enforced it via typescript in + // the definition + if (agg == null) { + throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`) + } + + if (def.version !== versions.get(def.type)) { + throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`) + } + + const { publish = true } = options || {} + + // Note that this is an "immediate" transaction which is critical. + // We need to make sure we can safely read and write with nothing + // else changing the data from under us + Database.transaction( + (tx) => { + const id = EventID.ascending() + const row = tx + .select({ seq: EventSequenceTable.seq }) + .from(EventSequenceTable) + .where(eq(EventSequenceTable.aggregate_id, agg)) + .get() + const seq = row?.seq != null ? row.seq + 1 : 0 + + const event = { id, seq, aggregateID: agg, data } + process(def, event, { publish }) + }, + { + behavior: "immediate", + }, + ) + }) + + const remove: Interface["remove"] = Effect.fn("SyncEvent.remove")(function* (aggregateID) { + Database.transaction((tx) => { + tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run() + tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run() + }) + }) + + return Service.of({ + run, + replay, + replayAll, + remove, + }) }), ) @@ -211,92 +301,19 @@ function process(def: Def, event: Event, options: { } export function replay(event: SerializedEvent, options?: { publish: boolean }) { - const def = registry.get(event.type) - if (!def) { - throw new Error(`Unknown event type: ${event.type}`) - } - - const row = Database.use((db) => - db - .select({ seq: EventSequenceTable.seq }) - .from(EventSequenceTable) - .where(eq(EventSequenceTable.aggregate_id, event.aggregateID)) - .get(), - ) - - const latest = row?.seq ?? -1 - if (event.seq <= latest) { - return - } - - const expected = latest + 1 - if (event.seq !== expected) { - throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`) - } - - process(def, event, { publish: !!options?.publish }) + return Effect.runSync(Service.use((sync) => sync.replay(event, options)).pipe(Effect.provide(defaultLayer))) } export function replayAll(events: SerializedEvent[], options?: { publish: boolean }) { - const source = events[0]?.aggregateID - if (!source) return - if (events.some((item) => item.aggregateID !== source)) { - throw new Error("Replay events must belong to the same session") - } - const start = events[0].seq - for (const [i, item] of events.entries()) { - const seq = start + i - if (item.seq !== seq) { - throw new Error(`Replay sequence mismatch at index ${i}: expected ${seq}, got ${item.seq}`) - } - } - for (const item of events) { - replay(item, options) - } - return source + return Effect.runSync(Service.use((sync) => sync.replayAll(events, options)).pipe(Effect.provide(defaultLayer))) } export function run(def: Def, data: Event["data"], options?: { publish?: boolean }) { - const agg = (data as Record)[def.aggregate] - // This should never happen: we've enforced it via typescript in - // the definition - if (agg == null) { - throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`) - } - - if (def.version !== versions.get(def.type)) { - throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`) - } - - const { publish = true } = options || {} - - // Note that this is an "immediate" transaction which is critical. - // We need to make sure we can safely read and write with nothing - // else changing the data from under us - Database.transaction( - (tx) => { - const id = EventID.ascending() - const row = tx - .select({ seq: EventSequenceTable.seq }) - .from(EventSequenceTable) - .where(eq(EventSequenceTable.aggregate_id, agg)) - .get() - const seq = row?.seq != null ? row.seq + 1 : 0 - - const event = { id, seq, aggregateID: agg, data } - process(def, event, { publish }) - }, - { - behavior: "immediate", - }, - ) + return Effect.runSync(Service.use((sync) => sync.run(def, data, options)).pipe(Effect.provide(defaultLayer))) } export function remove(aggregateID: string) { - Database.transaction((tx) => { - tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run() - tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run() - }) + return Effect.runSync(Service.use((sync) => sync.remove(aggregateID)).pipe(Effect.provide(defaultLayer))) } export function payloads() { diff --git a/packages/opencode/test/control-plane/workspace.test.ts b/packages/opencode/test/control-plane/workspace.test.ts index 6e68730a90..594789b207 100644 --- a/packages/opencode/test/control-plane/workspace.test.ts +++ b/packages/opencode/test/control-plane/workspace.test.ts @@ -1,4 +1,4 @@ -import { afterEach, beforeEach, describe, expect, mock, spyOn, test } from "bun:test" +import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test" import fs from "node:fs/promises" import Http from "node:http" import path from "node:path" @@ -1426,48 +1426,41 @@ describe("workspace-old sessionRestore", () => { }) }) - test("local restore replays batches without fetch and emits progress", async () => { - await withInstance(async (dir) => { - const captured = captureGlobalEvents() - let fetchCallCount = 0 - const replayAll = spyOn(SyncEvent, "replayAll") - try { - using server = Bun.serve({ - port: 0, - fetch() { - fetchCallCount++ - return Response.json({ ok: true }) - }, - }) - const type = unique("restore-local") - const info = workspaceInfo(Instance.project.id, type, { directory: dir }) - insertWorkspace(info) - registerAdaptor(Instance.project.id, type, localAdaptor(dir).adaptor) - const session = await AppRuntime.runPromise( - SessionNs.Service.use((svc) => svc.create({ title: "restore local" })), - ) - replaceSessionEvents(session.id, 20) + it.live("local restore replays batches and emits progress", () => + provideTmpdirInstance( + (dir) => + Effect.gen(function* () { + const workspace = yield* WorkspaceOld.Service + const sessionSvc = yield* SessionNs.Service + const captured = captureGlobalEvents() + try { + const type = unique("restore-local") + const info = workspaceInfo(Instance.project.id, type, { directory: dir }) + insertWorkspace(info) + registerAdaptor(Instance.project.id, type, localAdaptor(dir).adaptor) + const session = yield* sessionSvc.create({ title: "restore local" }) + replaceSessionEvents(session.id, 20) - expect(await restoreWorkspaceSession({ workspaceID: info.id, sessionID: session.id })).toEqual({ total: 3 }) - - expect(fetchCallCount).toBe(0) - expect(replayAll).toHaveBeenCalledTimes(3) - expect(replayAll.mock.calls.map((call) => call[0].length)).toEqual([10, 10, 1]) - expect((await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.get(session.id)))).workspaceID).toBe( - info.id, - ) - expect(eventRows(session.id).map((row) => row.seq)).toEqual(Array.from({ length: 21 }, (_, i) => i)) - expect( - captured.events - .filter((event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Restore.type) - .map((event) => event.payload.properties.step), - ).toEqual([0, 1, 2, 3]) - await removeWorkspace(info.id) - } finally { - captured.dispose() - } - }) - }) + expect(yield* workspace.sessionRestore({ workspaceID: info.id, sessionID: session.id })).toEqual({ + total: 3, + }) + expect((yield* sessionSvc.get(session.id)).workspaceID).toBe(info.id) + expect(eventRows(session.id).map((row) => row.seq)).toEqual(Array.from({ length: 21 }, (_, i) => i)) + expect( + captured.events + .filter( + (event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Restore.type, + ) + .map((event) => event.payload.properties.step), + ).toEqual([0, 1, 2, 3]) + yield* workspace.remove(info.id) + } finally { + captured.dispose() + } + }), + { git: true }, + ), + ) it.live("session restore includes real message and part events in sequence order", () => { const replay: FetchCall[] = []