fix(httpapi): handle corrupt v2 session messages (#28633)

This commit is contained in:
Shoubhit Dash 2026-05-21 16:30:04 +05:30 committed by GitHub
parent 4a976482b1
commit 9739d75892
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 120 additions and 13 deletions

View file

@ -2,7 +2,7 @@ import { SessionID } from "@/session/schema"
import { SessionMessage } from "@opencode-ai/core/session-message"
import { Schema } from "effect"
import { HttpApiEndpoint, HttpApiGroup, OpenApi } from "effect/unstable/httpapi"
import { InvalidCursorError, SessionNotFoundError } from "../../errors"
import { InvalidCursorError, SessionNotFoundError, UnknownError } from "../../errors"
import { V2Authorization } from "../../middleware/authorization"
import { WorkspaceRoutingQueryFields } from "../../middleware/workspace-routing"
@ -36,7 +36,7 @@ export const MessageGroup = HttpApiGroup.make("v2.message")
next: Schema.String.pipe(Schema.optional),
}),
}).annotate({ identifier: "V2SessionMessagesResponse" }),
error: [InvalidCursorError, SessionNotFoundError],
error: [InvalidCursorError, SessionNotFoundError, UnknownError],
}).annotateMerge(
OpenApi.annotations({
identifier: "v2.session.messages",

View file

@ -4,7 +4,7 @@ import { Prompt } from "@opencode-ai/core/session-prompt"
import { SessionV2 } from "@/v2/session"
import { Schema } from "effect"
import { HttpApiEndpoint, HttpApiGroup, HttpApiSchema, OpenApi } from "effect/unstable/httpapi"
import { InvalidCursorError, InvalidRequestError, ServiceUnavailableError, SessionNotFoundError } from "../../errors"
import { InvalidCursorError, InvalidRequestError, ServiceUnavailableError, SessionNotFoundError, UnknownError } from "../../errors"
import { V2Authorization } from "../../middleware/authorization"
import { WorkspaceRoutingQuery, WorkspaceRoutingQueryFields } from "../../middleware/workspace-routing"
import { QueryBoolean } from "../query"
@ -103,7 +103,7 @@ export const SessionGroup = HttpApiGroup.make("v2.session")
params: { sessionID: SessionID },
query: WorkspaceRoutingQuery,
success: Schema.Array(SessionMessage.Message),
error: SessionNotFoundError,
error: [SessionNotFoundError, UnknownError],
}).annotateMerge(
OpenApi.annotations({
identifier: "v2.session.context",

View file

@ -4,7 +4,7 @@ import { Effect, Schema } from "effect"
import * as DateTime from "effect/DateTime"
import { HttpApiBuilder } from "effect/unstable/httpapi"
import { InstanceHttpApi } from "../../api"
import { InvalidCursorError, SessionNotFoundError } from "../../errors"
import { InvalidCursorError, SessionNotFoundError, UnknownError } from "../../errors"
const DefaultMessagesLimit = 50
@ -58,6 +58,20 @@ export const messageHandlers = HttpApiBuilder.group(InstanceHttpApi, "v2.message
}),
),
),
Effect.catchTag("Session.MessageDecodeError", (error) => {
const ref = `err_${crypto.randomUUID().slice(0, 8)}`
return Effect.logError("failed to decode v2 session message").pipe(
Effect.annotateLogs({ ref, sessionID: error.sessionID, messageID: error.messageID }),
Effect.andThen(
Effect.fail(
new UnknownError({
message: "Unexpected server error. Check server logs for details.",
ref,
}),
),
),
)
}),
)
const first = messages[0]
const last = messages.at(-1)

View file

@ -3,7 +3,7 @@ import { SessionV2 } from "@/v2/session"
import { DateTime, Effect, Option, Schema } from "effect"
import { HttpApiBuilder, HttpApiSchema } from "effect/unstable/httpapi"
import { InstanceHttpApi } from "../../api"
import { InvalidCursorError, InvalidRequestError, ServiceUnavailableError, SessionNotFoundError } from "../../errors"
import { InvalidCursorError, InvalidRequestError, ServiceUnavailableError, SessionNotFoundError, UnknownError } from "../../errors"
const DefaultSessionsLimit = 50
@ -219,6 +219,20 @@ export const sessionHandlers = HttpApiBuilder.group(InstanceHttpApi, "v2.session
}),
),
),
Effect.catchTag("Session.MessageDecodeError", (error) => {
const ref = `err_${crypto.randomUUID().slice(0, 8)}`
return Effect.logError("failed to decode v2 session message").pipe(
Effect.annotateLogs({ ref, sessionID: error.sessionID, messageID: error.messageID }),
Effect.andThen(
Effect.fail(
new UnknownError({
message: "Unexpected server error. Check server logs for details.",
ref,
}),
),
),
)
}),
)
}),
)

View file

