From 3085279724c1a7d620b633ded1302d03f06de874 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Tue, 21 Apr 2026 19:06:00 -0400 Subject: [PATCH] refactor(core): allow SyncEvent.define and BusEvent.define to accept Effect Schema Overloads BusEvent.define and SyncEvent.define so payload schemas can be passed as Effect Schema values directly. Effect Schemas are converted to Zod via the effect-zod walker since the sync/bus pipelines still use Zod internally. Migrates MessageV2.Event.* to use Schema.Struct directly instead of z.object with .zod references. --- packages/opencode/src/bus/bus-event.ts | 31 +++++++++++++--- packages/opencode/src/session/message-v2.ts | 40 ++++++++++---------- packages/opencode/src/sync/index.ts | 41 ++++++++++++++++++--- 3 files changed, 80 insertions(+), 32 deletions(-) diff --git a/packages/opencode/src/bus/bus-event.ts b/packages/opencode/src/bus/bus-event.ts index efaed94406..bb9b3f497f 100644 --- a/packages/opencode/src/bus/bus-event.ts +++ b/packages/opencode/src/bus/bus-event.ts @@ -1,19 +1,38 @@ import z from "zod" import type { ZodType } from "zod" +import { Schema, Types } from "effect" +import { zod } from "@/util/effect-zod" export type Definition = ReturnType const registry = new Map() -export function define(type: Type, properties: Properties) { - const result = { - type, - properties, - } - registry.set(type, result) +/** + * Define a bus event type with a payload schema. + * + * Accepts either a Zod schema or an Effect Schema. Effect Schemas are + * converted to Zod internally via the effect-zod walker so that the bus + * continues to use Zod as the lingua franca for serialization/validation. + */ +export function define( + type: Type, + properties: P, +): { type: Type; properties: z.ZodType>> } +export function define( + type: Type, + properties: P, +): { type: Type; properties: P } +export function define(type: string, properties: unknown) { + const zodProperties = isEffectSchema(properties) ? zod(properties) : (properties as ZodType) + const result = { type, properties: zodProperties } + registry.set(type, result as Definition) return result } +function isEffectSchema(value: unknown): value is Schema.Top { + return typeof value === "object" && value !== null && "ast" in value +} + export function payloads() { return registry .entries() diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index 980dd4da84..7664b09ada 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -581,48 +581,48 @@ export const Event = { type: "message.updated", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - info: Info.zod, + schema: Schema.Struct({ + sessionID: SessionID, + info: _Info, }), }), Removed: SyncEvent.define({ type: "message.removed", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - messageID: MessageID.zod, + schema: Schema.Struct({ + sessionID: SessionID, + messageID: MessageID, }), }), PartUpdated: SyncEvent.define({ type: "message.part.updated", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - part: Part.zod, - time: z.number(), + schema: Schema.Struct({ + sessionID: SessionID, + part: _Part, + time: Schema.Number, }), }), PartDelta: BusEvent.define( "message.part.delta", - z.object({ - sessionID: SessionID.zod, - messageID: MessageID.zod, - partID: PartID.zod, - field: z.string(), - delta: z.string(), + Schema.Struct({ + sessionID: SessionID, + messageID: MessageID, + partID: PartID, + field: Schema.String, + delta: Schema.String, }), ), PartRemoved: SyncEvent.define({ type: "message.part.removed", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - messageID: MessageID.zod, - partID: PartID.zod, + schema: Schema.Struct({ + sessionID: SessionID, + messageID: MessageID, + partID: PartID, }), }), } diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts index 125d8c9550..6fdad16621 100644 --- a/packages/opencode/src/sync/index.ts +++ b/packages/opencode/src/sync/index.ts @@ -1,5 +1,6 @@ import z from "zod" import type { ZodObject } from "zod" +import { Schema, Types } from "effect" import { Database, eq } from "@/storage" import { GlobalBus } from "@/bus/global" import { Bus as ProjectBus } from "@/bus" @@ -9,6 +10,7 @@ import { EventSequenceTable, EventTable } from "./event.sql" import { WorkspaceContext } from "@/control-plane/workspace-context" import { EventID } from "./schema" import { Flag } from "@/flag/flag" +import { zod } from "@/util/effect-zod" export type Definition = { type: string @@ -69,31 +71,58 @@ export function versionedType(type: string, version?: number) { return version ? `${type}.${version}` : type } +type SchemaLike = + | ZodObject>> + | Schema.Struct> + +type BusSchemaLike = ZodObject | Schema.Struct + +type Mutable = Types.DeepMutable +type ToZodObject = S extends Schema.Top + ? z.ZodObject<{ [K in keyof Mutable>]: z.ZodType>[K]> }> + : S + +/** + * Define a sync event. Accepts either a Zod schema or an Effect Schema for + * both `schema` and `busSchema`. Effect Schemas are converted to Zod via the + * `effect-zod` walker since the sync pipeline uses Zod for validation and + * JSON Schema generation. + */ export function define< Type extends string, Agg extends string, - Schema extends ZodObject>>, - BusSchema extends ZodObject = Schema, ->(input: { type: Type; version: number; aggregate: Agg; schema: Schema; busSchema?: BusSchema }) { + S extends SchemaLike, + B extends BusSchemaLike = S, +>(input: { type: Type; version: number; aggregate: Agg; schema: S; busSchema?: B }) { if (frozen) { throw new Error("Error defining sync event: sync system has been frozen") } + const schema = toZodObject(input.schema) as ToZodObject + const properties = (input.busSchema ? toZodObject(input.busSchema) : schema) as ToZodObject + const def = { type: input.type, version: input.version, aggregate: input.aggregate, - schema: input.schema, - properties: input.busSchema ? input.busSchema : input.schema, + schema, + properties, } versions.set(def.type, Math.max(def.version, versions.get(def.type) || 0)) - registry.set(versionedType(def.type, def.version), def) + registry.set(versionedType(def.type, def.version), def as unknown as Definition) return def } +function toZodObject(value: ZodObject | Schema.Top): z.ZodObject { + if (typeof value === "object" && value !== null && "ast" in value) { + return zod(value as Schema.Top) as unknown as z.ZodObject + } + return value as z.ZodObject +} + export function project( def: Def, func: (db: Database.TxOrDb, data: Event["data"]) => void,