core: restructure v2 event system with session.next namespace and data encapsulation

- Rename all event types from session.* to session.next.* for clearer namespacing

- Wrap event payload in data field for better schema organization

- Add timestamp to all event schemas for consistent event tracking

- Fix effect-zod handling for Declaration ASTs with type parameters

- Remove obsolete session-entry-stepper tests

This provides a cleaner event structure that separates metadata from payload data, making the event system more maintainable and easier to extend.
This commit is contained in:
Dax Raad 2026-04-26 15:03:04 -04:00
parent b80b1f4e2f
commit 51b0b6fda9
7 changed files with 135 additions and 748 deletions

View file

@ -90,7 +90,7 @@ function bodyWithChecks(ast: SchemaAST.AST): z.ZodTypeAny {
// Schema.withDecodingDefault also attaches encoding, but we want `.default(v)`
// on the inner Zod rather than a transform wrapper — so optional ASTs whose
// encoding resolves a default from Option.none() route through body()/opt().
const hasEncoding = ast.encoding?.length && ast._tag !== "Declaration"
const hasEncoding = ast.encoding?.length && (ast._tag !== "Declaration" || ast.typeParameters.length === 0)
const hasTransform = hasEncoding && !(SchemaAST.isOptional(ast) && extractDefault(ast) !== undefined)
const base = hasTransform ? encoded(ast) : body(ast)
return ast.checks?.length ? applyChecks(base, ast.checks, ast) : base

View file

@ -17,13 +17,10 @@ export function define<const Type extends string, Fields extends Schema.Struct.F
aggregate: string
version?: number
}) {
const Event = Schema.Struct({
id: ID,
const Payload = Schema.Struct({
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
timestamp: Schema.DateTimeUtc,
type: Schema.Literal(input.type),
version: Schema.Number.pipe(Schema.optional),
...input.schema,
data: Schema.Struct(input.schema),
}).annotate({
identifier: input.type,
})
@ -32,10 +29,14 @@ export function define<const Type extends string, Fields extends Schema.Struct.F
type: input.type,
version: input.version ?? 1,
aggregate: input.aggregate,
schema: Event,
schema: Payload.fields.data,
})
return Object.assign(Event, { Sync })
return Object.assign(Payload, {
Sync,
version: input.version,
aggregate: input.aggregate,
})
}
export * as Event from "./event"

View file

