diff --git a/packages/opencode/src/bus/bus-event.ts b/packages/opencode/src/bus/bus-event.ts index efaed94406..bb9b3f497f 100644 --- a/packages/opencode/src/bus/bus-event.ts +++ b/packages/opencode/src/bus/bus-event.ts @@ -1,19 +1,38 @@ import z from "zod" import type { ZodType } from "zod" +import { Schema, Types } from "effect" +import { zod } from "@/util/effect-zod" export type Definition = ReturnType const registry = new Map() -export function define(type: Type, properties: Properties) { - const result = { - type, - properties, - } - registry.set(type, result) +/** + * Define a bus event type with a payload schema. + * + * Accepts either a Zod schema or an Effect Schema. Effect Schemas are + * converted to Zod internally via the effect-zod walker so that the bus + * continues to use Zod as the lingua franca for serialization/validation. + */ +export function define( + type: Type, + properties: P, +): { type: Type; properties: z.ZodType>> } +export function define( + type: Type, + properties: P, +): { type: Type; properties: P } +export function define(type: string, properties: unknown) { + const zodProperties = isEffectSchema(properties) ? zod(properties) : (properties as ZodType) + const result = { type, properties: zodProperties } + registry.set(type, result as Definition) return result } +function isEffectSchema(value: unknown): value is Schema.Top { + return typeof value === "object" && value !== null && "ast" in value +} + export function payloads() { return registry .entries() diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index 980dd4da84..7664b09ada 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -581,48 +581,48 @@ export const Event = { type: "message.updated", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - info: Info.zod, + schema: Schema.Struct({ + sessionID: SessionID, + info: _Info, }), }), Removed: SyncEvent.define({ type: "message.removed", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - messageID: MessageID.zod, + schema: Schema.Struct({ + sessionID: SessionID, + messageID: MessageID, }), }), PartUpdated: SyncEvent.define({ type: "message.part.updated", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - part: Part.zod, - time: z.number(), + schema: Schema.Struct({ + sessionID: SessionID, + part: _Part, + time: Schema.Number, }), }), PartDelta: BusEvent.define( "message.part.delta", - z.object({ - sessionID: SessionID.zod, - messageID: MessageID.zod, - partID: PartID.zod, - field: z.string(), - delta: z.string(), + Schema.Struct({ + sessionID: SessionID, + messageID: MessageID, + partID: PartID, + field: Schema.String, + delta: Schema.String, }), ), PartRemoved: SyncEvent.define({ type: "message.part.removed", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - messageID: MessageID.zod, - partID: PartID.zod, + schema: Schema.Struct({ + sessionID: SessionID, + messageID: MessageID, + partID: PartID, }), }), } diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts index 125d8c9550..6fdad16621 100644 --- a/packages/opencode/src/sync/index.ts +++ b/packages/opencode/src/sync/index.ts @@ -1,5 +1,6 @@ import z from "zod" import type { ZodObject } from "zod" +import { Schema, Types } from "effect" import { Database, eq } from "@/storage" import { GlobalBus } from "@/bus/global" import { Bus as ProjectBus } from "@/bus" @@ -9,6 +10,7 @@ import { EventSequenceTable, EventTable } from "./event.sql" import { WorkspaceContext } from "@/control-plane/workspace-context" import { EventID } from "./schema" import { Flag } from "@/flag/flag" +import { zod } from "@/util/effect-zod" export type Definition = { type: string @@ -69,31 +71,58 @@ export function versionedType(type: string, version?: number) { return version ? `${type}.${version}` : type } +type SchemaLike = + | ZodObject>> + | Schema.Struct> + +type BusSchemaLike = ZodObject | Schema.Struct + +type Mutable = Types.DeepMutable +type ToZodObject = S extends Schema.Top + ? z.ZodObject<{ [K in keyof Mutable>]: z.ZodType>[K]> }> + : S + +/** + * Define a sync event. Accepts either a Zod schema or an Effect Schema for + * both `schema` and `busSchema`. Effect Schemas are converted to Zod via the + * `effect-zod` walker since the sync pipeline uses Zod for validation and + * JSON Schema generation. + */ export function define< Type extends string, Agg extends string, - Schema extends ZodObject>>, - BusSchema extends ZodObject = Schema, ->(input: { type: Type; version: number; aggregate: Agg; schema: Schema; busSchema?: BusSchema }) { + S extends SchemaLike, + B extends BusSchemaLike = S, +>(input: { type: Type; version: number; aggregate: Agg; schema: S; busSchema?: B }) { if (frozen) { throw new Error("Error defining sync event: sync system has been frozen") } + const schema = toZodObject(input.schema) as ToZodObject + const properties = (input.busSchema ? toZodObject(input.busSchema) : schema) as ToZodObject + const def = { type: input.type, version: input.version, aggregate: input.aggregate, - schema: input.schema, - properties: input.busSchema ? input.busSchema : input.schema, + schema, + properties, } versions.set(def.type, Math.max(def.version, versions.get(def.type) || 0)) - registry.set(versionedType(def.type, def.version), def) + registry.set(versionedType(def.type, def.version), def as unknown as Definition) return def } +function toZodObject(value: ZodObject | Schema.Top): z.ZodObject { + if (typeof value === "object" && value !== null && "ast" in value) { + return zod(value as Schema.Top) as unknown as z.ZodObject + } + return value as z.ZodObject +} + export function project( def: Def, func: (db: Database.TxOrDb, data: Event["data"]) => void,