Revamp sync safety: implement claiming

This commit is contained in:
James Long 2026-05-04 13:16:12 -04:00
parent e1f49c2cbe
commit c86fc1a756
10 changed files with 1539 additions and 155 deletions

View file

@ -0,0 +1 @@
ALTER TABLE `event_sequence` ADD `owner_id` text;

File diff suppressed because it is too large Load diff

View file

@ -518,42 +518,26 @@ export const layer = Layer.effect(
if (current?.workspaceID) {
const previous = yield* get(current.workspaceID)
if (previous) {
yield* Effect.gen(function* () {
const adaptor = getAdaptor(previous.projectID, previous.type)
const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(previous)))
if (target.type === "local") return
const adaptor = getAdaptor(previous.projectID, previous.type)
const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(previous)))
const response = yield* http.execute(
HttpClientRequest.post(route(target.url, "/sync/erase"), {
headers: new Headers(target.headers),
body: HttpBody.jsonUnsafe({ sessionID: input.sessionID }),
}),
if (target.type === "remote") {
yield* syncHistory(previous, target.url, target.headers).pipe(
Effect.catch((error) =>
Effect.sync(() => {
log.warn("session warp final source sync failed", {
workspaceID: previous.id,
sessionID: input.sessionID,
error: errorData(error),
})
}),
),
)
}
// TODO: if this fails, we need to mark this workspace
// as "orphaned" meaning we abandoned it and never want
// to talk to it again
if (response.status < 200 || response.status >= 300) {
const body = yield* response.text
log.warn("session warp erase failed", {
workspaceID: previous.id,
sessionID: input.sessionID,
status: response.status,
body,
})
}
}).pipe(
Effect.catch((error) =>
Effect.sync(() => {
log.warn("session warp erase unavailable", {
workspaceID: previous.id,
sessionID: input.sessionID,
error,
})
}),
),
)
// "claim" this session so any future events coming from
// the old workspace are ignored
SyncEvent.claim(input.sessionID, input.workspaceID)
}
}

View file

