feat(session): project next session events

This commit is contained in:
Dax Raad 2026-04-26 14:59:23 -04:00
parent ccfe2ac4da
commit b80b1f4e2f
4 changed files with 218 additions and 5 deletions

View file

@ -1,3 +1,5 @@
export * as Log from "./log"
import path from "path"
import fs from "fs/promises"
import { createWriteStream } from "fs"

View file

@ -20,6 +20,9 @@ import { Question } from "@/question"
import { errorMessage } from "@/util/error"
import * as Log from "@opencode-ai/core/util/log"
import { isRecord } from "@/util/record"
import { SyncEvent } from "@/sync"
import { SessionEvent } from "@/v2/session-event"
import * as DateTime from "effect/DateTime"
const DOOM_LOOP_THRESHOLD = 3
const log = Log.create({ service: "session.processor" })
@ -221,6 +224,10 @@ export const layer: Layer.Layer<
case "reasoning-start":
if (value.id in ctx.reasoningMap) return
SyncEvent.run(SessionEvent.Reasoning.Started.Sync, {
sessionID: ctx.sessionID,
timestamp: DateTime.makeUnsafe(Date.now()),
})
ctx.reasoningMap[value.id] = {
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
@ -248,6 +255,11 @@ export const layer: Layer.Layer<
case "reasoning-end":
if (!(value.id in ctx.reasoningMap)) return
SyncEvent.run(SessionEvent.Reasoning.Ended.Sync, {
sessionID: ctx.sessionID,
text: ctx.reasoningMap[value.id].text,
timestamp: DateTime.makeUnsafe(Date.now()),
})
// oxlint-disable-next-line no-self-assign -- reactivity trigger
ctx.reasoningMap[value.id].text = ctx.reasoningMap[value.id].text
ctx.reasoningMap[value.id].time = { ...ctx.reasoningMap[value.id].time, end: Date.now() }
@ -260,6 +272,12 @@ export const layer: Layer.Layer<
if (ctx.assistantMessage.summary) {
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
}
SyncEvent.run(SessionEvent.Tool.Input.Started.Sync, {
sessionID: ctx.sessionID,
callID: value.id,
name: value.toolName,
timestamp: DateTime.makeUnsafe(Date.now()),
})
const part = yield* session.updatePart({
id: ctx.toolcalls[value.id]?.partID ?? PartID.ascending(),
messageID: ctx.assistantMessage.id,
@ -281,13 +299,32 @@ export const layer: Layer.Layer<
case "tool-input-delta":
return
case "tool-input-end":
case "tool-input-end": {
SyncEvent.run(SessionEvent.Tool.Input.Ended.Sync, {
sessionID: ctx.sessionID,
callID: value.id,
text: "",
timestamp: DateTime.makeUnsafe(Date.now()),
})
return
}
case "tool-call": {
if (ctx.assistantMessage.summary) {
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
}
const toolCall = yield* readToolCall(value.toolCallId)
SyncEvent.run(SessionEvent.Tool.Called.Sync, {
sessionID: ctx.sessionID,
callID: value.toolCallId,
tool: value.toolName,
input: value.input,
provider: {
executed: toolCall?.part.metadata?.providerExecuted === true,
...(value.providerMetadata ? { metadata: value.providerMetadata } : {}),
},
timestamp: DateTime.makeUnsafe(Date.now()),
})
yield* updateToolCall(value.toolCallId, (match) => ({
...match,
tool: value.toolName,
@ -331,11 +368,47 @@ export const layer: Layer.Layer<
}
case "tool-result": {
const toolCall = yield* readToolCall(value.toolCallId)
SyncEvent.run(SessionEvent.Tool.Success.Sync, {
sessionID: ctx.sessionID,
callID: value.toolCallId,
title: value.output.title,
output: value.output.output,
attachments: value.output.attachments?.map((item: MessageV2.FilePart) => ({
uri: item.url,
mime: item.mime,
...(item.filename ? { name: item.filename } : {}),
...(item.source
? {
source: {
start: item.source.text.start,
end: item.source.text.end,
text: item.source.text.value,
},
}
: {}),
})),
provider: {
executed: toolCall?.part.metadata?.providerExecuted === true,
metadata: value.output.metadata,
},
timestamp: DateTime.makeUnsafe(Date.now()),
})
yield* completeToolCall(value.toolCallId, value.output)
return
}
case "tool-error": {
const toolCall = yield* readToolCall(value.toolCallId)
SyncEvent.run(SessionEvent.Tool.Error.Sync, {
sessionID: ctx.sessionID,
callID: value.toolCallId,
error: errorMessage(value.error),
provider: {
executed: toolCall?.part.metadata?.providerExecuted === true,
},
timestamp: DateTime.makeUnsafe(Date.now()),
})
yield* failToolCall(value.toolCallId, value.error)
return
}
@ -345,6 +418,15 @@ export const layer: Layer.Layer<
case "start-step":
if (!ctx.snapshot) ctx.snapshot = yield* snapshot.track()
SyncEvent.run(SessionEvent.Step.Started.Sync, {
sessionID: ctx.sessionID,
model: {
id: ctx.model.id,
providerID: ctx.model.providerID,
variant: input.assistantMessage.variant,
},
timestamp: DateTime.makeUnsafe(Date.now()),
})
yield* session.updatePart({
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
@ -360,6 +442,13 @@ export const layer: Layer.Layer<
usage: value.usage,
metadata: value.providerMetadata,
})
SyncEvent.run(SessionEvent.Step.Ended.Sync, {
sessionID: ctx.sessionID,
reason: value.finishReason,
cost: usage.cost,
tokens: usage.tokens,
timestamp: DateTime.makeUnsafe(Date.now()),
})
ctx.assistantMessage.finish = value.finishReason
ctx.assistantMessage.cost += usage.cost
ctx.assistantMessage.tokens = usage.tokens
@ -404,6 +493,10 @@ export const layer: Layer.Layer<
}
case "text-start":
SyncEvent.run(SessionEvent.Text.Started.Sync, {
sessionID: ctx.sessionID,
timestamp: DateTime.makeUnsafe(Date.now()),
})
ctx.currentText = {
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
@ -442,6 +535,11 @@ export const layer: Layer.Layer<
},
{ text: ctx.currentText.text },
)).text
SyncEvent.run(SessionEvent.Text.Ended.Sync, {
sessionID: ctx.sessionID,
text: ctx.currentText.text,
timestamp: DateTime.makeUnsafe(Date.now()),
})
{
const end = Date.now()
ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end }
@ -568,13 +666,23 @@ export const layer: Layer.Layer<
Effect.retry(
SessionRetry.policy({
parse,
set: (info) =>
status.set(ctx.sessionID, {
set: (info) => {
SyncEvent.run(SessionEvent.Retried.Sync, {
sessionID: ctx.sessionID,
attempt: info.attempt,
error: {
message: info.message,
isRetryable: true,
},
timestamp: DateTime.makeUnsafe(Date.now()),
})
return status.set(ctx.sessionID, {
type: "retry",
attempt: info.attempt,
message: info.message,
next: info.next,
}),
})
},
}),
),
Effect.catch(halt),

View file

@ -0,0 +1,100 @@
import { and, desc, eq } from "@/storage"
import type { Database } from "@/storage"
import { SessionEntry } from "@/v2/session-entry"
import { SessionEntryStepper } from "@/v2/session-entry-stepper"
import { SessionEvent } from "@/v2/session-event"
import * as DateTime from "effect/DateTime"
import { SyncEvent } from "@/sync"
import { SessionEntryTable } from "./session.sql"
import type { SessionID } from "./schema"
function sqlite(db: Database.TxOrDb, sessionID: SessionID): SessionEntryStepper.Adapter<void> {
return {
getCurrentAssistant() {
return db
.select()
.from(SessionEntryTable)
.where(and(eq(SessionEntryTable.session_id, sessionID), eq(SessionEntryTable.type, "assistant")))
.orderBy(desc(SessionEntryTable.id))
.all()
.map((row) => ({ id: row.id, type: row.type, ...row.data }) as SessionEntry.Entry)
.find((entry): entry is SessionEntry.Assistant => entry.type === "assistant" && !entry.time.completed)
},
updateAssistant(assistant) {
const { id, type, ...data } = assistant
db.update(SessionEntryTable)
.set({ data })
.where(and(eq(SessionEntryTable.id, id), eq(SessionEntryTable.session_id, sessionID), eq(SessionEntryTable.type, type)))
.run()
},
appendEntry(entry) {
const { id, type, ...data } = entry
db.insert(SessionEntryTable)
.values({
id,
session_id: sessionID,
type,
time_created: DateTime.toEpochMillis(entry.time.created),
data,
})
.run()
},
appendPending() {},
finish() {},
}
}
function step(db: Database.TxOrDb, event: SessionEvent.Event) {
SessionEntryStepper.stepWith(sqlite(db, event.data.sessionID), event)
}
export default [
SyncEvent.project(SessionEvent.Prompted.Sync, (db, data) => {
step(db, { type: "session.next.prompted", data })
}),
SyncEvent.project(SessionEvent.Synthetic.Sync, (db, data) => {
step(db, { type: "session.next.synthetic", data })
}),
SyncEvent.project(SessionEvent.Step.Started.Sync, (db, data) => {
step(db, { type: "session.next.step.started", data })
}),
SyncEvent.project(SessionEvent.Step.Ended.Sync, (db, data) => {
step(db, { type: "session.next.step.ended", data })
}),
SyncEvent.project(SessionEvent.Text.Started.Sync, (db, data) => {
step(db, { type: "session.next.text.started", data })
}),
SyncEvent.project(SessionEvent.Text.Delta.Sync, () => {}),
SyncEvent.project(SessionEvent.Text.Ended.Sync, (db, data) => {
step(db, { type: "session.next.text.ended", data })
}),
SyncEvent.project(SessionEvent.Tool.Input.Started.Sync, (db, data) => {
step(db, { type: "session.next.tool.input.started", data })
}),
SyncEvent.project(SessionEvent.Tool.Input.Delta.Sync, () => {}),
SyncEvent.project(SessionEvent.Tool.Input.Ended.Sync, (db, data) => {
step(db, { type: "session.next.tool.input.ended", data })
}),
SyncEvent.project(SessionEvent.Tool.Called.Sync, (db, data) => {
step(db, { type: "session.next.tool.called", data })
}),
SyncEvent.project(SessionEvent.Tool.Success.Sync, (db, data) => {
step(db, { type: "session.next.tool.success", data })
}),
SyncEvent.project(SessionEvent.Tool.Error.Sync, (db, data) => {
step(db, { type: "session.next.tool.error", data })
}),
SyncEvent.project(SessionEvent.Reasoning.Started.Sync, (db, data) => {
step(db, { type: "session.next.reasoning.started", data })
}),
SyncEvent.project(SessionEvent.Reasoning.Delta.Sync, () => {}),
SyncEvent.project(SessionEvent.Reasoning.Ended.Sync, (db, data) => {
step(db, { type: "session.next.reasoning.ended", data })
}),
SyncEvent.project(SessionEvent.Retried.Sync, (db, data) => {
step(db, { type: "session.next.retried", data })
}),
SyncEvent.project(SessionEvent.Compacted.Sync, (db, data) => {
step(db, { type: "session.next.compacted", data })
}),
]

View file

@ -5,7 +5,8 @@ import { SyncEvent } from "@/sync"
import * as Session from "./session"
import { MessageV2 } from "./message-v2"
import { SessionTable, MessageTable, PartTable } from "./session.sql"
import * as Log from "@opencode-ai/core/util/log"
import { Log } from "@opencode-ai/core/util/log"
import nextProjectors from "./projectors-next"
const log = Log.create({ service: "session.projector" })
@ -135,4 +136,6 @@ export default [
log.warn("ignored late part update", { partID: id, messageID, sessionID })
}
}),
...nextProjectors,
]