refactor(bus): migrate BusEvent to Effect Schema (#24040)

This commit is contained in:
Kit Langton 2026-04-23 15:37:44 -04:00 committed by GitHub
parent 0590452456
commit cd93533b1f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
37 changed files with 281 additions and 260 deletions

View file

@ -1,15 +1,19 @@
import z from "zod"
import type { ZodType } from "zod"
import { Schema } from "effect"
import { zodObject } from "@/util/effect-zod"
export type Definition = ReturnType<typeof define>
export type Definition<Type extends string = string, Properties extends Schema.Top = Schema.Top> = {
type: Type
properties: Properties
}
const registry = new Map<string, Definition>()
export function define<Type extends string, Properties extends ZodType>(type: Type, properties: Properties) {
const result = {
type,
properties,
}
export function define<Type extends string, Properties extends Schema.Top>(
type: Type,
properties: Properties,
): Definition<Type, Properties> {
const result = { type, properties }
registry.set(type, result)
return result
}
@ -21,7 +25,7 @@ export function payloads() {
return z
.object({
type: z.literal(type),
properties: def.properties,
properties: zodObject(def.properties),
})
.meta({
ref: `Event.${def.type}`,

View file

@ -1,5 +1,4 @@
import z from "zod"
import { Effect, Exit, Layer, PubSub, Scope, Context, Stream, Schema as EffectSchema, Types } from "effect"
import { Effect, Exit, Layer, PubSub, Scope, Context, Stream, Schema } from "effect"
import { EffectBridge } from "@/effect"
import { Log } from "../util"
import { BusEvent } from "./bus-event"
@ -9,16 +8,12 @@ import { makeRuntime } from "@/effect/run-service"
const log = Log.create({ service: "bus" })
type BusProperties<D extends BusEvent.Definition = BusEvent.Definition> = D extends {
effectProperties: infer Properties extends EffectSchema.Top
}
? Types.DeepMutable<EffectSchema.Schema.Type<Properties>>
: z.infer<D["properties"]>
type BusProperties<D extends BusEvent.Definition<string, Schema.Top>> = Schema.Schema.Type<D["properties"]>
export const InstanceDisposed = BusEvent.define(
"server.instance.disposed",
z.object({
directory: z.string(),
Schema.Struct({
directory: Schema.String,
}),
)

View file

@ -1,14 +1,14 @@
import { BusEvent } from "@/bus/bus-event"
import { SessionID } from "@/session/schema"
import z from "zod"
import { Schema } from "effect"
export const TuiEvent = {
PromptAppend: BusEvent.define("tui.prompt.append", z.object({ text: z.string() })),
PromptAppend: BusEvent.define("tui.prompt.append", Schema.Struct({ text: Schema.String })),
CommandExecute: BusEvent.define(
"tui.command.execute",
z.object({
command: z.union([
z.enum([
Schema.Struct({
command: Schema.Union([
Schema.Literals([
"session.list",
"session.new",
"session.share",
@ -26,23 +26,23 @@ export const TuiEvent = {
"prompt.submit",
"agent.cycle",
]),
z.string(),
Schema.String,
]),
}),
),
ToastShow: BusEvent.define(
"tui.toast.show",
z.object({
title: z.string().optional(),
message: z.string(),
variant: z.enum(["info", "success", "warning", "error"]),
duration: z.number().default(5000).optional().describe("Duration in milliseconds"),
Schema.Struct({
title: Schema.optional(Schema.String),
message: Schema.String,
variant: Schema.Literals(["info", "success", "warning", "error"]),
duration: Schema.optional(Schema.Number).annotate({ description: "Duration in milliseconds" }),
}),
),
SessionSelect: BusEvent.define(
"tui.session.select",
z.object({
sessionID: SessionID.zod.describe("Session ID to navigate to"),
Schema.Struct({
sessionID: SessionID.annotate({ description: "Session ID to navigate to" }),
}),
),
}

View file

@ -4,10 +4,10 @@ import { useTheme } from "@tui/context/theme"
import { useTerminalDimensions } from "@opentui/solid"
import { SplitBorder } from "../component/border"
import { TextAttributes } from "@opentui/core"
import z from "zod"
import { Schema } from "effect"
import { type TuiEvent } from "../event"
export type ToastOptions = z.infer<typeof TuiEvent.ToastShow.properties>
export type ToastOptions = Schema.Schema.Type<typeof TuiEvent.ToastShow.properties>
export function Toast() {
const toast = useToast()

View file

@ -3,7 +3,7 @@ import { InstanceState } from "@/effect"
import { EffectBridge } from "@/effect"
import type { InstanceContext } from "@/project/instance"
import { SessionID, MessageID } from "@/session/schema"
import { Effect, Layer, Context } from "effect"
import { Effect, Layer, Context, Schema } from "effect"
import z from "zod"
import { Config } from "../config"
import { MCP } from "../mcp"
@ -18,11 +18,11 @@ type State = {
export const Event = {
Executed: BusEvent.define(
"command.executed",
z.object({
name: z.string(),
sessionID: SessionID.zod,
arguments: z.string(),
messageID: MessageID.zod,
Schema.Struct({
name: Schema.String,
sessionID: SessionID,
arguments: Schema.String,
messageID: MessageID,
}),
),
}

View file

@ -25,7 +25,7 @@ import { Context, Duration, Effect, Exit, Fiber, Layer, Option, Schema } from "e
import { EffectFlock } from "@opencode-ai/shared/util/effect-flock"
import { InstanceRef } from "@/effect/instance-ref"
import { zod, ZodOverride } from "@/util/effect-zod"
import { NonNegativeInt, PositiveInt, withStatics } from "@/util/schema"
import { NonNegativeInt, PositiveInt, withStatics, type DeepMutable } from "@/util/schema"
import { ConfigAgent } from "./agent"
import { ConfigCommand } from "./command"
import { ConfigFormatter } from "./formatter"
@ -249,26 +249,9 @@ export const Info = Schema.Struct({
})),
)
// Schema.Struct produces readonly types by default, but the service code
// below mutates Info objects directly (e.g. `config.mode = ...`). Strip the
// readonly recursively so callers get the same mutable shape zod inferred.
//
// `Types.DeepMutable` from effect-smol would be a drop-in, but its fallback
// branch `{ -readonly [K in keyof T]: ... }` collapses `unknown` to `{}`
// (since `keyof unknown = never`), which widens `Record<string, unknown>`
// fields like `ConfigPlugin.Options`. The local version gates on
// `extends object` so `unknown` passes through.
//
// Tuple branch preserves `ConfigPlugin.Spec`'s `readonly [string, Options]`
// shape (otherwise the general array branch widens it to an array).
type DeepMutable<T> = T extends readonly [unknown, ...unknown[]]
? { -readonly [K in keyof T]: DeepMutable<T[K]> }
: T extends readonly (infer U)[]
? DeepMutable<U>[]
: T extends object
? { -readonly [K in keyof T]: DeepMutable<T[K]> }
: T
// Uses the shared `DeepMutable` from `@/util/schema`. See the definition
// there for why the local variant is needed over `Types.DeepMutable` from
// effect-smol (the upstream version collapses `unknown` to `{}`).
export type Info = DeepMutable<Schema.Schema.Type<typeof Info>> & {
// plugin_origins is derived state, not a persisted config field. It keeps each winning plugin spec together
// with the file and scope it came from so later runtime code can make location-sensitive decisions.

View file

@ -1,4 +1,5 @@
import z from "zod"
import { Schema } from "effect"
import { setTimeout as sleep } from "node:timers/promises"
import { fn } from "@/util/fn"
import { Database, asc, eq, inArray } from "@/storage"
@ -25,36 +26,37 @@ import { errorData } from "@/util/error"
import { AppRuntime } from "@/effect/app-runtime"
import { waitEvent } from "./util"
import { WorkspaceContext } from "./workspace-context"
import { NonNegativeInt } from "@/util/schema"
export const Info = WorkspaceInfo.meta({
ref: "Workspace",
})
export type Info = z.infer<typeof Info>
export const ConnectionStatus = z.object({
workspaceID: WorkspaceID.zod,
status: z.enum(["connected", "connecting", "disconnected", "error"]),
export const ConnectionStatus = Schema.Struct({
workspaceID: WorkspaceID,
status: Schema.Literals(["connected", "connecting", "disconnected", "error"]),
})
export type ConnectionStatus = z.infer<typeof ConnectionStatus>
export type ConnectionStatus = Schema.Schema.Type<typeof ConnectionStatus>
const Restore = z.object({
workspaceID: WorkspaceID.zod,
sessionID: SessionID.zod,
total: z.number().int().min(0),
step: z.number().int().min(0),
const Restore = Schema.Struct({
workspaceID: WorkspaceID,
sessionID: SessionID,
total: NonNegativeInt,
step: NonNegativeInt,
})
export const Event = {
Ready: BusEvent.define(
"workspace.ready",
z.object({
name: z.string(),
Schema.Struct({
name: Schema.String,
}),
),
Failed: BusEvent.define(
"workspace.failed",
z.object({
message: z.string(),
Schema.Struct({
message: Schema.String,
}),
),
Restore: BusEvent.define("workspace.restore", Restore),

View file

@ -3,7 +3,7 @@ import { InstanceState } from "@/effect"
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
import { Git } from "@/git"
import { Effect, Layer, Context, Scope } from "effect"
import { Effect, Layer, Context, Schema, Scope } from "effect"
import * as Stream from "effect/Stream"
import { formatPatch, structuredPatch } from "diff"
import fuzzysort from "fuzzysort"
@ -76,8 +76,8 @@ export type Content = z.infer<typeof Content>
export const Event = {
Edited: BusEvent.define(
"file.edited",
z.object({
file: z.string(),
Schema.Struct({
file: Schema.String,
}),
),
}

View file

@ -1,4 +1,4 @@
import { Cause, Effect, Layer, Context } from "effect"
import { Cause, Effect, Layer, Context, Schema } from "effect"
// @ts-ignore
import { createWrapper } from "@parcel/watcher/wrapper"
import type ParcelWatcher from "@parcel/watcher"
@ -25,9 +25,9 @@ const SUBSCRIBE_TIMEOUT_MS = 10_000
export const Event = {
Updated: BusEvent.define(
"file.watcher.updated",
z.object({
file: z.string(),
event: z.union([z.literal("add"), z.literal("change"), z.literal("unlink")]),
Schema.Struct({
file: Schema.String,
event: Schema.Literals(["add", "change", "unlink"]),
}),
),
}

View file

@ -1,5 +1,6 @@
import { BusEvent } from "@/bus/bus-event"
import z from "zod"
import { Schema } from "effect"
import { NamedError } from "@opencode-ai/shared/util/error"
import { Log } from "../util"
import { Process } from "@/util"
@ -17,8 +18,8 @@ const log = Log.create({ service: "ide" })
export const Event = {
Installed: BusEvent.define(
"ide.installed",
z.object({
ide: z.string(),
Schema.Struct({
ide: Schema.String,
}),
),
}

View file

@ -21,14 +21,14 @@ export type ReleaseType = "patch" | "minor" | "major"
export const Event = {
Updated: BusEvent.define(
"installation.updated",
z.object({
version: z.string(),
Schema.Struct({
version: Schema.String,
}),
),
UpdateAvailable: BusEvent.define(
"installation.update-available",
z.object({
version: z.string(),
Schema.Struct({
version: Schema.String,
}),
),
}

View file

@ -8,6 +8,7 @@ import { Log } from "../util"
import { Process } from "../util"
import { LANGUAGE_EXTENSIONS } from "./language"
import z from "zod"
import { Schema } from "effect"
import type * as LSPServer from "./server"
import { NamedError } from "@opencode-ai/shared/util/error"
import { withTimeout } from "../util/timeout"
@ -41,9 +42,9 @@ export const InitializeError = NamedError.create(
export const Event = {
Diagnostics: BusEvent.define(
"lsp.client.diagnostics",
z.object({
serverID: z.string(),
path: z.string(),
Schema.Struct({
serverID: Schema.String,
path: Schema.String,
}),
),
}

View file

@ -19,7 +19,7 @@ import { zod, ZodOverride } from "@/util/effect-zod"
const log = Log.create({ service: "lsp" })
export const Event = {
Updated: BusEvent.define("lsp.updated", z.object({})),
Updated: BusEvent.define("lsp.updated", Schema.Struct({})),
}
const Position = Schema.Struct({

View file

@ -25,7 +25,7 @@ import { BusEvent } from "../bus/bus-event"
import { Bus } from "@/bus"
import { TuiEvent } from "@/cli/cmd/tui/event"
import open from "open"
import { Effect, Exit, Layer, Option, Context, Stream } from "effect"
import { Effect, Exit, Layer, Option, Context, Schema, Stream } from "effect"
import { EffectBridge } from "@/effect"
import { InstanceState } from "@/effect"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
@ -47,16 +47,16 @@ export type Resource = z.infer<typeof Resource>
export const ToolsChanged = BusEvent.define(
"mcp.tools.changed",
z.object({
server: z.string(),
Schema.Struct({
server: Schema.String,
}),
)
export const BrowserOpenFailed = BusEvent.define(
"mcp.browser.open.failed",
z.object({
mcpName: z.string(),
url: z.string(),
Schema.Struct({
mcpName: Schema.String,
url: Schema.String,
}),
)

View file

@ -73,16 +73,14 @@ export class Approval extends Schema.Class<Approval>("PermissionApproval")({
}
export const Event = {
Asked: BusEvent.define("permission.asked", Request.zod),
Asked: BusEvent.define("permission.asked", Request),
Replied: BusEvent.define(
"permission.replied",
zod(
Schema.Struct({
sessionID: SessionID,
requestID: PermissionID,
reply: Reply,
}),
),
Schema.Struct({
sessionID: SessionID,
requestID: PermissionID,
reply: Reply,
}),
),
}

View file

@ -53,7 +53,7 @@ export const Info = Schema.Struct({
export type Info = Types.DeepMutable<Schema.Schema.Type<typeof Info>>
export const Event = {
Updated: BusEvent.define("project.updated", Info.zod),
Updated: BusEvent.define("project.updated", Info),
}
type Row = typeof ProjectTable.$inferSelect

View file

@ -1,4 +1,4 @@
import { Effect, Layer, Context, Stream, Scope } from "effect"
import { Effect, Layer, Context, Schema, Stream, Scope } from "effect"
import { formatPatch, structuredPatch } from "diff"
import path from "path"
import { Bus } from "@/bus"
@ -107,8 +107,8 @@ export type Mode = z.infer<typeof Mode>
export const Event = {
BranchUpdated: BusEvent.define(
"vcs.branch.updated",
z.object({
branch: z.string().optional(),
Schema.Struct({
branch: Schema.optional(Schema.String),
}),
),
}

View file

@ -3,13 +3,14 @@ import { Bus } from "@/bus"
import { InstanceState } from "@/effect"
import { Instance } from "@/project/instance"
import type { Proc } from "#pty"
import z from "zod"
import { Log } from "../util"
import { lazy } from "@opencode-ai/shared/util/lazy"
import { Shell } from "@/shell/shell"
import { Plugin } from "@/plugin"
import { PtyID } from "./schema"
import { Effect, Layer, Context } from "effect"
import { Effect, Layer, Context, Schema, Types } from "effect"
import { zod } from "@/util/effect-zod"
import { withStatics } from "@/util/schema"
import { EffectBridge } from "@/effect"
const log = Log.create({ service: "pty" })
@ -53,47 +54,47 @@ const meta = (cursor: number) => {
const pty = lazy(() => import("#pty"))
export const Info = z
.object({
id: PtyID.zod,
title: z.string(),
command: z.string(),
args: z.array(z.string()),
cwd: z.string(),
status: z.enum(["running", "exited"]),
pid: z.number(),
})
.meta({ ref: "Pty" })
export type Info = z.infer<typeof Info>
export const CreateInput = z.object({
command: z.string().optional(),
args: z.array(z.string()).optional(),
cwd: z.string().optional(),
title: z.string().optional(),
env: z.record(z.string(), z.string()).optional(),
export const Info = Schema.Struct({
id: PtyID,
title: Schema.String,
command: Schema.String,
args: Schema.Array(Schema.String),
cwd: Schema.String,
status: Schema.Literals(["running", "exited"]),
pid: Schema.Number,
})
.annotate({ identifier: "Pty" })
.pipe(withStatics((s) => ({ zod: zod(s) })))
export type CreateInput = z.infer<typeof CreateInput>
export type Info = Types.DeepMutable<Schema.Schema.Type<typeof Info>>
export const UpdateInput = z.object({
title: z.string().optional(),
size: z
.object({
rows: z.number(),
cols: z.number(),
})
.optional(),
})
export const CreateInput = Schema.Struct({
command: Schema.optional(Schema.String),
args: Schema.optional(Schema.Array(Schema.String)),
cwd: Schema.optional(Schema.String),
title: Schema.optional(Schema.String),
env: Schema.optional(Schema.Record(Schema.String, Schema.String)),
}).pipe(withStatics((s) => ({ zod: zod(s) })))
export type UpdateInput = z.infer<typeof UpdateInput>
export type CreateInput = Types.DeepMutable<Schema.Schema.Type<typeof CreateInput>>
export const UpdateInput = Schema.Struct({
title: Schema.optional(Schema.String),
size: Schema.optional(
Schema.Struct({
rows: Schema.Number,
cols: Schema.Number,
}),
),
}).pipe(withStatics((s) => ({ zod: zod(s) })))
export type UpdateInput = Types.DeepMutable<Schema.Schema.Type<typeof UpdateInput>>
export const Event = {
Created: BusEvent.define("pty.created", z.object({ info: Info })),
Updated: BusEvent.define("pty.updated", z.object({ info: Info })),
Exited: BusEvent.define("pty.exited", z.object({ id: PtyID.zod, exitCode: z.number() })),
Deleted: BusEvent.define("pty.deleted", z.object({ id: PtyID.zod })),
Created: BusEvent.define("pty.created", Schema.Struct({ info: Info })),
Updated: BusEvent.define("pty.updated", Schema.Struct({ info: Info })),
Exited: BusEvent.define("pty.exited", Schema.Struct({ id: PtyID, exitCode: Schema.Number })),
Deleted: BusEvent.define("pty.deleted", Schema.Struct({ id: PtyID })),
}
export interface Interface {

View file

@ -94,9 +94,9 @@ class Rejected extends Schema.Class<Rejected>("QuestionRejected")({
}) {}
export const Event = {
Asked: BusEvent.define("question.asked", Request.zod),
Replied: BusEvent.define("question.replied", zod(Replied)),
Rejected: BusEvent.define("question.rejected", zod(Rejected)),
Asked: BusEvent.define("question.asked", Request),
Replied: BusEvent.define("question.replied", Replied),
Rejected: BusEvent.define("question.rejected", Rejected),
}
export class RejectedError extends Schema.TaggedErrorClass<RejectedError>()("QuestionRejectedError", {}) {
@ -194,7 +194,7 @@ export const layer = Layer.effect(
yield* bus.publish(Event.Replied, {
sessionID: existing.info.sessionID,
requestID: existing.info.id,
answers: input.answers,
answers: input.answers.map((a) => [...a]),
})
yield* Deferred.succeed(existing.deferred, input.answers)
})

View file

@ -1,7 +1,7 @@
import { BusEvent } from "@/bus/bus-event"
import z from "zod"
import { Schema } from "effect"
export const Event = {
Connected: BusEvent.define("server.connected", z.object({})),
Disposed: BusEvent.define("global.disposed", z.object({})),
Connected: BusEvent.define("server.connected", Schema.Struct({})),
Disposed: BusEvent.define("global.disposed", Schema.Struct({})),
}

View file

@ -3,6 +3,7 @@ import { describeRoute, resolver, validator } from "hono-openapi"
import z from "zod"
import { listAdaptors } from "@/control-plane/adaptors"
import { Workspace } from "@/control-plane/workspace"
import { zodObject } from "@/util/effect-zod"
import { Instance } from "@/project/instance"
import { errors } from "../../error"
import { lazy } from "@/util/lazy"
@ -107,7 +108,7 @@ export const WorkspaceRoutes = lazy(() =>
description: "Workspace status",
content: {
"application/json": {
schema: resolver(z.array(Workspace.ConnectionStatus)),
schema: resolver(z.array(zodObject(Workspace.ConnectionStatus))),
},
},
},

View file

@ -1,7 +1,7 @@
import { Hono, type Context } from "hono"
import { describeRoute, resolver, validator } from "hono-openapi"
import { streamSSE } from "hono/streaming"
import { Effect } from "effect"
import { Effect, Schema } from "effect"
import z from "zod"
import { BusEvent } from "@/bus/bus-event"
import { SyncEvent } from "@/sync"
@ -18,7 +18,7 @@ import { errors } from "../error"
const log = Log.create({ service: "server" })
export const GlobalDisposedEvent = BusEvent.define("global.disposed", z.object({}))
export const GlobalDisposedEvent = BusEvent.define("global.disposed", Schema.Struct({}))
async function streamEvents(c: Context, subscribe: (q: AsyncQueue<string | null>) => () => void) {
return streamSSE(c, async (stream) => {

View file

@ -23,7 +23,7 @@ export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) {
description: "List of sessions",
content: {
"application/json": {
schema: resolver(Pty.Info.array()),
schema: resolver(Pty.Info.zod.array()),
},
},
},
@ -46,18 +46,18 @@ export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) {
description: "Created session",
content: {
"application/json": {
schema: resolver(Pty.Info),
schema: resolver(Pty.Info.zod),
},
},
},
...errors(400),
},
}),
validator("json", Pty.CreateInput),
validator("json", Pty.CreateInput.zod),
async (c) =>
jsonRequest("PtyRoutes.create", c, function* () {
const pty = yield* Pty.Service
return yield* pty.create(c.req.valid("json"))
return yield* pty.create(c.req.valid("json") as Pty.CreateInput)
}),
)
.get(
@ -71,7 +71,7 @@ export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) {
description: "Session info",
content: {
"application/json": {
schema: resolver(Pty.Info),
schema: resolver(Pty.Info.zod),
},
},
},
@ -105,7 +105,7 @@ export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) {
description: "Updated session",
content: {
"application/json": {
schema: resolver(Pty.Info),
schema: resolver(Pty.Info.zod),
},
},
},
@ -113,11 +113,11 @@ export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) {
},
}),
validator("param", z.object({ ptyID: PtyID.zod })),
validator("json", Pty.UpdateInput),
validator("json", Pty.UpdateInput.zod),
async (c) =>
jsonRequest("PtyRoutes.update", c, function* () {
const pty = yield* Pty.Service
return yield* pty.update(c.req.valid("param").ptyID, c.req.valid("json"))
return yield* pty.update(c.req.valid("param").ptyID, c.req.valid("json") as Pty.UpdateInput)
}),
)
.delete(

View file

@ -1,9 +1,12 @@
import { Hono, type Context } from "hono"
import { describeRoute, validator, resolver } from "hono-openapi"
import { Schema } from "effect"
import z from "zod"
import { Bus } from "@/bus"
import { Session } from "@/session"
import type { SessionID } from "@/session/schema"
import { TuiEvent } from "@/cli/cmd/tui/event"
import { zodObject } from "@/util/effect-zod"
import { AsyncQueue } from "@/util/queue"
import { errors } from "../../error"
import { lazy } from "@/util/lazy"
@ -96,9 +99,9 @@ export const TuiRoutes = lazy(() =>
...errors(400),
},
}),
validator("json", TuiEvent.PromptAppend.properties),
validator("json", zodObject(TuiEvent.PromptAppend.properties)),
async (c) => {
await Bus.publish(TuiEvent.PromptAppend, c.req.valid("json"))
await Bus.publish(TuiEvent.PromptAppend, c.req.valid("json") as { text: string })
return c.json(true)
},
)
@ -305,9 +308,9 @@ export const TuiRoutes = lazy(() =>
},
},
}),
validator("json", TuiEvent.ToastShow.properties),
validator("json", zodObject(TuiEvent.ToastShow.properties)),
async (c) => {
await Bus.publish(TuiEvent.ToastShow, c.req.valid("json"))
await Bus.publish(TuiEvent.ToastShow, c.req.valid("json") as Schema.Schema.Type<typeof TuiEvent.ToastShow.properties>)
return c.json(true)
},
)
@ -336,7 +339,7 @@ export const TuiRoutes = lazy(() =>
return z
.object({
type: z.literal(def.type),
properties: def.properties,
properties: zodObject(def.properties),
})
.meta({
ref: `Event.${def.type}`,
@ -345,8 +348,9 @@ export const TuiRoutes = lazy(() =>
),
),
async (c) => {
const evt = c.req.valid("json")
await Bus.publish(Object.values(TuiEvent).find((def) => def.type === evt.type)!, evt.properties)
const evt = c.req.valid("json") as { type: string; properties: Record<string, unknown> }
// eslint-disable-next-line @typescript-eslint/no-explicit-any
await Bus.publish(Object.values(TuiEvent).find((def) => def.type === evt.type)! as any, evt.properties as any)
return c.json(true)
},
)
@ -368,9 +372,9 @@ export const TuiRoutes = lazy(() =>
...errors(400, 404),
},
}),
validator("json", TuiEvent.SessionSelect.properties),
validator("json", zodObject(TuiEvent.SessionSelect.properties)),
async (c) => {
const { sessionID } = c.req.valid("json")
const { sessionID } = c.req.valid("json") as { sessionID: SessionID }
await runRequest(
"TuiRoutes.sessionSelect",
c,

View file

@ -13,7 +13,7 @@ import { Plugin } from "@/plugin"
import { Config } from "@/config"
import { NotFoundError } from "@/storage"
import { ModelID, ProviderID } from "@/provider/schema"
import { Effect, Layer, Context } from "effect"
import { Effect, Layer, Context, Schema } from "effect"
import { InstanceState } from "@/effect"
import { isOverflow as overflow, usable } from "./overflow"
import { makeRuntime } from "@/effect/run-service"
@ -24,8 +24,8 @@ const log = Log.create({ service: "session.compaction" })
export const Event = {
Compacted: BusEvent.define(
"session.compacted",
z.object({
sessionID: SessionID.zod,
Schema.Struct({
sessionID: SessionID,
}),
),
}

View file

@ -617,12 +617,12 @@ export const Event = {
}),
PartDelta: BusEvent.define(
"message.part.delta",
z.object({
sessionID: SessionID.zod,
messageID: MessageID.zod,
partID: PartID.zod,
field: z.string(),
delta: z.string(),
Schema.Struct({
sessionID: SessionID,
messageID: MessageID,
partID: PartID,
field: Schema.String,
delta: Schema.String,
}),
),
PartRemoved: SyncEvent.define({

View file

@ -273,17 +273,18 @@ export const Event = {
}),
Diff: BusEvent.define(
"session.diff",
z.object({
sessionID: SessionID.zod,
diff: Snapshot.FileDiff.zod.array(),
Schema.Struct({
sessionID: SessionID,
diff: Schema.Array(Snapshot.FileDiff),
}),
),
Error: BusEvent.define(
"session.error",
z.object({
sessionID: SessionID.zod.optional(),
// z.lazy defers access to break circular dep: session → message-v2 → provider → plugin → session
error: z.lazy(() => (MessageV2.Assistant.zod as unknown as z.ZodObject<any>).shape.error),
Schema.Struct({
sessionID: Schema.optional(SessionID),
// Reuses MessageV2.Assistant.fields.error (already Schema.optional) so
// the derived zod keeps the same discriminated-union shape on the bus.
error: MessageV2.Assistant.fields.error,
}),
),
}

View file

@ -28,16 +28,16 @@ export type Info = Schema.Schema.Type<typeof Info>
export const Event = {
Status: BusEvent.define(
"session.status",
z.object({
sessionID: SessionID.zod,
status: Info.zod,
Schema.Struct({
sessionID: SessionID,
status: Info,
}),
),
// deprecated
Idle: BusEvent.define(
"session.idle",
z.object({
sessionID: SessionID.zod,
Schema.Struct({
sessionID: SessionID,
}),
),
}

View file

@ -22,9 +22,9 @@ export type Info = Schema.Schema.Type<typeof Info>
export const Event = {
Updated: BusEvent.define(
"todo.updated",
z.object({
sessionID: SessionID.zod,
todos: z.array(Info.zod),
Schema.Struct({
sessionID: SessionID,
todos: Schema.Array(Info),
}),
),
}

View file

@ -8,51 +8,48 @@ import { EventSequenceTable, EventTable } from "./event.sql"
import { WorkspaceContext } from "@/control-plane/workspace-context"
import { EventID } from "./schema"
import { Flag } from "@/flag/flag"
import { Schema as EffectSchema, Types } from "effect"
import { Schema as EffectSchema } from "effect"
import { zodObject } from "@/util/effect-zod"
import { isRecord } from "@/util/record"
import type { DeepMutable } from "@/util/schema"
// Keep `Event["data"]` mutable because projectors mutate the persisted shape
// when writing to the database. Bus payloads (`Properties`) stay readonly —
// subscribers only read.
export type Definition<
Type extends string = string,
Schema extends EffectSchema.Top = EffectSchema.Top,
BusSchema extends EffectSchema.Top = Schema,
> = {
type: string
type: Type
version: number
aggregate: string
effectSchema: Schema
effectProperties: BusSchema
schema: z.ZodObject
// This is temporary and only exists for compatibility with bus
// event definitions
properties: z.ZodObject
schema: Schema
// Bus event payload schema. Defaults to `schema` unless `busSchema` was
// passed at definition time (see `session.updated`, whose projector
// expands the persisted data to a `{ sessionID, info }` bus payload).
properties: BusSchema
}
export type Event<Def extends Definition = Definition> = {
id: string
seq: number
aggregateID: string
data: Types.DeepMutable<EffectSchema.Schema.Type<Def["effectSchema"]>>
data: DeepMutable<EffectSchema.Schema.Type<Def["schema"]>>
}
export type Properties<Def extends Definition = Definition> = Types.DeepMutable<
EffectSchema.Schema.Type<Def["effectProperties"]>
>
export type Properties<Def extends Definition = Definition> = EffectSchema.Schema.Type<Def["properties"]>
export type SerializedEvent<Def extends Definition = Definition> = Event<Def> & { type: string }
type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void
type ConvertEvent = (type: string, data: Event["data"]) => unknown | Promise<unknown>
export const registry = new Map<string, Definition>()
let projectors: Map<Definition, ProjectorFunc> | undefined
const versions = new Map<string, number>()
let frozen = false
let convertEvent: (type: string, event: Event["data"]) => Promise<unknown> | unknown
function asRecord(input: unknown) {
if (isRecord(input)) return input
throw new Error(`SyncEvent.convertEvent must return an object, got: ${JSON.stringify(input)}`)
}
let convertEvent: ConvertEvent
export function reset() {
frozen = false
@ -60,7 +57,7 @@ export function reset() {
convertEvent = (_, data) => data
}
export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: typeof convertEvent }) {
export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: ConvertEvent }) {
projectors = new Map(input.projectors)
// Install all the latest event defs to the bus. We only ever emit
@ -76,7 +73,7 @@ export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; co
// Freeze the system so it clearly errors if events are defined
// after `init` which would cause bugs
frozen = true
convertEvent = input.convertEvent || ((_, data) => data)
convertEvent = input.convertEvent ?? ((_, data) => data)
}
export function versionedType<A extends string>(type: A): A
@ -96,21 +93,17 @@ export function define<
aggregate: Agg
schema: Schema
busSchema?: BusSchema
}): Definition<Schema, BusSchema> {
}): Definition<Type, Schema, BusSchema> {
if (frozen) {
throw new Error("Error defining sync event: sync system has been frozen")
}
const effectProperties = (input.busSchema ?? input.schema) as BusSchema
const def = {
type: input.type,
version: input.version,
aggregate: input.aggregate,
effectSchema: input.schema,
effectProperties,
schema: zodObject(input.schema),
properties: zodObject(effectProperties),
schema: input.schema,
properties: (input.busSchema ?? input.schema) as BusSchema,
}
versions.set(def.type, Math.max(def.version, versions.get(def.type) || 0))
@ -167,12 +160,11 @@ function process<Def extends Definition>(def: Def, event: Event<Def>, options: {
Database.effect(() => {
if (options?.publish) {
const result = convertEvent(def.type, event.data)
const publish = (data: unknown) => ProjectBus.publish(def, data as Properties<Def>)
if (result instanceof Promise) {
void result.then((data) => {
void ProjectBus.publish({ type: def.type, properties: def.properties }, asRecord(data))
})
void result.then(publish)
} else {
void ProjectBus.publish({ type: def.type, properties: def.properties }, asRecord(result))
void publish(result)
}
GlobalBus.emit("event", {
@ -292,7 +284,7 @@ export function payloads() {
id: z.string(),
seq: z.number(),
aggregateID: z.literal(def.aggregate),
data: def.schema,
data: zodObject(def.schema),
})
.meta({
ref: `SyncEvent.${def.type}`,

View file

@ -59,8 +59,17 @@ function walk(ast: SchemaAST.AST): z.ZodTypeAny {
function walkUncached(ast: SchemaAST.AST): z.ZodTypeAny {
const override = (ast.annotations as any)?.[ZodOverride] as z.ZodTypeAny | undefined
if (override) return override
// `description` annotations layer on top of an override so callers can
// reuse a shared override schema (e.g. `SessionID`) and still add a
// per-field description on the outer wrapper.
const base = override ?? bodyWithChecks(ast)
const desc = SchemaAST.resolveDescription(ast)
const ref = SchemaAST.resolveIdentifier(ast)
const described = desc ? base.describe(desc) : base
return ref ? described.meta({ ref }) : described
}
function bodyWithChecks(ast: SchemaAST.AST): z.ZodTypeAny {
// Schema.Class wraps its fields in a Declaration AST plus an encoding that
// constructs the class instance. For the Zod derivation we want the plain
// field shape (the decoded/consumer view), not the class instance — so
@ -74,11 +83,7 @@ function walkUncached(ast: SchemaAST.AST): z.ZodTypeAny {
const hasEncoding = ast.encoding?.length && ast._tag !== "Declaration"
const hasTransform = hasEncoding && !(SchemaAST.isOptional(ast) && extractDefault(ast) !== undefined)
const base = hasTransform ? encoded(ast) : body(ast)
const checked = ast.checks?.length ? applyChecks(base, ast.checks, ast) : base
const desc = SchemaAST.resolveDescription(ast)
const ref = SchemaAST.resolveIdentifier(ast)
const described = desc ? checked.describe(desc) : checked
return ref ? described.meta({ ref }) : described
return ast.checks?.length ? applyChecks(base, ast.checks, ast) : base
}
// Walk the encoded side and apply each link's decode to produce the decoded

View file

@ -10,6 +10,34 @@ export const PositiveInt = Schema.Int.check(Schema.isGreaterThan(0))
*/
export const NonNegativeInt = Schema.Int.check(Schema.isGreaterThanOrEqualTo(0))
/**
* Strip `readonly` from a nested type. Stand-in for `effect`'s `Types.DeepMutable`
* until `effect:core/x228my` ("Types.DeepMutable widens unknown to `{}`") lands.
*
* The upstream version falls through `unknown` into `{ -readonly [K in keyof T]: ... }`
* where `keyof unknown = never`, so `unknown` collapses to `{}`. This local
* version gates the object branch on `extends object` (which `unknown` does
* not) so `unknown` passes through untouched.
*
* Primitive bailout matches upstream without it, branded strings like
* `string & Brand<"SessionID">` fall into the object branch and get their
* prototype methods walked.
*
* Tuple branch preserves readonly tuples (e.g. `ConfigPlugin.Spec`'s
* `readonly [string, Options]`); the general array branch would otherwise
* widen them to unbounded arrays.
*/
// eslint-disable-next-line @typescript-eslint/ban-types
export type DeepMutable<T> = T extends string | number | boolean | bigint | symbol | Function
? T
: T extends readonly [unknown, ...unknown[]]
? { -readonly [K in keyof T]: DeepMutable<T[K]> }
: T extends readonly (infer U)[]
? DeepMutable<U>[]
: T extends object
? { -readonly [K in keyof T]: DeepMutable<T[K]> }
: T
/**
* Attach static methods to a schema object. Designed to be used with `.pipe()`:
*
@ -26,13 +54,16 @@ export const withStatics =
(schema: S): S & M =>
Object.assign(schema, methods(schema))
declare const NewtypeBrand: unique symbol
type NewtypeBrand<Tag extends string> = { readonly [NewtypeBrand]: Tag }
/**
* Nominal wrapper for scalar types. The class itself is a valid schema
* pass it directly to `Schema.decode`, `Schema.decodeEffect`, etc.
*
* Overrides `~type.make` on the derived `Schema.Opaque` so `Schema.Schema.Type`
* of a field using this newtype resolves to `Self` rather than the underlying
* branded phantom. Without that override, passing a class instance to code
* typed against `Schema.Schema.Type<FieldSchema>` would require a cast even
* though the values are structurally equivalent at runtime.
*
* @example
* class QuestionID extends Newtype<QuestionID>()("QuestionID", Schema.String) {
* static make(id: string): QuestionID {
@ -44,10 +75,8 @@ type NewtypeBrand<Tag extends string> = { readonly [NewtypeBrand]: Tag }
*/
export function Newtype<Self>() {
return <const Tag extends string, S extends Schema.Top>(tag: Tag, schema: S) => {
type Branded = NewtypeBrand<Tag>
abstract class Base {
declare readonly [NewtypeBrand]: Tag
declare readonly _newtype: Tag
static make(value: Schema.Schema.Type<S>): Self {
return value as unknown as Self
@ -56,8 +85,10 @@ export function Newtype<Self>() {
Object.setPrototypeOf(Base, schema)
return Base as unknown as (abstract new (_: never) => Branded) & {
return Base as unknown as (abstract new (_: never) => { readonly _newtype: Tag }) & {
readonly make: (value: Schema.Schema.Type<S>) => Self
} & Omit<Schema.Opaque<Self, S, {}>, "make">
} & Omit<Schema.Opaque<Self, S, {}>, "make" | "~type.make"> & {
readonly "~type.make": Self
}
}
}

View file

@ -13,7 +13,7 @@ import { errorMessage } from "../util/error"
import { BusEvent } from "@/bus/bus-event"
import { GlobalBus } from "@/bus/global"
import { Git } from "@/git"
import { Effect, Layer, Path, Scope, Context, Stream } from "effect"
import { Effect, Layer, Path, Schema, Scope, Context, Stream } from "effect"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import { NodePath } from "@effect/platform-node"
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
@ -26,15 +26,15 @@ const log = Log.create({ service: "worktree" })
export const Event = {
Ready: BusEvent.define(
"worktree.ready",
z.object({
name: z.string(),
branch: z.string(),
Schema.Struct({
name: Schema.String,
branch: Schema.String,
}),
),
Failed: BusEvent.define(
"worktree.failed",
z.object({
message: z.string(),
Schema.Struct({
message: Schema.String,
}),
),
}

View file

@ -1,6 +1,5 @@
import { describe, expect } from "bun:test"
import { Deferred, Effect, Layer, Stream } from "effect"
import z from "zod"
import { Deferred, Effect, Layer, Schema, Stream } from "effect"
import { Bus } from "../../src/bus"
import { BusEvent } from "../../src/bus/bus-event"
import { Instance } from "../../src/project/instance"
@ -9,8 +8,8 @@ import { provideInstance, provideTmpdirInstance, tmpdirScoped } from "../fixture
import { testEffect } from "../lib/effect"
const TestEvent = {
Ping: BusEvent.define("test.effect.ping", z.object({ value: z.number() })),
Pong: BusEvent.define("test.effect.pong", z.object({ message: z.string() })),
Ping: BusEvent.define("test.effect.ping", Schema.Struct({ value: Schema.Number })),
Pong: BusEvent.define("test.effect.pong", Schema.Struct({ message: Schema.String })),
}
const node = CrossSpawnSpawner.defaultLayer

View file

@ -1,11 +1,11 @@
import { afterEach, describe, expect, test } from "bun:test"
import z from "zod"
import { Schema } from "effect"
import { Bus } from "../../src/bus"
import { BusEvent } from "../../src/bus/bus-event"
import { Instance } from "../../src/project/instance"
import { tmpdir } from "../fixture/fixture"
const TestEvent = BusEvent.define("test.integration", z.object({ value: z.number() }))
const TestEvent = BusEvent.define("test.integration", Schema.Struct({ value: Schema.Number }))
function withInstance(directory: string, fn: () => Promise<void>) {
return Instance.provide({ directory, fn })
@ -42,7 +42,7 @@ describe("Bus integration: acquireRelease subscriber pattern", () => {
await using tmp = await tmpdir()
const received: Array<{ type: string; value?: number }> = []
const OtherEvent = BusEvent.define("test.other", z.object({ value: z.number() }))
const OtherEvent = BusEvent.define("test.other", Schema.Struct({ value: Schema.Number }))
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => {

View file

@ -1,13 +1,13 @@
import { afterEach, describe, expect, test } from "bun:test"
import z from "zod"
import { Schema } from "effect"
import { Bus } from "../../src/bus"
import { BusEvent } from "../../src/bus/bus-event"
import { Instance } from "../../src/project/instance"
import { tmpdir } from "../fixture/fixture"
const TestEvent = {
Ping: BusEvent.define("test.ping", z.object({ value: z.number() })),
Pong: BusEvent.define("test.pong", z.object({ message: z.string() })),
Ping: BusEvent.define("test.ping", Schema.Struct({ value: Schema.Number })),
Pong: BusEvent.define("test.pong", Schema.Struct({ message: Schema.String })),
}
function withInstance(directory: string, fn: () => Promise<void>) {

View file

@ -111,9 +111,12 @@ describe("step-finish token propagation via Bus event", () => {
mode: "",
} as unknown as MessageV2.Info)
// Bus subscribers receive readonly Schema.Type payloads; `MessageV2.Part`
// is the mutable domain type. Cast bridges the two — safe because the
// test only reads the value afterwards.
let received: MessageV2.Part | undefined
const unsub = Bus.subscribe(MessageV2.Event.PartUpdated, (event) => {
received = event.properties.part
received = event.properties.part as MessageV2.Part
})
const tokens = {