@ -63,8 +63,8 @@ export function stepWith<Result>(adapter: Adapter<Result>, event: SessionEvent.E
const latestReasoning = (assistant: DraftAssistant | undefined) =>
assistant?.content.findLast((item): item is DraftReasoning => item.type === "reasoning")
SessionEvent.Event.match(event, {
"session.prompted": (event) => {
SessionEvent.All.match(event, {
"session.next.prompted": (event) => {
const entry = SessionEntry.User.fromEvent(event)
if (currentAssistant) {
adapter.appendPending(entry)
@ -72,31 +72,31 @@ export function stepWith<Result>(adapter: Adapter<Result>, event: SessionEvent.E
}
adapter.appendEntry(entry)
},
"session.synthetic": (event) => {
"session.next.synthetic": (event) => {
adapter.appendEntry(SessionEntry.Synthetic.fromEvent(event))
},
"session.step.started": (event) => {
"session.next.step.started": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.time.completed = event.timestamp
draft.time.completed = event.data.timestamp
}),
)
}
adapter.appendEntry(SessionEntry.Assistant.fromEvent(event))
},
"session.step.ended": (event) => {
"session.next.step.ended": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.time.completed = event.timestamp
draft.cost = event.cost
draft.tokens = event.tokens
draft.time.completed = event.data.timestamp
draft.cost = event.data.cost
draft.tokens = event.data.tokens
}),
)
}
},
"session.text.started": () => {
"session.next.text.started": () => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
@ -108,27 +108,27 @@ export function stepWith<Result>(adapter: Adapter<Result>, event: SessionEvent.E
)
}
},
"session.text.delta": (event) => {
"session.next.text.delta": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestText(draft)
if (match) match.text += event.delta
if (match) match.text += event.data.delta
}),
)
}
},
"session.text.ended": () => {},
"session.tool.input.started": (event) => {
"session.next.text.ended": () => {},
"session.next.tool.input.started": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.content.push({
type: "tool",
callID: event.callID,
name: event.name,
callID: event.data.callID,
name: event.data.name,
time: {
created: event.timestamp,
created: event.data.timestamp,
},
state: {
status: "pending",
@ -139,62 +139,62 @@ export function stepWith<Result>(adapter: Adapter<Result>, event: SessionEvent.E
)
}
},
"session.tool.input.delta": (event) => {
"session.next.tool.input.delta": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.callID)
const match = latestTool(draft, event.data.callID)
// oxlint-disable-next-line no-base-to-string -- event.delta is a Schema.String (runtime string)
if (match && match.state.status === "pending") match.state.input += event.delta
if (match && match.state.status === "pending") match.state.input += event.data.delta
}),
)
}
},
"session.tool.input.ended": () => {},
"session.tool.called": (event) => {
"session.next.tool.input.ended": () => {},
"session.next.tool.called": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.callID)
const match = latestTool(draft, event.data.callID)
if (match) {
match.time.ran = event.timestamp
match.time.ran = event.data.timestamp
match.state = {
status: "running",
input: event.input,
input: event.data.input,
}
}
}),
)
}
},
"session.tool.success": (event) => {
"session.next.tool.success": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.callID)
const match = latestTool(draft, event.data.callID)
if (match && match.state.status === "running") {
match.state = {
status: "completed",
input: match.state.input,
output: event.output ?? "",
title: event.title,
output: event.data.output ?? "",
title: event.data.title,
metadata: event.metadata ?? {},
attachments: [...(event.attachments ?? [])],
attachments: [...(event.data.attachments ?? [])],
}
}
}),
)
}
},
"session.tool.error": (event) => {
"session.next.tool.error": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.callID)
const match = latestTool(draft, event.data.callID)
if (match && match.state.status === "running") {
match.state = {
status: "error",
error: event.error,
error: event.data.error,
input: match.state.input,
metadata: event.metadata ?? {},
}
@ -203,7 +203,7 @@ export function stepWith<Result>(adapter: Adapter<Result>, event: SessionEvent.E
)
}
},
"session.reasoning.started": () => {
"session.next.reasoning.started": () => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
@ -215,27 +215,27 @@ export function stepWith<Result>(adapter: Adapter<Result>, event: SessionEvent.E
)
}
},
"session.reasoning.delta": (event) => {
"session.next.reasoning.delta": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestReasoning(draft)
if (match) match.text += event.delta
if (match) match.text += event.data.delta
}),
)
}
},
"session.reasoning.ended": (event) => {
"session.next.reasoning.ended": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestReasoning(draft)
if (match) match.text = event.text
if (match) match.text = event.data.text
}),
)
}
},
"session.retried": (event) => {
"session.next.retried": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
@ -244,7 +244,7 @@ export function stepWith<Result>(adapter: Adapter<Result>, event: SessionEvent.E
)
}
},
"session.compacted": (event) => {
"session.next.compacted": (event) => {
adapter.appendEntry(SessionEntry.Compaction.fromEvent(event))
},
})

View file