@ -72,6 +72,11 @@ export class OperationUnavailableError extends Schema.TaggedErrorClass<Operation
},
) {}
export class MessageDecodeError extends Schema.TaggedErrorClass<MessageDecodeError>()("Session.MessageDecodeError", {
sessionID: SessionID,
messageID: SessionMessage.ID,
}) {}
export interface Interface {
readonly create: (input?: {
agent?: string
@ -104,8 +109,8 @@ export interface Interface {
time: number
direction: "previous" | "next"
}
}) => Effect.Effect<SessionMessage.Message[], NotFoundError>
readonly context: (sessionID: SessionID) => Effect.Effect<SessionMessage.Message[], NotFoundError>
}) => Effect.Effect<SessionMessage.Message[], NotFoundError | MessageDecodeError>
readonly context: (sessionID: SessionID) => Effect.Effect<SessionMessage.Message[], NotFoundError | MessageDecodeError>
readonly prompt: (input: {
id?: EventV2.ID
sessionID: SessionID
@ -120,7 +125,7 @@ export interface Interface {
prompt: Prompt
agent: string
model?: ModelV2.Ref
}) => Effect.Effect<void, NotFoundError | OperationUnavailableError>
}) => Effect.Effect<void, NotFoundError | OperationUnavailableError | MessageDecodeError>
readonly switchAgent: (input: { sessionID: SessionID; agent: string }) => Effect.Effect<void, never>
readonly switchModel: (input: { sessionID: SessionID; model: ModelV2.Ref }) => Effect.Effect<void, never>
readonly compact: (sessionID: SessionID) => Effect.Effect<void, NotFoundError | OperationUnavailableError>
@ -133,10 +138,18 @@ export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const events = yield* EventV2Bridge.Service
const decodeMessage = Schema.decodeUnknownSync(SessionMessage.Message)
const decodeMessage = Schema.decodeUnknownEffect(SessionMessage.Message)
const decode = (row: typeof SessionMessageTable.$inferSelect) =>
decodeMessage({ ...row.data, id: row.id, type: row.type })
decodeMessage({ ...row.data, id: row.id, type: row.type }).pipe(
Effect.mapError(
() =>
new MessageDecodeError({
sessionID: SessionID.make(row.session_id),
messageID: SessionMessage.ID.make(row.id),
}),
),
)
function fromRow(row: typeof SessionTable.$inferSelect): Info {
return new Info({
@ -262,7 +275,7 @@ export const layer = Layer.effect(
const rows = input.limit === undefined ? query.all() : query.limit(input.limit).all()
return direction === "previous" ? rows.toReversed() : rows
})
return rows.map((row) => decode(row))
return yield* Effect.forEach(rows, (row) => decode(row))
}),
context: Effect.fn("V2Session.context")(function* (sessionID) {
yield* result.get(sessionID)
@ -295,7 +308,7 @@ export const layer = Layer.effect(
.orderBy(asc(SessionMessageTable.time_created), asc(SessionMessageTable.id))
.all()
})
return rows.map((row) => decode(row))
return yield* Effect.forEach(rows, (row) => decode(row))
}),
prompt: Effect.fn("V2Session.prompt")(function* (input) {
yield* result.get(input.sessionID)

View file

@ -129,4 +129,17 @@ describe("PublicApi OpenAPI v2 errors", () => {
)
}
})
test("documents v2 session read data errors", () => {
const spec = OpenApi.fromApi(PublicApi) as OpenApiSpec
for (const route of [
["get", "/api/session/{sessionID}/context"],
["get", "/api/session/{sessionID}/message"],
] as const) {
expect(componentName(responseRef(spec.paths[route[1]]?.[route[0]]?.responses?.["500"]) ?? "")).toMatch(
/^UnknownError\d*$/,
)
}
})
})

View file

@ -139,6 +139,24 @@ const insertLegacyAssistantMessage = (sessionID: SessionIDType, time = 1) =>
)
})
const insertCorruptV2Message = (sessionID: SessionIDType, time = 1) =>
Effect.sync(() =>
Database.use((db) =>
db
.insert(SessionMessageTable)
.values([
{
id: SessionMessage.ID.create(),
session_id: sessionID,
type: "assistant",
time_created: time,
data: {} as NonNullable<(typeof SessionMessageTable.$inferInsert)["data"]>,
},
])
.run(),
),
)
const setLegacySummaryDiff = (sessionID: SessionIDType) =>
Effect.sync(() =>
Database.use((db) =>
@ -481,6 +499,41 @@ describe("session HttpApi", () => {
{ git: true, config: { formatter: false, lsp: false } },
)
it.instance(
"returns safe v2 unknown errors for corrupt projected messages",
() =>
Effect.gen(function* () {
const test = yield* TestInstance
const session = yield* createSession({ title: "v2 corrupt message" })
yield* insertCorruptV2Message(session.id)
const messages = yield* request(`/api/session/${session.id}/message`, {
headers: { "x-opencode-directory": test.directory },
})
const messagesBody = yield* responseJson(messages)
expect(messages.status).toBe(500)
expect(messagesBody).toMatchObject({
_tag: "UnknownError",
message: "Unexpected server error. Check server logs for details.",
})
expect((messagesBody as { ref?: unknown }).ref).toMatch(/^err_[0-9a-f-]{8}$/)
expect(JSON.stringify(messagesBody)).not.toContain("assistant")
const context = yield* request(`/api/session/${session.id}/context`, {
headers: { "x-opencode-directory": test.directory },
})
const contextBody = yield* responseJson(context)
expect(context.status).toBe(500)
expect(contextBody).toMatchObject({
_tag: "UnknownError",
message: "Unexpected server error. Check server logs for details.",
})
expect((contextBody as { ref?: unknown }).ref).toMatch(/^err_[0-9a-f-]{8}$/)
expect(JSON.stringify(contextBody)).not.toContain("assistant")
}),
{ git: true, config: { formatter: false, lsp: false } },
)
it.instance(
"serves sessions with migrated summary diffs missing file details",
() =>