refactor(sync): publish via EffectBridge.fork for codebase consistency (#28187)

This commit is contained in:
Kit Langton 2026-05-18 13:30:41 -04:00 committed by GitHub
parent 762850dfe5
commit 159d271e1e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 79 additions and 55 deletions

View file

@ -7,9 +7,7 @@ import { eq } from "drizzle-orm"
import { GlobalBus } from "@/bus/global"
import { Bus as ProjectBus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
import type { InstanceContext } from "@/project/instance-context"
import { EventSequenceTable, EventTable } from "./event.sql"
import type { WorkspaceID } from "@/control-plane/schema"
import { EventID } from "./schema"
import { Context, Effect, Layer, Schema as EffectSchema } from "effect"
import type { DeepMutable } from "@opencode-ai/core/schema"
@ -17,7 +15,7 @@ import { EventV2 } from "@opencode-ai/core/event"
import { serviceUse } from "@/effect/service-use"
import { InstanceState } from "@/effect/instance-state"
import { RuntimeFlags } from "@/effect/runtime-flags"
import { attachWith } from "@/effect/run-service"
import { EffectBridge } from "@/effect/bridge"
// Keep `Event["data"]` mutable because projectors mutate the persisted shape
// when writing to the database. Bus payloads (`Properties`) stay readonly —
@ -51,10 +49,6 @@ export type SerializedEvent<Def extends Definition = Definition> = Event<Def> &
type ProjectorFunc = (db: Database.TxOrDb, data: unknown, event: Event) => void
type ConvertEvent = (type: string, data: Event["data"]) => unknown | Promise<unknown>
type PublishContext = {
instance?: InstanceContext
workspace?: WorkspaceID
}
export interface Interface {
readonly run: <Def extends Definition>(
@ -107,16 +101,14 @@ export const layer = Layer.effect(Service)(
}
const publish = !!options?.publish
const context = publish
? {
instance: yield* InstanceState.context,
workspace: yield* InstanceState.workspaceID,
}
: undefined
// Bridge captures handler-fiber refs (InstanceRef/WorkspaceRef) and the
// full Effect context, so the forked publish + GlobalBus emit run with
// the right state without a per-call attachWith.
const bridge = yield* EffectBridge.make()
process(def, event, {
bus,
bridge,
publish,
context,
ownerID: options?.ownerID,
experimentalWorkspaces: flags.experimentalWorkspaces,
})
@ -154,12 +146,7 @@ export const layer = Layer.effect(Service)(
}
const { publish = true } = options || {}
const context = publish
? {
instance: yield* InstanceState.context,
workspace: yield* InstanceState.workspaceID,
}
: undefined
const bridge = yield* EffectBridge.make()
// Note that this is an "immediate" transaction which is critical.
// We need to make sure we can safely read and write with nothing
@ -175,7 +162,7 @@ export const layer = Layer.effect(Service)(
const seq = row?.seq != null ? row.seq + 1 : 0
const event = { id, seq, aggregateID: agg, data }
process(def, event, { bus, publish, context, experimentalWorkspaces: flags.experimentalWorkspaces })
process(def, event, { bus, bridge, publish, experimentalWorkspaces: flags.experimentalWorkspaces })
},
{
behavior: "immediate",
@ -308,8 +295,8 @@ function process<Def extends Definition>(
event: Event<Def>,
options: {
bus: ProjectBus.Interface
bridge: EffectBridge.Shape
publish: boolean
context?: PublishContext
ownerID?: string
experimentalWorkspaces: boolean
},
@ -351,37 +338,36 @@ function process<Def extends Definition>(
}
Database.effect(() => {
if (options?.publish) {
if (!options.context?.instance) {
throw new Error("SyncEvent.process: publish requires instance context")
}
const result = convertEvent(def.type, event.data)
const publish = (data: unknown) =>
Effect.runPromise(
attachWith(options.bus.publish(def, data as Properties<Def>, { id: event.id }), {
instance: options.context?.instance,
workspace: options.context?.workspace,
}),
)
if (result instanceof Promise) {
void result.then(publish)
} else {
void publish(result)
}
GlobalBus.emit("event", {
directory: options.context.instance.directory,
project: options.context.instance.project.id,
workspace: options.context.workspace,
payload: {
type: "sync",
syncEvent: {
type: versionedType(def.type, def.version),
...event,
},
},
})
if (!options.publish) return
const result = convertEvent(def.type, event.data)
// The bridge was built inside the caller's fiber so it already carries
// InstanceRef/WorkspaceRef and the full Effect context. Both the bus
// publish and the GlobalBus emit run inside the forked Effect so they
// share the same instance/workspace lookup.
const publish = (data: unknown) =>
options.bridge.fork(
Effect.gen(function* () {
yield* options.bus.publish(def, data as Properties<Def>, { id: event.id })
const instance = yield* InstanceState.context
const workspace = yield* InstanceState.workspaceID
GlobalBus.emit("event", {
directory: instance.directory,
project: instance.project.id,
workspace,
payload: {
type: "sync",
syncEvent: {
type: versionedType(def.type, def.version),
...event,
},
},
})
}),
)
if (result instanceof Promise) {
void result.then(publish)
} else {
publish(result)
}
})
})

View file

@ -1,14 +1,15 @@
import { describe, expect, beforeEach, afterAll } from "bun:test"
import { provideTmpdirInstance } from "../fixture/fixture"
import { Effect, Layer, Schema } from "effect"
import { Deferred, Effect, Layer, Schema } from "effect"
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
import { Bus } from "../../src/bus"
import { GlobalBus, type GlobalEvent } from "../../src/bus/global"
import { SyncEvent } from "../../src/sync"
import { Database, eq } from "@/storage/db"
import { EventSequenceTable, EventTable } from "../../src/sync/event.sql"
import { MessageID } from "../../src/session/schema"
import { initProjectors } from "../../src/server/projectors"
import { testEffect } from "../lib/effect"
import { awaitWithTimeout, testEffect } from "../lib/effect"
import { RuntimeFlags } from "@/effect/runtime-flags"
const it = testEffect(
@ -139,6 +140,43 @@ describe("SyncEvent", () => {
}),
),
)
// Regression for the EffectBridge migration. GlobalBus.emit used to fire
// synchronously inside the Database.effect post-commit callback. After the
// migration it fires inside the forked publish Effect, AFTER bus.publish
// completes. Consumers don't care about microsecond-level ordering, but
// we still need to prove the emit actually fires.
it.live(
"emits sync events to GlobalBus after publishing to ProjectBus",
provideTmpdirInstance(() =>
Effect.gen(function* () {
const { Created } = setup()
// Filter for OUR specific event in the handler so we ignore any
// stray sync events from other tests' lingering forks.
const received = yield* Deferred.make<GlobalEvent>()
const handler = (evt: GlobalEvent) => {
if (evt.payload?.type === "sync" && evt.payload?.syncEvent?.type === "item.created.1") {
Deferred.doneUnsafe(received, Effect.succeed(evt))
}
}
GlobalBus.on("event", handler)
try {
yield* SyncEvent.use.run(Created, { id: "evt_global_1", name: "global" })
const event = yield* awaitWithTimeout(
Deferred.await(received),
"timed out waiting for sync event on GlobalBus",
"2 seconds",
)
expect(event.payload).toMatchObject({
type: "sync",
syncEvent: { type: "item.created.1", data: { id: "evt_global_1", name: "global" } },
})
} finally {
GlobalBus.off("event", handler)
}
}),
),
)
})
describe("replay", () => {