From 3253da034fc1b19d86769d095daecae1c36e701a Mon Sep 17 00:00:00 2001 From: Dax Raad Date: Sun, 26 Apr 2026 13:04:50 -0400 Subject: [PATCH] fix(session): include session id on v2 events --- packages/opencode/src/v2/event.ts | 41 +++++++ packages/opencode/src/v2/session-entry.ts | 5 +- packages/opencode/src/v2/session-event.ts | 124 ++++++++++++---------- 3 files changed, 109 insertions(+), 61 deletions(-) create mode 100644 packages/opencode/src/v2/event.ts diff --git a/packages/opencode/src/v2/event.ts b/packages/opencode/src/v2/event.ts new file mode 100644 index 0000000000..f7e9860c5e --- /dev/null +++ b/packages/opencode/src/v2/event.ts @@ -0,0 +1,41 @@ +import { Identifier } from "@/id/id" +import { SyncEvent } from "@/sync" +import { withStatics } from "@/util/schema" +import * as Schema from "effect/Schema" + +export const ID = Schema.String.pipe( + Schema.brand("Event.ID"), + withStatics((s) => ({ + create: () => s.make(Identifier.create("evt", "ascending")), + })), +) +export type ID = Schema.Schema.Type + +export function define(input: { + type: Type + schema: Fields + aggregate: string + version?: number +}) { + const Event = Schema.Struct({ + id: ID, + metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional), + timestamp: Schema.DateTimeUtc, + type: Schema.Literal(input.type), + version: Schema.Number.pipe(Schema.optional), + ...input.schema, + }).annotate({ + identifier: input.type, + }) + + const Sync = SyncEvent.define({ + type: input.type, + version: input.version ?? 1, + aggregate: input.aggregate, + schema: Event, + }) + + return Object.assign(Event, { Sync }) +} + +export * as Event from "./event" diff --git a/packages/opencode/src/v2/session-entry.ts b/packages/opencode/src/v2/session-entry.ts index 398ec14cc7..f36a0815ff 100644 --- a/packages/opencode/src/v2/session-entry.ts +++ b/packages/opencode/src/v2/session-entry.ts @@ -1,12 +1,13 @@ import { Schema } from "effect" import { Prompt } from "./session-prompt" import { SessionEvent } from "./session-event" +import { Event } from "./event" -export const ID = SessionEvent.ID +export const ID = Event.ID export type ID = Schema.Schema.Type const Base = { - id: SessionEvent.ID, + id: ID, metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional), time: Schema.Struct({ created: Schema.DateTimeUtc, diff --git a/packages/opencode/src/v2/session-event.ts b/packages/opencode/src/v2/session-event.ts index 5623f1c485..c573a36400 100644 --- a/packages/opencode/src/v2/session-event.ts +++ b/packages/opencode/src/v2/session-event.ts @@ -1,48 +1,12 @@ -import { Identifier } from "@/id/id" -import { FileAttachment, Prompt } from "./session-prompt" import { SessionID } from "@/session/schema" -import { SyncEvent } from "@/sync" -import { withStatics } from "@/util/schema" +import { Event as BaseEvent } from "./event" +import { FileAttachment, Prompt } from "./session-prompt" import { Schema } from "effect" export { FileAttachment } -export const ID = Schema.String.pipe( - Schema.brand("Session.Event.ID"), - withStatics((s) => ({ - create: () => s.make(Identifier.create("evt", "ascending")), - })), -) +export const ID = BaseEvent.ID export type ID = Schema.Schema.Type -function defineEvent(input: { - type: Type - schema: Fields - version?: number -}) { - const Event = Schema.Struct({ - id: ID, - sessionID: SessionID, - metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional), - timestamp: Schema.DateTimeUtc, - type: Schema.Literal(input.type), - version: Schema.Number.pipe(Schema.optional), - ...input.schema, - }).annotate({ - identifier: input.type, - }) - - const Sync = SyncEvent.define({ - type: input.type, - version: input.version ?? 1, - aggregate: "sessionID", - schema: Event, - }) - - return Object.assign(Event, { - Sync, - }) -} - export const Source = Schema.Struct({ start: Schema.Number, end: Schema.Number, @@ -52,26 +16,36 @@ export const Source = Schema.Struct({ }) export type Source = Schema.Schema.Type -export const Prompted = defineEvent({ +const Base = { + sessionID: SessionID, +} + +export const Prompted = BaseEvent.define({ type: "session.prompted", + aggregate: "sessionID", schema: { + ...Base, prompt: Prompt, }, }) export type Prompted = Schema.Schema.Type -export const Synthetic = defineEvent({ +export const Synthetic = BaseEvent.define({ type: "session.synthetic", + aggregate: "sessionID", schema: { + ...Base, text: Schema.String, }, }) export type Synthetic = Schema.Schema.Type export namespace Step { - export const Started = defineEvent({ + export const Started = BaseEvent.define({ type: "session.step.started", + aggregate: "sessionID", schema: { + ...Base, model: Schema.Struct({ id: Schema.String, providerID: Schema.String, @@ -81,9 +55,11 @@ export namespace Step { }) export type Started = Schema.Schema.Type - export const Ended = defineEvent({ + export const Ended = BaseEvent.define({ type: "session.step.ended", + aggregate: "sessionID", schema: { + ...Base, reason: Schema.String, cost: Schema.Number, tokens: Schema.Struct({ @@ -101,23 +77,30 @@ export namespace Step { } export namespace Text { - export const Started = defineEvent({ + export const Started = BaseEvent.define({ type: "session.text.started", - schema: {}, + aggregate: "sessionID", + schema: { + ...Base, + }, }) export type Started = Schema.Schema.Type - export const Delta = defineEvent({ + export const Delta = BaseEvent.define({ type: "session.text.delta", + aggregate: "sessionID", schema: { + ...Base, delta: Schema.String, }, }) export type Delta = Schema.Schema.Type - export const Ended = defineEvent({ + export const Ended = BaseEvent.define({ type: "session.text.ended", + aggregate: "sessionID", schema: { + ...Base, text: Schema.String, }, }) @@ -125,23 +108,30 @@ export namespace Text { } export namespace Reasoning { - export const Started = defineEvent({ + export const Started = BaseEvent.define({ type: "session.reasoning.started", - schema: {}, + aggregate: "sessionID", + schema: { + ...Base, + }, }) export type Started = Schema.Schema.Type - export const Delta = defineEvent({ + export const Delta = BaseEvent.define({ type: "session.reasoning.delta", + aggregate: "sessionID", schema: { + ...Base, delta: Schema.String, }, }) export type Delta = Schema.Schema.Type - export const Ended = defineEvent({ + export const Ended = BaseEvent.define({ type: "session.reasoning.ended", + aggregate: "sessionID", schema: { + ...Base, text: Schema.String, }, }) @@ -150,27 +140,33 @@ export namespace Reasoning { export namespace Tool { export namespace Input { - export const Started = defineEvent({ + export const Started = BaseEvent.define({ type: "session.tool.input.started", + aggregate: "sessionID", schema: { + ...Base, callID: Schema.String, name: Schema.String, }, }) export type Started = Schema.Schema.Type - export const Delta = defineEvent({ + export const Delta = BaseEvent.define({ type: "session.tool.input.delta", + aggregate: "sessionID", schema: { + ...Base, callID: Schema.String, delta: Schema.String, }, }) export type Delta = Schema.Schema.Type - export const Ended = defineEvent({ + export const Ended = BaseEvent.define({ type: "session.tool.input.ended", + aggregate: "sessionID", schema: { + ...Base, callID: Schema.String, text: Schema.String, }, @@ -178,9 +174,11 @@ export namespace Tool { export type Ended = Schema.Schema.Type } - export const Called = defineEvent({ + export const Called = BaseEvent.define({ type: "session.tool.called", + aggregate: "sessionID", schema: { + ...Base, callID: Schema.String, tool: Schema.String, input: Schema.Record(Schema.String, Schema.Unknown), @@ -192,9 +190,11 @@ export namespace Tool { }) export type Called = Schema.Schema.Type - export const Success = defineEvent({ + export const Success = BaseEvent.define({ type: "session.tool.success", + aggregate: "sessionID", schema: { + ...Base, callID: Schema.String, title: Schema.String, output: Schema.String.pipe(Schema.optional), @@ -207,9 +207,11 @@ export namespace Tool { }) export type Success = Schema.Schema.Type - export const Error = defineEvent({ + export const Error = BaseEvent.define({ type: "session.tool.error", + aggregate: "sessionID", schema: { + ...Base, callID: Schema.String, error: Schema.String, provider: Schema.Struct({ @@ -233,18 +235,22 @@ export const RetryError = Schema.Struct({ }) export type RetryError = Schema.Schema.Type -export const Retried = defineEvent({ +export const Retried = BaseEvent.define({ type: "session.retried", + aggregate: "sessionID", schema: { + ...Base, attempt: Schema.Number, error: RetryError, }, }) export type Retried = Schema.Schema.Type -export const Compacted = defineEvent({ +export const Compacted = BaseEvent.define({ type: "session.compacted", + aggregate: "sessionID", schema: { + ...Base, auto: Schema.Boolean, overflow: Schema.Boolean.pipe(Schema.optional), },