Apply PR #24512: Refactor v2 session events as schemas

This commit is contained in:
opencode-agent[bot] 2026-04-28 03:45:25 +00:00
commit 4e0a2adc6f
24 changed files with 3018 additions and 1484 deletions

View file

@ -462,7 +462,6 @@
},
"devDependencies": {
"@babel/core": "7.28.4",
"@effect/language-service": "0.84.2",
"@octokit/webhooks-types": "7.6.1",
"@opencode-ai/core": "workspace:*",
"@opencode-ai/script": "workspace:*",
@ -1077,8 +1076,6 @@
"@drizzle-team/brocli": ["@drizzle-team/brocli@0.11.0", "", {}, "sha512-hD3pekGiPg0WPCCGAZmusBBJsDqGUR66Y452YgQsZOnkdQ7ViEPKuyP4huUGEZQefp8g34RRodXYmJ2TbCH+tg=="],
"@effect/language-service": ["@effect/language-service@0.84.2", "", { "bin": { "effect-language-service": "cli.js" } }, "sha512-l04qNxpiA8rY5yXWckRPJ7Mk5MNerXuNymSFf+IdflfI5i8jgL1bpBNLuP6ijg7wgjdHc/KmTnCj2kT0SCntuA=="],
"@effect/opentelemetry": ["@effect/opentelemetry@4.0.0-beta.57", "", { "peerDependencies": { "@opentelemetry/api": "^1.9", "@opentelemetry/resources": "^2.0.0", "@opentelemetry/sdk-logs": ">=0.203.0 <0.300.0", "@opentelemetry/sdk-metrics": "^2.0.0", "@opentelemetry/sdk-trace-base": "^2.0.0", "@opentelemetry/sdk-trace-node": "^2.0.0", "@opentelemetry/sdk-trace-web": "^2.0.0", "@opentelemetry/semantic-conventions": "^1.33.0", "effect": "^4.0.0-beta.57" }, "optionalPeers": ["@opentelemetry/api", "@opentelemetry/resources", "@opentelemetry/sdk-logs", "@opentelemetry/sdk-metrics", "@opentelemetry/sdk-trace-base", "@opentelemetry/sdk-trace-node", "@opentelemetry/sdk-trace-web"] }, "sha512-gdjZPEP0QQg4qmI1vd+443kheeQZKytrjJIzCJncy6ZEpyk/SfrqeStLqLXdTRcms3IB0ls0vOV7KNq7YmBRVA=="],
"@effect/platform-node": ["@effect/platform-node@4.0.0-beta.57", "", { "dependencies": { "@effect/platform-node-shared": "^4.0.0-beta.57", "mime": "^4.1.0", "undici": "^8.0.2" }, "peerDependencies": { "effect": "^4.0.0-beta.57", "ioredis": "^5.7.0" } }, "sha512-la0xxPSAYOsY0d+uVxEBxok3jYB31iPQmIaZZRUj2SNWqcGGHJc6KorKtI8guqSLuv9FGZ255kBWXRbG6hMeeg=="],

View file

@ -1,3 +1,5 @@
export * as Log from "./log"
import path from "path"
import fs from "fs/promises"
import { createWriteStream } from "fs"

View file

@ -2,13 +2,6 @@
"$schema": "https://json.schemastore.org/tsconfig",
"extends": "@tsconfig/bun/tsconfig.json",
"compilerOptions": {
"noUncheckedIndexedAccess": false,
"plugins": [
{
"name": "@effect/language-service",
"transform": "@effect/language-service/transform",
"namespaceImportPackages": ["effect", "@effect/*"]
}
]
"noUncheckedIndexedAccess": false
}
}

View file

@ -0,0 +1,17 @@
CREATE TABLE `session_message` (
`id` text PRIMARY KEY,
`session_id` text NOT NULL,
`type` text NOT NULL,
`time_created` integer NOT NULL,
`time_updated` integer NOT NULL,
`data` text NOT NULL,
CONSTRAINT `fk_session_message_session_id_session_id_fk` FOREIGN KEY (`session_id`) REFERENCES `session`(`id`) ON DELETE CASCADE
);
--> statement-breakpoint
DROP INDEX IF EXISTS `session_entry_session_idx`;--> statement-breakpoint
DROP INDEX IF EXISTS `session_entry_session_type_idx`;--> statement-breakpoint
DROP INDEX IF EXISTS `session_entry_time_created_idx`;--> statement-breakpoint
CREATE INDEX `session_message_session_idx` ON `session_message` (`session_id`);--> statement-breakpoint
CREATE INDEX `session_message_session_type_idx` ON `session_message` (`session_id`,`type`);--> statement-breakpoint
CREATE INDEX `session_message_time_created_idx` ON `session_message` (`time_created`);--> statement-breakpoint
DROP TABLE `session_entry`;

File diff suppressed because it is too large Load diff

View file