@ -26,27 +26,30 @@ export class User extends Schema.Class<User>("Session.Entry.User")({
}) {
static fromEvent(event: SessionEvent.Prompted) {
return new User({
id: event.id,
id: ID.create(),
type: "user",
metadata: event.metadata,
text: event.prompt.text,
files: event.prompt.files,
agents: event.prompt.agents,
time: { created: event.timestamp },
text: event.data.prompt.text,
files: event.data.prompt.files,
agents: event.data.prompt.agents,
time: { created: event.data.timestamp },
})
}
}
export class Synthetic extends Schema.Class<Synthetic>("Session.Entry.Synthetic")({
...SessionEvent.Synthetic.fields,
...Base,
sessionID: SessionEvent.Synthetic.fields.data.fields.sessionID,
text: SessionEvent.Synthetic.fields.data.fields.text,
type: Schema.Literal("synthetic"),
}) {
static fromEvent(event: SessionEvent.Synthetic) {
return new Synthetic({
...event,
sessionID: event.data.sessionID,
text: event.data.text,
id: ID.create(),
type: "synthetic",
time: { created: event.timestamp },
time: { created: event.data.timestamp },
})
}
}
@ -116,10 +119,10 @@ export class AssistantRetry extends Schema.Class<AssistantRetry>("Session.Entry.
}) {
static fromEvent(event: SessionEvent.Retried) {
return new AssistantRetry({
attempt: event.attempt,
error: event.error,
attempt: event.data.attempt,
error: event.data.error,
time: {
created: event.timestamp,
created: event.data.timestamp,
},
})
}
@ -153,10 +156,10 @@ export class Assistant extends Schema.Class<Assistant>("Session.Entry.Assistant"
}) {
static fromEvent(event: SessionEvent.Step.Started) {
return new Assistant({
id: event.id,
id: ID.create(),
type: "assistant",
time: {
created: event.timestamp,
created: event.data.timestamp,
},
content: [],
retries: [],
@ -165,15 +168,20 @@ export class Assistant extends Schema.Class<Assistant>("Session.Entry.Assistant"
}
export class Compaction extends Schema.Class<Compaction>("Session.Entry.Compaction")({
...SessionEvent.Compacted.fields,
type: Schema.Literal("compaction"),
sessionID: SessionEvent.Compacted.fields.data.fields.sessionID,
auto: SessionEvent.Compacted.fields.data.fields.auto,
overflow: SessionEvent.Compacted.fields.data.fields.overflow,
...Base,
}) {
static fromEvent(event: SessionEvent.Compacted) {
return new Compaction({
...event,
sessionID: event.data.sessionID,
auto: event.data.auto,
overflow: event.data.overflow,
id: ID.create(),
type: "compaction",
time: { created: event.timestamp },
time: { created: event.data.timestamp },
})
}
}

View file

@ -1,10 +1,10 @@
import { SessionID } from "@/session/schema"
import { Event as BaseEvent } from "./event"
import { Event } from "./event"
import { FileAttachment, Prompt } from "./session-prompt"
import { Schema } from "effect"
export { FileAttachment }
export const ID = BaseEvent.ID
export const ID = Event.ID
export type ID = Schema.Schema.Type<typeof ID>
export const Source = Schema.Struct({
@ -12,24 +12,26 @@ export const Source = Schema.Struct({
end: Schema.Number,
text: Schema.String,
}).annotate({
identifier: "session.event.source",
identifier: "session.next.event.source",
})
export type Source = Schema.Schema.Type<typeof Source>
export const Prompted = BaseEvent.define({
type: "session.prompted",
export const Prompted = Event.define({
type: "session.next.prompted",
aggregate: "sessionID",
schema: {
timestamp: Schema.DateTimeUtcFromMillis,
sessionID: SessionID,
prompt: Prompt,
},
})
export type Prompted = Schema.Schema.Type<typeof Prompted>
export const Synthetic = BaseEvent.define({
type: "session.synthetic",
export const Synthetic = Event.define({
type: "session.next.synthetic",
aggregate: "sessionID",
schema: {
timestamp: Schema.DateTimeUtcFromMillis,
sessionID: SessionID,
text: Schema.String,
},
@ -37,10 +39,11 @@ export const Synthetic = BaseEvent.define({
export type Synthetic = Schema.Schema.Type<typeof Synthetic>
export namespace Step {
export const Started = BaseEvent.define({
type: "session.step.started",
export const Started = Event.define({
type: "session.next.step.started",
aggregate: "sessionID",
schema: {
timestamp: Schema.DateTimeUtcFromMillis,
sessionID: SessionID,
model: Schema.Struct({
id: Schema.String,
@ -51,10 +54,11 @@ export namespace Step {
})
export type Started = Schema.Schema.Type<typeof Started>
export const Ended = BaseEvent.define({
type: "session.step.ended",
export const Ended = Event.define({
type: "session.next.step.ended",
aggregate: "sessionID",
schema: {
timestamp: Schema.DateTimeUtcFromMillis,
sessionID: SessionID,
reason: Schema.String,
cost: Schema.Number,
@ -73,29 +77,32 @@ export namespace Step {
}
export namespace Text {
export const Started = BaseEvent.define({
type: "session.text.started",
export const Started = Event.define({
type: "session.next.text.started",
aggregate: "sessionID",
schema: {
timestamp: Schema.DateTimeUtcFromMillis,
sessionID: SessionID,
},
})
export type Started = Schema.Schema.Type<typeof Started>
export const Delta = BaseEvent.define({
type: "session.text.delta",
export const Delta = Event.define({
type: "session.next.text.delta",
aggregate: "sessionID",
schema: {
timestamp: Schema.DateTimeUtcFromMillis,
sessionID: SessionID,
delta: Schema.String,
},
})
export type Delta = Schema.Schema.Type<typeof Delta>
export const Ended = BaseEvent.define({
type: "session.text.ended",
export const Ended = Event.define({
type: "session.next.text.ended",
aggregate: "sessionID",
schema: {
timestamp: Schema.DateTimeUtcFromMillis,
sessionID: SessionID,
text: Schema.String,
},
@ -104,29 +111,32 @@ export namespace Text {
}
export namespace Reasoning {
export const Started = BaseEvent.define({
type: "session.reasoning.started",
export const Started = Event.define({
type: "session.next.reasoning.started",
aggregate: "sessionID",
schema: {
timestamp: Schema.DateTimeUtcFromMillis,
sessionID: SessionID,
},
})
export type Started = Schema.Schema.Type<typeof Started>
export const Delta = BaseEvent.define({
type: "session.reasoning.delta",
export const Delta = Event.define({
type: "session.next.reasoning.delta",
aggregate: "sessionID",
schema: {
timestamp: Schema.DateTimeUtcFromMillis,
sessionID: SessionID,
delta: Schema.String,
},
})
export type Delta = Schema.Schema.Type<typeof Delta>
export const Ended = BaseEvent.define({
type: "session.reasoning.ended",
export const Ended = Event.define({
type: "session.next.reasoning.ended",
aggregate: "sessionID",
schema: {
timestamp: Schema.DateTimeUtcFromMillis,
sessionID: SessionID,
text: Schema.String,
},
@ -136,10 +146,11 @@ export namespace Reasoning {
export namespace Tool {
export namespace Input {
export const Started = BaseEvent.define({
type: "session.tool.input.started",
export const Started = Event.define({
type: "session.next.tool.input.started",
aggregate: "sessionID",
schema: {
timestamp: Schema.DateTimeUtcFromMillis,
sessionID: SessionID,
callID: Schema.String,
name: Schema.String,
@ -147,10 +158,11 @@ export namespace Tool {
})
export type Started = Schema.Schema.Type<typeof Started>
export const Delta = BaseEvent.define({
type: "session.tool.input.delta",
export const Delta = Event.define({
type: "session.next.tool.input.delta",
aggregate: "sessionID",
schema: {
timestamp: Schema.DateTimeUtcFromMillis,
sessionID: SessionID,
callID: Schema.String,
delta: Schema.String,
@ -158,10 +170,11 @@ export namespace Tool {
})
export type Delta = Schema.Schema.Type<typeof Delta>
export const Ended = BaseEvent.define({
type: "session.tool.input.ended",
export const Ended = Event.define({
type: "session.next.tool.input.ended",
aggregate: "sessionID",
schema: {
timestamp: Schema.DateTimeUtcFromMillis,
sessionID: SessionID,
callID: Schema.String,
text: Schema.String,
@ -170,10 +183,11 @@ export namespace Tool {
export type Ended = Schema.Schema.Type<typeof Ended>
}
export const Called = BaseEvent.define({
type: "session.tool.called",
export const Called = Event.define({
type: "session.next.tool.called",
aggregate: "sessionID",
schema: {
timestamp: Schema.DateTimeUtcFromMillis,
sessionID: SessionID,
callID: Schema.String,
tool: Schema.String,
@ -186,10 +200,11 @@ export namespace Tool {
})
export type Called = Schema.Schema.Type<typeof Called>
export const Success = BaseEvent.define({
type: "session.tool.success",
export const Success = Event.define({
type: "session.next.tool.success",
aggregate: "sessionID",
schema: {
timestamp: Schema.DateTimeUtcFromMillis,
sessionID: SessionID,
callID: Schema.String,
title: Schema.String,
@ -203,10 +218,11 @@ export namespace Tool {
})
export type Success = Schema.Schema.Type<typeof Success>
export const Error = BaseEvent.define({
type: "session.tool.error",
export const Error = Event.define({
type: "session.next.tool.error",
aggregate: "sessionID",
schema: {
timestamp: Schema.DateTimeUtcFromMillis,
sessionID: SessionID,
callID: Schema.String,
error: Schema.String,
@ -227,14 +243,15 @@ export const RetryError = Schema.Struct({
responseBody: Schema.String.pipe(Schema.optional),
metadata: Schema.Record(Schema.String, Schema.String).pipe(Schema.optional),
}).annotate({
identifier: "session.retry_error",
identifier: "session.next.retry_error",
})
export type RetryError = Schema.Schema.Type<typeof RetryError>
export const Retried = BaseEvent.define({
type: "session.retried",
export const Retried = Event.define({
type: "session.next.retried",
aggregate: "sessionID",
schema: {
timestamp: Schema.DateTimeUtcFromMillis,
sessionID: SessionID,
attempt: Schema.Number,
error: RetryError,
@ -242,10 +259,11 @@ export const Retried = BaseEvent.define({
})
export type Retried = Schema.Schema.Type<typeof Retried>
export const Compacted = BaseEvent.define({
type: "session.compacted",
export const Compacted = Event.define({
type: "session.next.compacted",
aggregate: "sessionID",
schema: {
timestamp: Schema.DateTimeUtcFromMillis,
sessionID: SessionID,
auto: Schema.Boolean,
overflow: Schema.Boolean.pipe(Schema.optional),
@ -253,7 +271,7 @@ export const Compacted = BaseEvent.define({
})
export type Compacted = Schema.Schema.Type<typeof Compacted>
export const Event = Schema.Union(
export const All = Schema.Union(
[
Prompted,
Synthetic,
@ -278,7 +296,8 @@ export const Event = Schema.Union(
mode: "oneOf",
},
).pipe(Schema.toTaggedUnion("type"))
export type Event = Schema.Schema.Type<typeof Event>
export type Event = Schema.Schema.Type<typeof All>
export type Type = Event["type"]
export * as SessionEvent from "./session-event"

View file

@ -79,7 +79,7 @@ delete process.env["OPENCODE_SERVER_USERNAME"]
process.env["OPENCODE_DB"] = ":memory:"
// Now safe to import from src/
const Log = await import("@opencode-ai/core/util/log")
const { Log } = await import("@opencode-ai/core/util/log")
const { initProjectors } = await import("../src/server/projectors")
void Log.init({

View file

@ -1,641 +0,0 @@
import { describe, expect, test } from "bun:test"
import * as DateTime from "effect/DateTime"
import { SessionID } from "../../src/session/schema"
import { SessionEntry } from "../../src/v2/session-entry"
import { SessionEntryStepper } from "../../src/v2/session-entry-stepper"
import { SessionEvent } from "../../src/v2/session-event"
const sessionID = SessionID.descending()
const time = (n: number) => DateTime.makeUnsafe(n)
const tokens = {
input: 1,
output: 2,
reasoning: 3,
cache: {
read: 4,
write: 5,
},
}
function base<const Type extends SessionEvent.Type>(type: Type, timestamp: number) {
return {
id: SessionEvent.ID.create(),
type,
sessionID,
timestamp: time(timestamp),
}
}
function stepStarted(timestamp = 1) {
return ({
...base("session.step.started", timestamp),
model: {
id: "model",
providerID: "provider",
},
})
}
function stepEnded(timestamp = 1) {
return ({
...base("session.step.ended", timestamp),
reason: "stop",
cost: 1,
tokens,
})
}
function assistant() {
return new SessionEntry.Assistant({
id: SessionEvent.ID.create(),
type: "assistant",
time: { created: time(0) },
content: [],
retries: [],
})
}
function retryError(message: string) {
return ({
message,
isRetryable: true,
})
}
function retried(attempt: number, message: string, timestamp = 1) {
return ({
...base("session.retried", timestamp),
attempt,
error: retryError(message),
})
}
function retry(attempt: number, message: string, created: number) {
return new SessionEntry.AssistantRetry({
attempt,
error: retryError(message),
time: {
created: time(created),
},
})
}
function memoryState() {
const state: SessionEntryStepper.MemoryState = {
entries: [],
pending: [],
}
return state
}
function active() {
const state: SessionEntryStepper.MemoryState = {
entries: [assistant()],
pending: [],
}
return state
}
function run(events: SessionEvent.Event[], state = memoryState()) {
return events.reduce<SessionEntryStepper.MemoryState>((state, event) => SessionEntryStepper.step(state, event), state)
}
function last(state: SessionEntryStepper.MemoryState) {
const entry = [...state.pending, ...state.entries].reverse().find((x) => x.type === "assistant")
expect(entry?.type).toBe("assistant")
return entry?.type === "assistant" ? entry : undefined
}
function textsOf(state: SessionEntryStepper.MemoryState) {
const entry = last(state)
if (!entry) return []
return entry.content.filter((x): x is SessionEntry.AssistantText => x.type === "text")
}
function reasons(state: SessionEntryStepper.MemoryState) {
const entry = last(state)
if (!entry) return []
return entry.content.filter((x): x is SessionEntry.AssistantReasoning => x.type === "reasoning")
}
function tools(state: SessionEntryStepper.MemoryState) {
const entry = last(state)
if (!entry) return []
return entry.content.filter((x): x is SessionEntry.AssistantTool => x.type === "tool")
}
function tool(state: SessionEntryStepper.MemoryState, callID: string) {
return tools(state).find((x) => x.callID === callID)
}
function retriesOf(state: SessionEntryStepper.MemoryState) {
const entry = last(state)
if (!entry) return []
return entry.retries ?? []
}
describe("session-entry-stepper", () => {
describe("stepWith", () => {
test("aggregates retry events onto the current assistant", () => {
const state = active()
SessionEntryStepper.stepWith(SessionEntryStepper.memory(state), retried(1, "rate limited", 1))
SessionEntryStepper.stepWith(SessionEntryStepper.memory(state), retried(2, "provider overloaded", 2))
expect(retriesOf(state)).toEqual([retry(1, "rate limited", 1), retry(2, "provider overloaded", 2)])
})
})
describe("memory", () => {
test("tracks and replaces the current assistant", () => {
const state = active()
const adapter = SessionEntryStepper.memory(state)
const current = adapter.getCurrentAssistant()
expect(current?.type).toBe("assistant")
if (!current) return
adapter.updateAssistant(
new SessionEntry.Assistant({
...current,
content: [new SessionEntry.AssistantText({ type: "text", text: "done" })],
time: {
...current.time,
completed: time(1),
},
}),
)
expect(adapter.getCurrentAssistant()).toBeUndefined()
expect(state.entries[0]?.type).toBe("assistant")
if (state.entries[0]?.type !== "assistant") return
expect(state.entries[0].content).toEqual([{ type: "text", text: "done" }])
expect(state.entries[0].time.completed).toEqual(time(1))
})
test("appends committed and pending entries", () => {
const state = memoryState()
const adapter = SessionEntryStepper.memory(state)
const committed = SessionEntry.User.fromEvent(
({ ...base("session.prompted", 1), prompt: { text: "committed" } }),
)
const pending = SessionEntry.User.fromEvent(({ ...base("session.prompted", 2), prompt: { text: "pending" } }))
adapter.appendEntry(committed)
adapter.appendPending(pending)
expect(state.entries).toEqual([committed])
expect(state.pending).toEqual([pending])
})
test("stepWith through memory records reasoning", () => {
const state = active()
SessionEntryStepper.stepWith(SessionEntryStepper.memory(state), (base("session.reasoning.started", 1)))
SessionEntryStepper.stepWith(
SessionEntryStepper.memory(state),
({ ...base("session.reasoning.delta", 2), delta: "draft" }),
)
SessionEntryStepper.stepWith(
SessionEntryStepper.memory(state),
({ ...base("session.reasoning.ended", 3), text: "final" }),
)
expect(reasons(state)).toEqual([{ type: "reasoning", text: "final" }])
})
test("stepWith through memory records retries", () => {
const state = active()
SessionEntryStepper.stepWith(SessionEntryStepper.memory(state), retried(1, "rate limited", 1))
expect(retriesOf(state)).toEqual([retry(1, "rate limited", 1)])
})
})
describe("step", () => {
describe("seeded pending assistant", () => {
test("stores prompts in entries when no assistant is pending", () => {
const next = SessionEntryStepper.step(memoryState(), ({ ...base("session.prompted", 1), prompt: { text: "hello" } }))
expect(next.entries).toHaveLength(1)
expect(next.entries[0]?.type).toBe("user")
if (next.entries[0]?.type !== "user") return
expect(next.entries[0].text).toBe("hello")
})
test("stores prompts in pending when an assistant is pending", () => {
const next = SessionEntryStepper.step(active(), ({ ...base("session.prompted", 1), prompt: { text: "hello" } }))
expect(next.pending).toHaveLength(1)
expect(next.pending[0]?.type).toBe("user")
if (next.pending[0]?.type !== "user") return
expect(next.pending[0].text).toBe("hello")
})
test("accumulates text deltas on the latest text part", () => {
const next = run(
[
(base("session.text.started", 1)),
({ ...base("session.text.delta", 2), delta: "hel" }),
({ ...base("session.text.delta", 3), delta: "lo" }),
],
active(),
)
expect(textsOf(next)).toEqual([
{
type: "text",
text: "hello",
},
])
})
test("routes later text deltas to the latest text segment", () => {
const next = run(
[
(base("session.text.started", 1)),
({ ...base("session.text.delta", 2), delta: "first" }),
(base("session.text.started", 3)),
({ ...base("session.text.delta", 4), delta: "second" }),
],
active(),
)
expect(textsOf(next)).toEqual([
{ type: "text", text: "first" },
{ type: "text", text: "second" },
])
})
test("reasoning.ended replaces buffered reasoning text", () => {
const next = run(
[
(base("session.reasoning.started", 1)),
({ ...base("session.reasoning.delta", 2), delta: "draft" }),
({ ...base("session.reasoning.ended", 3), text: "final" }),
],
active(),
)
expect(reasons(next)).toEqual([
{
type: "reasoning",
text: "final",
},
])
})
test("tool.success completes the latest running tool", () => {
const input = { command: "ls", limit: 2 }
const metadata = { cwd: "/tmp" }
const attachments = [SessionEvent.FileAttachment.create({ uri: "file:///tmp/out.txt", mime: "text/plain" })]
const next = run(
[
({ ...base("session.tool.input.started", 1), callID: "call", name: "bash" }),
({ ...base("session.tool.input.delta", 2), callID: "call", delta: "{\"command\":" }),
({ ...base("session.tool.input.delta", 3), callID: "call", delta: "\"ls\"}" }),
({
...base("session.tool.called", 4),
callID: "call",
tool: "bash",
input,
provider: { executed: true },
}),
({
...base("session.tool.success", 5),
callID: "call",
title: "Listed files",
output: "ok",
metadata,
attachments,
provider: { executed: true },
}),
],
active(),
)
const match = tool(next, "call")
expect(match?.state.status).toBe("completed")
if (match?.state.status !== "completed") return
expect(match.time.ran).toEqual(time(4))
expect(match.state.input).toEqual(input)
expect(match.state.output).toBe("ok")
expect(match.state.title).toBe("Listed files")
expect(match.state.metadata).toEqual(metadata)
expect(match.state.attachments).toEqual(attachments)
})
test("tool.error completes the latest running tool with an error", () => {
const input = { command: "ls" }
const metadata = { cwd: "/tmp" }
const next = run(
[
({ ...base("session.tool.input.started", 1), callID: "call", name: "bash" }),
({
...base("session.tool.called", 2),
callID: "call",
tool: "bash",
input,
provider: { executed: true },
}),
({
...base("session.tool.error", 3),
callID: "call",
error: "permission denied",
metadata,
provider: { executed: true },
}),
],
active(),
)
const match = tool(next, "call")
expect(match?.state.status).toBe("error")
if (match?.state.status !== "error") return
expect(match.time.ran).toEqual(time(2))
expect(match.state.input).toEqual(input)
expect(match.state.error).toBe("permission denied")
expect(match.state.metadata).toEqual(metadata)
})
test("tool.success is ignored before tool.called promotes the tool to running", () => {
const next = run(
[
({ ...base("session.tool.input.started", 1), callID: "call", name: "bash" }),
({
...base("session.tool.success", 2),
callID: "call",
title: "Done",
provider: { executed: true },
}),
],
active(),
)
const match = tool(next, "call")
expect(match?.state).toEqual({
status: "pending",
input: "",
})
})
test("step.ended copies completion fields onto the pending assistant", () => {
const event = stepEnded(9)
const next = SessionEntryStepper.step(active(), event)
const entry = last(next)
expect(entry).toBeDefined()
if (!entry) return
expect(entry.time.completed).toEqual(event.timestamp)
expect(entry.cost).toBe(event.cost)
expect(entry.tokens).toEqual(event.tokens)
})
})
describe("known reducer gaps", () => {
test("prompt appends immutably when no assistant is pending", () => {
const old = memoryState()
const next = SessionEntryStepper.step(old, ({ ...base("session.prompted", 1), prompt: { text: "hello" } }))
expect(old).not.toBe(next)
expect(old.entries).toHaveLength(0)
expect(next.entries).toHaveLength(1)
})
test("prompt appends immutably when an assistant is pending", () => {
const old = active()
const next = SessionEntryStepper.step(old, ({ ...base("session.prompted", 1), prompt: { text: "hello" } }))
expect(old).not.toBe(next)
expect(old.pending).toHaveLength(0)
expect(next.pending).toHaveLength(1)
})
test("step.started creates an assistant consumed by follow-up events", () => {
const next = run([
stepStarted(1),
(base("session.text.started", 2)),
({ ...base("session.text.delta", 3), delta: "hello" }),
stepEnded(4),
])
const entry = last(next)
expect(entry).toBeDefined()
if (!entry) return
expect(entry.content).toEqual([
{
type: "text",
text: "hello",
},
])
expect(entry.time.completed).toEqual(time(4))
})
test("replays prompt -> step -> text -> step.ended", () => {
const next = run([
({ ...base("session.prompted", 0), prompt: { text: "hello" } }),
stepStarted(1),
(base("session.text.started", 2)),
({ ...base("session.text.delta", 3), delta: "world" }),
stepEnded(4),
])
expect(next.entries).toHaveLength(2)
expect(next.entries[0]?.type).toBe("user")
expect(next.entries[1]?.type).toBe("assistant")
if (next.entries[1]?.type !== "assistant") return
expect(next.entries[1].content).toEqual([
{
type: "text",
text: "world",
},
])
expect(next.entries[1].time.completed).toEqual(time(4))
})
test("replays prompt -> step -> reasoning -> tool -> success -> step.ended", () => {
const input = { command: "ls" }
const next = run([
({ ...base("session.prompted", 0), prompt: { text: "hello" } }),
stepStarted(1),
(base("session.reasoning.started", 2)),
({ ...base("session.reasoning.delta", 3), delta: "draft" }),
({ ...base("session.reasoning.ended", 4), text: "final" }),
({ ...base("session.tool.input.started", 5), callID: "call", name: "bash" }),
({
...base("session.tool.called", 6),
callID: "call",
tool: "bash",
input,
provider: { executed: true },
}),
({
...base("session.tool.success", 7),
callID: "call",
title: "Listed files",
output: "ok",
provider: { executed: true },
}),
stepEnded(8),
])
expect(next.entries.at(-1)?.type).toBe("assistant")
const entry = next.entries.at(-1)
if (entry?.type !== "assistant") return
expect(entry.content).toHaveLength(2)
expect(entry.content[0]).toEqual({
type: "reasoning",
text: "final",
})
expect(entry.content[1]?.type).toBe("tool")
if (entry.content[1]?.type !== "tool") return
expect(entry.content[1].state.status).toBe("completed")
expect(entry.time.completed).toEqual(time(8))
})
test("starting a new step completes the old assistant and appends a new active assistant", () => {
const next = run([stepStarted(1)], active())
expect(next.entries).toHaveLength(2)
expect(next.entries[0]?.type).toBe("assistant")
expect(next.entries[1]?.type).toBe("assistant")
if (next.entries[0]?.type !== "assistant" || next.entries[1]?.type !== "assistant") return
expect(next.entries[0].time.completed).toEqual(time(1))
expect(next.entries[1].time.created).toEqual(time(1))
expect(next.entries[1].time.completed).toBeUndefined()
})
test("handles sequential tools independently", () => {
const firstInput = { command: "ls" }
const secondInput = { pattern: "TODO" }
const next = run(
[
({ ...base("session.tool.input.started", 1), callID: "a", name: "bash" }),
({
...base("session.tool.called", 2),
callID: "a",
tool: "bash",
input: firstInput,
provider: { executed: true },
}),
({
...base("session.tool.success", 3),
callID: "a",
title: "Listed",
output: "done",
provider: { executed: true },
}),
({ ...base("session.tool.input.started", 4), callID: "b", name: "bash" }),
({
...base("session.tool.called", 5),
callID: "b",
tool: "bash",
input: secondInput,
provider: { executed: true },
}),
({
...base("session.tool.error", 6),
callID: "b",
error: "not found",
provider: { executed: true },
}),
],
active(),
)
const first = tool(next, "a")
const second = tool(next, "b")
expect(first?.state.status).toBe("completed")
if (first?.state.status !== "completed") return
expect(first.state.input).toEqual(firstInput)
expect(first.state.output).toBe("done")
expect(first.state.title).toBe("Listed")
expect(second?.state.status).toBe("error")
if (second?.state.status !== "error") return
expect(second.state.input).toEqual(secondInput)
expect(second.state.error).toBe("not found")
})
test("routes tool events by callID when tool streams interleave", () => {
const firstInput = { command: "ls" }
const secondInput = { pattern: "TODO" }
const next = run(
[
({ ...base("session.tool.input.started", 1), callID: "a", name: "bash" }),
({ ...base("session.tool.input.started", 2), callID: "b", name: "grep" }),
({ ...base("session.tool.input.delta", 3), callID: "a", delta: "first" }),
({ ...base("session.tool.input.delta", 4), callID: "b", delta: "second" }),
({
...base("session.tool.called", 5),
callID: "a",
tool: "bash",
input: firstInput,
provider: { executed: true },
}),
({
...base("session.tool.called", 6),
callID: "b",
tool: "grep",
input: secondInput,
provider: { executed: true },
}),
({
...base("session.tool.success", 7),
callID: "a",
title: "Listed",
output: "done-a",
provider: { executed: true },
}),
({
...base("session.tool.success", 8),
callID: "b",
title: "Grep",
output: "done-b",
provider: { executed: true },
}),
],
active(),
)
const first = tool(next, "a")
const second = tool(next, "b")
expect(first?.state.status).toBe("completed")
expect(second?.state.status).toBe("completed")
if (first?.state.status !== "completed" || second?.state.status !== "completed") return
expect(first.state.input).toEqual(firstInput)
expect(second.state.input).toEqual(secondInput)
expect(first.state.title).toBe("Listed")
expect(second.state.title).toBe("Grep")
})
test("records synthetic events", () => {
const next = SessionEntryStepper.step(
memoryState(),
({ ...base("session.synthetic", 1), text: "generated" }),
)
expect(next.entries).toHaveLength(1)
expect(next.entries[0]?.type).toBe("synthetic")
if (next.entries[0]?.type !== "synthetic") return
expect(next.entries[0].text).toBe("generated")
})
test("records compaction events", () => {
const next = SessionEntryStepper.step(
memoryState(),
({ ...base("session.compacted", 1), auto: true, overflow: false }),
)
expect(next.entries).toHaveLength(1)
expect(next.entries[0]?.type).toBe("compaction")
if (next.entries[0]?.type !== "compaction") return
expect(next.entries[0].auto).toBe(true)
expect(next.entries[0].overflow).toBe(false)
})
})
})
})