refactor(sync): capture instance context for publish (#25206)

This commit is contained in:
Kit Langton 2026-04-30 21:45:02 -04:00 committed by GitHub
parent ce3b0988c4
commit a083c88e87
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -4,9 +4,9 @@ import { eq } from "drizzle-orm"
import { GlobalBus } from "@/bus/global"
import { Bus as ProjectBus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
import { Instance } from "@/project/instance"
import type { InstanceContext } from "@/project/instance"
import { EventSequenceTable, EventTable } from "./event.sql"
import { WorkspaceContext } from "@/control-plane/workspace-context"
import type { WorkspaceID } from "@/control-plane/schema"
import { EventID } from "./schema"
import { Flag } from "@opencode-ai/core/flag/flag"
import { Context, Effect, Layer, Schema as EffectSchema } from "effect"
@ -14,6 +14,7 @@ import { zodObject } from "@/util/effect-zod"
import type { DeepMutable } from "@/util/schema"
import { makeRuntime } from "@/effect/run-service"
import { serviceUse } from "@/effect/service-use"
import { InstanceState } from "@/effect/instance-state"
// Keep `Event["data"]` mutable because projectors mutate the persisted shape
// when writing to the database. Bus payloads (`Properties`) stay readonly —
@ -47,6 +48,10 @@ export type SerializedEvent<Def extends Definition = Definition> = Event<Def> &
type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void
type ConvertEvent = (type: string, data: Event["data"]) => unknown | Promise<unknown>
type PublishContext = {
instance?: InstanceContext
workspace?: WorkspaceID
}
export interface Interface {
readonly run: <Def extends Definition>(
@ -87,7 +92,14 @@ export const layer = Layer.effect(Service)(
)
}
process(def, event, { publish: !!options?.publish })
const publish = !!options?.publish
const context = publish
? {
instance: yield* InstanceState.context,
workspace: yield* InstanceState.workspaceID,
}
: undefined
process(def, event, { publish, context })
})
const replayAll: Interface["replayAll"] = Effect.fn("SyncEvent.replayAll")(function* (events, options) {
@ -122,6 +134,12 @@ export const layer = Layer.effect(Service)(
}
const { publish = true } = options || {}
const context = publish
? {
instance: yield* InstanceState.context,
workspace: yield* InstanceState.workspaceID,
}
: undefined
// Note that this is an "immediate" transaction which is critical.
// We need to make sure we can safely read and write with nothing
@ -137,7 +155,7 @@ export const layer = Layer.effect(Service)(
const seq = row?.seq != null ? row.seq + 1 : 0
const event = { id, seq, aggregateID: agg, data }
process(def, event, { publish })
process(def, event, { publish, context })
},
{
behavior: "immediate",
@ -242,7 +260,11 @@ export function project<Def extends Definition>(
return [def, func as ProjectorFunc]
}
function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
function process<Def extends Definition>(
def: Def,
event: Event<Def>,
options: { publish: boolean; context?: PublishContext },
) {
if (projectors == null) {
throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
}
@ -281,6 +303,10 @@ function process<Def extends Definition>(def: Def, event: Event<Def>, options: {
Database.effect(() => {
if (options?.publish) {
if (!options.context?.instance) {
throw new Error("SyncEvent.process: publish requires instance context")
}
const result = convertEvent(def.type, event.data)
const publish = (data: unknown) => ProjectBus.publish(def, data as Properties<Def>)
if (result instanceof Promise) {
@ -290,9 +316,9 @@ function process<Def extends Definition>(def: Def, event: Event<Def>, options: {
}
GlobalBus.emit("event", {
directory: Instance.directory,
project: Instance.project.id,
workspace: WorkspaceContext.workspaceID,
directory: options.context.instance.directory,
project: options.context.instance.project.id,
workspace: options.context.workspace,
payload: {
type: "sync",
syncEvent: {