@ -37,7 +37,6 @@ export const HistoryEvent = Schema.Struct({
export const SyncPaths = {
start: `${root}/start`,
replay: `${root}/replay`,
erase: `${root}/erase`,
steal: `${root}/steal`,
history: `${root}/history`,
} as const
@ -66,17 +65,6 @@ export const SyncApi = HttpApi.make("sync")
description: "Validate and replay a complete sync event history.",
}),
),
HttpApiEndpoint.post("erase", SyncPaths.erase, {
payload: SessionPayload,
success: described(SessionPayload, "Erased session sync events"),
error: HttpApiError.BadRequest,
}).annotateMerge(
OpenApi.annotations({
identifier: "sync.erase",
summary: "Erase session sync events",
description: "Erase all locally stored sync events for a session aggregate.",
}),
),
HttpApiEndpoint.post("steal", SyncPaths.steal, {
payload: SessionPayload,
success: described(SessionPayload, "Session stolen into workspace"),

View file

@ -36,11 +36,6 @@ export const syncHandlers = HttpApiBuilder.group(InstanceHttpApi, "sync", (handl
return { sessionID: events[0].aggregateID }
})
const erase = Effect.fn("SyncHttpApi.erase")(function* (ctx: { payload: typeof SessionPayload.Type }) {
SyncEvent.remove(ctx.payload.sessionID)
return { sessionID: ctx.payload.sessionID }
})
const steal = Effect.fn("SyncHttpApi.steal")(function* (ctx: { payload: typeof SessionPayload.Type }) {
const instance = yield* InstanceState.context
const workspaceID = yield* InstanceState.workspaceID
@ -83,7 +78,6 @@ export const syncHandlers = HttpApiBuilder.group(InstanceHttpApi, "sync", (handl
return handlers
.handle("start", start)
.handle("replay", replay)
.handle("erase", erase)
.handle("steal", steal)
.handle("history", history)
}),

View file

@ -111,33 +111,6 @@ export const SyncRoutes = lazy(() =>
})
},
)
.post(
"/erase",
describeRoute({
summary: "Erase session sync events",
description: "Erase all locally stored sync events for a session aggregate.",
operationId: "sync.erase",
responses: {
200: {
description: "Erased session sync events",
content: {
"application/json": {
schema: resolver(SessionPayload),
},
},
},
...errors(400),
},
}),
validator("json", SessionPayload),
async (c) => {
const body = c.req.valid("json")
SyncEvent.remove(body.sessionID)
return c.json({
sessionID: body.sessionID,
})
},
)
.post(
"/steal",
describeRoute({

View file

@ -3,6 +3,7 @@ import { sqliteTable, text, integer } from "drizzle-orm/sqlite-core"
export const EventSequenceTable = sqliteTable("event_sequence", {
aggregate_id: text().notNull().primaryKey(),
seq: integer().notNull(),
owner_id: text(),
})
export const EventTable = sqliteTable("event", {

View file

@ -121,7 +121,7 @@ export function project<Def extends Definition>(
return [def, func as ProjectorFunc]
}
function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean; ownerID?: string }) {
if (projectors == null) {
throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
}
@ -131,8 +131,6 @@ function process<Def extends Definition>(def: Def, event: Event<Def>, options: {
throw new Error(`Projector not found for event: ${def.type}`)
}
// idempotent: need to ignore any events already logged
Database.transaction((tx) => {
projector(tx, event.data)
@ -141,6 +139,7 @@ function process<Def extends Definition>(def: Def, event: Event<Def>, options: {
.values({
aggregate_id: event.aggregateID,
seq: event.seq,
owner_id: options?.ownerID,
})
.onConflictDoUpdate({
target: EventSequenceTable.aggregate_id,
@ -185,7 +184,7 @@ function process<Def extends Definition>(def: Def, event: Event<Def>, options: {
})
}
export function replay(event: SerializedEvent, options?: { publish: boolean }) {
export function replay(event: SerializedEvent, options?: { publish: boolean; ownerID?: string }) {
const def = registry.get(event.type)
if (!def) {
throw new Error(`Unknown event type: ${event.type}`)
@ -193,7 +192,7 @@ export function replay(event: SerializedEvent, options?: { publish: boolean }) {
const row = Database.use((db) =>
db
.select({ seq: EventSequenceTable.seq })
.select({ seq: EventSequenceTable.seq, ownerID: EventSequenceTable.owner_id })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
.get(),
@ -204,12 +203,16 @@ export function replay(event: SerializedEvent, options?: { publish: boolean }) {
return
}
if (row?.ownerID && row.ownerID !== options?.ownerID) {
return
}
const expected = latest + 1
if (event.seq !== expected) {
throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`)
}
process(def, event, { publish: !!options?.publish })
process(def, event, { publish: !!options?.publish, ownerID: options?.ownerID ?? row?.ownerID ?? undefined })
}
export function replayAll(events: SerializedEvent[], options?: { publish: boolean }) {
@ -274,6 +277,16 @@ export function remove(aggregateID: string) {
})
}
export function claim(aggregateID: string, ownerID: string) {
Database.use((db) =>
db
.update(EventSequenceTable)
.set({ owner_id: ownerID })
.where(eq(EventSequenceTable.aggregate_id, aggregateID))
.run(),
)
}
export function payloads() {
return registry
.entries()

View file

@ -160,8 +160,6 @@ import type {
SessionUpdateErrors,
SessionUpdateResponses,
SubtaskPartInput,
SyncEraseErrors,
SyncEraseResponses,
SyncHistoryListErrors,
SyncHistoryListResponses,
SyncReplayErrors,
@ -3177,43 +3175,6 @@ export class Sync extends HeyApiClient {
})
}
/**
* Erase session sync events
*
* Erase all locally stored sync events for a session aggregate.
*/
public erase<ThrowOnError extends boolean = false>(
parameters?: {
directory?: string
workspace?: string
sessionID?: string
},
options?: Options<never, ThrowOnError>,
) {
const params = buildClientParams(
[parameters],
[
{
args: [
{ in: "query", key: "directory" },
{ in: "query", key: "workspace" },
{ in: "body", key: "sessionID" },
],
},
],
)
return (options?.client ?? this.client).post<SyncEraseResponses, SyncEraseErrors, ThrowOnError>({
url: "/sync/erase",
...options,
...params,
headers: {
"Content-Type": "application/json",
...options?.headers,
...params.headers,
},
})
}
/**
* Steal session into workspace
*

View file

@ -4611,38 +4611,6 @@ export type SyncReplayResponses = {
export type SyncReplayResponse = SyncReplayResponses[keyof SyncReplayResponses]
export type SyncEraseData = {
body?: {
sessionID: string
}
path?: never
query?: {
directory?: string
workspace?: string
}
url: "/sync/erase"
}
export type SyncEraseErrors = {
/**
* Bad request
*/
400: BadRequestError
}
export type SyncEraseError = SyncEraseErrors[keyof SyncEraseErrors]
export type SyncEraseResponses = {
/**
* Erased session sync events
*/
200: {
sessionID: string
}
}
export type SyncEraseResponse = SyncEraseResponses[keyof SyncEraseResponses]
export type SyncStealData = {
body?: {
sessionID: string