From c50d65b4d6b956cad44663d9d31b7b5eb01c8e57 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Thu, 23 Apr 2026 12:43:08 -0400 Subject: [PATCH] refactor(sync): make session events schema-first (#24019) --- packages/opencode/src/bus/index.ts | 24 ++++++-- packages/opencode/src/server/projectors.ts | 3 +- packages/opencode/src/session/message-v2.ts | 44 ++++++++------ packages/opencode/src/session/projectors.ts | 2 +- packages/opencode/src/session/session.ts | 65 ++++++++++++++------- packages/opencode/src/share/share-next.ts | 2 +- packages/opencode/src/sync/index.ts | 44 ++++++++++---- packages/opencode/test/sync/index.test.ts | 6 +- packages/sdk/js/src/v2/gen/types.gen.ts | 32 +++++----- 9 files changed, 141 insertions(+), 81 deletions(-) diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index 8a9579b599..a2f9f5ccba 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -1,5 +1,5 @@ import z from "zod" -import { Effect, Exit, Layer, PubSub, Scope, Context, Stream } from "effect" +import { Effect, Exit, Layer, PubSub, Scope, Context, Stream, Schema as EffectSchema, Types } from "effect" import { EffectBridge } from "@/effect" import { Log } from "../util" import { BusEvent } from "./bus-event" @@ -9,6 +9,12 @@ import { makeRuntime } from "@/effect/run-service" const log = Log.create({ service: "bus" }) +type BusProperties = D extends { + effectProperties: infer Properties extends EffectSchema.Top +} + ? Types.DeepMutable> + : z.infer + export const InstanceDisposed = BusEvent.define( "server.instance.disposed", z.object({ @@ -18,7 +24,7 @@ export const InstanceDisposed = BusEvent.define( type Payload = { type: D["type"] - properties: z.infer + properties: BusProperties } type State = { @@ -29,7 +35,7 @@ type State = { export interface Interface { readonly publish: ( def: D, - properties: z.output, + properties: BusProperties, ) => Effect.Effect readonly subscribe: (def: D) => Stream.Stream> readonly subscribeAll: () => Stream.Stream @@ -79,7 +85,10 @@ export const layer = Layer.effect( }) } - function publish(def: D, properties: z.output) { + function publish( + def: D, + properties: BusProperties, + ) { return Effect.gen(function* () { const s = yield* InstanceState.get(state) const payload: Payload = { type: def.type, properties } @@ -175,13 +184,16 @@ const { runPromise, runSync } = makeRuntime(Service, layer) // runSync is safe here because the subscribe chain (InstanceState.get, PubSub.subscribe, // Scope.make, Effect.forkScoped) is entirely synchronous. If any step becomes async, this will throw. -export async function publish(def: D, properties: z.output) { +export async function publish( + def: D, + properties: BusProperties, +) { return runPromise((svc) => svc.publish(def, properties)) } export function subscribe( def: D, - callback: (event: { type: D["type"]; properties: z.infer }) => unknown, + callback: (event: Payload) => unknown, ) { return runSync((svc) => svc.subscribeCallback(def, callback)) } diff --git a/packages/opencode/src/server/projectors.ts b/packages/opencode/src/server/projectors.ts index cfecce5265..18c273d587 100644 --- a/packages/opencode/src/server/projectors.ts +++ b/packages/opencode/src/server/projectors.ts @@ -1,4 +1,3 @@ -import z from "zod" import sessionProjectors from "../session/projectors" import { SyncEvent } from "@/sync" import { Session } from "@/session" @@ -10,7 +9,7 @@ export function initProjectors() { projectors: sessionProjectors, convertEvent: (type, data) => { if (type === "session.updated") { - const id = (data as z.infer).sessionID + const id = (data as SyncEvent.Event["data"]).sessionID const row = Database.use((db) => db.select().from(SessionTable).where(eq(SessionTable.id, id)).get()) if (!row) return data diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index 980dd4da84..a4b25d95ea 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -576,34 +576,46 @@ export const Info = Object.assign(_Info, { }) export type Info = User | Assistant +const UpdatedEventSchema = Schema.Struct({ + sessionID: SessionID, + info: _Info, +}) + +const RemovedEventSchema = Schema.Struct({ + sessionID: SessionID, + messageID: MessageID, +}) + +const PartUpdatedEventSchema = Schema.Struct({ + sessionID: SessionID, + part: _Part, + time: Schema.Number, +}) + +const PartRemovedEventSchema = Schema.Struct({ + sessionID: SessionID, + messageID: MessageID, + partID: PartID, +}) + export const Event = { Updated: SyncEvent.define({ type: "message.updated", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - info: Info.zod, - }), + schema: UpdatedEventSchema, }), Removed: SyncEvent.define({ type: "message.removed", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - messageID: MessageID.zod, - }), + schema: RemovedEventSchema, }), PartUpdated: SyncEvent.define({ type: "message.part.updated", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - part: Part.zod, - time: z.number(), - }), + schema: PartUpdatedEventSchema, }), PartDelta: BusEvent.define( "message.part.delta", @@ -619,11 +631,7 @@ export const Event = { type: "message.part.removed", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - messageID: MessageID.zod, - partID: PartID.zod, - }), + schema: PartRemovedEventSchema, }), } diff --git a/packages/opencode/src/session/projectors.ts b/packages/opencode/src/session/projectors.ts index 7b3b0a7f21..3a5fd0d8c9 100644 --- a/packages/opencode/src/session/projectors.ts +++ b/packages/opencode/src/session/projectors.ts @@ -71,7 +71,7 @@ export default [ const info = data.info const row = db .update(SessionTable) - .set(toPartialRow(info)) + .set(toPartialRow(info as Session.Patch)) .where(eq(SessionTable.id, data.sessionID)) .returning() .get() diff --git a/packages/opencode/src/session/session.ts b/packages/opencode/src/session/session.ts index d2bdbccb7d..1e046fdf79 100644 --- a/packages/opencode/src/session/session.ts +++ b/packages/opencode/src/session/session.ts @@ -15,7 +15,6 @@ import { PartTable, SessionTable } from "./session.sql" import { ProjectTable } from "../project/project.sql" import { Storage } from "@/storage" import { Log } from "../util" -import { updateSchema } from "../util/update-schema" import { MessageV2 } from "./message-v2" import { Instance } from "../project/instance" import { InstanceState } from "@/effect" @@ -28,7 +27,7 @@ import type { Provider } from "@/provider" import { Permission } from "@/permission" import { Global } from "@/global" import { Effect, Layer, Option, Context, Schema, Types } from "effect" -import { zod, zodObject } from "@/util/effect-zod" +import { zod } from "@/util/effect-zod" import { withStatics } from "@/util/schema" const log = Log.create({ service: "session" }) @@ -215,40 +214,62 @@ export const MessagesInput = Schema.Struct({ limit: Schema.optional(Schema.Number), }).pipe(withStatics((s) => ({ zod: zod(s) }))) +const CreatedEventSchema = Schema.Struct({ + sessionID: SessionID, + info: Info, +}) + +const UpdatedShare = Schema.Struct({ + url: Schema.optional(Schema.NullOr(Schema.String)), +}) + +const UpdatedTime = Schema.Struct({ + created: Schema.optional(Schema.NullOr(Schema.Number)), + updated: Schema.optional(Schema.NullOr(Schema.Number)), + compacting: Schema.optional(Schema.NullOr(Schema.Number)), + archived: Schema.optional(Schema.NullOr(Schema.Number)), +}) + +const UpdatedInfo = Schema.Struct({ + id: Schema.optional(Schema.NullOr(SessionID)), + slug: Schema.optional(Schema.NullOr(Schema.String)), + projectID: Schema.optional(Schema.NullOr(ProjectID)), + workspaceID: Schema.optional(Schema.NullOr(WorkspaceID)), + directory: Schema.optional(Schema.NullOr(Schema.String)), + parentID: Schema.optional(Schema.NullOr(SessionID)), + summary: Schema.optional(Schema.NullOr(Summary)), + share: Schema.optional(UpdatedShare), + title: Schema.optional(Schema.NullOr(Schema.String)), + version: Schema.optional(Schema.NullOr(Schema.String)), + time: Schema.optional(UpdatedTime), + permission: Schema.optional(Schema.NullOr(Permission.Ruleset)), + revert: Schema.optional(Schema.NullOr(Revert)), +}) + +const UpdatedEventSchema = Schema.Struct({ + sessionID: SessionID, + info: UpdatedInfo, +}) + export const Event = { Created: SyncEvent.define({ type: "session.created", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - info: Info.zod, - }), + schema: CreatedEventSchema, }), Updated: SyncEvent.define({ type: "session.updated", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - info: updateSchema(zodObject(Info)).extend({ - share: updateSchema(zodObject(Share)).optional(), - time: updateSchema(zodObject(Time)).optional(), - }), - }), - busSchema: z.object({ - sessionID: SessionID.zod, - info: Info.zod, - }), + schema: UpdatedEventSchema, + busSchema: CreatedEventSchema, }), Deleted: SyncEvent.define({ type: "session.deleted", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - info: Info.zod, - }), + schema: CreatedEventSchema, }), Diff: BusEvent.define( "session.diff", @@ -394,7 +415,7 @@ export interface Interface { export class Service extends Context.Service()("@opencode/Session") {} -type Patch = z.infer["info"] +export type Patch = Types.DeepMutable["data"]["info"]> const db = (fn: (d: Parameters[0] extends (trx: infer D) => any ? D : never) => T) => Effect.sync(() => Database.use(fn)) diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index 2622f4f7f0..f26a085c22 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -181,7 +181,7 @@ export const layer = Layer.effect( yield* watch(Session.Event.Updated, (evt) => Effect.gen(function* () { - const info = yield* session.get(evt.properties.sessionID) + const info = evt.properties.info yield* sync(info.id, [{ type: "session", data: info }]) }), ) diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts index 125d8c9550..5a4078402d 100644 --- a/packages/opencode/src/sync/index.ts +++ b/packages/opencode/src/sync/index.ts @@ -1,5 +1,4 @@ import z from "zod" -import type { ZodObject } from "zod" import { Database, eq } from "@/storage" import { GlobalBus } from "@/bus/global" import { Bus as ProjectBus } from "@/bus" @@ -9,11 +8,16 @@ import { EventSequenceTable, EventTable } from "./event.sql" import { WorkspaceContext } from "@/control-plane/workspace-context" import { EventID } from "./schema" import { Flag } from "@/flag/flag" +import { Schema as EffectSchema, Types } from "effect" +import { zodObject } from "@/util/effect-zod" +import { isRecord } from "@/util/record" -export type Definition = { +export type Definition = { type: string version: number aggregate: string + effectSchema: Schema + effectProperties: BusSchema schema: z.ZodObject // This is temporary and only exists for compatibility with bus @@ -25,9 +29,13 @@ export type Event = { id: string seq: number aggregateID: string - data: z.infer + data: Types.DeepMutable> } +export type Properties = Types.DeepMutable< + EffectSchema.Schema.Type +> + export type SerializedEvent = Event & { type: string } type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void @@ -36,7 +44,12 @@ export const registry = new Map() let projectors: Map | undefined const versions = new Map() let frozen = false -let convertEvent: (type: string, event: Event["data"]) => Promise> | Record +let convertEvent: (type: string, event: Event["data"]) => Promise | unknown + +function asRecord(input: unknown) { + if (isRecord(input)) return input + throw new Error(`SyncEvent.convertEvent must return an object, got: ${JSON.stringify(input)}`) +} export function reset() { frozen = false @@ -54,7 +67,7 @@ export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; co for (let [type, version] of versions.entries()) { let def = registry.get(versionedType(type, version))! - BusEvent.define(def.type, def.properties || def.schema) + BusEvent.define(def.type, def.properties) } // Freeze the system so it clearly errors if events are defined @@ -72,19 +85,26 @@ export function versionedType(type: string, version?: number) { export function define< Type extends string, Agg extends string, - Schema extends ZodObject>>, - BusSchema extends ZodObject = Schema, ->(input: { type: Type; version: number; aggregate: Agg; schema: Schema; busSchema?: BusSchema }) { + Schema extends EffectSchema.Top, + BusSchema extends EffectSchema.Top = Schema, +>(input: { type: Type; version: number; aggregate: Agg; schema: Schema; busSchema?: BusSchema }): Definition< + Schema, + BusSchema +> { if (frozen) { throw new Error("Error defining sync event: sync system has been frozen") } + const effectProperties = (input.busSchema ?? input.schema) as BusSchema + const def = { type: input.type, version: input.version, aggregate: input.aggregate, - schema: input.schema, - properties: input.busSchema ? input.busSchema : input.schema, + effectSchema: input.schema, + effectProperties, + schema: zodObject(input.schema), + properties: zodObject(effectProperties), } versions.set(def.type, Math.max(def.version, versions.get(def.type) || 0)) @@ -143,10 +163,10 @@ function process(def: Def, event: Event, options: { const result = convertEvent(def.type, event.data) if (result instanceof Promise) { void result.then((data) => { - void ProjectBus.publish({ type: def.type, properties: def.schema }, data) + void ProjectBus.publish({ type: def.type, properties: def.properties }, asRecord(data)) }) } else { - void ProjectBus.publish({ type: def.type, properties: def.schema }, result) + void ProjectBus.publish({ type: def.type, properties: def.properties }, asRecord(result)) } GlobalBus.emit("event", { diff --git a/packages/opencode/test/sync/index.test.ts b/packages/opencode/test/sync/index.test.ts index 866bcaa31a..d50f0d7c94 100644 --- a/packages/opencode/test/sync/index.test.ts +++ b/packages/opencode/test/sync/index.test.ts @@ -1,6 +1,6 @@ import { describe, test, expect, beforeEach, afterEach, afterAll } from "bun:test" import { tmpdir } from "../fixture/fixture" -import z from "zod" +import { Schema } from "effect" import { Bus } from "../../src/bus" import { Instance } from "../../src/project/instance" import { SyncEvent } from "../../src/sync" @@ -43,13 +43,13 @@ describe("SyncEvent", () => { type: "item.created", version: 1, aggregate: "id", - schema: z.object({ id: z.string(), name: z.string() }), + schema: Schema.Struct({ id: Schema.String, name: Schema.String }), }) const Sent = SyncEvent.define({ type: "item.sent", version: 1, aggregate: "item_id", - schema: z.object({ item_id: z.string(), to: z.string() }), + schema: Schema.Struct({ item_id: Schema.String, to: Schema.String }), }) SyncEvent.init({ diff --git a/packages/sdk/js/src/v2/gen/types.gen.ts b/packages/sdk/js/src/v2/gen/types.gen.ts index 1fcab2eda6..d28ce25794 100644 --- a/packages/sdk/js/src/v2/gen/types.gen.ts +++ b/packages/sdk/js/src/v2/gen/types.gen.ts @@ -1058,31 +1058,31 @@ export type SyncEventSessionUpdated = { data: { sessionID: string info: { - id: string | null - slug: string | null - projectID: string | null - workspaceID: string | null - directory: string | null - parentID: string | null - summary: { + id?: string | null + slug?: string | null + projectID?: string | null + workspaceID?: string | null + directory?: string | null + parentID?: string | null + summary?: { additions: number deletions: number files: number diffs?: Array } | null share?: { - url: string | null + url?: string | null } - title: string | null - version: string | null + title?: string | null + version?: string | null time?: { - created: number | null - updated: number | null - compacting: number | null - archived: number | null + created?: number | null + updated?: number | null + compacting?: number | null + archived?: number | null } - permission: PermissionRuleset | null - revert: { + permission?: PermissionRuleset | null + revert?: { messageID: string partID?: string snapshot?: string