diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts index 530c2c4112..a8858255f2 100644 --- a/packages/opencode/src/sync/index.ts +++ b/packages/opencode/src/sync/index.ts @@ -7,9 +7,7 @@ import { eq } from "drizzle-orm" import { GlobalBus } from "@/bus/global" import { Bus as ProjectBus } from "@/bus" import { BusEvent } from "@/bus/bus-event" -import type { InstanceContext } from "@/project/instance-context" import { EventSequenceTable, EventTable } from "./event.sql" -import type { WorkspaceID } from "@/control-plane/schema" import { EventID } from "./schema" import { Context, Effect, Layer, Schema as EffectSchema } from "effect" import type { DeepMutable } from "@opencode-ai/core/schema" @@ -17,7 +15,7 @@ import { EventV2 } from "@opencode-ai/core/event" import { serviceUse } from "@/effect/service-use" import { InstanceState } from "@/effect/instance-state" import { RuntimeFlags } from "@/effect/runtime-flags" -import { attachWith } from "@/effect/run-service" +import { EffectBridge } from "@/effect/bridge" // Keep `Event["data"]` mutable because projectors mutate the persisted shape // when writing to the database. Bus payloads (`Properties`) stay readonly — @@ -51,10 +49,6 @@ export type SerializedEvent = Event & type ProjectorFunc = (db: Database.TxOrDb, data: unknown, event: Event) => void type ConvertEvent = (type: string, data: Event["data"]) => unknown | Promise -type PublishContext = { - instance?: InstanceContext - workspace?: WorkspaceID -} export interface Interface { readonly run: ( @@ -107,16 +101,14 @@ export const layer = Layer.effect(Service)( } const publish = !!options?.publish - const context = publish - ? { - instance: yield* InstanceState.context, - workspace: yield* InstanceState.workspaceID, - } - : undefined + // Bridge captures handler-fiber refs (InstanceRef/WorkspaceRef) and the + // full Effect context, so the forked publish + GlobalBus emit run with + // the right state without a per-call attachWith. + const bridge = yield* EffectBridge.make() process(def, event, { bus, + bridge, publish, - context, ownerID: options?.ownerID, experimentalWorkspaces: flags.experimentalWorkspaces, }) @@ -154,12 +146,7 @@ export const layer = Layer.effect(Service)( } const { publish = true } = options || {} - const context = publish - ? { - instance: yield* InstanceState.context, - workspace: yield* InstanceState.workspaceID, - } - : undefined + const bridge = yield* EffectBridge.make() // Note that this is an "immediate" transaction which is critical. // We need to make sure we can safely read and write with nothing @@ -175,7 +162,7 @@ export const layer = Layer.effect(Service)( const seq = row?.seq != null ? row.seq + 1 : 0 const event = { id, seq, aggregateID: agg, data } - process(def, event, { bus, publish, context, experimentalWorkspaces: flags.experimentalWorkspaces }) + process(def, event, { bus, bridge, publish, experimentalWorkspaces: flags.experimentalWorkspaces }) }, { behavior: "immediate", @@ -308,8 +295,8 @@ function process( event: Event, options: { bus: ProjectBus.Interface + bridge: EffectBridge.Shape publish: boolean - context?: PublishContext ownerID?: string experimentalWorkspaces: boolean }, @@ -351,37 +338,36 @@ function process( } Database.effect(() => { - if (options?.publish) { - if (!options.context?.instance) { - throw new Error("SyncEvent.process: publish requires instance context") - } - - const result = convertEvent(def.type, event.data) - const publish = (data: unknown) => - Effect.runPromise( - attachWith(options.bus.publish(def, data as Properties, { id: event.id }), { - instance: options.context?.instance, - workspace: options.context?.workspace, - }), - ) - if (result instanceof Promise) { - void result.then(publish) - } else { - void publish(result) - } - - GlobalBus.emit("event", { - directory: options.context.instance.directory, - project: options.context.instance.project.id, - workspace: options.context.workspace, - payload: { - type: "sync", - syncEvent: { - type: versionedType(def.type, def.version), - ...event, - }, - }, - }) + if (!options.publish) return + const result = convertEvent(def.type, event.data) + // The bridge was built inside the caller's fiber so it already carries + // InstanceRef/WorkspaceRef and the full Effect context. Both the bus + // publish and the GlobalBus emit run inside the forked Effect so they + // share the same instance/workspace lookup. + const publish = (data: unknown) => + options.bridge.fork( + Effect.gen(function* () { + yield* options.bus.publish(def, data as Properties, { id: event.id }) + const instance = yield* InstanceState.context + const workspace = yield* InstanceState.workspaceID + GlobalBus.emit("event", { + directory: instance.directory, + project: instance.project.id, + workspace, + payload: { + type: "sync", + syncEvent: { + type: versionedType(def.type, def.version), + ...event, + }, + }, + }) + }), + ) + if (result instanceof Promise) { + void result.then(publish) + } else { + publish(result) } }) }) diff --git a/packages/opencode/test/sync/index.test.ts b/packages/opencode/test/sync/index.test.ts index 2fbfc9b94c..e3307d2aec 100644 --- a/packages/opencode/test/sync/index.test.ts +++ b/packages/opencode/test/sync/index.test.ts @@ -1,14 +1,15 @@ import { describe, expect, beforeEach, afterAll } from "bun:test" import { provideTmpdirInstance } from "../fixture/fixture" -import { Effect, Layer, Schema } from "effect" +import { Deferred, Effect, Layer, Schema } from "effect" import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner" import { Bus } from "../../src/bus" +import { GlobalBus, type GlobalEvent } from "../../src/bus/global" import { SyncEvent } from "../../src/sync" import { Database, eq } from "@/storage/db" import { EventSequenceTable, EventTable } from "../../src/sync/event.sql" import { MessageID } from "../../src/session/schema" import { initProjectors } from "../../src/server/projectors" -import { testEffect } from "../lib/effect" +import { awaitWithTimeout, testEffect } from "../lib/effect" import { RuntimeFlags } from "@/effect/runtime-flags" const it = testEffect( @@ -139,6 +140,43 @@ describe("SyncEvent", () => { }), ), ) + + // Regression for the EffectBridge migration. GlobalBus.emit used to fire + // synchronously inside the Database.effect post-commit callback. After the + // migration it fires inside the forked publish Effect, AFTER bus.publish + // completes. Consumers don't care about microsecond-level ordering, but + // we still need to prove the emit actually fires. + it.live( + "emits sync events to GlobalBus after publishing to ProjectBus", + provideTmpdirInstance(() => + Effect.gen(function* () { + const { Created } = setup() + // Filter for OUR specific event in the handler so we ignore any + // stray sync events from other tests' lingering forks. + const received = yield* Deferred.make() + const handler = (evt: GlobalEvent) => { + if (evt.payload?.type === "sync" && evt.payload?.syncEvent?.type === "item.created.1") { + Deferred.doneUnsafe(received, Effect.succeed(evt)) + } + } + GlobalBus.on("event", handler) + try { + yield* SyncEvent.use.run(Created, { id: "evt_global_1", name: "global" }) + const event = yield* awaitWithTimeout( + Deferred.await(received), + "timed out waiting for sync event on GlobalBus", + "2 seconds", + ) + expect(event.payload).toMatchObject({ + type: "sync", + syncEvent: { type: "item.created.1", data: { id: "evt_global_1", name: "global" } }, + }) + } finally { + GlobalBus.off("event", handler) + } + }), + ), + ) }) describe("replay", () => {