diff --git a/packages/core/src/util/log.ts b/packages/core/src/util/log.ts index a61c15f7a7..e1962aed4c 100644 --- a/packages/core/src/util/log.ts +++ b/packages/core/src/util/log.ts @@ -1,3 +1,5 @@ +export * as Log from "./log" + import path from "path" import fs from "fs/promises" import { createWriteStream } from "fs" diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index b475ec1c59..245f9063db 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -20,6 +20,9 @@ import { Question } from "@/question" import { errorMessage } from "@/util/error" import * as Log from "@opencode-ai/core/util/log" import { isRecord } from "@/util/record" +import { SyncEvent } from "@/sync" +import { SessionEvent } from "@/v2/session-event" +import * as DateTime from "effect/DateTime" const DOOM_LOOP_THRESHOLD = 3 const log = Log.create({ service: "session.processor" }) @@ -221,6 +224,10 @@ export const layer: Layer.Layer< case "reasoning-start": if (value.id in ctx.reasoningMap) return + SyncEvent.run(SessionEvent.Reasoning.Started.Sync, { + sessionID: ctx.sessionID, + timestamp: DateTime.makeUnsafe(Date.now()), + }) ctx.reasoningMap[value.id] = { id: PartID.ascending(), messageID: ctx.assistantMessage.id, @@ -248,6 +255,11 @@ export const layer: Layer.Layer< case "reasoning-end": if (!(value.id in ctx.reasoningMap)) return + SyncEvent.run(SessionEvent.Reasoning.Ended.Sync, { + sessionID: ctx.sessionID, + text: ctx.reasoningMap[value.id].text, + timestamp: DateTime.makeUnsafe(Date.now()), + }) // oxlint-disable-next-line no-self-assign -- reactivity trigger ctx.reasoningMap[value.id].text = ctx.reasoningMap[value.id].text ctx.reasoningMap[value.id].time = { ...ctx.reasoningMap[value.id].time, end: Date.now() } @@ -260,6 +272,12 @@ export const layer: Layer.Layer< if (ctx.assistantMessage.summary) { throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`) } + SyncEvent.run(SessionEvent.Tool.Input.Started.Sync, { + sessionID: ctx.sessionID, + callID: value.id, + name: value.toolName, + timestamp: DateTime.makeUnsafe(Date.now()), + }) const part = yield* session.updatePart({ id: ctx.toolcalls[value.id]?.partID ?? PartID.ascending(), messageID: ctx.assistantMessage.id, @@ -281,13 +299,32 @@ export const layer: Layer.Layer< case "tool-input-delta": return - case "tool-input-end": + case "tool-input-end": { + SyncEvent.run(SessionEvent.Tool.Input.Ended.Sync, { + sessionID: ctx.sessionID, + callID: value.id, + text: "", + timestamp: DateTime.makeUnsafe(Date.now()), + }) return + } case "tool-call": { if (ctx.assistantMessage.summary) { throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`) } + const toolCall = yield* readToolCall(value.toolCallId) + SyncEvent.run(SessionEvent.Tool.Called.Sync, { + sessionID: ctx.sessionID, + callID: value.toolCallId, + tool: value.toolName, + input: value.input, + provider: { + executed: toolCall?.part.metadata?.providerExecuted === true, + ...(value.providerMetadata ? { metadata: value.providerMetadata } : {}), + }, + timestamp: DateTime.makeUnsafe(Date.now()), + }) yield* updateToolCall(value.toolCallId, (match) => ({ ...match, tool: value.toolName, @@ -331,11 +368,47 @@ export const layer: Layer.Layer< } case "tool-result": { + const toolCall = yield* readToolCall(value.toolCallId) + SyncEvent.run(SessionEvent.Tool.Success.Sync, { + sessionID: ctx.sessionID, + callID: value.toolCallId, + title: value.output.title, + output: value.output.output, + attachments: value.output.attachments?.map((item: MessageV2.FilePart) => ({ + uri: item.url, + mime: item.mime, + ...(item.filename ? { name: item.filename } : {}), + ...(item.source + ? { + source: { + start: item.source.text.start, + end: item.source.text.end, + text: item.source.text.value, + }, + } + : {}), + })), + provider: { + executed: toolCall?.part.metadata?.providerExecuted === true, + metadata: value.output.metadata, + }, + timestamp: DateTime.makeUnsafe(Date.now()), + }) yield* completeToolCall(value.toolCallId, value.output) return } case "tool-error": { + const toolCall = yield* readToolCall(value.toolCallId) + SyncEvent.run(SessionEvent.Tool.Error.Sync, { + sessionID: ctx.sessionID, + callID: value.toolCallId, + error: errorMessage(value.error), + provider: { + executed: toolCall?.part.metadata?.providerExecuted === true, + }, + timestamp: DateTime.makeUnsafe(Date.now()), + }) yield* failToolCall(value.toolCallId, value.error) return } @@ -345,6 +418,15 @@ export const layer: Layer.Layer< case "start-step": if (!ctx.snapshot) ctx.snapshot = yield* snapshot.track() + SyncEvent.run(SessionEvent.Step.Started.Sync, { + sessionID: ctx.sessionID, + model: { + id: ctx.model.id, + providerID: ctx.model.providerID, + variant: input.assistantMessage.variant, + }, + timestamp: DateTime.makeUnsafe(Date.now()), + }) yield* session.updatePart({ id: PartID.ascending(), messageID: ctx.assistantMessage.id, @@ -360,6 +442,13 @@ export const layer: Layer.Layer< usage: value.usage, metadata: value.providerMetadata, }) + SyncEvent.run(SessionEvent.Step.Ended.Sync, { + sessionID: ctx.sessionID, + reason: value.finishReason, + cost: usage.cost, + tokens: usage.tokens, + timestamp: DateTime.makeUnsafe(Date.now()), + }) ctx.assistantMessage.finish = value.finishReason ctx.assistantMessage.cost += usage.cost ctx.assistantMessage.tokens = usage.tokens @@ -404,6 +493,10 @@ export const layer: Layer.Layer< } case "text-start": + SyncEvent.run(SessionEvent.Text.Started.Sync, { + sessionID: ctx.sessionID, + timestamp: DateTime.makeUnsafe(Date.now()), + }) ctx.currentText = { id: PartID.ascending(), messageID: ctx.assistantMessage.id, @@ -442,6 +535,11 @@ export const layer: Layer.Layer< }, { text: ctx.currentText.text }, )).text + SyncEvent.run(SessionEvent.Text.Ended.Sync, { + sessionID: ctx.sessionID, + text: ctx.currentText.text, + timestamp: DateTime.makeUnsafe(Date.now()), + }) { const end = Date.now() ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end } @@ -568,13 +666,23 @@ export const layer: Layer.Layer< Effect.retry( SessionRetry.policy({ parse, - set: (info) => - status.set(ctx.sessionID, { + set: (info) => { + SyncEvent.run(SessionEvent.Retried.Sync, { + sessionID: ctx.sessionID, + attempt: info.attempt, + error: { + message: info.message, + isRetryable: true, + }, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + return status.set(ctx.sessionID, { type: "retry", attempt: info.attempt, message: info.message, next: info.next, - }), + }) + }, }), ), Effect.catch(halt), diff --git a/packages/opencode/src/session/projectors-next.ts b/packages/opencode/src/session/projectors-next.ts new file mode 100644 index 0000000000..3298edfd6d --- /dev/null +++ b/packages/opencode/src/session/projectors-next.ts @@ -0,0 +1,100 @@ +import { and, desc, eq } from "@/storage" +import type { Database } from "@/storage" +import { SessionEntry } from "@/v2/session-entry" +import { SessionEntryStepper } from "@/v2/session-entry-stepper" +import { SessionEvent } from "@/v2/session-event" +import * as DateTime from "effect/DateTime" +import { SyncEvent } from "@/sync" +import { SessionEntryTable } from "./session.sql" +import type { SessionID } from "./schema" + +function sqlite(db: Database.TxOrDb, sessionID: SessionID): SessionEntryStepper.Adapter { + return { + getCurrentAssistant() { + return db + .select() + .from(SessionEntryTable) + .where(and(eq(SessionEntryTable.session_id, sessionID), eq(SessionEntryTable.type, "assistant"))) + .orderBy(desc(SessionEntryTable.id)) + .all() + .map((row) => ({ id: row.id, type: row.type, ...row.data }) as SessionEntry.Entry) + .find((entry): entry is SessionEntry.Assistant => entry.type === "assistant" && !entry.time.completed) + }, + updateAssistant(assistant) { + const { id, type, ...data } = assistant + db.update(SessionEntryTable) + .set({ data }) + .where(and(eq(SessionEntryTable.id, id), eq(SessionEntryTable.session_id, sessionID), eq(SessionEntryTable.type, type))) + .run() + }, + appendEntry(entry) { + const { id, type, ...data } = entry + db.insert(SessionEntryTable) + .values({ + id, + session_id: sessionID, + type, + time_created: DateTime.toEpochMillis(entry.time.created), + data, + }) + .run() + }, + appendPending() {}, + finish() {}, + } +} + +function step(db: Database.TxOrDb, event: SessionEvent.Event) { + SessionEntryStepper.stepWith(sqlite(db, event.data.sessionID), event) +} + +export default [ + SyncEvent.project(SessionEvent.Prompted.Sync, (db, data) => { + step(db, { type: "session.next.prompted", data }) + }), + SyncEvent.project(SessionEvent.Synthetic.Sync, (db, data) => { + step(db, { type: "session.next.synthetic", data }) + }), + SyncEvent.project(SessionEvent.Step.Started.Sync, (db, data) => { + step(db, { type: "session.next.step.started", data }) + }), + SyncEvent.project(SessionEvent.Step.Ended.Sync, (db, data) => { + step(db, { type: "session.next.step.ended", data }) + }), + SyncEvent.project(SessionEvent.Text.Started.Sync, (db, data) => { + step(db, { type: "session.next.text.started", data }) + }), + SyncEvent.project(SessionEvent.Text.Delta.Sync, () => {}), + SyncEvent.project(SessionEvent.Text.Ended.Sync, (db, data) => { + step(db, { type: "session.next.text.ended", data }) + }), + SyncEvent.project(SessionEvent.Tool.Input.Started.Sync, (db, data) => { + step(db, { type: "session.next.tool.input.started", data }) + }), + SyncEvent.project(SessionEvent.Tool.Input.Delta.Sync, () => {}), + SyncEvent.project(SessionEvent.Tool.Input.Ended.Sync, (db, data) => { + step(db, { type: "session.next.tool.input.ended", data }) + }), + SyncEvent.project(SessionEvent.Tool.Called.Sync, (db, data) => { + step(db, { type: "session.next.tool.called", data }) + }), + SyncEvent.project(SessionEvent.Tool.Success.Sync, (db, data) => { + step(db, { type: "session.next.tool.success", data }) + }), + SyncEvent.project(SessionEvent.Tool.Error.Sync, (db, data) => { + step(db, { type: "session.next.tool.error", data }) + }), + SyncEvent.project(SessionEvent.Reasoning.Started.Sync, (db, data) => { + step(db, { type: "session.next.reasoning.started", data }) + }), + SyncEvent.project(SessionEvent.Reasoning.Delta.Sync, () => {}), + SyncEvent.project(SessionEvent.Reasoning.Ended.Sync, (db, data) => { + step(db, { type: "session.next.reasoning.ended", data }) + }), + SyncEvent.project(SessionEvent.Retried.Sync, (db, data) => { + step(db, { type: "session.next.retried", data }) + }), + SyncEvent.project(SessionEvent.Compacted.Sync, (db, data) => { + step(db, { type: "session.next.compacted", data }) + }), +] diff --git a/packages/opencode/src/session/projectors.ts b/packages/opencode/src/session/projectors.ts index 35c8473809..d5448d91d2 100644 --- a/packages/opencode/src/session/projectors.ts +++ b/packages/opencode/src/session/projectors.ts @@ -5,7 +5,8 @@ import { SyncEvent } from "@/sync" import * as Session from "./session" import { MessageV2 } from "./message-v2" import { SessionTable, MessageTable, PartTable } from "./session.sql" -import * as Log from "@opencode-ai/core/util/log" +import { Log } from "@opencode-ai/core/util/log" +import nextProjectors from "./projectors-next" const log = Log.create({ service: "session.projector" }) @@ -135,4 +136,6 @@ export default [ log.warn("ignored late part update", { partID: id, messageID, sessionID }) } }), + + ...nextProjectors, ]