diff --git a/packages/opencode/src/control-plane/workspace.ts b/packages/opencode/src/control-plane/workspace.ts index fe8046ba9c..7f9d078bb7 100644 --- a/packages/opencode/src/control-plane/workspace.ts +++ b/packages/opencode/src/control-plane/workspace.ts @@ -169,6 +169,7 @@ export const layer = Layer.effect( const auth = yield* Auth.Service const session = yield* Session.Service const http = yield* HttpClient.HttpClient + const sync = yield* SyncEvent.Service const connections = new Map() const syncFibers = yield* FiberMap.make() @@ -307,25 +308,30 @@ export const layer = Layer.effect( events: events.length, }) - yield* Effect.sync(() => - WorkspaceContext.provide({ + yield* Effect.promise(async () => { + await WorkspaceContext.provide({ workspaceID: space.id, - fn: () => { - for (const event of events) { - SyncEvent.replay( - { - id: event.id, - aggregateID: event.aggregate_id, - seq: event.seq, - type: event.type, - data: event.data, - }, - { publish: true }, - ) - } + async fn() { + await Effect.runPromise( + Effect.forEach( + events, + (event) => + sync.replay( + { + id: event.id, + aggregateID: event.aggregate_id, + seq: event.seq, + type: event.type, + data: event.data, + }, + { publish: true }, + ), + { discard: true }, + ), + ) }, - }), - ) + }) + }) }) const syncWorkspaceLoop = Effect.fn("Workspace.syncWorkspaceLoop")(function* (space: Info) { @@ -361,16 +367,28 @@ export const layer = Layer.effect( setStatus(space.id, "connected") yield* parseSSE(stream, (evt) => - Effect.sync(() => { + Effect.gen(function* () { + if (!evt || typeof evt !== "object" || !("payload" in evt)) return + const payload = evt.payload as { type?: string; syncEvent?: SyncEvent.SerializedEvent } + if (payload.type === "server.heartbeat") return + + if (payload.type === "sync" && payload.syncEvent) { + const failed = yield* sync.replay(payload.syncEvent).pipe( + Effect.as(false), + Effect.catchCause((error) => + Effect.sync(() => { + log.info("failed to replay global event", { + workspaceID: space.id, + error, + }) + return true + }), + ), + ) + if (failed) return + } + try { - if (!evt || typeof evt !== "object" || !("payload" in evt)) return - const payload = evt.payload as { type?: string; syncEvent?: SyncEvent.SerializedEvent } - if (payload.type === "server.heartbeat") return - - if (payload.type === "sync" && payload.syncEvent) { - SyncEvent.replay(payload.syncEvent) - } - const event = evt as { directory?: string; project?: string; payload: unknown } GlobalBus.emit("event", { directory: event.directory, @@ -378,10 +396,10 @@ export const layer = Layer.effect( workspace: space.id, payload: event.payload, }) - } catch (err) { + } catch (error) { log.info("failed to replay global event", { workspaceID: space.id, - error: err, + error, }) } }), @@ -516,14 +534,12 @@ export const layer = Layer.effect( const adaptor = getAdaptor(space.projectID, space.type) const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(space))) - yield* Effect.sync(() => - SyncEvent.run(Session.Event.Updated, { - sessionID: input.sessionID, - info: { - workspaceID: input.workspaceID, - }, - }), - ) + yield* sync.run(Session.Event.Updated, { + sessionID: input.sessionID, + info: { + workspaceID: input.workspaceID, + }, + }) const rows = yield* db((db) => db @@ -593,7 +609,7 @@ export const layer = Layer.effect( }) if (target.type === "local") { - SyncEvent.replayAll(events) + yield* sync.replayAll(events) log.info("session restore batch replayed locally", { workspaceID: input.workspaceID, sessionID: input.sessionID, @@ -812,6 +828,7 @@ export const layer = Layer.effect( export const defaultLayer = layer.pipe( Layer.provide(Auth.defaultLayer), Layer.provide(Session.defaultLayer), + Layer.provide(SyncEvent.defaultLayer), Layer.provide(FetchHttpClient.layer), ) diff --git a/packages/opencode/src/effect/app-runtime.ts b/packages/opencode/src/effect/app-runtime.ts index 84be170688..06969ff9d1 100644 --- a/packages/opencode/src/effect/app-runtime.ts +++ b/packages/opencode/src/effect/app-runtime.ts @@ -47,6 +47,7 @@ import { Pty } from "@/pty" import { Installation } from "@/installation" import { ShareNext } from "@/share/share-next" import { SessionShare } from "@/share/session" +import { SyncEvent } from "@/sync" import { Npm } from "@opencode-ai/core/npm" import { memoMap } from "@opencode-ai/core/effect/memo-map" @@ -97,6 +98,7 @@ export const AppLayer = Layer.mergeAll( Installation.defaultLayer, ShareNext.defaultLayer, SessionShare.defaultLayer, + SyncEvent.defaultLayer, ).pipe(Layer.provideMerge(Observability.layer)) const rt = ManagedRuntime.make(AppLayer, { memoMap }) diff --git a/packages/opencode/src/server/routes/instance/httpapi/handlers/sync.ts b/packages/opencode/src/server/routes/instance/httpapi/handlers/sync.ts index fbe1249939..f4a2f315cd 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/handlers/sync.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/handlers/sync.ts @@ -21,6 +21,7 @@ export const syncHandlers = HttpApiBuilder.group(InstanceHttpApi, "sync", (handl Effect.gen(function* () { const workspace = yield* Workspace.Service const scope = yield* Scope.Scope + const sync = yield* SyncEvent.Service const start = Effect.fn("SyncHttpApi.start")(function* () { yield* workspace @@ -45,7 +46,7 @@ export const syncHandlers = HttpApiBuilder.group(InstanceHttpApi, "sync", (handl last: events.at(-1)?.seq, directory: ctx.payload.directory, }) - SyncEvent.replayAll(events) + yield* sync.replayAll(events) log.info("sync replay complete", { sessionID: source, events: events.length, diff --git a/packages/opencode/src/server/routes/instance/httpapi/server.ts b/packages/opencode/src/server/routes/instance/httpapi/server.ts index 62fa18743a..f53ddb3ec5 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/server.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/server.ts @@ -31,6 +31,7 @@ import { SessionSummary } from "@/session/summary" import { Todo } from "@/session/todo" import { SessionShare } from "@/share/session" import { Skill } from "@/skill" +import { SyncEvent } from "@/sync" import { ToolRegistry } from "@/tool/registry" import { lazy } from "@/util/lazy" import { Vcs } from "@/project/vcs" @@ -147,6 +148,7 @@ export const routes = Layer.mergeAll(rootApiRoutes, instanceRoutes).pipe( SessionRunState.defaultLayer, SessionStatus.defaultLayer, SessionSummary.defaultLayer, + SyncEvent.defaultLayer, Skill.defaultLayer, Todo.defaultLayer, ToolRegistry.defaultLayer, diff --git a/packages/opencode/src/server/routes/instance/sync.ts b/packages/opencode/src/server/routes/instance/sync.ts index bb816ecc42..b7bf413d4e 100644 --- a/packages/opencode/src/server/routes/instance/sync.ts +++ b/packages/opencode/src/server/routes/instance/sync.ts @@ -94,7 +94,7 @@ export const SyncRoutes = lazy(() => last: events.at(-1)?.seq, directory: body.directory, }) - SyncEvent.replayAll(events) + await AppRuntime.runPromise(SyncEvent.use.replayAll(events)) log.info("sync replay complete", { sessionID: source, diff --git a/packages/opencode/src/session/revert.ts b/packages/opencode/src/session/revert.ts index da9952ccb2..58d69a2040 100644 --- a/packages/opencode/src/session/revert.ts +++ b/packages/opencode/src/session/revert.ts @@ -38,6 +38,7 @@ export const layer = Layer.effect( const bus = yield* Bus.Service const summary = yield* SessionSummary.Service const state = yield* SessionRunState.Service + const sync = yield* SyncEvent.Service const revert = Effect.fn("SessionRevert.revert")(function* (input: RevertInput) { yield* state.assertNotBusy(input.sessionID) @@ -121,7 +122,7 @@ export const layer = Layer.effect( remove.push(msg) } for (const msg of remove) { - SyncEvent.run(MessageV2.Event.Removed, { + yield* sync.run(MessageV2.Event.Removed, { sessionID, messageID: msg.info.id, }) @@ -133,7 +134,7 @@ export const layer = Layer.effect( const removeParts = target.parts.slice(idx) target.parts = target.parts.slice(0, idx) for (const part of removeParts) { - SyncEvent.run(MessageV2.Event.PartRemoved, { + yield* sync.run(MessageV2.Event.PartRemoved, { sessionID, messageID: target.info.id, partID: part.id, @@ -156,6 +157,7 @@ export const defaultLayer = Layer.suspend(() => Layer.provide(Storage.defaultLayer), Layer.provide(Bus.layer), Layer.provide(SessionSummary.defaultLayer), + Layer.provide(SyncEvent.defaultLayer), ), ) diff --git a/packages/opencode/src/session/session.ts b/packages/opencode/src/session/session.ts index 72c4d241eb..5534976e39 100644 --- a/packages/opencode/src/session/session.ts +++ b/packages/opencode/src/session/session.ts @@ -443,11 +443,12 @@ export type Patch = Types.DeepMutable["dat const db = (fn: (d: Parameters[0] extends (trx: infer D) => any ? D : never) => T) => Effect.sync(() => Database.use(fn)) -export const layer: Layer.Layer = Layer.effect( +export const layer: Layer.Layer = Layer.effect( Service, Effect.gen(function* () { const bus = yield* Bus.Service const storage = yield* Storage.Service + const sync = yield* SyncEvent.Service const createNext = Effect.fn("Session.createNext")(function* (input: { id?: SessionID @@ -477,7 +478,7 @@ export const layer: Layer.Layer = } log.info("created", result) - yield* Effect.sync(() => SyncEvent.run(Event.Created, { sessionID: result.id, info: result })) + yield* sync.run(Event.Created, { sessionID: result.id, info: result }) if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) { // This only exist for backwards compatibility. We should not be @@ -525,10 +526,8 @@ export const layer: Layer.Layer = Effect.catchCause(() => Effect.succeed(false)), ) - yield* Effect.sync(() => { - SyncEvent.run(Event.Deleted, { sessionID, info: session }, { publish: hasInstance }) - SyncEvent.remove(sessionID) - }) + yield* sync.run(Event.Deleted, { sessionID, info: session }, { publish: hasInstance }) + yield* sync.remove(sessionID) } catch (e) { log.error(e) } @@ -536,19 +535,17 @@ export const layer: Layer.Layer = const updateMessage = (msg: T): Effect.Effect => Effect.gen(function* () { - yield* Effect.sync(() => SyncEvent.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg })) + yield* sync.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg }) return msg }).pipe(Effect.withSpan("Session.updateMessage")) const updatePart = (part: T): Effect.Effect => Effect.gen(function* () { - yield* Effect.sync(() => - SyncEvent.run(MessageV2.Event.PartUpdated, { - sessionID: part.sessionID, - part: structuredClone(part), - time: Date.now(), - }), - ) + yield* sync.run(MessageV2.Event.PartUpdated, { + sessionID: part.sessionID, + part: structuredClone(part), + time: Date.now(), + }) return part }).pipe(Effect.withSpan("Session.updatePart")) @@ -635,8 +632,7 @@ export const layer: Layer.Layer = return session }) - const patch = (sessionID: SessionID, info: Patch) => - Effect.sync(() => SyncEvent.run(Event.Updated, { sessionID, info })) + const patch = (sessionID: SessionID, info: Patch) => sync.run(Event.Updated, { sessionID, info }) const touch = Effect.fn("Session.touch")(function* (sessionID: SessionID) { yield* patch(sessionID, { time: { updated: Date.now() } }) @@ -693,12 +689,10 @@ export const layer: Layer.Layer = sessionID: SessionID messageID: MessageID }) { - yield* Effect.sync(() => - SyncEvent.run(MessageV2.Event.Removed, { - sessionID: input.sessionID, - messageID: input.messageID, - }), - ) + yield* sync.run(MessageV2.Event.Removed, { + sessionID: input.sessionID, + messageID: input.messageID, + }) return input.messageID }) @@ -707,13 +701,11 @@ export const layer: Layer.Layer = messageID: MessageID partID: PartID }) { - yield* Effect.sync(() => - SyncEvent.run(MessageV2.Event.PartRemoved, { - sessionID: input.sessionID, - messageID: input.messageID, - partID: input.partID, - }), - ) + yield* sync.run(MessageV2.Event.PartRemoved, { + sessionID: input.sessionID, + messageID: input.messageID, + partID: input.partID, + }) return input.partID }) @@ -764,7 +756,11 @@ export const layer: Layer.Layer = }), ) -export const defaultLayer = layer.pipe(Layer.provide(Bus.layer), Layer.provide(Storage.defaultLayer)) +export const defaultLayer = layer.pipe( + Layer.provide(Bus.layer), + Layer.provide(Storage.defaultLayer), + Layer.provide(SyncEvent.defaultLayer), +) export function* list(input?: { directory?: string diff --git a/packages/opencode/src/share/session.ts b/packages/opencode/src/share/session.ts index 99e46a0092..7e4de204ed 100644 --- a/packages/opencode/src/share/session.ts +++ b/packages/opencode/src/share/session.ts @@ -21,20 +21,19 @@ export const layer = Layer.effect( const session = yield* Session.Service const shareNext = yield* ShareNext.Service const scope = yield* Scope.Scope + const sync = yield* SyncEvent.Service const share = Effect.fn("SessionShare.share")(function* (sessionID: SessionID) { const conf = yield* cfg.get() if (conf.share === "disabled") throw new Error("Sharing is disabled in configuration") const result = yield* shareNext.create(sessionID) - yield* Effect.sync(() => - SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } }), - ) + yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } }) return result }) const unshare = Effect.fn("SessionShare.unshare")(function* (sessionID: SessionID) { yield* shareNext.remove(sessionID) - yield* Effect.sync(() => SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } })) + yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } }) }) const create = Effect.fn("SessionShare.create")(function* (input?: Session.CreateInput) { @@ -54,6 +53,7 @@ export const defaultLayer = layer.pipe( Layer.provide(ShareNext.defaultLayer), Layer.provide(Session.defaultLayer), Layer.provide(Config.defaultLayer), + Layer.provide(SyncEvent.defaultLayer), ) export * as SessionShare from "./session" diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts index 67bc9b9e7c..ebf7543af1 100644 --- a/packages/opencode/src/sync/index.ts +++ b/packages/opencode/src/sync/index.ts @@ -9,9 +9,11 @@ import { EventSequenceTable, EventTable } from "./event.sql" import { WorkspaceContext } from "@/control-plane/workspace-context" import { EventID } from "./schema" import { Flag } from "@opencode-ai/core/flag/flag" -import { Schema as EffectSchema } from "effect" +import { Context, Effect, Layer, Schema as EffectSchema } from "effect" import { zodObject } from "@/util/effect-zod" import type { DeepMutable } from "@/util/schema" +import { makeRuntime } from "@/effect/run-service" +import { serviceUse } from "@/effect/service-use" // Keep `Event["data"]` mutable because projectors mutate the persisted shape // when writing to the database. Bus payloads (`Properties`) stay readonly — @@ -46,6 +48,125 @@ export type SerializedEvent = Event & type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void type ConvertEvent = (type: string, data: Event["data"]) => unknown | Promise +export interface Interface { + readonly run: ( + def: Def, + data: Event["data"], + options?: { publish?: boolean }, + ) => Effect.Effect + readonly replay: (event: SerializedEvent, options?: { publish: boolean }) => Effect.Effect + readonly replayAll: (events: SerializedEvent[], options?: { publish: boolean }) => Effect.Effect + readonly remove: (aggregateID: string) => Effect.Effect +} + +export class Service extends Context.Service()("@opencode/SyncEvent") {} + +export const layer = Layer.effect(Service)( + Effect.gen(function* () { + const replay: Interface["replay"] = Effect.fn("SyncEvent.replay")(function* (event, options) { + const def = registry.get(event.type) + if (!def) { + throw new Error(`Unknown event type: ${event.type}`) + } + + const row = Database.use((db) => + db + .select({ seq: EventSequenceTable.seq }) + .from(EventSequenceTable) + .where(eq(EventSequenceTable.aggregate_id, event.aggregateID)) + .get(), + ) + + const latest = row?.seq ?? -1 + if (event.seq <= latest) return + + const expected = latest + 1 + if (event.seq !== expected) { + throw new Error( + `Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`, + ) + } + + process(def, event, { publish: !!options?.publish }) + }) + + const replayAll: Interface["replayAll"] = Effect.fn("SyncEvent.replayAll")(function* (events, options) { + const source = events[0]?.aggregateID + if (!source) return undefined + if (events.some((item) => item.aggregateID !== source)) { + throw new Error("Replay events must belong to the same session") + } + const start = events[0].seq + for (const [i, item] of events.entries()) { + const seq = start + i + if (item.seq !== seq) { + throw new Error(`Replay sequence mismatch at index ${i}: expected ${seq}, got ${item.seq}`) + } + } + for (const item of events) { + yield* replay(item, options) + } + return source + }) + + const run: Interface["run"] = Effect.fn("SyncEvent.run")(function* (def, data, options) { + const agg = (data as Record)[def.aggregate] + // This should never happen: we've enforced it via typescript in + // the definition + if (agg == null) { + throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`) + } + + if (def.version !== versions.get(def.type)) { + throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`) + } + + const { publish = true } = options || {} + + // Note that this is an "immediate" transaction which is critical. + // We need to make sure we can safely read and write with nothing + // else changing the data from under us + Database.transaction( + (tx) => { + const id = EventID.ascending() + const row = tx + .select({ seq: EventSequenceTable.seq }) + .from(EventSequenceTable) + .where(eq(EventSequenceTable.aggregate_id, agg)) + .get() + const seq = row?.seq != null ? row.seq + 1 : 0 + + const event = { id, seq, aggregateID: agg, data } + process(def, event, { publish }) + }, + { + behavior: "immediate", + }, + ) + }) + + const remove: Interface["remove"] = Effect.fn("SyncEvent.remove")(function* (aggregateID) { + Database.transaction((tx) => { + tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run() + tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run() + }) + }) + + return Service.of({ + run, + replay, + replayAll, + remove, + }) + }), +) + +export const defaultLayer = layer + +export const use = serviceUse(Service) + +const runtime = makeRuntime(Service, defaultLayer) + export const registry = new Map() let projectors: Map | undefined const versions = new Map() @@ -186,92 +307,19 @@ function process(def: Def, event: Event, options: { } export function replay(event: SerializedEvent, options?: { publish: boolean }) { - const def = registry.get(event.type) - if (!def) { - throw new Error(`Unknown event type: ${event.type}`) - } - - const row = Database.use((db) => - db - .select({ seq: EventSequenceTable.seq }) - .from(EventSequenceTable) - .where(eq(EventSequenceTable.aggregate_id, event.aggregateID)) - .get(), - ) - - const latest = row?.seq ?? -1 - if (event.seq <= latest) { - return - } - - const expected = latest + 1 - if (event.seq !== expected) { - throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`) - } - - process(def, event, { publish: !!options?.publish }) + return runtime.runSync((sync) => sync.replay(event, options)) } export function replayAll(events: SerializedEvent[], options?: { publish: boolean }) { - const source = events[0]?.aggregateID - if (!source) return - if (events.some((item) => item.aggregateID !== source)) { - throw new Error("Replay events must belong to the same session") - } - const start = events[0].seq - for (const [i, item] of events.entries()) { - const seq = start + i - if (item.seq !== seq) { - throw new Error(`Replay sequence mismatch at index ${i}: expected ${seq}, got ${item.seq}`) - } - } - for (const item of events) { - replay(item, options) - } - return source + return runtime.runSync((sync) => sync.replayAll(events, options)) } export function run(def: Def, data: Event["data"], options?: { publish?: boolean }) { - const agg = (data as Record)[def.aggregate] - // This should never happen: we've enforced it via typescript in - // the definition - if (agg == null) { - throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`) - } - - if (def.version !== versions.get(def.type)) { - throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`) - } - - const { publish = true } = options || {} - - // Note that this is an "immediate" transaction which is critical. - // We need to make sure we can safely read and write with nothing - // else changing the data from under us - Database.transaction( - (tx) => { - const id = EventID.ascending() - const row = tx - .select({ seq: EventSequenceTable.seq }) - .from(EventSequenceTable) - .where(eq(EventSequenceTable.aggregate_id, agg)) - .get() - const seq = row?.seq != null ? row.seq + 1 : 0 - - const event = { id, seq, aggregateID: agg, data } - process(def, event, { publish }) - }, - { - behavior: "immediate", - }, - ) + return runtime.runSync((sync) => sync.run(def, data, options)) } export function remove(aggregateID: string) { - Database.transaction((tx) => { - tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run() - tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run() - }) + return runtime.runSync((sync) => sync.remove(aggregateID)) } export function payloads() { diff --git a/packages/opencode/test/control-plane/workspace.test.ts b/packages/opencode/test/control-plane/workspace.test.ts index 6e68730a90..594789b207 100644 --- a/packages/opencode/test/control-plane/workspace.test.ts +++ b/packages/opencode/test/control-plane/workspace.test.ts @@ -1,4 +1,4 @@ -import { afterEach, beforeEach, describe, expect, mock, spyOn, test } from "bun:test" +import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test" import fs from "node:fs/promises" import Http from "node:http" import path from "node:path" @@ -1426,48 +1426,41 @@ describe("workspace-old sessionRestore", () => { }) }) - test("local restore replays batches without fetch and emits progress", async () => { - await withInstance(async (dir) => { - const captured = captureGlobalEvents() - let fetchCallCount = 0 - const replayAll = spyOn(SyncEvent, "replayAll") - try { - using server = Bun.serve({ - port: 0, - fetch() { - fetchCallCount++ - return Response.json({ ok: true }) - }, - }) - const type = unique("restore-local") - const info = workspaceInfo(Instance.project.id, type, { directory: dir }) - insertWorkspace(info) - registerAdaptor(Instance.project.id, type, localAdaptor(dir).adaptor) - const session = await AppRuntime.runPromise( - SessionNs.Service.use((svc) => svc.create({ title: "restore local" })), - ) - replaceSessionEvents(session.id, 20) + it.live("local restore replays batches and emits progress", () => + provideTmpdirInstance( + (dir) => + Effect.gen(function* () { + const workspace = yield* WorkspaceOld.Service + const sessionSvc = yield* SessionNs.Service + const captured = captureGlobalEvents() + try { + const type = unique("restore-local") + const info = workspaceInfo(Instance.project.id, type, { directory: dir }) + insertWorkspace(info) + registerAdaptor(Instance.project.id, type, localAdaptor(dir).adaptor) + const session = yield* sessionSvc.create({ title: "restore local" }) + replaceSessionEvents(session.id, 20) - expect(await restoreWorkspaceSession({ workspaceID: info.id, sessionID: session.id })).toEqual({ total: 3 }) - - expect(fetchCallCount).toBe(0) - expect(replayAll).toHaveBeenCalledTimes(3) - expect(replayAll.mock.calls.map((call) => call[0].length)).toEqual([10, 10, 1]) - expect((await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.get(session.id)))).workspaceID).toBe( - info.id, - ) - expect(eventRows(session.id).map((row) => row.seq)).toEqual(Array.from({ length: 21 }, (_, i) => i)) - expect( - captured.events - .filter((event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Restore.type) - .map((event) => event.payload.properties.step), - ).toEqual([0, 1, 2, 3]) - await removeWorkspace(info.id) - } finally { - captured.dispose() - } - }) - }) + expect(yield* workspace.sessionRestore({ workspaceID: info.id, sessionID: session.id })).toEqual({ + total: 3, + }) + expect((yield* sessionSvc.get(session.id)).workspaceID).toBe(info.id) + expect(eventRows(session.id).map((row) => row.seq)).toEqual(Array.from({ length: 21 }, (_, i) => i)) + expect( + captured.events + .filter( + (event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Restore.type, + ) + .map((event) => event.payload.properties.step), + ).toEqual([0, 1, 2, 3]) + yield* workspace.remove(info.id) + } finally { + captured.dispose() + } + }), + { git: true }, + ), + ) it.live("session restore includes real message and part events in sequence order", () => { const replay: FetchCall[] = [] diff --git a/packages/opencode/test/sync/index.test.ts b/packages/opencode/test/sync/index.test.ts index 32a08715ca..0afbb18317 100644 --- a/packages/opencode/test/sync/index.test.ts +++ b/packages/opencode/test/sync/index.test.ts @@ -1,16 +1,18 @@ -import { describe, test, expect, beforeEach, afterEach, afterAll } from "bun:test" -import { tmpdir } from "../fixture/fixture" -import { Schema } from "effect" +import { describe, expect, beforeEach, afterEach, afterAll } from "bun:test" +import { provideTmpdirInstance } from "../fixture/fixture" +import { Effect, Layer, Schema } from "effect" +import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner" import { Bus } from "../../src/bus" -import { Instance } from "../../src/project/instance" import { SyncEvent } from "../../src/sync" import { Database } from "@/storage/db" import { EventTable } from "../../src/sync/event.sql" -import { Identifier } from "../../src/id/id" +import { MessageID } from "../../src/session/schema" import { Flag } from "@opencode-ai/core/flag/flag" import { initProjectors } from "../../src/server/projectors" +import { testEffect } from "../lib/effect" const original = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES +const it = testEffect(Layer.mergeAll(SyncEvent.defaultLayer, CrossSpawnSpawner.defaultLayer)) beforeEach(() => { Database.close() @@ -22,19 +24,6 @@ afterEach(() => { Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = original }) -function withInstance(fn: () => void | Promise) { - return async () => { - await using tmp = await tmpdir() - - await Instance.provide({ - directory: tmp.path, - fn: async () => { - await fn() - }, - }) - } -} - describe("SyncEvent", () => { function setup() { SyncEvent.reset() @@ -59,179 +48,209 @@ describe("SyncEvent", () => { return { Created, Sent } } + function expectDefect(effect: Effect.Effect, pattern: RegExp) { + return Effect.gen(function* () { + const exit = yield* Effect.exit(effect) + if (exit._tag === "Success") throw new Error("Expected effect to fail") + expect(String(exit.cause)).toMatch(pattern) + }) + } + afterAll(() => { SyncEvent.reset() initProjectors() }) describe("run", () => { - test( + it.live( "inserts event row", - withInstance(() => { - const { Created } = setup() - SyncEvent.run(Created, { id: "evt_1", name: "first" }) - const rows = Database.use((db) => db.select().from(EventTable).all()) - expect(rows).toHaveLength(1) - expect(rows[0].type).toBe("item.created.1") - expect(rows[0].aggregate_id).toBe("evt_1") - }), + provideTmpdirInstance(() => + Effect.gen(function* () { + const { Created } = setup() + yield* SyncEvent.use.run(Created, { id: "evt_1", name: "first" }) + const rows = Database.use((db) => db.select().from(EventTable).all()) + expect(rows).toHaveLength(1) + expect(rows[0].type).toBe("item.created.1") + expect(rows[0].aggregate_id).toBe("evt_1") + }), + ), ) - test( + it.live( "increments seq per aggregate", - withInstance(() => { - const { Created } = setup() - SyncEvent.run(Created, { id: "evt_1", name: "first" }) - SyncEvent.run(Created, { id: "evt_1", name: "second" }) - const rows = Database.use((db) => db.select().from(EventTable).all()) - expect(rows).toHaveLength(2) - expect(rows[1].seq).toBe(rows[0].seq + 1) - }), + provideTmpdirInstance(() => + Effect.gen(function* () { + const { Created } = setup() + yield* SyncEvent.use.run(Created, { id: "evt_1", name: "first" }) + yield* SyncEvent.use.run(Created, { id: "evt_1", name: "second" }) + const rows = Database.use((db) => db.select().from(EventTable).all()) + expect(rows).toHaveLength(2) + expect(rows[1].seq).toBe(rows[0].seq + 1) + }), + ), ) - test( + it.live( "uses custom aggregate field from agg()", - withInstance(() => { - const { Sent } = setup() - SyncEvent.run(Sent, { item_id: "evt_1", to: "james" }) - const rows = Database.use((db) => db.select().from(EventTable).all()) - expect(rows).toHaveLength(1) - expect(rows[0].aggregate_id).toBe("evt_1") - }), + provideTmpdirInstance(() => + Effect.gen(function* () { + const { Sent } = setup() + yield* SyncEvent.use.run(Sent, { item_id: "evt_1", to: "james" }) + const rows = Database.use((db) => db.select().from(EventTable).all()) + expect(rows).toHaveLength(1) + expect(rows[0].aggregate_id).toBe("evt_1") + }), + ), ) - test( + it.live( "emits events", - withInstance(async () => { - const { Created } = setup() - const events: Array<{ - type: string - properties: { id: string; name: string } - }> = [] - const received = new Promise((resolve) => { - Bus.subscribeAll((event) => { + provideTmpdirInstance(() => + Effect.gen(function* () { + const { Created } = setup() + const events: Array<{ + type: string + properties: { id: string; name: string } + }> = [] + let resolve = () => {} + const received = new Promise((done) => { + resolve = done + }) + const dispose = Bus.subscribeAll((event) => { events.push(event) resolve() }) - }) - - SyncEvent.run(Created, { id: "evt_1", name: "test" }) - - await received - expect(events).toHaveLength(1) - expect(events[0]).toEqual({ - type: "item.created", - properties: { - id: "evt_1", - name: "test", - }, - }) - }), + try { + yield* SyncEvent.use.run(Created, { id: "evt_1", name: "test" }) + yield* Effect.promise(() => received) + expect(events).toHaveLength(1) + expect(events[0]).toEqual({ + type: "item.created", + properties: { + id: "evt_1", + name: "test", + }, + }) + } finally { + dispose() + } + }), + ), ) }) describe("replay", () => { - test( + it.live( "inserts event from external payload", - withInstance(() => { - const id = Identifier.descending("message") - SyncEvent.replay({ - id: "evt_1", - type: "item.created.1", - seq: 0, - aggregateID: id, - data: { id, name: "replayed" }, - }) - const rows = Database.use((db) => db.select().from(EventTable).all()) - expect(rows).toHaveLength(1) - expect(rows[0].aggregate_id).toBe(id) - }), - ) - - test( - "throws on sequence mismatch", - withInstance(() => { - const id = Identifier.descending("message") - SyncEvent.replay({ - id: "evt_1", - type: "item.created.1", - seq: 0, - aggregateID: id, - data: { id, name: "first" }, - }) - expect(() => - SyncEvent.replay({ + provideTmpdirInstance(() => + Effect.gen(function* () { + const id = MessageID.ascending() + yield* SyncEvent.use.replay({ id: "evt_1", type: "item.created.1", - seq: 5, - aggregateID: id, - data: { id, name: "bad" }, - }), - ).toThrow(/Sequence mismatch/) - }), - ) - - test( - "throws on unknown event type", - withInstance(() => { - expect(() => - SyncEvent.replay({ - id: "evt_1", - type: "unknown.event.1", seq: 0, - aggregateID: "x", - data: {}, - }), - ).toThrow(/Unknown event type/) - }), + aggregateID: id, + data: { id, name: "replayed" }, + }) + const rows = Database.use((db) => db.select().from(EventTable).all()) + expect(rows).toHaveLength(1) + expect(rows[0].aggregate_id).toBe(id) + }), + ), ) - test( - "replayAll accepts later chunks after the first batch", - withInstance(() => { - const { Created } = setup() - const id = Identifier.descending("message") - - const one = SyncEvent.replayAll([ - { + it.live( + "throws on sequence mismatch", + provideTmpdirInstance(() => + Effect.gen(function* () { + const id = MessageID.ascending() + yield* SyncEvent.use.replay({ id: "evt_1", - type: SyncEvent.versionedType(Created.type, Created.version), + type: "item.created.1", seq: 0, aggregateID: id, data: { id, name: "first" }, - }, - { - id: "evt_2", - type: SyncEvent.versionedType(Created.type, Created.version), - seq: 1, - aggregateID: id, - data: { id, name: "second" }, - }, - ]) + }) + yield* expectDefect( + SyncEvent.use.replay({ + id: "evt_1", + type: "item.created.1", + seq: 5, + aggregateID: id, + data: { id, name: "bad" }, + }), + /Sequence mismatch/, + ) + }), + ), + ) - const two = SyncEvent.replayAll([ - { - id: "evt_3", - type: SyncEvent.versionedType(Created.type, Created.version), - seq: 2, - aggregateID: id, - data: { id, name: "third" }, - }, - { - id: "evt_4", - type: SyncEvent.versionedType(Created.type, Created.version), - seq: 3, - aggregateID: id, - data: { id, name: "fourth" }, - }, - ]) + it.live( + "throws on unknown event type", + provideTmpdirInstance(() => + Effect.gen(function* () { + yield* expectDefect( + SyncEvent.use.replay({ + id: "evt_1", + type: "unknown.event.1", + seq: 0, + aggregateID: "x", + data: {}, + }), + /Unknown event type/, + ) + }), + ), + ) - expect(one).toBe(id) - expect(two).toBe(id) + it.live( + "replayAll accepts later chunks after the first batch", + provideTmpdirInstance(() => + Effect.gen(function* () { + const { Created } = setup() + const id = MessageID.ascending() - const rows = Database.use((db) => db.select().from(EventTable).all()) - expect(rows.map((row) => row.seq)).toEqual([0, 1, 2, 3]) - }), + const one = yield* SyncEvent.use.replayAll([ + { + id: "evt_1", + type: SyncEvent.versionedType(Created.type, Created.version), + seq: 0, + aggregateID: id, + data: { id, name: "first" }, + }, + { + id: "evt_2", + type: SyncEvent.versionedType(Created.type, Created.version), + seq: 1, + aggregateID: id, + data: { id, name: "second" }, + }, + ]) + + const two = yield* SyncEvent.use.replayAll([ + { + id: "evt_3", + type: SyncEvent.versionedType(Created.type, Created.version), + seq: 2, + aggregateID: id, + data: { id, name: "third" }, + }, + { + id: "evt_4", + type: SyncEvent.versionedType(Created.type, Created.version), + seq: 3, + aggregateID: id, + data: { id, name: "fourth" }, + }, + ]) + + expect(one).toBe(id) + expect(two).toBe(id) + + const rows = Database.use((db) => db.select().from(EventTable).all()) + expect(rows.map((row) => row.seq)).toEqual([0, 1, 2, 3]) + }), + ), ) }) })