diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts index c1bd8d647f..530c2c4112 100644 --- a/packages/opencode/src/sync/index.ts +++ b/packages/opencode/src/sync/index.ts @@ -17,6 +17,7 @@ import { EventV2 } from "@opencode-ai/core/event" import { serviceUse } from "@/effect/service-use" import { InstanceState } from "@/effect/instance-state" import { RuntimeFlags } from "@/effect/runtime-flags" +import { attachWith } from "@/effect/run-service" // Keep `Event["data"]` mutable because projectors mutate the persisted shape // when writing to the database. Bus payloads (`Properties`) stay readonly — @@ -75,6 +76,7 @@ export class Service extends Context.Service()("@opencode/Sy export const layer = Layer.effect(Service)( Effect.gen(function* () { const flags = yield* RuntimeFlags.Service + const bus = yield* ProjectBus.Service const replay: Interface["replay"] = Effect.fn("SyncEvent.replay")(function* (event, options) { const def = registry.get(event.type) @@ -112,6 +114,7 @@ export const layer = Layer.effect(Service)( } : undefined process(def, event, { + bus, publish, context, ownerID: options?.ownerID, @@ -172,7 +175,7 @@ export const layer = Layer.effect(Service)( const seq = row?.seq != null ? row.seq + 1 : 0 const event = { id, seq, aggregateID: agg, data } - process(def, event, { publish, context, experimentalWorkspaces: flags.experimentalWorkspaces }) + process(def, event, { bus, publish, context, experimentalWorkspaces: flags.experimentalWorkspaces }) }, { behavior: "immediate", @@ -209,7 +212,7 @@ export const layer = Layer.effect(Service)( }), ) -export const defaultLayer = layer.pipe(Layer.provide(RuntimeFlags.defaultLayer)) +export const defaultLayer = layer.pipe(Layer.provide([ProjectBus.defaultLayer, RuntimeFlags.defaultLayer])) export const use = serviceUse(Service) @@ -303,7 +306,13 @@ function register(def: Definition) { function process( def: Def, event: Event, - options: { publish: boolean; context?: PublishContext; ownerID?: string; experimentalWorkspaces: boolean }, + options: { + bus: ProjectBus.Interface + publish: boolean + context?: PublishContext + ownerID?: string + experimentalWorkspaces: boolean + }, ) { if (projectors == null) { throw new Error("No projectors available. Call `SyncEvent.init` to install projectors") @@ -348,7 +357,13 @@ function process( } const result = convertEvent(def.type, event.data) - const publish = (data: unknown) => ProjectBus.publish(def, data as Properties, { id: event.id }) + const publish = (data: unknown) => + Effect.runPromise( + attachWith(options.bus.publish(def, data as Properties, { id: event.id }), { + instance: options.context?.instance, + workspace: options.context?.workspace, + }), + ) if (result instanceof Promise) { void result.then(publish) } else { diff --git a/packages/opencode/test/sync/index.test.ts b/packages/opencode/test/sync/index.test.ts index c4e5b86062..2fbfc9b94c 100644 --- a/packages/opencode/test/sync/index.test.ts +++ b/packages/opencode/test/sync/index.test.ts @@ -13,7 +13,10 @@ import { RuntimeFlags } from "@/effect/runtime-flags" const it = testEffect( Layer.mergeAll( - SyncEvent.layer.pipe(Layer.provide(RuntimeFlags.layer({ experimentalWorkspaces: true }))), + SyncEvent.layer.pipe( + Layer.provide(RuntimeFlags.layer({ experimentalWorkspaces: true })), + Layer.provideMerge(Bus.layer), + ), CrossSpawnSpawner.defaultLayer, ), ) @@ -114,7 +117,8 @@ describe("SyncEvent", () => { const received = new Promise((done) => { resolve = done }) - const dispose = Bus.subscribeAll((event) => { + const bus = yield* Bus.Service + const dispose = yield* bus.subscribeAllCallback((event) => { events.push(event) resolve() })