@ -6,7 +6,6 @@
"license": "MIT",
"private": true,
"scripts": {
"prepare": "effect-language-service patch || true",
"typecheck": "tsgo --noEmit",
"test": "bun test --timeout 30000",
"test:ci": "mkdir -p .artifacts/unit && bun test --timeout 30000 --reporter=junit --reporter-outfile=.artifacts/unit/junit.xml",
@ -42,7 +41,6 @@
},
"devDependencies": {
"@babel/core": "7.28.4",
"@effect/language-service": "0.84.2",
"@octokit/webhooks-types": "7.6.1",
"@opencode-ai/script": "workspace:*",
"@opencode-ai/core": "workspace:*",

View file

@ -20,6 +20,9 @@ import { Question } from "@/question"
import { errorMessage } from "@/util/error"
import * as Log from "@opencode-ai/core/util/log"
import { isRecord } from "@/util/record"
import { SyncEvent } from "@/sync"
import { SessionEvent } from "@/v2/session-event"
import * as DateTime from "effect/DateTime"
const DOOM_LOOP_THRESHOLD = 3
const log = Log.create({ service: "session.processor" })
@ -221,6 +224,11 @@ export const layer: Layer.Layer<
case "reasoning-start":
if (value.id in ctx.reasoningMap) return
SyncEvent.run(SessionEvent.Reasoning.Started.Sync, {
sessionID: ctx.sessionID,
reasoningID: value.id,
timestamp: DateTime.makeUnsafe(Date.now()),
})
ctx.reasoningMap[value.id] = {
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
@ -235,6 +243,12 @@ export const layer: Layer.Layer<
case "reasoning-delta":
if (!(value.id in ctx.reasoningMap)) return
SyncEvent.run(SessionEvent.Reasoning.Delta.Sync, {
sessionID: ctx.sessionID,
reasoningID: value.id,
delta: value.text,
timestamp: DateTime.makeUnsafe(Date.now()),
})
ctx.reasoningMap[value.id].text += value.text
if (value.providerMetadata) ctx.reasoningMap[value.id].metadata = value.providerMetadata
yield* session.updatePartDelta({
@ -248,6 +262,12 @@ export const layer: Layer.Layer<
case "reasoning-end":
if (!(value.id in ctx.reasoningMap)) return
SyncEvent.run(SessionEvent.Reasoning.Ended.Sync, {
sessionID: ctx.sessionID,
reasoningID: value.id,
text: ctx.reasoningMap[value.id].text,
timestamp: DateTime.makeUnsafe(Date.now()),
})
// oxlint-disable-next-line no-self-assign -- reactivity trigger
ctx.reasoningMap[value.id].text = ctx.reasoningMap[value.id].text
ctx.reasoningMap[value.id].time = { ...ctx.reasoningMap[value.id].time, end: Date.now() }
@ -260,6 +280,12 @@ export const layer: Layer.Layer<
if (ctx.assistantMessage.summary) {
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
}
SyncEvent.run(SessionEvent.Tool.Input.Started.Sync, {
sessionID: ctx.sessionID,
callID: value.id,
name: value.toolName,
timestamp: DateTime.makeUnsafe(Date.now()),
})
const part = yield* session.updatePart({
id: ctx.toolcalls[value.id]?.partID ?? PartID.ascending(),
messageID: ctx.assistantMessage.id,
@ -281,13 +307,32 @@ export const layer: Layer.Layer<
case "tool-input-delta":
return
case "tool-input-end":
case "tool-input-end": {
SyncEvent.run(SessionEvent.Tool.Input.Ended.Sync, {
sessionID: ctx.sessionID,
callID: value.id,
text: "",
timestamp: DateTime.makeUnsafe(Date.now()),
})
return
}
case "tool-call": {
if (ctx.assistantMessage.summary) {
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
}
const toolCall = yield* readToolCall(value.toolCallId)
SyncEvent.run(SessionEvent.Tool.Called.Sync, {
sessionID: ctx.sessionID,
callID: value.toolCallId,
tool: value.toolName,
input: value.input,
provider: {
executed: toolCall?.part.metadata?.providerExecuted === true,
...(value.providerMetadata ? { metadata: value.providerMetadata } : {}),
},
timestamp: DateTime.makeUnsafe(Date.now()),
})
yield* updateToolCall(value.toolCallId, (match) => ({
...match,
tool: value.toolName,
@ -331,11 +376,46 @@ export const layer: Layer.Layer<
}
case "tool-result": {
const toolCall = yield* readToolCall(value.toolCallId)
SyncEvent.run(SessionEvent.Tool.Success.Sync, {
sessionID: ctx.sessionID,
callID: value.toolCallId,
structured: value.output.metadata,
content: [
{
type: "text",
text: value.output.output,
},
...(value.output.attachments?.map((item: MessageV2.FilePart) => ({
type: "file",
uri: item.url,
mime: item.mime,
name: item.filename,
})) ?? []),
],
provider: {
executed: toolCall?.part.metadata?.providerExecuted === true,
},
timestamp: DateTime.makeUnsafe(Date.now()),
})
yield* completeToolCall(value.toolCallId, value.output)
return
}
case "tool-error": {
const toolCall = yield* readToolCall(value.toolCallId)
SyncEvent.run(SessionEvent.Tool.Error.Sync, {
sessionID: ctx.sessionID,
callID: value.toolCallId,
error: {
type: "unknown",
message: errorMessage(value.error),
},
provider: {
executed: toolCall?.part.metadata?.providerExecuted === true,
},
timestamp: DateTime.makeUnsafe(Date.now()),
})
yield* failToolCall(value.toolCallId, value.error)
return
}
@ -345,6 +425,16 @@ export const layer: Layer.Layer<
case "start-step":
if (!ctx.snapshot) ctx.snapshot = yield* snapshot.track()
SyncEvent.run(SessionEvent.Step.Started.Sync, {
sessionID: ctx.sessionID,
model: {
id: ctx.model.id,
providerID: ctx.model.providerID,
variant: input.assistantMessage.variant,
},
snapshot: ctx.snapshot,
timestamp: DateTime.makeUnsafe(Date.now()),
})
yield* session.updatePart({
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
@ -355,18 +445,27 @@ export const layer: Layer.Layer<
return
case "finish-step": {
const completedSnapshot = yield* snapshot.track()
const usage = Session.getUsage({
model: ctx.model,
usage: value.usage,
metadata: value.providerMetadata,
})
SyncEvent.run(SessionEvent.Step.Ended.Sync, {
sessionID: ctx.sessionID,
reason: value.finishReason,
cost: usage.cost,
tokens: usage.tokens,
snapshot: completedSnapshot,
timestamp: DateTime.makeUnsafe(Date.now()),
})
ctx.assistantMessage.finish = value.finishReason
ctx.assistantMessage.cost += usage.cost
ctx.assistantMessage.tokens = usage.tokens
yield* session.updatePart({
id: PartID.ascending(),
reason: value.finishReason,
snapshot: yield* snapshot.track(),
snapshot: completedSnapshot,
messageID: ctx.assistantMessage.id,
sessionID: ctx.assistantMessage.sessionID,
type: "step-finish",
@ -404,6 +503,10 @@ export const layer: Layer.Layer<
}
case "text-start":
SyncEvent.run(SessionEvent.Text.Started.Sync, {
sessionID: ctx.sessionID,
timestamp: DateTime.makeUnsafe(Date.now()),
})
ctx.currentText = {
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
@ -442,6 +545,11 @@ export const layer: Layer.Layer<
},
{ text: ctx.currentText.text },
)).text
SyncEvent.run(SessionEvent.Text.Ended.Sync, {
sessionID: ctx.sessionID,
text: ctx.currentText.text,
timestamp: DateTime.makeUnsafe(Date.now()),
})
{
const end = Date.now()
ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end }
@ -568,13 +676,23 @@ export const layer: Layer.Layer<
Effect.retry(
SessionRetry.policy({
parse,
set: (info) =>
status.set(ctx.sessionID, {
set: (info) => {
SyncEvent.run(SessionEvent.Retried.Sync, {
sessionID: ctx.sessionID,
attempt: info.attempt,
error: {
message: info.message,
isRetryable: true,
},
timestamp: DateTime.makeUnsafe(Date.now()),
})
return status.set(ctx.sessionID, {
type: "retry",
attempt: info.attempt,
message: info.message,
next: info.next,
}),
})
},
}),
),
Effect.catch(halt),

View file

@ -0,0 +1,106 @@
import { and, desc, eq } from "@/storage/db"
import type { Database } from "@/storage/db"
import { SessionMessage } from "@/v2/session-message"
import { SessionMessageUpdater } from "@/v2/session-message-updater"
import { SessionEvent } from "@/v2/session-event"
import * as DateTime from "effect/DateTime"
import { SyncEvent } from "@/sync"
import { SessionMessageTable } from "./session.sql"
import type { SessionID } from "./schema"
function sqlite(db: Database.TxOrDb, sessionID: SessionID): SessionMessageUpdater.Adapter<void> {
return {
getCurrentAssistant() {
return db
.select()
.from(SessionMessageTable)
.where(and(eq(SessionMessageTable.session_id, sessionID), eq(SessionMessageTable.type, "assistant")))
.orderBy(desc(SessionMessageTable.id))
.all()
.map((row) => ({ id: row.id, type: row.type, ...row.data }) as SessionMessage.Message)
.find((message): message is SessionMessage.Assistant => message.type === "assistant" && !message.time.completed)
},
updateAssistant(assistant) {
const { id, type, ...data } = assistant
db.update(SessionMessageTable)
.set({ data })
.where(
and(
eq(SessionMessageTable.id, id),
eq(SessionMessageTable.session_id, sessionID),
eq(SessionMessageTable.type, type),
),
)
.run()
},
appendMessage(message) {
const { id, type, ...data } = message
db.insert(SessionMessageTable)
.values({
id,
session_id: sessionID,
type,
time_created: DateTime.toEpochMillis(message.time.created),
data,
})
.run()
},
appendPending() {},
finish() {},
}
}
function update(db: Database.TxOrDb, event: SessionEvent.Event) {
SessionMessageUpdater.update(sqlite(db, event.data.sessionID), event)
}
export default [
SyncEvent.project(SessionEvent.Prompted.Sync, (db, data) => {
update(db, { type: "session.next.prompted", data })
}),
SyncEvent.project(SessionEvent.Synthetic.Sync, (db, data) => {
update(db, { type: "session.next.synthetic", data })
}),
SyncEvent.project(SessionEvent.Step.Started.Sync, (db, data) => {
update(db, { type: "session.next.step.started", data })
}),
SyncEvent.project(SessionEvent.Step.Ended.Sync, (db, data) => {
update(db, { type: "session.next.step.ended", data })
}),
SyncEvent.project(SessionEvent.Text.Started.Sync, (db, data) => {
update(db, { type: "session.next.text.started", data })
}),
SyncEvent.project(SessionEvent.Text.Delta.Sync, () => {}),
SyncEvent.project(SessionEvent.Text.Ended.Sync, (db, data) => {
update(db, { type: "session.next.text.ended", data })
}),
SyncEvent.project(SessionEvent.Tool.Input.Started.Sync, (db, data) => {
update(db, { type: "session.next.tool.input.started", data })
}),
SyncEvent.project(SessionEvent.Tool.Input.Delta.Sync, () => {}),
SyncEvent.project(SessionEvent.Tool.Input.Ended.Sync, (db, data) => {
update(db, { type: "session.next.tool.input.ended", data })
}),
SyncEvent.project(SessionEvent.Tool.Called.Sync, (db, data) => {
update(db, { type: "session.next.tool.called", data })
}),
SyncEvent.project(SessionEvent.Tool.Success.Sync, (db, data) => {
update(db, { type: "session.next.tool.success", data })
}),
SyncEvent.project(SessionEvent.Tool.Error.Sync, (db, data) => {
update(db, { type: "session.next.tool.error", data })
}),
SyncEvent.project(SessionEvent.Reasoning.Started.Sync, (db, data) => {
update(db, { type: "session.next.reasoning.started", data })
}),
SyncEvent.project(SessionEvent.Reasoning.Delta.Sync, () => {}),
SyncEvent.project(SessionEvent.Reasoning.Ended.Sync, (db, data) => {
update(db, { type: "session.next.reasoning.ended", data })
}),
SyncEvent.project(SessionEvent.Retried.Sync, (db, data) => {
update(db, { type: "session.next.retried", data })
}),
SyncEvent.project(SessionEvent.Compacted.Sync, (db, data) => {
update(db, { type: "session.next.compacted", data })
}),
]

View file

@ -5,7 +5,8 @@ import { SyncEvent } from "@/sync"
import * as Session from "./session"
import { MessageV2 } from "./message-v2"
import { SessionTable, MessageTable, PartTable } from "./session.sql"
import * as Log from "@opencode-ai/core/util/log"
import { Log } from "@opencode-ai/core/util/log"
import nextProjectors from "./projectors-next"
const log = Log.create({ service: "session.projector" })
@ -135,4 +136,6 @@ export default [
log.warn("ignored late part update", { partID: id, messageID, sessionID })
}
}),
...nextProjectors,
]

View file

@ -1,7 +1,7 @@
import { sqliteTable, text, integer, index, primaryKey } from "drizzle-orm/sqlite-core"
import { ProjectTable } from "../project/project.sql"
import type { MessageV2 } from "./message-v2"
import type { SessionEntry } from "../v2/session-entry"
import type { SessionMessage } from "../v2/session-message"
import type { Snapshot } from "../snapshot"
import type { Permission } from "../permission"
import type { ProjectID } from "../project/schema"
@ -95,22 +95,22 @@ export const TodoTable = sqliteTable(
],
)
export const SessionEntryTable = sqliteTable(
"session_entry",
export const SessionMessageTable = sqliteTable(
"session_message",
{
id: text().$type<SessionEntry.ID>().primaryKey(),
id: text().$type<SessionMessage.ID>().primaryKey(),
session_id: text()
.$type<SessionID>()
.notNull()
.references(() => SessionTable.id, { onDelete: "cascade" }),
type: text().$type<SessionEntry.Type>().notNull(),
type: text().$type<SessionMessage.Type>().notNull(),
...Timestamps,
data: text({ mode: "json" }).notNull().$type<Omit<SessionEntry.Entry, "type" | "id">>(),
data: text({ mode: "json" }).notNull().$type<Omit<SessionMessage.Message, "type" | "id">>(),
},
(table) => [
index("session_entry_session_idx").on(table.session_id),
index("session_entry_session_type_idx").on(table.session_id, table.type),
index("session_entry_time_created_idx").on(table.time_created),
index("session_message_session_idx").on(table.session_id),
index("session_message_session_type_idx").on(table.session_id, table.type),
index("session_message_time_created_idx").on(table.time_created),
],
)

View file

@ -90,7 +90,7 @@ function bodyWithChecks(ast: SchemaAST.AST): z.ZodTypeAny {
// Schema.withDecodingDefault also attaches encoding, but we want `.default(v)`
// on the inner Zod rather than a transform wrapper — so optional ASTs whose
// encoding resolves a default from Option.none() route through body()/opt().
const hasEncoding = ast.encoding?.length && ast._tag !== "Declaration"
const hasEncoding = ast.encoding?.length && (ast._tag !== "Declaration" || ast.typeParameters.length === 0)
const hasTransform = hasEncoding && !(SchemaAST.isOptional(ast) && extractDefault(ast) !== undefined)
const base = hasTransform ? encoded(ast) : body(ast)
return ast.checks?.length ? applyChecks(base, ast.checks, ast) : base

View file

@ -0,0 +1,42 @@
import { Identifier } from "@/id/id"
import { SyncEvent } from "@/sync"
import { withStatics } from "@/util/schema"
import * as Schema from "effect/Schema"
export const ID = Schema.String.pipe(
Schema.brand("Event.ID"),
withStatics((s) => ({
create: () => s.make(Identifier.create("evt", "ascending")),
})),
)
export type ID = Schema.Schema.Type<typeof ID>
export function define<const Type extends string, Fields extends Schema.Struct.Fields>(input: {
type: Type
schema: Fields
aggregate: string
version?: number
}) {
const Payload = Schema.Struct({
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
type: Schema.Literal(input.type),
data: Schema.Struct(input.schema),
}).annotate({
identifier: input.type,
})
const Sync = SyncEvent.define({
type: input.type,
version: input.version ?? 1,
aggregate: input.aggregate,
schema: Payload.fields.data,
})
return Object.assign(Payload, {
Sync,
version: input.version,
aggregate: input.aggregate,
})
}
export * as Event from "./event"

View file

@ -1,127 +1,69 @@
import { Identifier } from "@/id/id"
import { withStatics } from "@/util/schema"
import * as DateTime from "effect/DateTime"
import { SessionID } from "@/session/schema"
import { Event } from "./event"
import { FileAttachment, Prompt } from "./session-prompt"
import { Schema } from "effect"
export { FileAttachment }
import { ToolOutput } from "./tool-output"
export namespace SessionEvent {
export const ID = Schema.String.pipe(
Schema.brand("Session.Event.ID"),
withStatics((s) => ({
create: () => s.make(Identifier.create("evt", "ascending")),
})),
)
export type ID = Schema.Schema.Type<typeof ID>
type Stamp = Schema.Schema.Type<typeof Schema.DateTimeUtc>
type BaseInput = {
id?: ID
metadata?: Record<string, unknown>
timestamp?: Stamp
}
export const ID = Event.ID
export type ID = Schema.Schema.Type<typeof ID>
const Base = {
id: ID,
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
timestamp: Schema.DateTimeUtc,
}
export const Source = Schema.Struct({
start: Schema.Number,
end: Schema.Number,
text: Schema.String,
}).annotate({
identifier: "session.next.event.source",
})
export type Source = Schema.Schema.Type<typeof Source>
export class Source extends Schema.Class<Source>("Session.Event.Source")({
start: Schema.Number,
end: Schema.Number,
text: Schema.String,
}) {}
const Base = {
timestamp: Schema.DateTimeUtcFromMillis,
sessionID: SessionID,
}
export class FileAttachment extends Schema.Class<FileAttachment>("Session.Event.FileAttachment")({
uri: Schema.String,
mime: Schema.String,
name: Schema.String.pipe(Schema.optional),
description: Schema.String.pipe(Schema.optional),
source: Source.pipe(Schema.optional),
}) {
static create(input: FileAttachment) {
return new FileAttachment({
uri: input.uri,
mime: input.mime,
name: input.name,
description: input.description,
source: input.source,
})
}
}
export class AgentAttachment extends Schema.Class<AgentAttachment>("Session.Event.AgentAttachment")({
name: Schema.String,
source: Source.pipe(Schema.optional),
}) {}
export class RetryError extends Schema.Class<RetryError>("Session.Event.Retry.Error")({
message: Schema.String,
statusCode: Schema.Number.pipe(Schema.optional),
isRetryable: Schema.Boolean,
responseHeaders: Schema.Record(Schema.String, Schema.String).pipe(Schema.optional),
responseBody: Schema.String.pipe(Schema.optional),
metadata: Schema.Record(Schema.String, Schema.String).pipe(Schema.optional),
}) {}
export class Prompt extends Schema.Class<Prompt>("Session.Event.Prompt")({
export const Prompted = Event.define({
type: "session.next.prompted",
aggregate: "sessionID",
version: 1,
schema: {
...Base,
type: Schema.Literal("prompt"),
text: Schema.String,
files: Schema.Array(FileAttachment).pipe(Schema.optional),
agents: Schema.Array(AgentAttachment).pipe(Schema.optional),
}) {
static create(input: BaseInput & { text: string; files?: FileAttachment[]; agents?: AgentAttachment[] }) {
return new Prompt({
id: input.id ?? ID.create(),
type: "prompt",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
text: input.text,
files: input.files,
agents: input.agents,
})
}
}
prompt: Prompt,
},
})
export type Prompted = Schema.Schema.Type<typeof Prompted>
export class Synthetic extends Schema.Class<Synthetic>("Session.Event.Synthetic")({
export const Synthetic = Event.define({
type: "session.next.synthetic",
aggregate: "sessionID",
schema: {
...Base,
type: Schema.Literal("synthetic"),
text: Schema.String,
}) {
static create(input: BaseInput & { text: string }) {
return new Synthetic({
id: input.id ?? ID.create(),
type: "synthetic",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
text: input.text,
})
}
}
},
})
export type Synthetic = Schema.Schema.Type<typeof Synthetic>
export namespace Step {
export class Started extends Schema.Class<Started>("Session.Event.Step.Started")({
export namespace Step {
export const Started = Event.define({
type: "session.next.step.started",
aggregate: "sessionID",
schema: {
...Base,
type: Schema.Literal("step.started"),
model: Schema.Struct({
id: Schema.String,
providerID: Schema.String,
variant: Schema.String.pipe(Schema.optional),
}),
}) {
static create(input: BaseInput & { model: { id: string; providerID: string; variant?: string } }) {
return new Started({
id: input.id ?? ID.create(),
type: "step.started",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
model: input.model,
})
}
}
snapshot: Schema.String.pipe(Schema.optional),
},
})
export type Started = Schema.Schema.Type<typeof Started>
export class Ended extends Schema.Class<Ended>("Session.Event.Step.Ended")({
export const Ended = Event.define({
type: "session.next.step.ended",
aggregate: "sessionID",
schema: {
...Base,
type: Schema.Literal("step.ended"),
reason: Schema.String,
cost: Schema.Number,
tokens: Schema.Struct({
@ -133,177 +75,118 @@ export namespace SessionEvent {
write: Schema.Number,
}),
}),
}) {
static create(input: BaseInput & { reason: string; cost: number; tokens: Ended["tokens"] }) {
return new Ended({
id: input.id ?? ID.create(),
type: "step.ended",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
reason: input.reason,
cost: input.cost,
tokens: input.tokens,
})
}
}
}
snapshot: Schema.String.pipe(Schema.optional),
},
})
export type Ended = Schema.Schema.Type<typeof Ended>
}
export namespace Text {
export class Started extends Schema.Class<Started>("Session.Event.Text.Started")({
export namespace Text {
export const Started = Event.define({
type: "session.next.text.started",
aggregate: "sessionID",
schema: {
...Base,
type: Schema.Literal("text.started"),
}) {
static create(input: BaseInput = {}) {
return new Started({
id: input.id ?? ID.create(),
type: "text.started",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
})
}
}
},
})
export type Started = Schema.Schema.Type<typeof Started>
export class Delta extends Schema.Class<Delta>("Session.Event.Text.Delta")({
export const Delta = Event.define({
type: "session.next.text.delta",
aggregate: "sessionID",
schema: {
...Base,
type: Schema.Literal("text.delta"),
delta: Schema.String,
}) {
static create(input: BaseInput & { delta: string }) {
return new Delta({
id: input.id ?? ID.create(),
type: "text.delta",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
delta: input.delta,
})
}
}
},
})
export type Delta = Schema.Schema.Type<typeof Delta>
export class Ended extends Schema.Class<Ended>("Session.Event.Text.Ended")({
export const Ended = Event.define({
type: "session.next.text.ended",
aggregate: "sessionID",
schema: {
...Base,
type: Schema.Literal("text.ended"),
text: Schema.String,
}) {
static create(input: BaseInput & { text: string }) {
return new Ended({
id: input.id ?? ID.create(),
type: "text.ended",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
text: input.text,
})
}
}
}
},
})
export type Ended = Schema.Schema.Type<typeof Ended>
}
export namespace Reasoning {
export class Started extends Schema.Class<Started>("Session.Event.Reasoning.Started")({
export namespace Reasoning {
export const Started = Event.define({
type: "session.next.reasoning.started",
aggregate: "sessionID",
schema: {
...Base,
type: Schema.Literal("reasoning.started"),
}) {
static create(input: BaseInput = {}) {
return new Started({
id: input.id ?? ID.create(),
type: "reasoning.started",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
})
}
}
reasoningID: Schema.String,
},
})
export type Started = Schema.Schema.Type<typeof Started>
export class Delta extends Schema.Class<Delta>("Session.Event.Reasoning.Delta")({
export const Delta = Event.define({
type: "session.next.reasoning.delta",
aggregate: "sessionID",
schema: {
...Base,
type: Schema.Literal("reasoning.delta"),
reasoningID: Schema.String,
delta: Schema.String,
}) {
static create(input: BaseInput & { delta: string }) {
return new Delta({
id: input.id ?? ID.create(),
type: "reasoning.delta",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
delta: input.delta,
})
}
}
},
})
export type Delta = Schema.Schema.Type<typeof Delta>
export class Ended extends Schema.Class<Ended>("Session.Event.Reasoning.Ended")({
export const Ended = Event.define({
type: "session.next.reasoning.ended",
aggregate: "sessionID",
schema: {
...Base,
type: Schema.Literal("reasoning.ended"),
reasoningID: Schema.String,
text: Schema.String,
}) {
static create(input: BaseInput & { text: string }) {
return new Ended({
id: input.id ?? ID.create(),
type: "reasoning.ended",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
text: input.text,
})
}
}
}
},
})
export type Ended = Schema.Schema.Type<typeof Ended>
}
export namespace Tool {
export namespace Input {
export class Started extends Schema.Class<Started>("Session.Event.Tool.Input.Started")({
export namespace Tool {
export namespace Input {
export const Started = Event.define({
type: "session.next.tool.input.started",
aggregate: "sessionID",
schema: {
...Base,
callID: Schema.String,
name: Schema.String,
type: Schema.Literal("tool.input.started"),
}) {
static create(input: BaseInput & { callID: string; name: string }) {
return new Started({
id: input.id ?? ID.create(),
type: "tool.input.started",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
callID: input.callID,
name: input.name,
})
}
}
},
})
export type Started = Schema.Schema.Type<typeof Started>
export class Delta extends Schema.Class<Delta>("Session.Event.Tool.Input.Delta")({
export const Delta = Event.define({
type: "session.next.tool.input.delta",
aggregate: "sessionID",
schema: {
...Base,
callID: Schema.String,
type: Schema.Literal("tool.input.delta"),
delta: Schema.String,
}) {
static create(input: BaseInput & { callID: string; delta: string }) {
return new Delta({
id: input.id ?? ID.create(),
type: "tool.input.delta",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
callID: input.callID,
delta: input.delta,
})
}
}
},
})
export type Delta = Schema.Schema.Type<typeof Delta>
export class Ended extends Schema.Class<Ended>("Session.Event.Tool.Input.Ended")({
export const Ended = Event.define({
type: "session.next.tool.input.ended",
aggregate: "sessionID",
schema: {
...Base,
callID: Schema.String,
type: Schema.Literal("tool.input.ended"),
text: Schema.String,
}) {
static create(input: BaseInput & { callID: string; text: string }) {
return new Ended({
id: input.id ?? ID.create(),
type: "tool.input.ended",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
callID: input.callID,
text: input.text,
})
}
}
}
},
})
export type Ended = Schema.Schema.Type<typeof Ended>
}
export class Called extends Schema.Class<Called>("Session.Event.Tool.Called")({
export const Called = Event.define({
type: "session.next.tool.called",
aggregate: "sessionID",
schema: {
...Base,
type: Schema.Literal("tool.called"),
callID: Schema.String,
tool: Schema.String,
input: Schema.Record(Schema.String, Schema.Unknown),
@ -311,148 +194,119 @@ export namespace SessionEvent {
executed: Schema.Boolean,
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
}),
}) {
static create(
input: BaseInput & {
callID: string
tool: string
input: Record<string, unknown>
provider: Called["provider"]
},
) {
return new Called({
id: input.id ?? ID.create(),
type: "tool.called",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
callID: input.callID,
tool: input.tool,
input: input.input,
provider: input.provider,
})
}
}
},
})
export type Called = Schema.Schema.Type<typeof Called>
export class Success extends Schema.Class<Success>("Session.Event.Tool.Success")({
export const Progress = Event.define({
type: "session.next.tool.progress",
aggregate: "sessionID",
schema: {
...Base,
type: Schema.Literal("tool.success"),
callID: Schema.String,
title: Schema.String,
output: Schema.String.pipe(Schema.optional),
attachments: Schema.Array(FileAttachment).pipe(Schema.optional),
structured: ToolOutput.Structured,
content: Schema.Array(ToolOutput.Content),
},
})
export type Progress = Schema.Schema.Type<typeof Progress>
export const Success = Event.define({
type: "session.next.tool.success",
aggregate: "sessionID",
schema: {
...Base,
callID: Schema.String,
structured: ToolOutput.Structured,
content: Schema.Array(ToolOutput.Content),
provider: Schema.Struct({
executed: Schema.Boolean,
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
}),
}) {
static create(
input: BaseInput & {
callID: string
title: string
output?: string
attachments?: FileAttachment[]
provider: Success["provider"]
},
) {
return new Success({
id: input.id ?? ID.create(),
type: "tool.success",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
callID: input.callID,
title: input.title,
output: input.output,
attachments: input.attachments,
provider: input.provider,
})
}
}
},
})
export type Success = Schema.Schema.Type<typeof Success>
export class Error extends Schema.Class<Error>("Session.Event.Tool.Error")({
export const Error = Event.define({
type: "session.next.tool.error",
aggregate: "sessionID",
schema: {
...Base,
type: Schema.Literal("tool.error"),
callID: Schema.String,
error: Schema.String,
error: Schema.Struct({
type: Schema.String,
message: Schema.String,
}),
provider: Schema.Struct({
executed: Schema.Boolean,
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
}),
}) {
static create(input: BaseInput & { callID: string; error: string; provider: Error["provider"] }) {
return new Error({
id: input.id ?? ID.create(),
type: "tool.error",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
callID: input.callID,
error: input.error,
provider: input.provider,
})
}
}
}
},
})
export type Error = Schema.Schema.Type<typeof Error>
}
export class Retried extends Schema.Class<Retried>("Session.Event.Retried")({
export const RetryError = Schema.Struct({
message: Schema.String,
statusCode: Schema.Number.pipe(Schema.optional),
isRetryable: Schema.Boolean,
responseHeaders: Schema.Record(Schema.String, Schema.String).pipe(Schema.optional),
responseBody: Schema.String.pipe(Schema.optional),
metadata: Schema.Record(Schema.String, Schema.String).pipe(Schema.optional),
}).annotate({
identifier: "session.next.retry_error",
})
export type RetryError = Schema.Schema.Type<typeof RetryError>
export const Retried = Event.define({
type: "session.next.retried",
aggregate: "sessionID",
schema: {
...Base,
type: Schema.Literal("retried"),
attempt: Schema.Number,
error: RetryError,
}) {
static create(input: BaseInput & { attempt: number; error: RetryError }) {
return new Retried({
id: input.id ?? ID.create(),
type: "retried",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
attempt: input.attempt,
error: input.error,
})
}
}
},
})
export type Retried = Schema.Schema.Type<typeof Retried>
export class Compacted extends Schema.Class<Compacted>("Session.Event.Compated")({
export const Compacted = Event.define({
type: "session.next.compacted",
aggregate: "sessionID",
schema: {
...Base,
type: Schema.Literal("compacted"),
auto: Schema.Boolean,
overflow: Schema.Boolean.pipe(Schema.optional),
}) {
static create(input: BaseInput & { auto: boolean; overflow?: boolean }) {
return new Compacted({
id: input.id ?? ID.create(),
type: "compacted",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
auto: input.auto,
overflow: input.overflow,
})
}
}
},
})
export type Compacted = Schema.Schema.Type<typeof Compacted>
export const Event = Schema.Union(
[
Prompt,
Synthetic,
Step.Started,
Step.Ended,
Text.Started,
Text.Delta,
Text.Ended,
Tool.Input.Started,
Tool.Input.Delta,
Tool.Input.Ended,
Tool.Called,
Tool.Success,
Tool.Error,
Reasoning.Started,
Reasoning.Delta,
Reasoning.Ended,
Retried,
Compacted,
],
{
mode: "oneOf",
},
).pipe(Schema.toTaggedUnion("type"))
export type Event = Schema.Schema.Type<typeof Event>
export type Type = Event["type"]
}
export const All = Schema.Union(
[
Prompted,
Synthetic,
Step.Started,
Step.Ended,
Text.Started,
Text.Delta,
Text.Ended,
Tool.Input.Started,
Tool.Input.Delta,
Tool.Input.Ended,
Tool.Called,
Tool.Progress,
Tool.Success,
Tool.Error,
Reasoning.Started,
Reasoning.Delta,
Reasoning.Ended,
Retried,
Compacted,
],
{
mode: "oneOf",
},
).pipe(Schema.toTaggedUnion("type"))
export type Event = Schema.Schema.Type<typeof All>
export type Type = Event["type"]
export * as SessionEvent from "./session-event"

View file

@ -1,43 +1,43 @@
import { produce, type WritableDraft } from "immer"
import { SessionEvent } from "./session-event"
import { SessionEntry } from "./session-entry"
import { SessionMessage } from "./session-message"
export type MemoryState = {
entries: SessionEntry.Entry[]
pending: SessionEntry.Entry[]
messages: SessionMessage.Message[]
pending: SessionMessage.Message[]
}
export interface Adapter<Result> {
readonly getCurrentAssistant: () => SessionEntry.Assistant | undefined
readonly updateAssistant: (assistant: SessionEntry.Assistant) => void
readonly appendEntry: (entry: SessionEntry.Entry) => void
readonly appendPending: (entry: SessionEntry.Entry) => void
readonly getCurrentAssistant: () => SessionMessage.Assistant | undefined
readonly updateAssistant: (assistant: SessionMessage.Assistant) => void
readonly appendMessage: (message: SessionMessage.Message) => void
readonly appendPending: (message: SessionMessage.Message) => void
readonly finish: () => Result
}
export function memory(state: MemoryState): Adapter<MemoryState> {
const activeAssistantIndex = () =>
state.entries.findLastIndex((entry) => entry.type === "assistant" && !entry.time.completed)
state.messages.findLastIndex((message) => message.type === "assistant" && !message.time.completed)
return {
getCurrentAssistant() {
const index = activeAssistantIndex()
if (index < 0) return
const assistant = state.entries[index]
const assistant = state.messages[index]
return assistant?.type === "assistant" ? assistant : undefined
},
updateAssistant(assistant) {
const index = activeAssistantIndex()
if (index < 0) return
const current = state.entries[index]
const current = state.messages[index]
if (current?.type !== "assistant") return
state.entries[index] = assistant
state.messages[index] = assistant
},
appendEntry(entry) {
state.entries.push(entry)
appendMessage(message) {
state.messages.push(message)
},
appendPending(entry) {
state.pending.push(entry)
appendPending(message) {
state.pending.push(message)
},
finish() {
return state
@ -45,12 +45,12 @@ export function memory(state: MemoryState): Adapter<MemoryState> {
}
}
export function stepWith<Result>(adapter: Adapter<Result>, event: SessionEvent.Event): Result {
export function update<Result>(adapter: Adapter<Result>, event: SessionEvent.Event): Result {
const currentAssistant = adapter.getCurrentAssistant()
type DraftAssistant = WritableDraft<SessionEntry.Assistant>
type DraftTool = WritableDraft<SessionEntry.AssistantTool>
type DraftText = WritableDraft<SessionEntry.AssistantText>
type DraftReasoning = WritableDraft<SessionEntry.AssistantReasoning>
type DraftAssistant = WritableDraft<SessionMessage.Assistant>
type DraftTool = WritableDraft<SessionMessage.AssistantTool>
type DraftText = WritableDraft<SessionMessage.AssistantText>
type DraftReasoning = WritableDraft<SessionMessage.AssistantReasoning>
const latestTool = (assistant: DraftAssistant | undefined, callID?: string) =>
assistant?.content.findLast(
@ -60,43 +60,46 @@ export function stepWith<Result>(adapter: Adapter<Result>, event: SessionEvent.E
const latestText = (assistant: DraftAssistant | undefined) =>
assistant?.content.findLast((item): item is DraftText => item.type === "text")
const latestReasoning = (assistant: DraftAssistant | undefined) =>
assistant?.content.findLast((item): item is DraftReasoning => item.type === "reasoning")
const latestReasoning = (assistant: DraftAssistant | undefined, reasoningID: string) =>
assistant?.content.findLast(
(item): item is DraftReasoning => item.type === "reasoning" && item.reasoningID === reasoningID,
)
SessionEvent.Event.match(event, {
prompt: (event) => {
const entry = SessionEntry.User.fromEvent(event)
SessionEvent.All.match(event, {
"session.next.prompted": (event) => {
const message = SessionMessage.User.fromEvent(event)
if (currentAssistant) {
adapter.appendPending(entry)
adapter.appendPending(message)
return
}
adapter.appendEntry(entry)
adapter.appendMessage(message)
},
synthetic: (event) => {
adapter.appendEntry(SessionEntry.Synthetic.fromEvent(event))
"session.next.synthetic": (event) => {
adapter.appendMessage(SessionMessage.Synthetic.fromEvent(event))
},
"step.started": (event) => {
"session.next.step.started": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.time.completed = event.timestamp
draft.time.completed = event.data.timestamp
}),
)
}
adapter.appendEntry(SessionEntry.Assistant.fromEvent(event))
adapter.appendMessage(SessionMessage.Assistant.fromEvent(event))
},
"step.ended": (event) => {
"session.next.step.ended": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.time.completed = event.timestamp
draft.cost = event.cost
draft.tokens = event.tokens
draft.time.completed = event.data.timestamp
draft.cost = event.data.cost
draft.tokens = event.data.tokens
if (event.data.snapshot) draft.snapshot = { ...draft.snapshot, end: event.data.snapshot }
}),
)
}
},
"text.started": () => {
"session.next.text.started": () => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
@ -108,27 +111,27 @@ export function stepWith<Result>(adapter: Adapter<Result>, event: SessionEvent.E
)
}
},
"text.delta": (event) => {
"session.next.text.delta": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestText(draft)
if (match) match.text += event.delta
if (match) match.text += event.data.delta
}),
)
}
},
"text.ended": () => {},
"tool.input.started": (event) => {
"session.next.text.ended": () => {},
"session.next.tool.input.started": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.content.push({
type: "tool",
callID: event.callID,
name: event.name,
callID: event.data.callID,
name: event.data.name,
time: {
created: event.timestamp,
created: event.data.timestamp,
},
state: {
status: "pending",
@ -139,123 +142,124 @@ export function stepWith<Result>(adapter: Adapter<Result>, event: SessionEvent.E
)
}
},
"tool.input.delta": (event) => {
"session.next.tool.input.delta": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.callID)
const match = latestTool(draft, event.data.callID)
// oxlint-disable-next-line no-base-to-string -- event.delta is a Schema.String (runtime string)
if (match && match.state.status === "pending") match.state.input += event.delta
if (match && match.state.status === "pending") match.state.input += event.data.delta
}),
)
}
},
"tool.input.ended": () => {},
"tool.called": (event) => {
"session.next.tool.input.ended": () => {},
"session.next.tool.called": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.callID)
const match = latestTool(draft, event.data.callID)
if (match) {
match.time.ran = event.timestamp
match.time.ran = event.data.timestamp
match.state = {
status: "running",
input: event.input,
input: event.data.input,
structured: {},
content: [],
}
}
}),
)
}
},
"tool.success": (event) => {
"session.next.tool.progress": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.callID)
const match = latestTool(draft, event.data.callID)
if (match && match.state.status === "running") {
match.state.structured = event.data.structured
match.state.content = [...event.data.content]
}
}),
)
}
},
"session.next.tool.success": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.data.callID)
if (match && match.state.status === "running") {
match.state = {
status: "completed",
input: match.state.input,
output: event.output ?? "",
title: event.title,
metadata: event.metadata ?? {},
attachments: [...(event.attachments ?? [])],
structured: event.data.structured,
content: [...event.data.content],
}
}
}),
)
}
},
"tool.error": (event) => {
"session.next.tool.error": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.callID)
const match = latestTool(draft, event.data.callID)
if (match && match.state.status === "running") {
match.state = {
status: "error",
error: event.error,
error: event.data.error,
input: match.state.input,
metadata: event.metadata ?? {},
structured: match.state.structured,
content: match.state.content,
}
}
}),
)
}
},
"reasoning.started": () => {
"session.next.reasoning.started": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.content.push({
type: "reasoning",
reasoningID: event.data.reasoningID,
text: "",
})
}),
)
}
},
"reasoning.delta": (event) => {
"session.next.reasoning.delta": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestReasoning(draft)
if (match) match.text += event.delta
const match = latestReasoning(draft, event.data.reasoningID)
if (match) match.text += event.data.delta
}),
)
}
},
"reasoning.ended": (event) => {
"session.next.reasoning.ended": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestReasoning(draft)
if (match) match.text = event.text
const match = latestReasoning(draft, event.data.reasoningID)
if (match) match.text = event.data.text
}),
)
}
},
retried: (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.retries = [...(draft.retries ?? []), SessionEntry.AssistantRetry.fromEvent(event)]
}),
)
}
},
compacted: (event) => {
adapter.appendEntry(SessionEntry.Compaction.fromEvent(event))
"session.next.retried": () => {},
"session.next.compacted": (event) => {
adapter.appendMessage(SessionMessage.Compaction.fromEvent(event))
},
})
return adapter.finish()
}
export function step(old: MemoryState, event: SessionEvent.Event): MemoryState {
return produce(old, (draft) => {
stepWith(memory(draft as MemoryState), event)
})
}
export * as SessionEntryStepper from "./session-entry-stepper"
export * as SessionMessageUpdater from "./session-message-updater"

