refactor(core): allow SyncEvent.define and BusEvent.define to accept Effect Schema

Overloads BusEvent.define and SyncEvent.define so payload schemas can be passed as Effect Schema values directly. Effect Schemas are converted to Zod via the effect-zod walker since the sync/bus pipelines still use Zod internally. Migrates MessageV2.Event.* to use Schema.Struct directly instead of z.object with .zod references.
This commit is contained in:
Kit Langton 2026-04-21 19:06:00 -04:00
parent 2cd89d64e9
commit 3085279724
3 changed files with 80 additions and 32 deletions

View file

@ -1,19 +1,38 @@
import z from "zod"
import type { ZodType } from "zod"
import { Schema, Types } from "effect"
import { zod } from "@/util/effect-zod"
export type Definition = ReturnType<typeof define>
const registry = new Map<string, Definition>()
export function define<Type extends string, Properties extends ZodType>(type: Type, properties: Properties) {
const result = {
type,
properties,
}
registry.set(type, result)
/**
* Define a bus event type with a payload schema.
*
* Accepts either a Zod schema or an Effect Schema. Effect Schemas are
* converted to Zod internally via the effect-zod walker so that the bus
* continues to use Zod as the lingua franca for serialization/validation.
*/
export function define<Type extends string, P extends Schema.Top>(
type: Type,
properties: P,
): { type: Type; properties: z.ZodType<Types.DeepMutable<Schema.Schema.Type<P>>> }
export function define<Type extends string, P extends ZodType>(
type: Type,
properties: P,
): { type: Type; properties: P }
export function define(type: string, properties: unknown) {
const zodProperties = isEffectSchema(properties) ? zod(properties) : (properties as ZodType)
const result = { type, properties: zodProperties }
registry.set(type, result as Definition)
return result
}
function isEffectSchema(value: unknown): value is Schema.Top {
return typeof value === "object" && value !== null && "ast" in value
}
export function payloads() {
return registry
.entries()

View file

@ -581,48 +581,48 @@ export const Event = {
type: "message.updated",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
info: Info.zod,
schema: Schema.Struct({
sessionID: SessionID,
info: _Info,
}),
}),
Removed: SyncEvent.define({
type: "message.removed",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
messageID: MessageID.zod,
schema: Schema.Struct({
sessionID: SessionID,
messageID: MessageID,
}),
}),
PartUpdated: SyncEvent.define({
type: "message.part.updated",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
part: Part.zod,
time: z.number(),
schema: Schema.Struct({
sessionID: SessionID,
part: _Part,
time: Schema.Number,
}),
}),
PartDelta: BusEvent.define(
"message.part.delta",
z.object({
sessionID: SessionID.zod,
messageID: MessageID.zod,
partID: PartID.zod,
field: z.string(),
delta: z.string(),
Schema.Struct({
sessionID: SessionID,
messageID: MessageID,
partID: PartID,
field: Schema.String,
delta: Schema.String,
}),
),
PartRemoved: SyncEvent.define({
type: "message.part.removed",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
messageID: MessageID.zod,
partID: PartID.zod,
schema: Schema.Struct({
sessionID: SessionID,
messageID: MessageID,
partID: PartID,
}),
}),
}

View file

@ -1,5 +1,6 @@
import z from "zod"
import type { ZodObject } from "zod"
import { Schema, Types } from "effect"
import { Database, eq } from "@/storage"
import { GlobalBus } from "@/bus/global"
import { Bus as ProjectBus } from "@/bus"
@ -9,6 +10,7 @@ import { EventSequenceTable, EventTable } from "./event.sql"
import { WorkspaceContext } from "@/control-plane/workspace-context"
import { EventID } from "./schema"
import { Flag } from "@/flag/flag"
import { zod } from "@/util/effect-zod"
export type Definition = {
type: string
@ -69,31 +71,58 @@ export function versionedType(type: string, version?: number) {
return version ? `${type}.${version}` : type
}
type SchemaLike<Agg extends string> =
| ZodObject<Record<Agg, z.ZodType<string>>>
| Schema.Struct<Record<Agg, Schema.Top>>
type BusSchemaLike = ZodObject | Schema.Struct<Schema.Struct.Fields>
type Mutable<T> = Types.DeepMutable<T>
type ToZodObject<S> = S extends Schema.Top
? z.ZodObject<{ [K in keyof Mutable<Schema.Schema.Type<S>>]: z.ZodType<Mutable<Schema.Schema.Type<S>>[K]> }>
: S
/**
* Define a sync event. Accepts either a Zod schema or an Effect Schema for
* both `schema` and `busSchema`. Effect Schemas are converted to Zod via the
* `effect-zod` walker since the sync pipeline uses Zod for validation and
* JSON Schema generation.
*/
export function define<
Type extends string,
Agg extends string,
Schema extends ZodObject<Record<Agg, z.ZodType<string>>>,
BusSchema extends ZodObject = Schema,
>(input: { type: Type; version: number; aggregate: Agg; schema: Schema; busSchema?: BusSchema }) {
S extends SchemaLike<Agg>,
B extends BusSchemaLike = S,
>(input: { type: Type; version: number; aggregate: Agg; schema: S; busSchema?: B }) {
if (frozen) {
throw new Error("Error defining sync event: sync system has been frozen")
}
const schema = toZodObject(input.schema) as ToZodObject<S>
const properties = (input.busSchema ? toZodObject(input.busSchema) : schema) as ToZodObject<B>
const def = {
type: input.type,
version: input.version,
aggregate: input.aggregate,
schema: input.schema,
properties: input.busSchema ? input.busSchema : input.schema,
schema,
properties,
}
versions.set(def.type, Math.max(def.version, versions.get(def.type) || 0))
registry.set(versionedType(def.type, def.version), def)
registry.set(versionedType(def.type, def.version), def as unknown as Definition)
return def
}
function toZodObject(value: ZodObject | Schema.Top): z.ZodObject {
if (typeof value === "object" && value !== null && "ast" in value) {
return zod(value as Schema.Top) as unknown as z.ZodObject
}
return value as z.ZodObject
}
export function project<Def extends Definition>(
def: Def,
func: (db: Database.TxOrDb, data: Event<Def>["data"]) => void,