View file

@ -1,79 +1,89 @@
import { Schema } from "effect"
import { Prompt } from "./session-prompt"
import { SessionEvent } from "./session-event"
import { Event } from "./event"
import { ToolOutput } from "./tool-output"
export const ID = SessionEvent.ID
export const ID = Event.ID
export type ID = Schema.Schema.Type<typeof ID>
const Base = {
id: SessionEvent.ID,
id: ID,
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
time: Schema.Struct({
created: Schema.DateTimeUtc,
}),
}
export class User extends Schema.Class<User>("Session.Entry.User")({
export class User extends Schema.Class<User>("Session.Message.User")({
...Base,
text: SessionEvent.Prompt.fields.text,
files: SessionEvent.Prompt.fields.files,
agents: SessionEvent.Prompt.fields.agents,
text: Prompt.fields.text,
files: Prompt.fields.files,
agents: Prompt.fields.agents,
type: Schema.Literal("user"),
time: Schema.Struct({
created: Schema.DateTimeUtc,
}),
}) {
static fromEvent(event: SessionEvent.Prompt) {
static fromEvent(event: SessionEvent.Prompted) {
return new User({
id: event.id,
id: ID.create(),
type: "user",
metadata: event.metadata,
text: event.text,
files: event.files,
agents: event.agents,
time: { created: event.timestamp },
text: event.data.prompt.text,
files: event.data.prompt.files,
agents: event.data.prompt.agents,
time: { created: event.data.timestamp },
})
}
}
export class Synthetic extends Schema.Class<Synthetic>("Session.Entry.Synthetic")({
...SessionEvent.Synthetic.fields,
export class Synthetic extends Schema.Class<Synthetic>("Session.Message.Synthetic")({
...Base,
sessionID: SessionEvent.Synthetic.fields.data.fields.sessionID,
text: SessionEvent.Synthetic.fields.data.fields.text,
type: Schema.Literal("synthetic"),
}) {
static fromEvent(event: SessionEvent.Synthetic) {
return new Synthetic({
...event,
time: { created: event.timestamp },
sessionID: event.data.sessionID,
text: event.data.text,
id: ID.create(),
type: "synthetic",
time: { created: event.data.timestamp },
})
}
}
export class ToolStatePending extends Schema.Class<ToolStatePending>("Session.Entry.ToolState.Pending")({
export class ToolStatePending extends Schema.Class<ToolStatePending>("Session.Message.ToolState.Pending")({
status: Schema.Literal("pending"),
input: Schema.String,
}) {}
export class ToolStateRunning extends Schema.Class<ToolStateRunning>("Session.Entry.ToolState.Running")({
export class ToolStateRunning extends Schema.Class<ToolStateRunning>("Session.Message.ToolState.Running")({
status: Schema.Literal("running"),
input: Schema.Record(Schema.String, Schema.Unknown),
title: Schema.String.pipe(Schema.optional),
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
structured: ToolOutput.Structured,
content: ToolOutput.Content.pipe(Schema.Array),
}) {}
export class ToolStateCompleted extends Schema.Class<ToolStateCompleted>("Session.Entry.ToolState.Completed")({
export class ToolStateCompleted extends Schema.Class<ToolStateCompleted>("Session.Message.ToolState.Completed")({
status: Schema.Literal("completed"),
input: Schema.Record(Schema.String, Schema.Unknown),
output: Schema.String,
title: Schema.String,
metadata: Schema.Record(Schema.String, Schema.Unknown),
attachments: SessionEvent.FileAttachment.pipe(Schema.Array, Schema.optional),
content: ToolOutput.Content.pipe(Schema.Array),
structured: ToolOutput.Structured,
}) {}
export class ToolStateError extends Schema.Class<ToolStateError>("Session.Entry.ToolState.Error")({
export class ToolStateError extends Schema.Class<ToolStateError>("Session.Message.ToolState.Error")({
status: Schema.Literal("error"),
input: Schema.Record(Schema.String, Schema.Unknown),
error: Schema.String,
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
content: ToolOutput.Content.pipe(Schema.Array),
structured: ToolOutput.Structured,
error: Schema.Struct({
type: Schema.String,
message: Schema.String,
}),
}) {}
export const ToolState = Schema.Union([ToolStatePending, ToolStateRunning, ToolStateCompleted, ToolStateError]).pipe(
@ -81,7 +91,7 @@ export const ToolState = Schema.Union([ToolStatePending, ToolStateRunning, ToolS
)
export type ToolState = Schema.Schema.Type<typeof ToolState>
export class AssistantTool extends Schema.Class<AssistantTool>("Session.Entry.Assistant.Tool")({
export class AssistantTool extends Schema.Class<AssistantTool>("Session.Message.Assistant.Tool")({
type: Schema.Literal("tool"),
callID: Schema.String,
name: Schema.String,
@ -94,44 +104,30 @@ export class AssistantTool extends Schema.Class<AssistantTool>("Session.Entry.As
}),
}) {}
export class AssistantText extends Schema.Class<AssistantText>("Session.Entry.Assistant.Text")({
export class AssistantText extends Schema.Class<AssistantText>("Session.Message.Assistant.Text")({
type: Schema.Literal("text"),
text: Schema.String,
}) {}
export class AssistantReasoning extends Schema.Class<AssistantReasoning>("Session.Entry.Assistant.Reasoning")({
export class AssistantReasoning extends Schema.Class<AssistantReasoning>("Session.Message.Assistant.Reasoning")({
type: Schema.Literal("reasoning"),
reasoningID: Schema.String,
text: Schema.String,
}) {}
export class AssistantRetry extends Schema.Class<AssistantRetry>("Session.Entry.Assistant.Retry")({
attempt: Schema.Number,
error: SessionEvent.RetryError,
time: Schema.Struct({
created: Schema.DateTimeUtc,
}),
}) {
static fromEvent(event: SessionEvent.Retried) {
return new AssistantRetry({
attempt: event.attempt,
error: event.error,
time: {
created: event.timestamp,
},
})
}
}
export const AssistantContent = Schema.Union([AssistantText, AssistantReasoning, AssistantTool]).pipe(
Schema.toTaggedUnion("type"),
)
export type AssistantContent = Schema.Schema.Type<typeof AssistantContent>
export class Assistant extends Schema.Class<Assistant>("Session.Entry.Assistant")({
export class Assistant extends Schema.Class<Assistant>("Session.Message.Assistant")({
...Base,
type: Schema.Literal("assistant"),
content: AssistantContent.pipe(Schema.Array),
retries: AssistantRetry.pipe(Schema.Array, Schema.optional),
snapshot: Schema.Struct({
start: Schema.String.pipe(Schema.optional),
end: Schema.String.pipe(Schema.optional),
}).pipe(Schema.optional),
cost: Schema.Number.pipe(Schema.optional),
tokens: Schema.Struct({
input: Schema.Number,
@ -150,59 +146,64 @@ export class Assistant extends Schema.Class<Assistant>("Session.Entry.Assistant"
}) {
static fromEvent(event: SessionEvent.Step.Started) {
return new Assistant({
id: event.id,
id: ID.create(),
type: "assistant",
time: {
created: event.timestamp,
created: event.data.timestamp,
},
content: [],
retries: [],
snapshot: event.data.snapshot ? { start: event.data.snapshot } : undefined,
})
}
}
export class Compaction extends Schema.Class<Compaction>("Session.Entry.Compaction")({
...SessionEvent.Compacted.fields,
export class Compaction extends Schema.Class<Compaction>("Session.Message.Compaction")({
type: Schema.Literal("compaction"),
sessionID: SessionEvent.Compacted.fields.data.fields.sessionID,
auto: SessionEvent.Compacted.fields.data.fields.auto,
overflow: SessionEvent.Compacted.fields.data.fields.overflow,
...Base,
}) {
static fromEvent(event: SessionEvent.Compacted) {
return new Compaction({
...event,
sessionID: event.data.sessionID,
auto: event.data.auto,
overflow: event.data.overflow,
id: ID.create(),
type: "compaction",
time: { created: event.timestamp },
time: { created: event.data.timestamp },
})
}
}
export const Entry = Schema.Union([User, Synthetic, Assistant, Compaction]).pipe(Schema.toTaggedUnion("type"))
export const Message = Schema.Union([User, Synthetic, Assistant, Compaction]).pipe(Schema.toTaggedUnion("type"))
export type Entry = Schema.Schema.Type<typeof Entry>
export type Message = Schema.Schema.Type<typeof Message>
export type Type = Entry["type"]
export type Type = Message["type"]
/*
export interface Interface {
readonly decode: (row: typeof SessionEntryTable.$inferSelect) => Entry
readonly fromSession: (sessionID: SessionID) => Effect.Effect<Entry[], never>
readonly decode: (row: typeof SessionMessageTable.$inferSelect) => Message
readonly fromSession: (sessionID: SessionID) => Effect.Effect<Message[], never>
}
export class Service extends Context.Service<Service, Interface>()("@opencode/SessionEntry") {}
export class Service extends Context.Service<Service, Interface>()("@opencode/SessionMessage") {}
export const layer: Layer.Layer<Service, never, never> = Layer.effect(
Service,
Effect.gen(function* () {
const decodeEntry = Schema.decodeUnknownSync(Entry)
const decodeMessage = Schema.decodeUnknownSync(Message)
const decode: (typeof Service.Service)["decode"] = (row) => decodeEntry({ ...row, id: row.id, type: row.type })
const decode: (typeof Service.Service)["decode"] = (row) => decodeMessage({ ...row, id: row.id, type: row.type })
const fromSession = Effect.fn("SessionEntry.fromSession")(function* (sessionID: SessionID) {
const fromSession = Effect.fn("SessionMessage.fromSession")(function* (sessionID: SessionID) {
return Database.use((db) =>
db
.select()
.from(SessionEntryTable)
.where(eq(SessionEntryTable.session_id, sessionID))
.orderBy(SessionEntryTable.id)
.from(SessionMessageTable)
.where(eq(SessionMessageTable.session_id, sessionID))
.orderBy(SessionMessageTable.id)
.all()
.map((row) => decode(row)),
)
@ -216,4 +217,4 @@ export const layer: Layer.Layer<Service, never, never> = Layer.effect(
)
*/
export * as SessionEntry from "./session-entry"
export * as SessionMessage from "./session-message"

View file

@ -0,0 +1,36 @@
import * as Schema from "effect/Schema"
export class Source extends Schema.Class<Source>("Prompt.Source")({
start: Schema.Number,
end: Schema.Number,
text: Schema.String,
}) {}
export class FileAttachment extends Schema.Class<FileAttachment>("Prompt.FileAttachment")({
uri: Schema.String,
mime: Schema.String,
name: Schema.String.pipe(Schema.optional),
description: Schema.String.pipe(Schema.optional),
source: Source.pipe(Schema.optional),
}) {
static create(input: FileAttachment) {
return new FileAttachment({
uri: input.uri,
mime: input.mime,
name: input.name,
description: input.description,
source: input.source,
})
}
}
export class AgentAttachment extends Schema.Class<AgentAttachment>("Prompt.AgentAttachment")({
name: Schema.String,
source: Source.pipe(Schema.optional),
}) {}
export class Prompt extends Schema.Class<Prompt>("Prompt")({
text: Schema.String,
files: Schema.Array(FileAttachment).pipe(Schema.optional),
agents: Schema.Array(AgentAttachment).pipe(Schema.optional),
}) {}

View file

@ -1,5 +1,5 @@
import { Context, Layer, Schema, Effect } from "effect"
import { SessionEntry } from "./session-entry"
import { SessionMessage } from "./session-message"
import { Struct } from "effect"
import { Session } from "@/session/session"
import { SessionID } from "@/session/schema"
@ -9,8 +9,8 @@ export const ID = SessionID
export type ID = Schema.Schema.Type<typeof ID>
export class PromptInput extends Schema.Class<PromptInput>("Session.PromptInput")({
...Struct.omit(SessionEntry.User.fields, ["time", "type"]),
id: Schema.optionalKey(SessionEntry.ID),
...Struct.omit(SessionMessage.User.fields, ["time", "type"]),
id: Schema.optionalKey(SessionMessage.ID),
sessionID: ID,
}) {}
@ -30,7 +30,7 @@ export class Info extends Schema.Class<Info>("Session.Info")({
export interface Interface {
fromID: (id: ID) => Effect.Effect<Info>
create: (input: CreateInput) => Effect.Effect<Info>
prompt: (input: PromptInput) => Effect.Effect<SessionEntry.User>
prompt: (input: PromptInput) => Effect.Effect<SessionMessage.User>
}
export class Service extends Context.Service<Service, Interface>()("Session.Service") {}

View file

@ -0,0 +1,18 @@
export * as ToolOutput from "./tool-output"
import { Schema } from "effect"
export class TextContent extends Schema.Class<TextContent>("Tool.TextContent")({
type: Schema.Literal("text"),
text: Schema.String,
}) {}
export class FileContent extends Schema.Class<FileContent>("Tool.FileContent")({
type: Schema.Literal("file"),
uri: Schema.String,
mime: Schema.String,
name: Schema.String.pipe(Schema.optional),
}) {}
export const Content = Schema.Union([TextContent, FileContent]).pipe(Schema.toTaggedUnion("type"))
export const Structured = Schema.Record(Schema.String, Schema.Any)

View file

@ -79,7 +79,7 @@ delete process.env["OPENCODE_SERVER_USERNAME"]
process.env["OPENCODE_DB"] = ":memory:"
// Now safe to import from src/
const Log = await import("@opencode-ai/core/util/log")
const { Log } = await import("@opencode-ai/core/util/log")
const { initProjectors } = await import("../src/server/projectors")
void Log.init({

View file

@ -1,916 +0,0 @@
import { describe, expect, test } from "bun:test"
import * as DateTime from "effect/DateTime"
import * as FastCheck from "effect/testing/FastCheck"
import { SessionEntry } from "../../src/v2/session-entry"
import { SessionEntryStepper } from "../../src/v2/session-entry-stepper"
import { SessionEvent } from "../../src/v2/session-event"
const time = (n: number) => DateTime.makeUnsafe(n)
const word = FastCheck.string({ minLength: 1, maxLength: 8 })
const text = FastCheck.string({ maxLength: 16 })
const texts = FastCheck.array(text, { maxLength: 8 })
const val = FastCheck.oneof(FastCheck.boolean(), FastCheck.integer(), FastCheck.string({ maxLength: 12 }))
const dict = FastCheck.dictionary(word, val, { maxKeys: 4 })
const files = FastCheck.array(
word.map((x) => SessionEvent.FileAttachment.create({ uri: `file://${encodeURIComponent(x)}`, mime: "text/plain" })),
{ maxLength: 2 },
)
function maybe<A>(arb: FastCheck.Arbitrary<A>) {
return FastCheck.oneof(FastCheck.constant(undefined), arb)
}
function assistant() {
return new SessionEntry.Assistant({
id: SessionEvent.ID.create(),
type: "assistant",
time: { created: time(0) },
content: [],
retries: [],
})
}
function retryError(message: string) {
return new SessionEvent.RetryError({
message,
isRetryable: true,
})
}
function retry(attempt: number, message: string, created: number) {
return new SessionEntry.AssistantRetry({
attempt,
error: retryError(message),
time: {
created: time(created),
},
})
}
function memoryState() {
const state: SessionEntryStepper.MemoryState = {
entries: [],
pending: [],
}
return state
}
function active() {
const state: SessionEntryStepper.MemoryState = {
entries: [assistant()],
pending: [],
}
return state
}
function run(events: SessionEvent.Event[], state = memoryState()) {
return events.reduce<SessionEntryStepper.MemoryState>((state, event) => SessionEntryStepper.step(state, event), state)
}
function last(state: SessionEntryStepper.MemoryState) {
const entry = [...state.pending, ...state.entries].reverse().find((x) => x.type === "assistant")
expect(entry?.type).toBe("assistant")
return entry?.type === "assistant" ? entry : undefined
}
function texts_of(state: SessionEntryStepper.MemoryState) {
const entry = last(state)
if (!entry) return []
return entry.content.filter((x): x is SessionEntry.AssistantText => x.type === "text")
}
function reasons(state: SessionEntryStepper.MemoryState) {
const entry = last(state)
if (!entry) return []
return entry.content.filter((x): x is SessionEntry.AssistantReasoning => x.type === "reasoning")
}
function tools(state: SessionEntryStepper.MemoryState) {
const entry = last(state)
if (!entry) return []
return entry.content.filter((x): x is SessionEntry.AssistantTool => x.type === "tool")
}
function tool(state: SessionEntryStepper.MemoryState, callID: string) {
return tools(state).find((x) => x.callID === callID)
}
function retriesOf(state: SessionEntryStepper.MemoryState) {
const entry = last(state)
if (!entry) return []
return entry.retries ?? []
}
function adapterStore() {
return {
committed: [] as SessionEntry.Entry[],
deferred: [] as SessionEntry.Entry[],
}
}
function adapterFor(store: ReturnType<typeof adapterStore>): SessionEntryStepper.Adapter<typeof store> {
const activeAssistantIndex = () =>
store.committed.findLastIndex((entry) => entry.type === "assistant" && !entry.time.completed)
const getCurrentAssistant = () => {
const index = activeAssistantIndex()
if (index < 0) return
const assistant = store.committed[index]
return assistant?.type === "assistant" ? assistant : undefined
}
return {
getCurrentAssistant,
updateAssistant(assistant) {
const index = activeAssistantIndex()
if (index < 0) return
const current = store.committed[index]
if (current?.type !== "assistant") return
store.committed[index] = assistant
},
appendEntry(entry) {
store.committed.push(entry)
},
appendPending(entry) {
store.deferred.push(entry)
},
finish() {
return store
},
}
}
describe("session-entry-stepper", () => {
describe("stepWith", () => {
test("reduces through a custom adapter", () => {
const store = adapterStore()
store.committed.push(assistant())
SessionEntryStepper.stepWith(adapterFor(store), SessionEvent.Prompt.create({ text: "hello", timestamp: time(1) }))
SessionEntryStepper.stepWith(adapterFor(store), SessionEvent.Reasoning.Started.create({ timestamp: time(2) }))
SessionEntryStepper.stepWith(
adapterFor(store),
SessionEvent.Reasoning.Delta.create({ delta: "thinking", timestamp: time(3) }),
)
SessionEntryStepper.stepWith(
adapterFor(store),
SessionEvent.Reasoning.Ended.create({ text: "thought", timestamp: time(4) }),
)
SessionEntryStepper.stepWith(adapterFor(store), SessionEvent.Text.Started.create({ timestamp: time(5) }))
SessionEntryStepper.stepWith(
adapterFor(store),
SessionEvent.Text.Delta.create({ delta: "world", timestamp: time(6) }),
)
SessionEntryStepper.stepWith(
adapterFor(store),
SessionEvent.Step.Ended.create({
reason: "stop",
cost: 1,
tokens: {
input: 1,
output: 2,
reasoning: 3,
cache: {
read: 4,
write: 5,
},
},
timestamp: time(7),
}),
)
expect(store.deferred).toHaveLength(1)
expect(store.deferred[0]?.type).toBe("user")
expect(store.committed).toHaveLength(1)
expect(store.committed[0]?.type).toBe("assistant")
if (store.committed[0]?.type !== "assistant") return
expect(store.committed[0].content).toEqual([
{ type: "reasoning", text: "thought" },
{ type: "text", text: "world" },
])
expect(store.committed[0].time.completed).toEqual(time(7))
})
test("aggregates retry events onto the current assistant", () => {
const store = adapterStore()
store.committed.push(assistant())
SessionEntryStepper.stepWith(
adapterFor(store),
SessionEvent.Retried.create({
attempt: 1,
error: retryError("rate limited"),
timestamp: time(1),
}),
)
SessionEntryStepper.stepWith(
adapterFor(store),
SessionEvent.Retried.create({
attempt: 2,
error: retryError("provider overloaded"),
timestamp: time(2),
}),
)
expect(store.committed[0]?.type).toBe("assistant")
if (store.committed[0]?.type !== "assistant") return
expect(store.committed[0].retries).toEqual([retry(1, "rate limited", 1), retry(2, "provider overloaded", 2)])
})
})
describe("memory", () => {
test("tracks and replaces the current assistant", () => {
const state = active()
const adapter = SessionEntryStepper.memory(state)
const current = adapter.getCurrentAssistant()
expect(current?.type).toBe("assistant")
if (!current) return
adapter.updateAssistant(
new SessionEntry.Assistant({
...current,
content: [new SessionEntry.AssistantText({ type: "text", text: "done" })],
time: {
...current.time,
completed: time(1),
},
}),
)
expect(adapter.getCurrentAssistant()).toBeUndefined()
expect(state.entries[0]?.type).toBe("assistant")
if (state.entries[0]?.type !== "assistant") return
expect(state.entries[0].content).toEqual([{ type: "text", text: "done" }])
expect(state.entries[0].time.completed).toEqual(time(1))
})
test("appends committed and pending entries", () => {
const state = memoryState()
const adapter = SessionEntryStepper.memory(state)
const committed = SessionEntry.User.fromEvent(
SessionEvent.Prompt.create({ text: "committed", timestamp: time(1) }),
)
const pending = SessionEntry.User.fromEvent(SessionEvent.Prompt.create({ text: "pending", timestamp: time(2) }))
adapter.appendEntry(committed)
adapter.appendPending(pending)
expect(state.entries).toEqual([committed])
expect(state.pending).toEqual([pending])
})
test("stepWith through memory records reasoning", () => {
const state = active()
SessionEntryStepper.stepWith(
SessionEntryStepper.memory(state),
SessionEvent.Reasoning.Started.create({ timestamp: time(1) }),
)
SessionEntryStepper.stepWith(
SessionEntryStepper.memory(state),
SessionEvent.Reasoning.Delta.create({ delta: "draft", timestamp: time(2) }),
)
SessionEntryStepper.stepWith(
SessionEntryStepper.memory(state),
SessionEvent.Reasoning.Ended.create({ text: "final", timestamp: time(3) }),
)
expect(reasons(state)).toEqual([{ type: "reasoning", text: "final" }])
})
test("stepWith through memory records retries", () => {
const state = active()
SessionEntryStepper.stepWith(
SessionEntryStepper.memory(state),
SessionEvent.Retried.create({
attempt: 1,
error: retryError("rate limited"),
timestamp: time(1),
}),
)
expect(retriesOf(state)).toEqual([retry(1, "rate limited", 1)])
})
})
describe("step", () => {
describe("seeded pending assistant", () => {
test("stores prompts in entries when no assistant is pending", () => {
FastCheck.assert(
FastCheck.property(word, (body) => {
const next = SessionEntryStepper.step(
memoryState(),
SessionEvent.Prompt.create({ text: body, timestamp: time(1) }),
)
expect(next.entries).toHaveLength(1)
expect(next.entries[0]?.type).toBe("user")
if (next.entries[0]?.type !== "user") return
expect(next.entries[0].text).toBe(body)
}),
{ numRuns: 50 },
)
})
test("stores prompts in pending when an assistant is pending", () => {
FastCheck.assert(
FastCheck.property(word, (body) => {
const next = SessionEntryStepper.step(
active(),
SessionEvent.Prompt.create({ text: body, timestamp: time(1) }),
)
expect(next.pending).toHaveLength(1)
expect(next.pending[0]?.type).toBe("user")
if (next.pending[0]?.type !== "user") return
expect(next.pending[0].text).toBe(body)
}),
{ numRuns: 50 },
)
})
test("accumulates text deltas on the latest text part", () => {
FastCheck.assert(
FastCheck.property(texts, (parts) => {
const next = parts.reduce(
(state, part, i) =>
SessionEntryStepper.step(
state,
SessionEvent.Text.Delta.create({ delta: part, timestamp: time(i + 2) }),
),
SessionEntryStepper.step(active(), SessionEvent.Text.Started.create({ timestamp: time(1) })),
)
expect(texts_of(next)).toEqual([
{
type: "text",
text: parts.join(""),
},
])
}),
{ numRuns: 100 },
)
})
test("routes later text deltas to the latest text segment", () => {
FastCheck.assert(
FastCheck.property(texts, texts, (a, b) => {
const next = run(
[
SessionEvent.Text.Started.create({ timestamp: time(1) }),
...a.map((x, i) => SessionEvent.Text.Delta.create({ delta: x, timestamp: time(i + 2) })),
SessionEvent.Text.Started.create({ timestamp: time(a.length + 2) }),
...b.map((x, i) => SessionEvent.Text.Delta.create({ delta: x, timestamp: time(i + a.length + 3) })),
],
active(),
)
expect(texts_of(next)).toEqual([
{ type: "text", text: a.join("") },
{ type: "text", text: b.join("") },
])
}),
{ numRuns: 50 },
)
})
test("reasoning.ended replaces buffered reasoning text", () => {
FastCheck.assert(
FastCheck.property(texts, text, (parts, end) => {
const next = run(
[
SessionEvent.Reasoning.Started.create({ timestamp: time(1) }),
...parts.map((x, i) => SessionEvent.Reasoning.Delta.create({ delta: x, timestamp: time(i + 2) })),
SessionEvent.Reasoning.Ended.create({ text: end, timestamp: time(parts.length + 2) }),
],
active(),
)
expect(reasons(next)).toEqual([
{
type: "reasoning",
text: end,
},
])
}),
{ numRuns: 100 },
)
})
test("tool.success completes the latest running tool", () => {
FastCheck.assert(
FastCheck.property(
word,
word,
dict,
maybe(text),
maybe(dict),
maybe(files),
texts,
(callID, title, input, output, metadata, attachments, parts) => {
const next = run(
[
SessionEvent.Tool.Input.Started.create({ callID, name: "shell", timestamp: time(1) }),
...parts.map((x, i) =>
SessionEvent.Tool.Input.Delta.create({ callID, delta: x, timestamp: time(i + 2) }),
),
SessionEvent.Tool.Called.create({
callID,
tool: "shell",
input,
provider: { executed: true },
timestamp: time(parts.length + 2),
}),
SessionEvent.Tool.Success.create({
callID,
title,
output,
metadata,
attachments,
provider: { executed: true },
timestamp: time(parts.length + 3),
}),
],
active(),
)
const match = tool(next, callID)
expect(match?.state.status).toBe("completed")
if (match?.state.status !== "completed") return
expect(match.time.ran).toEqual(time(parts.length + 2))
expect(match.state.input).toEqual(input)
expect(match.state.output).toBe(output ?? "")
expect(match.state.title).toBe(title)
expect(match.state.metadata).toEqual(metadata ?? {})
expect(match.state.attachments).toEqual(attachments ?? [])
},
),
{ numRuns: 50 },
)
})
test("tool.error completes the latest running tool with an error", () => {
FastCheck.assert(
FastCheck.property(word, dict, word, maybe(dict), (callID, input, error, metadata) => {
const next = run(
[
SessionEvent.Tool.Input.Started.create({ callID, name: "shell", timestamp: time(1) }),
SessionEvent.Tool.Called.create({
callID,
tool: "shell",
input,
provider: { executed: true },
timestamp: time(2),
}),
SessionEvent.Tool.Error.create({
callID,
error,
metadata,
provider: { executed: true },
timestamp: time(3),
}),
],
active(),
)
const match = tool(next, callID)
expect(match?.state.status).toBe("error")
if (match?.state.status !== "error") return
expect(match.time.ran).toEqual(time(2))
expect(match.state.input).toEqual(input)
expect(match.state.error).toBe(error)
expect(match.state.metadata).toEqual(metadata ?? {})
}),
{ numRuns: 50 },
)
})
test("tool.success is ignored before tool.called promotes the tool to running", () => {
FastCheck.assert(
FastCheck.property(word, word, (callID, title) => {
const next = run(
[
SessionEvent.Tool.Input.Started.create({ callID, name: "shell", timestamp: time(1) }),
SessionEvent.Tool.Success.create({
callID,
title,
provider: { executed: true },
timestamp: time(2),
}),
],
active(),
)
const match = tool(next, callID)
expect(match?.state).toEqual({
status: "pending",
input: "",
})
}),
{ numRuns: 50 },
)
})
test("step.ended copies completion fields onto the pending assistant", () => {
FastCheck.assert(
FastCheck.property(FastCheck.integer({ min: 1, max: 1000 }), (n) => {
const event = SessionEvent.Step.Ended.create({
reason: "stop",
cost: 1,
tokens: {
input: 1,
output: 2,
reasoning: 3,
cache: {
read: 4,
write: 5,
},
},
timestamp: time(n),
})
const next = SessionEntryStepper.step(active(), event)
const entry = last(next)
expect(entry).toBeDefined()
if (!entry) return
expect(entry.time.completed).toEqual(event.timestamp)
expect(entry.cost).toBe(event.cost)
expect(entry.tokens).toEqual(event.tokens)
}),
{ numRuns: 50 },
)
})
})
describe("known reducer gaps", () => {
test("prompt appends immutably when no assistant is pending", () => {
FastCheck.assert(
FastCheck.property(word, (body) => {
const old = memoryState()
const next = SessionEntryStepper.step(old, SessionEvent.Prompt.create({ text: body, timestamp: time(1) }))
expect(old).not.toBe(next)
expect(old.entries).toHaveLength(0)
expect(next.entries).toHaveLength(1)
}),
{ numRuns: 50 },
)
})
test("prompt appends immutably when an assistant is pending", () => {
FastCheck.assert(
FastCheck.property(word, (body) => {
const old = active()
const next = SessionEntryStepper.step(old, SessionEvent.Prompt.create({ text: body, timestamp: time(1) }))
expect(old).not.toBe(next)
expect(old.pending).toHaveLength(0)
expect(next.pending).toHaveLength(1)
}),
{ numRuns: 50 },
)
})
test("step.started creates an assistant consumed by follow-up events", () => {
FastCheck.assert(
FastCheck.property(texts, (parts) => {
const next = run([
SessionEvent.Step.Started.create({
model: {
id: "model",
providerID: "provider",
},
timestamp: time(1),
}),
SessionEvent.Text.Started.create({ timestamp: time(2) }),
...parts.map((x, i) => SessionEvent.Text.Delta.create({ delta: x, timestamp: time(i + 3) })),
SessionEvent.Step.Ended.create({
reason: "stop",
cost: 1,
tokens: {
input: 1,
output: 2,
reasoning: 3,
cache: {
read: 4,
write: 5,
},
},
timestamp: time(parts.length + 3),
}),
])
const entry = last(next)
expect(entry).toBeDefined()
if (!entry) return
expect(entry.content).toEqual([
{
type: "text",
text: parts.join(""),
},
])
expect(entry.time.completed).toEqual(time(parts.length + 3))
}),
{ numRuns: 100 },
)
})
test("replays prompt -> step -> text -> step.ended", () => {
FastCheck.assert(
FastCheck.property(word, texts, (body, parts) => {
const next = run([
SessionEvent.Prompt.create({ text: body, timestamp: time(0) }),
SessionEvent.Step.Started.create({
model: {
id: "model",
providerID: "provider",
},
timestamp: time(1),
}),
SessionEvent.Text.Started.create({ timestamp: time(2) }),
...parts.map((x, i) => SessionEvent.Text.Delta.create({ delta: x, timestamp: time(i + 3) })),
SessionEvent.Step.Ended.create({
reason: "stop",
cost: 1,
tokens: {
input: 1,
output: 2,
reasoning: 3,
cache: {
read: 4,
write: 5,
},
},
timestamp: time(parts.length + 3),
}),
])
expect(next.entries).toHaveLength(2)
expect(next.entries[0]?.type).toBe("user")
expect(next.entries[1]?.type).toBe("assistant")
if (next.entries[1]?.type !== "assistant") return
expect(next.entries[1].content).toEqual([
{
type: "text",
text: parts.join(""),
},
])
expect(next.entries[1].time.completed).toEqual(time(parts.length + 3))
}),
{ numRuns: 50 },
)
})
test("replays prompt -> step -> reasoning -> tool -> success -> step.ended", () => {
FastCheck.assert(
FastCheck.property(
word,
texts,
text,
dict,
word,
maybe(text),
maybe(dict),
maybe(files),
(body, reason, end, input, title, output, metadata, attachments) => {
const callID = "call"
const next = run([
SessionEvent.Prompt.create({ text: body, timestamp: time(0) }),
SessionEvent.Step.Started.create({
model: {
id: "model",
providerID: "provider",
},
timestamp: time(1),
}),
SessionEvent.Reasoning.Started.create({ timestamp: time(2) }),
...reason.map((x, i) => SessionEvent.Reasoning.Delta.create({ delta: x, timestamp: time(i + 3) })),
SessionEvent.Reasoning.Ended.create({ text: end, timestamp: time(reason.length + 3) }),
SessionEvent.Tool.Input.Started.create({ callID, name: "shell", timestamp: time(reason.length + 4) }),
SessionEvent.Tool.Called.create({
callID,
tool: "shell",
input,
provider: { executed: true },
timestamp: time(reason.length + 5),
}),
SessionEvent.Tool.Success.create({
callID,
title,
output,
metadata,
attachments,
provider: { executed: true },
timestamp: time(reason.length + 6),
}),
SessionEvent.Step.Ended.create({
reason: "stop",
cost: 1,
tokens: {
input: 1,
output: 2,
reasoning: 3,
cache: {
read: 4,
write: 5,
},
},
timestamp: time(reason.length + 7),
}),
])
expect(next.entries.at(-1)?.type).toBe("assistant")
const entry = next.entries.at(-1)
if (entry?.type !== "assistant") return
expect(entry.content).toHaveLength(2)
expect(entry.content[0]).toEqual({
type: "reasoning",
text: end,
})
expect(entry.content[1]?.type).toBe("tool")
if (entry.content[1]?.type !== "tool") return
expect(entry.content[1].state.status).toBe("completed")
expect(entry.time.completed).toEqual(time(reason.length + 7))
},
),
{ numRuns: 50 },
)
})
test("starting a new step completes the old assistant and appends a new active assistant", () => {
const next = run(
[
SessionEvent.Step.Started.create({
model: {
id: "model",
providerID: "provider",
},
timestamp: time(1),
}),
],
active(),
)
expect(next.entries).toHaveLength(2)
expect(next.entries[0]?.type).toBe("assistant")
expect(next.entries[1]?.type).toBe("assistant")
if (next.entries[0]?.type !== "assistant" || next.entries[1]?.type !== "assistant") return
expect(next.entries[0].time.completed).toEqual(time(1))
expect(next.entries[1].time.created).toEqual(time(1))
expect(next.entries[1].time.completed).toBeUndefined()
})
test("handles sequential tools independently", () => {
FastCheck.assert(
FastCheck.property(dict, dict, word, word, (a, b, title, error) => {
const next = run(
[
SessionEvent.Tool.Input.Started.create({ callID: "a", name: "shell", timestamp: time(1) }),
SessionEvent.Tool.Called.create({
callID: "a",
tool: "shell",
input: a,
provider: { executed: true },
timestamp: time(2),
}),
SessionEvent.Tool.Success.create({
callID: "a",
title,
output: "done",
provider: { executed: true },
timestamp: time(3),
}),
SessionEvent.Tool.Input.Started.create({ callID: "b", name: "grep", timestamp: time(4) }),
SessionEvent.Tool.Called.create({
callID: "b",
tool: "shell",
input: b,
provider: { executed: true },
timestamp: time(5),
}),
SessionEvent.Tool.Error.create({
callID: "b",
error,
provider: { executed: true },
timestamp: time(6),
}),
],
active(),
)
const first = tool(next, "a")
const second = tool(next, "b")
expect(first?.state.status).toBe("completed")
if (first?.state.status !== "completed") return
expect(first.state.input).toEqual(a)
expect(first.state.output).toBe("done")
expect(first.state.title).toBe(title)
expect(second?.state.status).toBe("error")
if (second?.state.status !== "error") return
expect(second.state.input).toEqual(b)
expect(second.state.error).toBe(error)
}),
{ numRuns: 50 },
)
})
test("routes tool events by callID when tool streams interleave", () => {
FastCheck.assert(
FastCheck.property(dict, dict, word, word, text, text, (a, b, titleA, titleB, deltaA, deltaB) => {
const next = run(
[
SessionEvent.Tool.Input.Started.create({ callID: "a", name: "shell", timestamp: time(1) }),
SessionEvent.Tool.Input.Started.create({ callID: "b", name: "grep", timestamp: time(2) }),
SessionEvent.Tool.Input.Delta.create({ callID: "a", delta: deltaA, timestamp: time(3) }),
SessionEvent.Tool.Input.Delta.create({ callID: "b", delta: deltaB, timestamp: time(4) }),
SessionEvent.Tool.Called.create({
callID: "a",
tool: "shell",
input: a,
provider: { executed: true },
timestamp: time(5),
}),
SessionEvent.Tool.Called.create({
callID: "b",
tool: "grep",
input: b,
provider: { executed: true },
timestamp: time(6),
}),
SessionEvent.Tool.Success.create({
callID: "a",
title: titleA,
output: "done-a",
provider: { executed: true },
timestamp: time(7),
}),
SessionEvent.Tool.Success.create({
callID: "b",
title: titleB,
output: "done-b",
provider: { executed: true },
timestamp: time(8),
}),
],
active(),
)
const first = tool(next, "a")
const second = tool(next, "b")
expect(first?.state.status).toBe("completed")
expect(second?.state.status).toBe("completed")
if (first?.state.status !== "completed" || second?.state.status !== "completed") return
expect(first.state.input).toEqual(a)
expect(second.state.input).toEqual(b)
expect(first.state.title).toBe(titleA)
expect(second.state.title).toBe(titleB)
}),
{ numRuns: 50 },
)
})
test("records synthetic events", () => {
FastCheck.assert(
FastCheck.property(word, (body) => {
const next = SessionEntryStepper.step(
memoryState(),
SessionEvent.Synthetic.create({ text: body, timestamp: time(1) }),
)
expect(next.entries).toHaveLength(1)
expect(next.entries[0]?.type).toBe("synthetic")
if (next.entries[0]?.type !== "synthetic") return
expect(next.entries[0].text).toBe(body)
}),
{ numRuns: 50 },
)
})
test("records compaction events", () => {
FastCheck.assert(
FastCheck.property(FastCheck.boolean(), maybe(FastCheck.boolean()), (auto, overflow) => {
const next = SessionEntryStepper.step(
memoryState(),
SessionEvent.Compacted.create({ auto, overflow, timestamp: time(1) }),
)
expect(next.entries).toHaveLength(1)
expect(next.entries[0]?.type).toBe("compaction")
if (next.entries[0]?.type !== "compaction") return
expect(next.entries[0].auto).toBe(auto)
expect(next.entries[0].overflow).toBe(overflow)
}),
{ numRuns: 50 },
)
})
})
})
})

View file

@ -0,0 +1,41 @@
import { expect, test } from "bun:test"
import * as DateTime from "effect/DateTime"
import { SessionID } from "../../src/session/schema"
import { SessionEvent } from "../../src/v2/session-event"
import { SessionMessageUpdater } from "../../src/v2/session-message-updater"
test("step snapshots carry over to assistant messages", () => {
const state: SessionMessageUpdater.MemoryState = { messages: [], pending: [] }
const sessionID = SessionID.make("session")
SessionMessageUpdater.update(SessionMessageUpdater.memory(state), {
type: "session.next.step.started",
data: {
sessionID,
timestamp: DateTime.makeUnsafe(1),
model: { id: "model", providerID: "provider" },
snapshot: "before",
},
} satisfies SessionEvent.Event)
SessionMessageUpdater.update(SessionMessageUpdater.memory(state), {
type: "session.next.step.ended",
data: {
sessionID,
timestamp: DateTime.makeUnsafe(2),
reason: "stop",
cost: 0,
tokens: {
input: 1,
output: 2,
reasoning: 0,
cache: { read: 0, write: 0 },
},
snapshot: "after",
},
} satisfies SessionEvent.Event)
expect(state.messages[0]?.type).toBe("assistant")
if (state.messages[0]?.type !== "assistant") return
expect(state.messages[0].snapshot).toEqual({ start: "before", end: "after" })
})

View file

@ -12,13 +12,6 @@
"@/*": ["./src/*"],
"@tui/*": ["./src/cli/cmd/tui/*"],
"@test/*": ["./test/*"]
},
"plugins": [
{
"name": "@effect/language-service",
"transform": "@effect/language-service/transform",
"namespaceImportPackages": ["effect", "@effect/*"]
}
]
}
}
}

View file

@ -987,6 +987,266 @@ export type EventSessionDeleted = {
}
}
export type PromptSource = {
start: number
end: number
text: string
}
export type PromptFileAttachment = {
uri: string
mime: string
name?: string
description?: string
source?: PromptSource
}
export type PromptAgentAttachment = {
name: string
source?: PromptSource
}
export type Prompt = {
text: string
files?: Array<PromptFileAttachment>
agents?: Array<PromptAgentAttachment>
}
export type EventSessionNextPrompted = {
type: "session.next.prompted"
properties: {
timestamp: number
sessionID: string
prompt: Prompt
}
}
export type EventSessionNextSynthetic = {
type: "session.next.synthetic"
properties: {
timestamp: number
sessionID: string
text: string
}
}
export type EventSessionNextStepStarted = {
type: "session.next.step.started"
properties: {
timestamp: number
sessionID: string
model: {
id: string
providerID: string
variant?: string
}
}
}
export type EventSessionNextStepEnded = {
type: "session.next.step.ended"
properties: {
timestamp: number
sessionID: string
reason: string
cost: number
tokens: {
input: number
output: number
reasoning: number
cache: {
read: number
write: number
}
}
}
}
export type EventSessionNextTextStarted = {
type: "session.next.text.started"
properties: {
timestamp: number
sessionID: string
}
}
export type EventSessionNextTextDelta = {
type: "session.next.text.delta"
properties: {
timestamp: number
sessionID: string
delta: string
}
}
export type EventSessionNextTextEnded = {
type: "session.next.text.ended"
properties: {
timestamp: number
sessionID: string
text: string
}
}
export type EventSessionNextReasoningStarted = {
type: "session.next.reasoning.started"
properties: {
timestamp: number
sessionID: string
reasoningID: string
}
}
export type EventSessionNextReasoningDelta = {
type: "session.next.reasoning.delta"
properties: {
timestamp: number
sessionID: string
reasoningID: string
delta: string
}
}
export type EventSessionNextReasoningEnded = {
type: "session.next.reasoning.ended"
properties: {
timestamp: number
sessionID: string
reasoningID: string
text: string
}
}
export type EventSessionNextToolInputStarted = {
type: "session.next.tool.input.started"
properties: {
timestamp: number
sessionID: string
callID: string
name: string
}
}
export type EventSessionNextToolInputDelta = {
type: "session.next.tool.input.delta"
properties: {
timestamp: number
sessionID: string
callID: string
delta: string
}
}
export type EventSessionNextToolInputEnded = {
type: "session.next.tool.input.ended"
properties: {
timestamp: number
sessionID: string
callID: string
text: string
}
}
export type EventSessionNextToolCalled = {
type: "session.next.tool.called"
properties: {
timestamp: number
sessionID: string
callID: string
tool: string
input: {
[key: string]: unknown
}
provider: {
executed: boolean
metadata?: {
[key: string]: unknown
}
}
}
}
export type EventSessionNextToolProgress = {
type: "session.next.tool.progress"
properties: {
timestamp: number
sessionID: string
callID: string
details: {
[key: string]: unknown
}
}
}
export type EventSessionNextToolSuccess = {
type: "session.next.tool.success"
properties: {
timestamp: number
sessionID: string
callID: string
output?: string
attachments?: Array<PromptFileAttachment>
details?: {
[key: string]: unknown
}
provider: {
executed: boolean
metadata?: {
[key: string]: unknown
}
}
}
}
export type EventSessionNextToolError = {
type: "session.next.tool.error"
properties: {
timestamp: number
sessionID: string
callID: string
error: string
provider: {
executed: boolean
metadata?: {
[key: string]: unknown
}
}
}
}
export type SessionNextRetryError = {
message: string
statusCode?: number
isRetryable: boolean
responseHeaders?: {
[key: string]: string
}
responseBody?: string
metadata?: {
[key: string]: string
}
}
export type EventSessionNextRetried = {
type: "session.next.retried"
properties: {
timestamp: number
sessionID: string
attempt: number
error: SessionNextRetryError
}
}
export type EventSessionNextCompacted = {
type: "session.next.compacted"
properties: {
timestamp: number
sessionID: string
auto: boolean
overflow?: boolean
}
}
export type SyncEventMessageUpdated = {
type: "sync"
name: "message.updated.1"
@ -1104,6 +1364,304 @@ export type SyncEventSessionDeleted = {
}
}
export type SyncEventSessionNextPrompted = {
type: "sync"
name: "session.next.prompted.1"
id: string
seq: number
aggregateID: "sessionID"
data: {
timestamp: number
sessionID: string
prompt: Prompt
}
}
export type SyncEventSessionNextSynthetic = {
type: "sync"
name: "session.next.synthetic.1"
id: string
seq: number
aggregateID: "sessionID"
data: {
timestamp: number
sessionID: string
text: string
}
}
export type SyncEventSessionNextStepStarted = {
type: "sync"
name: "session.next.step.started.1"
id: string
seq: number
aggregateID: "sessionID"
data: {
timestamp: number
sessionID: string
model: {
id: string
providerID: string
variant?: string
}
}
}
export type SyncEventSessionNextStepEnded = {
type: "sync"
name: "session.next.step.ended.1"
id: string
seq: number
aggregateID: "sessionID"
data: {
timestamp: number
sessionID: string
reason: string
cost: number
tokens: {
input: number
output: number
reasoning: number
cache: {
read: number
write: number
}
}
}
}
export type SyncEventSessionNextTextStarted = {
type: "sync"
name: "session.next.text.started.1"
id: string
seq: number
aggregateID: "sessionID"
data: {
timestamp: number
sessionID: string
}
}
export type SyncEventSessionNextTextDelta = {
type: "sync"
name: "session.next.text.delta.1"
id: string
seq: number
aggregateID: "sessionID"
data: {
timestamp: number
sessionID: string
delta: string
}
}
export type SyncEventSessionNextTextEnded = {
type: "sync"
name: "session.next.text.ended.1"
id: string
seq: number
aggregateID: "sessionID"
data: {
timestamp: number
sessionID: string
text: string
}
}
export type SyncEventSessionNextReasoningStarted = {
type: "sync"
name: "session.next.reasoning.started.1"
id: string
seq: number
aggregateID: "sessionID"
data: {
timestamp: number
sessionID: string
reasoningID: string
}
}
export type SyncEventSessionNextReasoningDelta = {
type: "sync"
name: "session.next.reasoning.delta.1"
id: string
seq: number
aggregateID: "sessionID"
data: {
timestamp: number
sessionID: string
reasoningID: string
delta: string
}
}
export type SyncEventSessionNextReasoningEnded = {
type: "sync"
name: "session.next.reasoning.ended.1"
id: string
seq: number
aggregateID: "sessionID"
data: {
timestamp: number
sessionID: string
reasoningID: string
text: string
}
}
export type SyncEventSessionNextToolInputStarted = {
type: "sync"
name: "session.next.tool.input.started.1"
id: string
seq: number
aggregateID: "sessionID"
data: {
timestamp: number
sessionID: string
callID: string
name: string
}
}
export type SyncEventSessionNextToolInputDelta = {
type: "sync"
name: "session.next.tool.input.delta.1"
id: string
seq: number
aggregateID: "sessionID"
data: {
timestamp: number
sessionID: string
callID: string
delta: string
}
}
export type SyncEventSessionNextToolInputEnded = {
type: "sync"
name: "session.next.tool.input.ended.1"
id: string
seq: number
aggregateID: "sessionID"
data: {
timestamp: number
sessionID: string
callID: string
text: string
}
}
export type SyncEventSessionNextToolCalled = {
type: "sync"
name: "session.next.tool.called.1"
id: string
seq: number
aggregateID: "sessionID"
data: {
timestamp: number
sessionID: string
callID: string
tool: string
input: {
[key: string]: unknown
}
provider: {
executed: boolean
metadata?: {
[key: string]: unknown
}
}
}
}
export type SyncEventSessionNextToolProgress = {
type: "sync"
name: "session.next.tool.progress.1"
id: string
seq: number
aggregateID: "sessionID"
data: {
timestamp: number
sessionID: string
callID: string
details: {
[key: string]: unknown
}
}
}
export type SyncEventSessionNextToolSuccess = {
type: "sync"
name: "session.next.tool.success.1"
id: string
seq: number
aggregateID: "sessionID"
data: {
timestamp: number
sessionID: string
callID: string
output?: string
attachments?: Array<PromptFileAttachment>
details?: {
[key: string]: unknown
}
provider: {
executed: boolean
metadata?: {
[key: string]: unknown
}
}
}
}
export type SyncEventSessionNextToolError = {
type: "sync"
name: "session.next.tool.error.1"
id: string
seq: number
aggregateID: "sessionID"
data: {
timestamp: number
sessionID: string
callID: string
error: string
provider: {
executed: boolean
metadata?: {
[key: string]: unknown
}
}
}
}
export type SyncEventSessionNextRetried = {
type: "sync"
name: "session.next.retried.1"
id: string
seq: number
aggregateID: "sessionID"
data: {
timestamp: number
sessionID: string
attempt: number
error: SessionNextRetryError
}
}
export type SyncEventSessionNextCompacted = {
type: "sync"
name: "session.next.compacted.1"
id: string
seq: number
aggregateID: "sessionID"
data: {
timestamp: number
sessionID: string
auto: boolean
overflow?: boolean
}
}
export type GlobalEvent = {
directory: string
project?: string
@ -1156,6 +1714,25 @@ export type GlobalEvent = {
| EventSessionCreated
| EventSessionUpdated
| EventSessionDeleted
| EventSessionNextPrompted
| EventSessionNextSynthetic
| EventSessionNextStepStarted
| EventSessionNextStepEnded
| EventSessionNextTextStarted
| EventSessionNextTextDelta
| EventSessionNextTextEnded
| EventSessionNextReasoningStarted
| EventSessionNextReasoningDelta
| EventSessionNextReasoningEnded
| EventSessionNextToolInputStarted
| EventSessionNextToolInputDelta
| EventSessionNextToolInputEnded
| EventSessionNextToolCalled
| EventSessionNextToolProgress
| EventSessionNextToolSuccess
| EventSessionNextToolError
| EventSessionNextRetried
| EventSessionNextCompacted
| SyncEventMessageUpdated
| SyncEventMessageRemoved
| SyncEventMessagePartUpdated
@ -1163,6 +1740,25 @@ export type GlobalEvent = {
| SyncEventSessionCreated
| SyncEventSessionUpdated
| SyncEventSessionDeleted
| SyncEventSessionNextPrompted
| SyncEventSessionNextSynthetic
| SyncEventSessionNextStepStarted
| SyncEventSessionNextStepEnded
| SyncEventSessionNextTextStarted
| SyncEventSessionNextTextDelta
| SyncEventSessionNextTextEnded
| SyncEventSessionNextReasoningStarted
| SyncEventSessionNextReasoningDelta
| SyncEventSessionNextReasoningEnded
| SyncEventSessionNextToolInputStarted
| SyncEventSessionNextToolInputDelta
| SyncEventSessionNextToolInputEnded
| SyncEventSessionNextToolCalled
| SyncEventSessionNextToolProgress
| SyncEventSessionNextToolSuccess
| SyncEventSessionNextToolError
| SyncEventSessionNextRetried
| SyncEventSessionNextCompacted
}
/**
@ -2099,6 +2695,25 @@ export type Event =
| EventSessionCreated
| EventSessionUpdated
| EventSessionDeleted
| EventSessionNextPrompted
| EventSessionNextSynthetic
| EventSessionNextStepStarted
| EventSessionNextStepEnded
| EventSessionNextTextStarted
| EventSessionNextTextDelta
| EventSessionNextTextEnded
| EventSessionNextReasoningStarted
| EventSessionNextReasoningDelta
| EventSessionNextReasoningEnded
| EventSessionNextToolInputStarted
| EventSessionNextToolInputDelta
| EventSessionNextToolInputEnded
| EventSessionNextToolCalled
| EventSessionNextToolProgress
| EventSessionNextToolSuccess
| EventSessionNextToolError
| EventSessionNextRetried
| EventSessionNextCompacted
export type McpStatusConnected = {
status: "connected"

View file

@ -0,0 +1,131 @@
# Session V2 Concept Gaps
Compared with `packages/opencode/src/session/message-v2.ts` and `packages/opencode/src/session/processor.ts`, `packages/opencode/src/v2` currently captures the rough event stream for prompts, assistant steps, text, reasoning, tools, retries, and compaction, but it does not yet capture several persisted-message and processor concepts.
## Message Metadata
- User messages are missing selected `agent`, `model`, `system`, enabled `tools`, output `format`, and summary metadata.
- Assistant messages are missing `parentID`, `agent`, `providerID`, `modelID`, `variant`, `path.cwd`, `path.root`, deprecated `mode`, `summary`, `structured`, `finish`, and typed `error`.
## Output Format
- Text output format.
- JSON-schema output format.
- Structured-output retry count.
- Structured assistant result payload.
- Structured-output error classification.
## Errors
- Aborted error.
- Provider auth error.
- API error with status, retryability, headers, body, and metadata.
- Context-overflow error.
- Output-length error.
- Unknown error.
- V2 mostly reduces assistant errors to strings, except retry errors.
## Part Identity
- V1 has stable `MessageID`, `PartID`, `sessionID`, and `messageID` on every part.
- V2 assistant content does not preserve stable per-content IDs.
- Stable content IDs matter for deltas, updates, removals, sync events, and UI reconciliation.
## Part Timing And Metadata
- V1 text, reasoning, and tool states carry timing and provider metadata.
- V2 assistant text and reasoning content only store text.
- V2 events include metadata, but `SessionEntry` currently drops most provider metadata.
## Snapshots And Patches
- Snapshot parts.
- Patch parts.
- Step-start snapshot references.
- Step-finish snapshot references.
- Processor behavior that tracks a snapshot before the stream and emits patches after step finish or cleanup.
## Step Boundaries
- V1 stores `step-start` and `step-finish` as first-class parts.
- V2 has `step.started` and `step.ended` events, but the assistant entry only stores aggregate cost and tokens.
- V2 does not preserve step boundary parts, finish reason, or snapshot details in the entry model.
## Compaction
- V1 compaction parts have `auto`, `overflow`, and `tail_start_id`.
- V2 compacted events have `auto` and optional `overflow`, but no retained-tail marker.
- V1 also has history filtering semantics around completed summary messages and retained tails.
## Files And Sources
- V1 file parts have `mime`, `filename`, `url`, and typed source information.
- V1 source variants include file, symbol, and resource sources.
- Symbol sources include LSP range, name, and kind.
- Resource sources include client name and URI.
- V2 file attachments have `uri`, `mime`, `name`, `description`, and a generic text source, but lose source type, LSP metadata, and resource metadata.
## Agents And Subtasks
- Agent parts.
- Subtask parts.
- Subtask prompt, description, agent, model, and command.
- V2 has agent attachments on prompts, but no assistant/session content equivalent for subtask execution.
## Text Flags
- Synthetic text flag.
- Ignored text flag.
- V2 has a separate synthetic entry, but no ignored text concept.
## Tool Calls
- V1 pending tool state stores parsed input and raw input text separately.
- V2 pending tool state stores a string input but does not preserve a separate raw field.
- V1 completed tool state has `time.start`, `time.end`, and optional `time.compacted`.
- V2 tool time has `created`, `ran`, `completed`, and `pruned`, but the stepper currently does not set `completed` or `pruned`.
- V1 error tool state has `time.start` and `time.end`.
- V1 supports interrupted tool errors with `metadata.interrupted` and preserved partial output.
- V1 tracks provider execution and provider call metadata.
- V2 events include provider info, but `SessionEntryStepper` drops it from entries.
- V1 has tool-output compaction and truncation behavior via `time.compacted`.
## Media Handling
- V1 models tool attachments as file parts and has provider-specific handling for media in tool results.
- V1 can strip media, inject synthetic user messages for unsupported providers, and uses a synthetic attachment prompt.
- V2 has attachments but not these model-message conversion semantics.
## Retries
- V1 stores retries as independently addressable retry parts.
- V2 stores retries as an assistant aggregate.
- V2 captures some retry information, but not the independent part identity/update model.
## Processor Control Flow
- Session status transitions: busy, retry, and idle.
- Retry policy integration.
- Context-overflow-driven compaction.
- Abort and interrupt handling.
- Permission-denied blocking.
- Doom-loop detection.
- Plugin hook for `experimental.text.complete`.
- Background summary generation after steps.
- Cleanup semantics for open text, reasoning, and tool calls.
## Sync And Bus Events
- Message updated.
- Message removed.
- Message part updated.
- Message part delta.
- Message part removed.
- V2 has domain events, but not the sync/bus event model for persisted message and part updates/removals.
## History Retrieval
- Cursor encoding and decoding.
- Paged message retrieval.
- Reverse streaming through history.
- Compaction-aware history filtering.