diff --git a/packages/opencode/src/control-plane/adaptors/index.ts b/packages/opencode/src/control-plane/adaptors/index.ts index 651d09cc21..c91f534b5a 100644 --- a/packages/opencode/src/control-plane/adaptors/index.ts +++ b/packages/opencode/src/control-plane/adaptors/index.ts @@ -1,27 +1,26 @@ -import { lazy } from "@/util/lazy" import type { ProjectID } from "@/project/schema" import type { WorkspaceAdaptor, WorkspaceAdaptorEntry } from "../types" +import { WorktreeAdaptor } from "./worktree" -const BUILTIN: Record Promise> = { - worktree: lazy(async () => (await import("./worktree")).WorktreeAdaptor), +const BUILTIN: Record = { + worktree: WorktreeAdaptor, } const state = new Map>() -export async function getAdaptor(projectID: ProjectID, type: string): Promise { +export function getAdaptor(projectID: ProjectID, type: string): WorkspaceAdaptor { const custom = state.get(projectID)?.get(type) if (custom) return custom const builtin = BUILTIN[type] - if (builtin) return builtin() + if (builtin) return builtin throw new Error(`Unknown workspace adaptor: ${type}`) } export async function listAdaptors(projectID: ProjectID): Promise { const builtin = await Promise.all( - Object.entries(BUILTIN).map(async ([type, init]) => { - const adaptor = await init() + Object.entries(BUILTIN).map(async ([type, adaptor]) => { return { type, name: adaptor.name, diff --git a/packages/opencode/src/control-plane/adaptors/worktree.ts b/packages/opencode/src/control-plane/adaptors/worktree.ts index 9c080daa38..de9618d302 100644 --- a/packages/opencode/src/control-plane/adaptors/worktree.ts +++ b/packages/opencode/src/control-plane/adaptors/worktree.ts @@ -1,6 +1,4 @@ import { Schema } from "effect" -import { AppRuntime } from "@/effect/app-runtime" -import { Worktree } from "@/worktree" import { type WorkspaceAdaptor, WorkspaceInfo } from "../types" const WorktreeConfig = Schema.Struct({ @@ -10,19 +8,26 @@ const WorktreeConfig = Schema.Struct({ }) const decodeWorktreeConfig = Schema.decodeUnknownSync(WorktreeConfig) +async function loadWorktree() { + const [{ AppRuntime }, { Worktree }] = await Promise.all([import("@/effect/app-runtime"), import("@/worktree")]) + return { AppRuntime, Worktree } +} + export const WorktreeAdaptor: WorkspaceAdaptor = { name: "Worktree", description: "Create a git worktree", async configure(info) { - const worktree = await AppRuntime.runPromise(Worktree.Service.use((svc) => svc.makeWorktreeInfo())) + const { AppRuntime, Worktree } = await loadWorktree() + const next = await AppRuntime.runPromise(Worktree.Service.use((svc) => svc.makeWorktreeInfo())) return { ...info, - name: worktree.name, - branch: worktree.branch, - directory: worktree.directory, + name: next.name, + branch: next.branch, + directory: next.directory, } }, async create(info) { + const { AppRuntime, Worktree } = await loadWorktree() const config = decodeWorktreeConfig(info) await AppRuntime.runPromise( Worktree.Service.use((svc) => @@ -35,6 +40,7 @@ export const WorktreeAdaptor: WorkspaceAdaptor = { ) }, async remove(info) { + const { AppRuntime, Worktree } = await loadWorktree() const config = decodeWorktreeConfig(info) await AppRuntime.runPromise(Worktree.Service.use((svc) => svc.remove({ directory: config.directory }))) }, diff --git a/packages/opencode/src/control-plane/dev/README.md b/packages/opencode/src/control-plane/dev/README.md new file mode 100644 index 0000000000..dbd62c0b1f --- /dev/null +++ b/packages/opencode/src/control-plane/dev/README.md @@ -0,0 +1,20 @@ + +This is a plugin to simulate a remote environment locally. Add this to `.opencode/opencode.jsonc`: + +```json + "plugin": ["../packages/opencode/src/control-plane/dev/debug-workspace-plugin.ts"], +``` + +In a separate terminal, run a separate OpenCode server. This will act like a remote server and the local instance will proxy all requests to it: + +``` +./packages/opencode/script/run-workspace-server +``` + +With the plugin install, you can now run OpenCode and create a `debug` workspace type. This will create a "remote" workspace which talks to the second workspace server started above. + +How this works: + +* The workspace server needs to know the workspace id and port to run. It waits for this information to be written to a file and starts the server when the data is written. +* The debug plugin writes this information in the `create` call to the workspace. So create a `debug` workspace will always kick off a new external server. +* The server script watches for file changes, so whenver you create a new `debug` workspace it will restart with the new information. This means that there is only ever one working `debug` workspace at a time; when you create a new one all previous sessions will show that it can't connect because previous debug workspaces do not exist. \ No newline at end of file diff --git a/packages/opencode/src/control-plane/sse.ts b/packages/opencode/src/control-plane/sse.ts deleted file mode 100644 index 003093a003..0000000000 --- a/packages/opencode/src/control-plane/sse.ts +++ /dev/null @@ -1,66 +0,0 @@ -export async function parseSSE( - body: ReadableStream, - signal: AbortSignal, - onEvent: (event: unknown) => void, -) { - const reader = body.getReader() - const decoder = new TextDecoder() - let buf = "" - let last = "" - let retry = 1000 - - const abort = () => { - void reader.cancel().catch(() => undefined) - } - - signal.addEventListener("abort", abort) - - try { - while (!signal.aborted) { - const chunk = await reader.read().catch(() => ({ done: true, value: undefined as Uint8Array | undefined })) - if (chunk.done) break - - buf += decoder.decode(chunk.value, { stream: true }) - buf = buf.replace(/\r\n/g, "\n").replace(/\r/g, "\n") - - const chunks = buf.split("\n\n") - buf = chunks.pop() ?? "" - - chunks.forEach((chunk) => { - const data: string[] = [] - chunk.split("\n").forEach((line) => { - if (line.startsWith("data:")) { - data.push(line.replace(/^data:\s*/, "")) - return - } - if (line.startsWith("id:")) { - last = line.replace(/^id:\s*/, "") - return - } - if (line.startsWith("retry:")) { - const parsed = Number.parseInt(line.replace(/^retry:\s*/, ""), 10) - if (!Number.isNaN(parsed)) retry = parsed - } - }) - - if (!data.length) return - const raw = data.join("\n") - try { - onEvent(JSON.parse(raw)) - } catch { - onEvent({ - type: "sse.message", - properties: { - data: raw, - id: last || undefined, - retry, - }, - }) - } - }) - } - } finally { - signal.removeEventListener("abort", abort) - reader.releaseLock() - } -} diff --git a/packages/opencode/src/control-plane/util.ts b/packages/opencode/src/control-plane/util.ts index 023c2ae150..35bc87163b 100644 --- a/packages/opencode/src/control-plane/util.ts +++ b/packages/opencode/src/control-plane/util.ts @@ -1,22 +1,23 @@ import { GlobalBus, type GlobalEvent } from "@/bus/global" +import { Effect } from "effect" export function waitEvent(input: { timeout: number; signal?: AbortSignal; fn: (event: GlobalEvent) => boolean }) { - if (input.signal?.aborted) return Promise.reject(input.signal.reason ?? new Error("Request aborted")) + if (input.signal?.aborted) return Effect.fail(input.signal.reason ?? new Error("Request aborted")) - return new Promise((resolve, reject) => { + return Effect.callback((resume) => { const abort = () => { cleanup() - reject(input.signal?.reason ?? new Error("Request aborted")) + resume(Effect.fail(input.signal?.reason ?? new Error("Request aborted"))) } const handler = (event: GlobalEvent) => { try { if (!input.fn(event)) return cleanup() - resolve() + resume(Effect.void) } catch (error) { cleanup() - reject(error) + resume(Effect.fail(error)) } } @@ -28,10 +29,11 @@ export function waitEvent(input: { timeout: number; signal?: AbortSignal; fn: (e const timeout = setTimeout(() => { cleanup() - reject(new Error("Timed out waiting for global event")) + resume(Effect.fail(new Error("Timed out waiting for global event"))) }, input.timeout) GlobalBus.on("event", handler) input.signal?.addEventListener("abort", abort, { once: true }) + return Effect.sync(cleanup) }) } diff --git a/packages/opencode/src/control-plane/workspace.ts b/packages/opencode/src/control-plane/workspace.ts index c56ff26310..2d8c570441 100644 --- a/packages/opencode/src/control-plane/workspace.ts +++ b/packages/opencode/src/control-plane/workspace.ts @@ -1,5 +1,5 @@ -import { Schema } from "effect" -import { setTimeout as sleep } from "node:timers/promises" +import { Context, Effect, FiberMap, Layer, Schema, Stream } from "effect" +import { FetchHttpClient, HttpBody, HttpClient, HttpClientError, HttpClientRequest } from "effect/unstable/http" import { fn } from "@/util/fn" import { Database } from "@/storage/db" import { asc } from "drizzle-orm" @@ -20,12 +20,11 @@ import { WorkspaceTable } from "./workspace.sql" import { getAdaptor } from "./adaptors" import { type WorkspaceInfo, WorkspaceInfo as WorkspaceInfoSchema } from "./types" import { WorkspaceID } from "./schema" -import { parseSSE } from "./sse" import { Session } from "@/session/session" import { SessionTable } from "@/session/session.sql" import { SessionID } from "@/session/schema" import { errorData } from "@/util/error" -import { AppRuntime } from "@/effect/app-runtime" +import { makeRuntime } from "@/effect/run-service" import { waitEvent } from "./util" import { WorkspaceContext } from "./workspace-context" import { NonNegativeInt, withStatics } from "@/util/schema" @@ -76,6 +75,11 @@ function fromRow(row: typeof WorkspaceTable.$inferSelect): Info { } } +const db = (fn: (d: Parameters[0] extends (trx: infer D) => any ? D : never) => T) => + Effect.sync(() => Database.use(fn)) + +const log = Log.create({ service: "workspace-sync" }) + export const CreateInput = Schema.Struct({ id: Schema.optional(WorkspaceID), type: Info.fields.type, @@ -85,286 +89,739 @@ export const CreateInput = Schema.Struct({ }).pipe(withStatics((s) => ({ zod: effectZod(s), zodObject: zodObject(s) }))) export type CreateInput = Schema.Schema.Type -export const create = fn(CreateInput.zod, async (input) => { - const id = WorkspaceID.ascending(input.id) - const adaptor = await getAdaptor(input.projectID, input.type) - - const config = await adaptor.configure({ ...input, id, name: Slug.create(), directory: null }) - - const info: Info = { - id, - type: config.type, - branch: config.branch ?? null, - name: config.name ?? null, - directory: config.directory ?? null, - extra: config.extra ?? null, - projectID: input.projectID, - } - - Database.use((db) => { - db.insert(WorkspaceTable) - .values({ - id: info.id, - type: info.type, - branch: info.branch, - name: info.name, - directory: info.directory, - extra: info.extra, - project_id: info.projectID, - }) - .run() - }) - - const env = { - OPENCODE_AUTH_CONTENT: JSON.stringify(await AppRuntime.runPromise(Auth.Service.use((auth) => auth.all()))), - OPENCODE_WORKSPACE_ID: config.id, - OPENCODE_EXPERIMENTAL_WORKSPACES: "true", - OTEL_EXPORTER_OTLP_HEADERS: process.env.OTEL_EXPORTER_OTLP_HEADERS, - OTEL_EXPORTER_OTLP_ENDPOINT: process.env.OTEL_EXPORTER_OTLP_ENDPOINT, - OTEL_RESOURCE_ATTRIBUTES: process.env.OTEL_RESOURCE_ATTRIBUTES, - } - await adaptor.create(config, env) - - startSync(info) - - await waitEvent({ - timeout: TIMEOUT, - fn(event) { - if (event.workspace === info.id && event.payload.type === Event.Status.type) { - const { status } = event.payload.properties - return status === "error" || status === "connected" - } - return false - }, - }) - - return info -}) - export const SessionRestoreInput = Schema.Struct({ workspaceID: WorkspaceID, sessionID: SessionID, }).pipe(withStatics((s) => ({ zod: effectZod(s), zodObject: zodObject(s) }))) export type SessionRestoreInput = Schema.Schema.Type -export const sessionRestore = fn(SessionRestoreInput.zod, async (input) => { - log.info("session restore requested", { - workspaceID: input.workspaceID, - sessionID: input.sessionID, - }) - try { - const space = await get(input.workspaceID) - if (!space) throw new Error(`Workspace not found: ${input.workspaceID}`) +export class SyncHttpError extends Schema.TaggedErrorClass()("WorkspaceSyncHttpError", { + message: Schema.String, + status: Schema.Number, + body: Schema.optional(Schema.String), +}) {} - const adaptor = await getAdaptor(space.projectID, space.type) - const target = await adaptor.target(space) +export class WorkspaceNotFoundError extends Schema.TaggedErrorClass()("WorkspaceNotFoundError", { + message: Schema.String, + workspaceID: WorkspaceID, +}) {} - // Need to switch the workspace of the session - SyncEvent.run(Session.Event.Updated, { - sessionID: input.sessionID, - info: { - workspaceID: input.workspaceID, - }, - }) +export class SessionEventsNotFoundError extends Schema.TaggedErrorClass()( + "WorkspaceSessionEventsNotFoundError", + { + message: Schema.String, + sessionID: SessionID, + }, +) {} - const rows = Database.use((db) => - db - .select({ - id: EventTable.id, - aggregateID: EventTable.aggregate_id, - seq: EventTable.seq, - type: EventTable.type, - data: EventTable.data, - }) - .from(EventTable) - .where(eq(EventTable.aggregate_id, input.sessionID)) - .orderBy(asc(EventTable.seq)) - .all(), - ) - if (rows.length === 0) throw new Error(`No events found for session: ${input.sessionID}`) +export class SessionRestoreHttpError extends Schema.TaggedErrorClass()( + "WorkspaceSessionRestoreHttpError", + { + message: Schema.String, + workspaceID: WorkspaceID, + sessionID: SessionID, + status: Schema.Number, + body: Schema.String, + }, +) {} - const all = rows +export class SyncTimeoutError extends Schema.TaggedErrorClass()("WorkspaceSyncTimeoutError", { + message: Schema.String, + state: Schema.Record(Schema.String, Schema.Number), +}) {} + +export class SyncAbortedError extends Schema.TaggedErrorClass()("WorkspaceSyncAbortedError", { + message: Schema.String, + cause: Schema.optional(Schema.Defect), +}) {} + +type CreateError = Auth.AuthError +type SessionRestoreError = + | WorkspaceNotFoundError + | SessionEventsNotFoundError + | SessionRestoreHttpError + | HttpClientError.HttpClientError +type WaitForSyncError = SyncTimeoutError | SyncAbortedError +type SyncLoopError = SyncHttpError | HttpClientError.HttpClientError + +export interface Interface { + readonly create: (input: CreateInput) => Effect.Effect + readonly sessionRestore: (input: SessionRestoreInput) => Effect.Effect<{ total: number }, SessionRestoreError> + readonly list: (project: Project.Info) => Effect.Effect + readonly get: (id: WorkspaceID) => Effect.Effect + readonly remove: (id: WorkspaceID) => Effect.Effect + readonly status: () => Effect.Effect + readonly isSyncing: (workspaceID: WorkspaceID) => Effect.Effect + readonly waitForSync: ( + workspaceID: WorkspaceID, + state: Record, + signal?: AbortSignal, + ) => Effect.Effect + readonly startWorkspaceSyncing: (projectID: ProjectID) => Effect.Effect +} + +export class Service extends Context.Service()("@opencode/Workspace") {} + +export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const auth = yield* Auth.Service + const session = yield* Session.Service + const http = yield* HttpClient.HttpClient + const connections = new Map() + const syncFibers = yield* FiberMap.make() + + const setStatus = (id: WorkspaceID, status: ConnectionStatus["status"]) => { + const prev = connections.get(id) + if (prev?.status === status) return + const next = { workspaceID: id, status } + connections.set(id, next) - const size = 10 - const sets = Array.from({ length: Math.ceil(all.length / size) }, (_, i) => all.slice(i * size, (i + 1) * size)) - const total = sets.length - log.info("session restore prepared", { - workspaceID: input.workspaceID, - sessionID: input.sessionID, - workspaceType: space.type, - directory: space.directory, - target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory, - events: all.length, - batches: total, - first: all[0]?.seq, - last: all.at(-1)?.seq, - }) - GlobalBus.emit("event", { - directory: "global", - workspace: input.workspaceID, - payload: { - type: Event.Restore.type, - properties: { - workspaceID: input.workspaceID, - sessionID: input.sessionID, - total, - step: 0, - }, - }, - }) - for (const [i, events] of sets.entries()) { - log.info("session restore batch starting", { - workspaceID: input.workspaceID, - sessionID: input.sessionID, - step: i + 1, - total, - events: events.length, - first: events[0]?.seq, - last: events.at(-1)?.seq, - target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory, - }) - if (target.type === "local") { - SyncEvent.replayAll(events) - log.info("session restore batch replayed locally", { - workspaceID: input.workspaceID, - sessionID: input.sessionID, - step: i + 1, - total, - events: events.length, - }) - } else { - const url = route(target.url, "/sync/replay") - const headers = new Headers(target.headers) - headers.set("content-type", "application/json") - const res = await fetch(url, { - method: "POST", - headers, - body: JSON.stringify({ - directory: space.directory ?? "", - events, - }), - }) - if (!res.ok) { - const body = await res.text() - log.error("session restore batch failed", { - workspaceID: input.workspaceID, - sessionID: input.sessionID, - step: i + 1, - total, - status: res.status, - body, - }) - throw new Error( - `Failed to replay session ${input.sessionID} into workspace ${input.workspaceID}: HTTP ${res.status} ${body}`, - ) - } - log.info("session restore batch posted", { - workspaceID: input.workspaceID, - sessionID: input.sessionID, - step: i + 1, - total, - status: res.status, - }) - } GlobalBus.emit("event", { directory: "global", - workspace: input.workspaceID, + workspace: id, payload: { - type: Event.Restore.type, - properties: { - workspaceID: input.workspaceID, - sessionID: input.sessionID, - total, - step: i + 1, - }, + type: Event.Status.type, + properties: next, }, }) } - log.info("session restore complete", { - workspaceID: input.workspaceID, - sessionID: input.sessionID, - batches: total, + const connectSSE = Effect.fn("Workspace.connectSSE")(function* ( + url: URL | string, + headers: HeadersInit | undefined, + ) { + const response = yield* http.execute( + HttpClientRequest.get(route(url, "/global/event"), { + headers: new Headers(headers), + accept: "text/event-stream", + }), + ) + if (response.status < 200 || response.status >= 300) { + return yield* new SyncHttpError({ + message: `Workspace sync HTTP failure: ${response.status}`, + status: response.status, + }) + } + return response.stream }) - return { - total, - } - } catch (err) { - log.error("session restore failed", { - workspaceID: input.workspaceID, - sessionID: input.sessionID, - error: errorData(err), + const parseSSE = Effect.fn("Workspace.parseSSE")(function* ( + stream: Stream.Stream, + onEvent: (event: unknown) => Effect.Effect, + ) { + yield* stream.pipe( + Stream.decodeText(), + Stream.splitLines, + Stream.mapAccum( + () => ({ data: [] as string[], id: undefined as string | undefined, retry: 1000 }), + (state, line) => { + if (line === "") { + if (!state.data.length) return [state, []] + return [{ ...state, data: [] }, [{ data: state.data.join("\n"), id: state.id, retry: state.retry }]] + } + + const index = line.indexOf(":") + const field = index === -1 ? line : line.slice(0, index) + const value = index === -1 ? "" : line.slice(index + (line[index + 1] === " " ? 2 : 1)) + + if (field === "data") return [{ ...state, data: [...state.data, value] }, []] + if (field === "id") return [{ ...state, id: value }, []] + if (field === "retry") { + const retry = Number.parseInt(value, 10) + return [Number.isNaN(retry) ? state : { ...state, retry }, []] + } + return [state, []] + }, + { + onHalt: (state) => + state.data.length ? [{ data: state.data.join("\n"), id: state.id, retry: state.retry }] : [], + }, + ), + Stream.map((event) => { + try { + return JSON.parse(event.data) as unknown + } catch { + return { + type: "sse.message", + properties: { + data: event.data, + id: event.id || undefined, + retry: event.retry, + }, + } + } + }), + Stream.runForEach(onEvent), + ) }) - throw err - } -}) -export function list(project: Project.Info) { - const rows = Database.use((db) => - db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(), - ) - const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id)) - return spaces -} + const syncHistory = Effect.fn("Workspace.syncHistory")(function* ( + space: Info, + url: URL | string, + headers: HeadersInit | undefined, + ) { + const sessionIDs = yield* db((db) => + db + .select({ id: SessionTable.id }) + .from(SessionTable) + .where(eq(SessionTable.workspace_id, space.id)) + .all() + .map((row) => row.id), + ) + const state = sessionIDs.length + ? Object.fromEntries( + (yield* db((db) => + db.select().from(EventSequenceTable).where(inArray(EventSequenceTable.aggregate_id, sessionIDs)).all(), + )).map((row) => [row.aggregate_id, row.seq]), + ) + : {} -export const get = fn(WorkspaceID.zod, async (id) => { - const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get()) - if (!row) return - return fromRow(row) -}) + log.info("syncing workspace history", { + workspaceID: space.id, + sessions: sessionIDs.length, + known: Object.keys(state).length, + }) -export const remove = fn(WorkspaceID.zod, async (id) => { - const sessions = Database.use((db) => - db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.workspace_id, id)).all(), - ) - for (const session of sessions) { - await AppRuntime.runPromise(Session.Service.use((svc) => svc.remove(session.id))) - } + const response = yield* http.execute( + HttpClientRequest.post(route(url, "/sync/history"), { + headers: new Headers(headers), + body: HttpBody.jsonUnsafe(state), + }), + ) - const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get()) + if (response.status < 200 || response.status >= 300) { + const body = yield* response.text + return yield* new SyncHttpError({ + message: `Workspace history HTTP failure: ${response.status} ${body}`, + status: response.status, + body, + }) + } - if (row) { - stopSync(id) + const events = (yield* response.json) as HistoryEvent[] - const info = fromRow(row) - try { - const adaptor = await getAdaptor(info.projectID, row.type) - await adaptor.remove(info) - } catch { - log.error("adaptor not available when removing workspace", { type: row.type }) - } - Database.use((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run()) - return info - } -}) + log.info("workspace history synced", { + workspaceID: space.id, + events: events.length, + }) + + yield* Effect.sync(() => + WorkspaceContext.provide({ + workspaceID: space.id, + fn: () => { + for (const event of events) { + SyncEvent.replay( + { + id: event.id, + aggregateID: event.aggregate_id, + seq: event.seq, + type: event.type, + data: event.data, + }, + { publish: true }, + ) + } + }, + }), + ) + }) + + const syncWorkspaceLoop = Effect.fn("Workspace.syncWorkspaceLoop")(function* (space: Info) { + const adaptor = getAdaptor(space.projectID, space.type) + const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(space))) + + if (target.type === "local") return + + let attempt = 0 + + while (true) { + log.info("connecting to global sync", { workspace: space.name }) + setStatus(space.id, "connecting") + + const stream = yield* connectSSE(target.url, target.headers).pipe( + Effect.tap(() => syncHistory(space, target.url, target.headers)), + Effect.catch((err) => + Effect.sync(() => { + setStatus(space.id, "error") + log.info("failed to connect to global sync", { + workspace: space.name, + err, + }) + return null + }), + ), + ) + + if (stream) { + attempt = 0 + + log.info("global sync connected", { workspace: space.name }) + setStatus(space.id, "connected") + + yield* parseSSE(stream, (evt) => + Effect.sync(() => { + try { + if (!evt || typeof evt !== "object" || !("payload" in evt)) return + const payload = evt.payload as { type?: string; syncEvent?: SyncEvent.SerializedEvent } + if (payload.type === "server.heartbeat") return + + if (payload.type === "sync" && payload.syncEvent) { + SyncEvent.replay(payload.syncEvent) + } + + const event = evt as { directory?: string; project?: string; payload: unknown } + GlobalBus.emit("event", { + directory: event.directory, + project: event.project, + workspace: space.id, + payload: event.payload, + }) + } catch (err) { + log.info("failed to replay global event", { + workspaceID: space.id, + error: err, + }) + } + }), + ) + + log.info("disconnected from global sync: " + space.id) + setStatus(space.id, "disconnected") + } + + // Back off reconnect attempts up to 2 minutes while the workspace + // stays unavailable. + yield* Effect.sleep(`${Math.min(120_000, 1_000 * 2 ** attempt)} millis`) + attempt += 1 + } + }) + + const startSync = Effect.fn("Workspace.startSync")(function* (space: Info) { + if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) return + + const adaptor = getAdaptor(space.projectID, space.type) + const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(space))) + + if (target.type === "local") { + setStatus(space.id, (yield* Effect.promise(() => Filesystem.exists(target.directory))) ? "connected" : "error") + return + } + + const exists = yield* FiberMap.has(syncFibers, space.id) + if (exists && connections.get(space.id)?.status !== "error") return + + setStatus(space.id, "disconnected") + + yield* FiberMap.run( + syncFibers, + space.id, + // TODO: look into `tapError` to set the status but still + // allow the fiber to fail and automatically get removed + syncWorkspaceLoop(space).pipe( + Effect.catch((error) => + Effect.sync(() => { + setStatus(space.id, "error") + log.warn("workspace listener failed", { + workspaceID: space.id, + error, + }) + }), + ), + ), + ) + }) + + const stopSync = Effect.fn("Workspace.stopSync")(function* (id: WorkspaceID) { + yield* FiberMap.remove(syncFibers, id) + connections.delete(id) + }) + + const create = Effect.fn("Workspace.create")(function* (input: CreateInput) { + const id = WorkspaceID.ascending(input.id) + const adaptor = getAdaptor(input.projectID, input.type) + const config = yield* Effect.promise(() => + Promise.resolve(adaptor.configure({ ...input, id, name: Slug.create(), directory: null })), + ) + + const info: Info = { + id, + type: config.type, + branch: config.branch ?? null, + name: config.name ?? null, + directory: config.directory ?? null, + extra: config.extra ?? null, + projectID: input.projectID, + } + + yield* db((db) => { + db.insert(WorkspaceTable) + .values({ + id: info.id, + type: info.type, + branch: info.branch, + name: info.name, + directory: info.directory, + extra: info.extra, + project_id: info.projectID, + }) + .run() + }) + + const env = { + OPENCODE_AUTH_CONTENT: JSON.stringify(yield* auth.all()), + OPENCODE_WORKSPACE_ID: config.id, + OPENCODE_EXPERIMENTAL_WORKSPACES: "true", + OTEL_EXPORTER_OTLP_HEADERS: process.env.OTEL_EXPORTER_OTLP_HEADERS, + OTEL_EXPORTER_OTLP_ENDPOINT: process.env.OTEL_EXPORTER_OTLP_ENDPOINT, + OTEL_RESOURCE_ATTRIBUTES: process.env.OTEL_RESOURCE_ATTRIBUTES, + } + + yield* Effect.promise(() => adaptor.create(config, env)) + yield* Effect.all( + [ + waitEvent({ + timeout: TIMEOUT, + fn(event) { + if (event.workspace === info.id && event.payload.type === Event.Status.type) { + const { status } = event.payload.properties + return status === "error" || status === "connected" + } + return false + }, + }), + startSync(info), + ], + { concurrency: 2, discard: true }, + ) + + return info + }) + + const sessionRestore = Effect.fn("Workspace.sessionRestore")(function* (input: SessionRestoreInput) { + return yield* Effect.gen(function* () { + log.info("session restore requested", { + workspaceID: input.workspaceID, + sessionID: input.sessionID, + }) + + const space = yield* get(input.workspaceID) + if (!space) + return yield* new WorkspaceNotFoundError({ + message: `Workspace not found: ${input.workspaceID}`, + workspaceID: input.workspaceID, + }) + + const adaptor = getAdaptor(space.projectID, space.type) + const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(space))) + + yield* Effect.sync(() => + SyncEvent.run(Session.Event.Updated, { + sessionID: input.sessionID, + info: { + workspaceID: input.workspaceID, + }, + }), + ) + + const rows = yield* db((db) => + db + .select({ + id: EventTable.id, + aggregateID: EventTable.aggregate_id, + seq: EventTable.seq, + type: EventTable.type, + data: EventTable.data, + }) + .from(EventTable) + .where(eq(EventTable.aggregate_id, input.sessionID)) + .orderBy(asc(EventTable.seq)) + .all(), + ) + if (rows.length === 0) + return yield* new SessionEventsNotFoundError({ + message: `No events found for session: ${input.sessionID}`, + sessionID: input.sessionID, + }) + + const size = 10 + // TODO: look into using effect APIs to process this in chunks + const sets = Array.from({ length: Math.ceil(rows.length / size) }, (_, i) => + rows.slice(i * size, (i + 1) * size), + ) + const total = sets.length + + log.info("session restore prepared", { + workspaceID: input.workspaceID, + sessionID: input.sessionID, + workspaceType: space.type, + directory: space.directory, + target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory, + events: rows.length, + batches: total, + first: rows[0]?.seq, + last: rows.at(-1)?.seq, + }) + + yield* Effect.sync(() => + GlobalBus.emit("event", { + directory: "global", + workspace: input.workspaceID, + payload: { + type: Event.Restore.type, + properties: { + workspaceID: input.workspaceID, + sessionID: input.sessionID, + total, + step: 0, + }, + }, + }), + ) + + for (const [i, events] of sets.entries()) { + log.info("session restore batch starting", { + workspaceID: input.workspaceID, + sessionID: input.sessionID, + step: i + 1, + total, + events: events.length, + first: events[0]?.seq, + last: events.at(-1)?.seq, + target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory, + }) + + if (target.type === "local") { + SyncEvent.replayAll(events) + log.info("session restore batch replayed locally", { + workspaceID: input.workspaceID, + sessionID: input.sessionID, + step: i + 1, + total, + events: events.length, + }) + } else { + const url = route(target.url, "/sync/replay") + const res = yield* http.execute( + HttpClientRequest.post(url, { + headers: new Headers(target.headers), + body: HttpBody.jsonUnsafe({ + directory: space.directory ?? "", + events, + }), + }), + ) + + if (res.status < 200 || res.status >= 300) { + const body = yield* res.text + log.error("session restore batch failed", { + workspaceID: input.workspaceID, + sessionID: input.sessionID, + step: i + 1, + total, + status: res.status, + body, + }) + return yield* new SessionRestoreHttpError({ + message: `Failed to replay session ${input.sessionID} into workspace ${input.workspaceID}: HTTP ${res.status} ${body}`, + workspaceID: input.workspaceID, + sessionID: input.sessionID, + status: res.status, + body, + }) + } + + log.info("session restore batch posted", { + workspaceID: input.workspaceID, + sessionID: input.sessionID, + step: i + 1, + total, + status: res.status, + }) + } + + yield* Effect.sync(() => + GlobalBus.emit("event", { + directory: "global", + workspace: input.workspaceID, + payload: { + type: Event.Restore.type, + properties: { + workspaceID: input.workspaceID, + sessionID: input.sessionID, + total, + step: i + 1, + }, + }, + }), + ) + } + + log.info("session restore complete", { + workspaceID: input.workspaceID, + sessionID: input.sessionID, + batches: total, + }) + + return { total } + }).pipe( + Effect.tapError((err) => + Effect.sync(() => + log.error("session restore failed", { + workspaceID: input.workspaceID, + sessionID: input.sessionID, + error: errorData(err), + }), + ), + ), + ) + }) + + const list = Effect.fn("Workspace.list")(function* (project: Project.Info) { + return yield* db((db) => + db + .select() + .from(WorkspaceTable) + .where(eq(WorkspaceTable.project_id, project.id)) + .all() + .map(fromRow) + .sort((a, b) => a.id.localeCompare(b.id)), + ) + }) + + const get = Effect.fn("Workspace.get")(function* (id: WorkspaceID) { + const row = yield* db((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get()) + if (!row) return + return fromRow(row) + }) + + const remove = Effect.fn("Workspace.remove")(function* (id: WorkspaceID) { + const sessions = yield* db((db) => + db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.workspace_id, id)).all(), + ) + yield* Effect.forEach(sessions, (sessionInfo) => session.remove(sessionInfo.id), { discard: true }) + + const row = yield* db((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get()) + if (!row) return + + yield* stopSync(id) + + const info = fromRow(row) + yield* Effect.catch( + Effect.gen(function* () { + const adaptor = getAdaptor(info.projectID, row.type) + yield* Effect.tryPromise(() => Promise.resolve(adaptor.remove(info))) + }), + () => + Effect.sync(() => { + log.error("adaptor not available when removing workspace", { type: row.type }) + }), + ) + + yield* db((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run()) + return info + }) + + const status = Effect.fn("Workspace.status")(function* () { + return [...connections.values()] + }) + + const isSyncing = Effect.fn("Workspace.isSyncing")(function* (workspaceID: WorkspaceID) { + const exists = yield* FiberMap.has(syncFibers, workspaceID) + return exists && connections.get(workspaceID)?.status !== "error" + }) + + const waitForSync = Effect.fn("Workspace.waitForSync")(function* ( + workspaceID: WorkspaceID, + state: Record, + signal?: AbortSignal, + ) { + if (synced(state)) return + + yield* Effect.catch( + waitEvent({ + timeout: TIMEOUT, + signal, + fn(event) { + if (event.workspace !== workspaceID && event.payload.type !== "sync") { + return false + } + return synced(state) + }, + }), + (): Effect.Effect => + signal?.aborted + ? Effect.fail( + new SyncAbortedError({ + message: signal.reason instanceof Error ? signal.reason.message : "Request aborted", + cause: signal.reason, + }), + ) + : Effect.fail( + new SyncTimeoutError({ + message: `Timed out waiting for sync fence: ${JSON.stringify(state)}`, + state, + }), + ), + ) + }) + + const startWorkspaceSyncing = Effect.fn("Workspace.startWorkspaceSyncing")(function* (projectID: ProjectID) { + // This session table join makes this query only return + // workspaces that have sessions + const rows = yield* db((db) => + db + .selectDistinct({ workspace: WorkspaceTable }) + .from(WorkspaceTable) + .innerJoin(SessionTable, eq(SessionTable.workspace_id, WorkspaceTable.id)) + .where(eq(WorkspaceTable.project_id, projectID)) + .all(), + ) + + for (const { workspace } of rows) { + yield* startSync(fromRow(workspace)).pipe( + Effect.catch((error) => + Effect.sync(() => { + setStatus(workspace.id, "error") + log.warn("workspace sync failed to start", { + workspaceID: workspace.id, + error, + }) + }), + ), + Effect.forkDetach, + ) + } + }) + + return Service.of({ + create, + sessionRestore, + list, + get, + remove, + status, + isSyncing, + waitForSync, + startWorkspaceSyncing, + }) + }), +) + +export const defaultLayer = layer.pipe( + Layer.provide(Auth.defaultLayer), + Layer.provide(Session.defaultLayer), + Layer.provide(FetchHttpClient.layer), +) -const connections = new Map() -const aborts = new Map() const TIMEOUT = 5000 -function setStatus(id: WorkspaceID, status: ConnectionStatus["status"]) { - const prev = connections.get(id) - if (prev?.status === status) return - const next = { workspaceID: id, status } - connections.set(id, next) - - if (status === "error") { - aborts.delete(id) - } - - GlobalBus.emit("event", { - directory: "global", - workspace: id, - payload: { - type: Event.Status.type, - properties: next, - }, - }) -} - -export function status(): ConnectionStatus[] { - return [...connections.values()] +type HistoryEvent = { + id: string + aggregate_id: string + seq: number + type: string + data: Record } function synced(state: Record) { @@ -389,32 +846,6 @@ function synced(state: Record) { }) } -export async function isSyncing(workspaceID: WorkspaceID) { - return aborts.has(workspaceID) -} - -export async function waitForSync(workspaceID: WorkspaceID, state: Record, signal?: AbortSignal) { - if (synced(state)) return - - try { - await waitEvent({ - timeout: TIMEOUT, - signal, - fn(event) { - if (event.workspace !== workspaceID && event.payload.type !== "sync") { - return false - } - return synced(state) - }, - }) - } catch { - if (signal?.aborted) throw signal.reason ?? new Error("Request aborted") - throw new Error(`Timed out waiting for sync fence: ${JSON.stringify(state)}`) - } -} - -const log = Log.create({ service: "workspace-sync" }) - function route(url: string | URL, path: string) { const next = new URL(url) next.pathname = `${next.pathname.replace(/\/$/, "")}${path}` @@ -423,198 +854,42 @@ function route(url: string | URL, path: string) { return next } -async function connectSSE(url: URL | string, headers: HeadersInit | undefined, signal: AbortSignal) { - const res = await fetch(route(url, "/global/event"), { - method: "GET", - headers, - signal, - }) +const { runPromise, runSync } = makeRuntime(Service, defaultLayer) - if (!res.ok) throw new Error(`Workspace sync HTTP failure: ${res.status}`) - if (!res.body) throw new Error("No response body from global sync") +export const create = fn(CreateInput.zod, (input) => runPromise((svc) => svc.create(input))) - return res.body -} +export const sessionRestore = fn(SessionRestoreInput.zod, (input) => runPromise((svc) => svc.sessionRestore(input))) -async function syncHistory(space: Info, url: URL | string, headers: HeadersInit | undefined, signal: AbortSignal) { - const sessionIDs = Database.use((db) => +export function list(project: Project.Info) { + return Database.use((db) => db - .select({ id: SessionTable.id }) - .from(SessionTable) - .where(eq(SessionTable.workspace_id, space.id)) + .select() + .from(WorkspaceTable) + .where(eq(WorkspaceTable.project_id, project.id)) .all() - .map((row) => row.id), + .map(fromRow) + .sort((a, b) => a.id.localeCompare(b.id)), ) - const state = sessionIDs.length - ? Object.fromEntries( - Database.use((db) => - db.select().from(EventSequenceTable).where(inArray(EventSequenceTable.aggregate_id, sessionIDs)).all(), - ).map((row) => [row.aggregate_id, row.seq]), - ) - : {} - - log.info("syncing workspace history", { - workspaceID: space.id, - sessions: sessionIDs.length, - known: Object.keys(state).length, - }) - - const requestHeaders = new Headers(headers) - requestHeaders.set("content-type", "application/json") - - const res = await fetch(route(url, "/sync/history"), { - method: "POST", - headers: requestHeaders, - body: JSON.stringify(state), - signal, - }) - - if (!res.ok) { - const body = await res.text() - throw new Error(`Workspace history HTTP failure: ${res.status} ${body}`) - } - - const events = await res.json() - - return WorkspaceContext.provide({ - workspaceID: space.id, - fn: () => { - for (const event of events) { - SyncEvent.replay( - { - id: event.id, - aggregateID: event.aggregate_id, - seq: event.seq, - type: event.type, - data: event.data, - }, - { publish: true }, - ) - } - }, - }) - - log.info("workspace history synced", { - workspaceID: space.id, - events: events.length, - }) } -async function syncWorkspaceLoop(space: Info, signal: AbortSignal) { - const adaptor = await getAdaptor(space.projectID, space.type) - const target = await adaptor.target(space) +export const get = fn(WorkspaceID.zod, (id) => runPromise((svc) => svc.get(id))) - if (target.type === "local") return null +export const remove = fn(WorkspaceID.zod, (id) => runPromise((svc) => svc.remove(id))) - let attempt = 0 - - while (!signal.aborted) { - log.info("connecting to global sync", { workspace: space.name }) - setStatus(space.id, "connecting") - - let stream - try { - stream = await connectSSE(target.url, target.headers, signal) - await syncHistory(space, target.url, target.headers, signal) - } catch (err) { - stream = null - setStatus(space.id, "error") - log.info("failed to connect to global sync", { - workspace: space.name, - err, - }) - } - - if (stream) { - attempt = 0 - - log.info("global sync connected", { workspace: space.name }) - setStatus(space.id, "connected") - - await parseSSE(stream, signal, (evt: any) => { - try { - if (!("payload" in evt)) return - if (evt.payload.type === "server.heartbeat") return - - if (evt.payload.type === "sync") { - SyncEvent.replay(evt.payload.syncEvent as SyncEvent.SerializedEvent) - } - - GlobalBus.emit("event", { - directory: evt.directory, - project: evt.project, - workspace: space.id, - payload: evt.payload, - }) - } catch (err) { - log.info("failed to replay global event", { - workspaceID: space.id, - error: err, - }) - } - }) - - log.info("disconnected from global sync: " + space.id) - setStatus(space.id, "disconnected") - } - - // Back off reconnect attempts up to 2 minutes while the workspace - // stays unavailable. - await sleep(Math.min(120_000, 1_000 * 2 ** attempt)) - attempt += 1 - } +export function status() { + return runSync((svc) => svc.status()) } -async function startSync(space: Info) { - if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) return - - const adaptor = await getAdaptor(space.projectID, space.type) - const target = await adaptor.target(space) - - if (target.type === "local") { - void Filesystem.exists(target.directory).then((exists) => { - setStatus(space.id, exists ? "connected" : "error") - }) - return - } - - if (aborts.has(space.id)) return true - - setStatus(space.id, "disconnected") - - const abort = new AbortController() - aborts.set(space.id, abort) - - void syncWorkspaceLoop(space, abort.signal).catch((error) => { - aborts.delete(space.id) - - setStatus(space.id, "error") - log.warn("workspace listener failed", { - workspaceID: space.id, - error, - }) - }) +export function isSyncing(workspaceID: WorkspaceID) { + return runSync((svc) => svc.isSyncing(workspaceID)) } -function stopSync(id: WorkspaceID) { - aborts.get(id)?.abort() - aborts.delete(id) - connections.delete(id) +export function waitForSync(workspaceID: WorkspaceID, state: Record, signal?: AbortSignal) { + return runPromise((svc) => svc.waitForSync(workspaceID, state, signal)) } export function startWorkspaceSyncing(projectID: ProjectID) { - const spaces = Database.use((db) => - db - .select({ workspace: WorkspaceTable }) - .from(WorkspaceTable) - .innerJoin(SessionTable, eq(SessionTable.workspace_id, WorkspaceTable.id)) - .where(eq(WorkspaceTable.project_id, projectID)) - .all(), - ) - - for (const row of new Map(spaces.map((row) => [row.workspace.id, row.workspace])).values()) { - void startSync(fromRow(row)) - } + void runPromise((svc) => svc.startWorkspaceSyncing(projectID)) } export * as Workspace from "./workspace" diff --git a/packages/opencode/src/effect/app-runtime.ts b/packages/opencode/src/effect/app-runtime.ts index fdd3053622..84be170688 100644 --- a/packages/opencode/src/effect/app-runtime.ts +++ b/packages/opencode/src/effect/app-runtime.ts @@ -41,6 +41,7 @@ import { ToolRegistry } from "@/tool/registry" import { Format } from "@/format" import { Project } from "@/project/project" import { Vcs } from "@/project/vcs" +import { Workspace } from "@/control-plane/workspace" import { Worktree } from "@/worktree" import { Pty } from "@/pty" import { Installation } from "@/installation" @@ -90,6 +91,7 @@ export const AppLayer = Layer.mergeAll( Format.defaultLayer, Project.defaultLayer, Vcs.defaultLayer, + Workspace.defaultLayer, Worktree.defaultLayer, Pty.defaultLayer, Installation.defaultLayer, diff --git a/packages/opencode/src/server/routes/instance/httpapi/middleware/workspace-routing.ts b/packages/opencode/src/server/routes/instance/httpapi/middleware/workspace-routing.ts index 9318dbfe5a..68dc0b9d7f 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/middleware/workspace-routing.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/middleware/workspace-routing.ts @@ -89,7 +89,7 @@ function missingWorkspaceResponse(id: WorkspaceID): HttpServerResponse.HttpServe function resolveTarget(workspace: Workspace.Info): Effect.Effect { return Effect.gen(function* () { - const adaptor = yield* Effect.promise(() => getAdaptor(workspace.projectID, workspace.type)) + const adaptor = yield* Effect.sync(() => getAdaptor(workspace.projectID, workspace.type)) return yield* Effect.promise(() => Promise.resolve(adaptor.target(workspace))) }) } @@ -101,7 +101,7 @@ function proxyRemote( url: URL, ): Effect.Effect { return Effect.gen(function* () { - const syncing = yield* Effect.promise(() => Workspace.isSyncing(workspace.id)) + const syncing = yield* Effect.sync(() => Workspace.isSyncing(workspace.id)) if (!syncing) { return HttpServerResponse.text(`broken sync connection for workspace: ${workspace.id}`, { status: 503, diff --git a/packages/opencode/test/control-plane/sse.test.ts b/packages/opencode/test/control-plane/sse.test.ts deleted file mode 100644 index 78a8341c0e..0000000000 --- a/packages/opencode/test/control-plane/sse.test.ts +++ /dev/null @@ -1,56 +0,0 @@ -import { afterEach, describe, expect, test } from "bun:test" -import { parseSSE } from "../../src/control-plane/sse" -import { resetDatabase } from "../fixture/db" - -afterEach(async () => { - await resetDatabase() -}) - -function stream(chunks: string[]) { - return new ReadableStream({ - start(controller) { - const encoder = new TextEncoder() - chunks.forEach((chunk) => controller.enqueue(encoder.encode(chunk))) - controller.close() - }, - }) -} - -describe("control-plane/sse", () => { - test("parses JSON events with CRLF and multiline data blocks", async () => { - const events: unknown[] = [] - const stop = new AbortController() - - await parseSSE( - stream([ - 'data: {"type":"one","properties":{"ok":true}}\r\n\r\n', - 'data: {"type":"two",\r\ndata: "properties":{"n":2}}\r\n\r\n', - ]), - stop.signal, - (event) => events.push(event), - ) - - expect(events).toEqual([ - { type: "one", properties: { ok: true } }, - { type: "two", properties: { n: 2 } }, - ]) - }) - - test("falls back to sse.message for non-json payload", async () => { - const events: unknown[] = [] - const stop = new AbortController() - - await parseSSE(stream(["id: abc\nretry: 1500\ndata: hello world\n\n"]), stop.signal, (event) => events.push(event)) - - expect(events).toEqual([ - { - type: "sse.message", - properties: { - data: "hello world", - id: "abc", - retry: 1500, - }, - }, - ]) - }) -}) diff --git a/packages/opencode/test/control-plane/workspace.test.ts b/packages/opencode/test/control-plane/workspace.test.ts new file mode 100644 index 0000000000..c94d3f9a32 --- /dev/null +++ b/packages/opencode/test/control-plane/workspace.test.ts @@ -0,0 +1,1391 @@ +import { afterEach, beforeEach, describe, expect, mock, spyOn, test } from "bun:test" +import fs from "node:fs/promises" +import Http from "node:http" +import path from "node:path" +import { setTimeout as delay } from "node:timers/promises" +import { NodeHttpServer } from "@effect/platform-node" +import { Effect, Layer } from "effect" +import { HttpServer, HttpServerRequest, HttpServerResponse } from "effect/unstable/http" +import { asc, eq } from "drizzle-orm" +import * as Log from "@opencode-ai/core/util/log" +import { Flag } from "@opencode-ai/core/flag/flag" +import { GlobalBus, type GlobalEvent } from "@/bus/global" +import { Database } from "@/storage/db" +import { ProjectID } from "@/project/schema" +import { ProjectTable } from "@/project/project.sql" +import { Instance } from "@/project/instance" +import { Session as SessionNs } from "@/session/session" +import { SessionID, MessageID, PartID } from "@/session/schema" +import { SessionTable } from "@/session/session.sql" +import { ModelID, ProviderID } from "@/provider/schema" +import { SyncEvent } from "@/sync" +import { EventSequenceTable, EventTable } from "@/sync/event.sql" +import { resetDatabase } from "../fixture/db" +import { provideTmpdirInstance, tmpdir } from "../fixture/fixture" +import { testEffect } from "../lib/effect" +import { registerAdaptor } from "../../src/control-plane/adaptors" +import { WorkspaceID } from "../../src/control-plane/schema" +import { WorkspaceTable } from "../../src/control-plane/workspace.sql" +import type { Target, WorkspaceAdaptor, WorkspaceInfo } from "../../src/control-plane/types" +import * as WorkspaceOld from "../../src/control-plane/workspace" +import { AppRuntime } from "@/effect/app-runtime" + +void Log.init({ print: false }) + +const testServerLayer = Layer.mergeAll( + NodeHttpServer.layer(Http.createServer, { host: "127.0.0.1", port: 0 }), + WorkspaceOld.defaultLayer, + SessionNs.defaultLayer, +) +const it = testEffect(testServerLayer) + +const originalWorkspacesFlag = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES +const originalEnv = { + OPENCODE_AUTH_CONTENT: process.env.OPENCODE_AUTH_CONTENT, + OTEL_EXPORTER_OTLP_HEADERS: process.env.OTEL_EXPORTER_OTLP_HEADERS, + OTEL_EXPORTER_OTLP_ENDPOINT: process.env.OTEL_EXPORTER_OTLP_ENDPOINT, + OTEL_RESOURCE_ATTRIBUTES: process.env.OTEL_RESOURCE_ATTRIBUTES, +} + +type RecordedCreate = { + info: WorkspaceInfo + env: Record + from?: WorkspaceInfo +} + +type RecordedAdaptor = { + adaptor: WorkspaceAdaptor + calls: { + configure: WorkspaceInfo[] + create: RecordedCreate[] + remove: WorkspaceInfo[] + target: WorkspaceInfo[] + } +} + +type FetchCall = { + url: URL + method: string + headers: Headers + bodyText?: string + json?: unknown +} + +function unique(prefix: string) { + return `${prefix}-${Math.random().toString(36).slice(2)}` +} + +function restoreEnv() { + Object.entries(originalEnv).forEach(([key, value]) => { + if (value === undefined) { + delete process.env[key] + return + } + process.env[key] = value + }) +} + +beforeEach(() => { + Database.close() + Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true + restoreEnv() +}) + +afterEach(async () => { + mock.restore() + await Instance.disposeAll() + Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = originalWorkspacesFlag + restoreEnv() + await resetDatabase() +}) + +async function withInstance(fn: (dir: string) => T | Promise) { + await using tmp = await tmpdir({ git: true }) + return Instance.provide({ + directory: tmp.path, + fn: () => fn(tmp.path), + }) +} + +function captureGlobalEvents() { + const events: GlobalEvent[] = [] + const handler = (event: GlobalEvent) => events.push(event) + GlobalBus.on("event", handler) + return { + events, + dispose() { + GlobalBus.off("event", handler) + }, + } +} + +async function eventually(fn: () => T | Promise, timeout = 1500) { + const started = Date.now() + let last: unknown + while (Date.now() - started < timeout) { + try { + return await fn() + } catch (err) { + last = err + await delay(10) + } + } + throw last ?? new Error("Timed out waiting for condition") +} + +function eventuallyEffect(effect: Effect.Effect, timeout = 1500) { + return Effect.gen(function* () { + const started = Date.now() + let last: unknown + while (Date.now() - started < timeout) { + const exit = yield* Effect.exit(effect) + if (exit._tag === "Success") return + last = exit.cause + yield* Effect.sleep("10 millis") + } + throw last ?? new Error("Timed out waiting for condition") + }) +} + +function recordedAdaptor(input: { + target: (info: WorkspaceInfo) => Target | Promise + configure?: (info: WorkspaceInfo) => WorkspaceInfo | Promise + create?: (info: WorkspaceInfo, env: Record, from?: WorkspaceInfo) => Promise + remove?: (info: WorkspaceInfo) => Promise +}): RecordedAdaptor { + const calls: RecordedAdaptor["calls"] = { + configure: [], + create: [], + remove: [], + target: [], + } + + return { + calls, + adaptor: { + name: "recorded", + description: "recorded", + configure(info) { + calls.configure.push(structuredClone(info)) + return input.configure?.(info) ?? info + }, + async create(info, env, from) { + calls.create.push({ info: structuredClone(info), env: { ...env }, from: from ? structuredClone(from) : undefined }) + await input.create?.(info, env, from) + }, + async remove(info) { + calls.remove.push(structuredClone(info)) + await input.remove?.(info) + }, + target(info) { + calls.target.push(structuredClone(info)) + return input.target(info) + }, + }, + } +} + +function localAdaptor(dir: string, input?: { createDir?: boolean; remove?: (info: WorkspaceInfo) => Promise }) { + return recordedAdaptor({ + configure(info) { + return { ...info, directory: dir } + }, + async create() { + if (input?.createDir === false) return + await fs.mkdir(dir, { recursive: true }) + }, + remove: input?.remove, + target() { + return { type: "local", directory: dir } + }, + }) +} + +function remoteAdaptor(url: string, input?: { directory?: string | null; headers?: HeadersInit }) { + return recordedAdaptor({ + configure(info) { + return { ...info, directory: input?.directory ?? info.directory } + }, + target() { + return { type: "remote", url, headers: input?.headers } + }, + }) +} + +function eventStreamResponse(events: unknown[] = [], keepOpen = true) { + const encoder = new TextEncoder() + return new Response( + new ReadableStream({ + start(controller) { + if (keepOpen) controller.enqueue(encoder.encode(":\n\n")) + events.forEach((event) => controller.enqueue(encoder.encode(`data: ${JSON.stringify(event)}\n\n`))) + if (!keepOpen) controller.close() + }, + }), + { status: 200, headers: { "content-type": "text/event-stream" } }, + ) +} + +function serverUrl() { + return Effect.gen(function* () { + return HttpServer.formatAddress((yield* HttpServer.HttpServer).address) + }) +} + +function workspaceInfo(projectID: ProjectID, type: string, input?: Partial): WorkspaceInfo { + return { + id: input?.id ?? WorkspaceID.ascending(), + type, + name: input?.name ?? unique("workspace"), + branch: input?.branch ?? null, + directory: input?.directory ?? null, + extra: input?.extra ?? null, + projectID, + } +} + +function insertWorkspace(info: WorkspaceInfo) { + Database.use((db) => + db + .insert(WorkspaceTable) + .values({ + id: info.id, + type: info.type, + branch: info.branch, + name: info.name, + directory: info.directory, + extra: info.extra, + project_id: info.projectID, + }) + .run(), + ) +} + +function insertProject(id: ProjectID, worktree: string) { + Database.use((db) => + db + .insert(ProjectTable) + .values({ + id, + worktree, + vcs: null, + name: null, + time_created: Date.now(), + time_updated: Date.now(), + sandboxes: [], + }) + .run(), + ) +} + +function attachSessionToWorkspace(sessionID: SessionID, workspaceID: WorkspaceID) { + Database.use((db) => db.update(SessionTable).set({ workspace_id: workspaceID }).where(eq(SessionTable.id, sessionID)).run()) +} + +function sessionSequence(sessionID: SessionID) { + return Database.use((db) => + db.select({ seq: EventSequenceTable.seq }).from(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, sessionID)).get(), + )?.seq +} + +function eventRows(sessionID: SessionID) { + return Database.use((db) => + db + .select({ seq: EventTable.seq, type: EventTable.type, data: EventTable.data }) + .from(EventTable) + .where(eq(EventTable.aggregate_id, sessionID)) + .orderBy(asc(EventTable.seq)) + .all(), + ) +} + +function sessionUpdatedType() { + return SyncEvent.versionedType(SessionNs.Event.Updated.type, SessionNs.Event.Updated.version) +} + +function replaceSessionEvents(sessionID: SessionID, count: number) { + Database.use((db) => { + db.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, sessionID)).run() + if (count === 0) return + + db.insert(EventSequenceTable).values({ aggregate_id: sessionID, seq: count - 1 }).run() + db.insert(EventTable) + .values( + Array.from({ length: count }, (_, i) => ({ + id: `evt_${unique(`manual-${i}`)}`, + aggregate_id: sessionID, + seq: i, + type: sessionUpdatedType(), + data: { sessionID, info: { title: `manual ${i}` } }, + })), + ) + .run() + }) +} + +describe("workspace-old schemas and exports", () => { + test("keeps the historical event type names", () => { + expect(WorkspaceOld.Event.Ready.type).toBe("workspace.ready") + expect(WorkspaceOld.Event.Failed.type).toBe("workspace.failed") + expect(WorkspaceOld.Event.Restore.type).toBe("workspace.restore") + expect(WorkspaceOld.Event.Status.type).toBe("workspace.status") + }) + + test("validates create input with workspace id, project id, branch, type, and extra", () => { + const input = { + id: WorkspaceID.ascending("wrk_schema_create"), + type: "worktree", + branch: "feature/schema", + projectID: ProjectID.make("project-schema"), + extra: { nested: true }, + } + + expect(WorkspaceOld.CreateInput.zod.parse(input)).toEqual(input) + expect(() => WorkspaceOld.CreateInput.zod.parse({ ...input, id: "bad" })).toThrow() + expect(() => WorkspaceOld.CreateInput.zod.parse({ ...input, branch: 1 })).toThrow() + }) + + test("validates session restore input", () => { + const input = { + workspaceID: WorkspaceID.ascending("wrk_schema_restore"), + sessionID: SessionID.descending("ses_schema_restore"), + } + + expect(WorkspaceOld.SessionRestoreInput.zod.parse(input)).toEqual(input) + expect(() => WorkspaceOld.SessionRestoreInput.zod.parse({ ...input, workspaceID: "bad" })).toThrow() + expect(() => WorkspaceOld.SessionRestoreInput.zod.parse({ ...input, sessionID: "bad" })).toThrow() + }) +}) + +describe("workspace-old CRUD", () => { + test("get returns undefined for a missing workspace", async () => { + await withInstance(async () => { + expect(await WorkspaceOld.get(WorkspaceID.ascending("wrk_missing_get"))).toBeUndefined() + }) + }) + + test("list maps database rows, filters by project, and sorts by id", async () => { + await withInstance(() => { + const otherProjectID = ProjectID.make("project-other") + insertProject(otherProjectID, "/tmp/other") + const a = workspaceInfo(Instance.project.id, "manual", { + id: WorkspaceID.ascending("wrk_a_list"), + branch: "a", + directory: "/a", + extra: { a: true }, + }) + const b = workspaceInfo(Instance.project.id, "manual", { + id: WorkspaceID.ascending("wrk_b_list"), + branch: "b", + directory: "/b", + extra: ["b"], + }) + const other = workspaceInfo(otherProjectID, "manual", { id: WorkspaceID.ascending("wrk_c_list") }) + insertWorkspace(b) + insertWorkspace(other) + insertWorkspace(a) + + expect(WorkspaceOld.list(Instance.project)).toEqual([a, b]) + }) + }) + + test("create configures, persists, creates, starts local sync, and passes environment", async () => { + await withInstance(async (dir) => { + process.env.OPENCODE_AUTH_CONTENT = JSON.stringify({ test: { type: "api", key: "secret" } }) + process.env.OTEL_EXPORTER_OTLP_HEADERS = "authorization=otel" + process.env.OTEL_EXPORTER_OTLP_ENDPOINT = "https://otel.test" + process.env.OTEL_RESOURCE_ATTRIBUTES = "service.name=opencode-test" + + const workspaceID = WorkspaceID.ascending("wrk_create_local") + const type = unique("create-local") + const targetDir = path.join(dir, "created-local") + const recorded = recordedAdaptor({ + configure(info) { + return { + ...info, + branch: "configured-branch", + name: "Configured Name", + directory: targetDir, + extra: { configured: true }, + } + }, + async create() { + await fs.mkdir(targetDir, { recursive: true }) + }, + target() { + return { type: "local", directory: targetDir } + }, + }) + registerAdaptor(Instance.project.id, type, recorded.adaptor) + + const info = await WorkspaceOld.create({ + id: workspaceID, + type, + branch: null, + projectID: Instance.project.id, + extra: null, + }) + + expect(info).toEqual({ + id: workspaceID, + type, + branch: "configured-branch", + name: "Configured Name", + directory: targetDir, + extra: { configured: true }, + projectID: Instance.project.id, + }) + expect(await WorkspaceOld.get(workspaceID)).toEqual(info) + expect(WorkspaceOld.list(Instance.project)).toEqual([info]) + expect(recorded.calls.configure).toHaveLength(1) + expect(recorded.calls.configure[0]).toMatchObject({ id: workspaceID, type, directory: null }) + expect(recorded.calls.create).toHaveLength(1) + expect(recorded.calls.create[0].info).toEqual(info) + expect(JSON.parse(recorded.calls.create[0].env.OPENCODE_AUTH_CONTENT ?? "{}")).toEqual({ + test: { type: "api", key: "secret" }, + }) + expect(recorded.calls.create[0].env.OPENCODE_WORKSPACE_ID).toBe(workspaceID) + expect(recorded.calls.create[0].env.OPENCODE_EXPERIMENTAL_WORKSPACES).toBe("true") + expect(recorded.calls.create[0].env.OTEL_EXPORTER_OTLP_HEADERS).toBe("authorization=otel") + expect(recorded.calls.create[0].env.OTEL_EXPORTER_OTLP_ENDPOINT).toBe("https://otel.test") + expect(recorded.calls.create[0].env.OTEL_RESOURCE_ATTRIBUTES).toBe("service.name=opencode-test") + expect(WorkspaceOld.status().find((item) => item.workspaceID === workspaceID)?.status).toBe("connected") + + await WorkspaceOld.remove(workspaceID) + expect(WorkspaceOld.status().find((item) => item.workspaceID === workspaceID)?.status).toBeUndefined() + }) + }) + + test("create propagates configure failures and does not insert a workspace", async () => { + await withInstance(async () => { + const type = unique("configure-failure") + registerAdaptor( + Instance.project.id, + type, + recordedAdaptor({ + configure() { + throw new Error("configure exploded") + }, + target() { + return { type: "local", directory: "/unused" } + }, + }).adaptor, + ) + + await expect( + WorkspaceOld.create({ type, branch: null, projectID: Instance.project.id, extra: null }), + ).rejects.toThrow("configure exploded") + expect(WorkspaceOld.list(Instance.project)).toEqual([]) + }) + }) + + test("create leaves the inserted row when adaptor create fails", async () => { + await withInstance(async () => { + const type = unique("create-failure") + const recorded = recordedAdaptor({ + async create() { + throw new Error("create exploded") + }, + target() { + return { type: "local", directory: "/unused" } + }, + }) + registerAdaptor(Instance.project.id, type, recorded.adaptor) + + await expect( + WorkspaceOld.create({ type, branch: "branch", projectID: Instance.project.id, extra: { x: 1 } }), + ).rejects.toThrow("create exploded") + + const rows = WorkspaceOld.list(Instance.project) + expect(rows).toHaveLength(1) + expect(rows[0]).toMatchObject({ type, branch: "branch", extra: { x: 1 } }) + expect(recorded.calls.target).toHaveLength(0) + await WorkspaceOld.remove(rows[0].id) + }) + }) + + test("create returns after a local workspace reports error", async () => { + await withInstance(async (dir) => { + const type = unique("local-error") + const missing = path.join(dir, "missing-local-target") + const recorded = localAdaptor(missing, { createDir: false }) + registerAdaptor(Instance.project.id, type, recorded.adaptor) + + const info = await WorkspaceOld.create({ type, branch: null, projectID: Instance.project.id, extra: null }) + + expect(info.directory).toBe(missing) + expect(WorkspaceOld.status().find((item) => item.workspaceID === info.id)?.status).toBe("error") + await WorkspaceOld.remove(info.id) + }) + }) + + it.live("remote create connects to routed event and history endpoints", () => { + const calls: FetchCall[] = [] + return Effect.gen(function* () { + yield* HttpServer.serveEffect()(Effect.gen(function* () { + const req = yield* HttpServerRequest.HttpServerRequest + const bodyText = yield* req.text + const call = { + url: new URL(req.url, "http://localhost"), + method: req.method, + headers: new Headers(req.headers), + bodyText, + json: bodyText ? JSON.parse(bodyText) : undefined, + } + calls.push(call) + if (call.url.pathname === "/base/global/event") return HttpServerResponse.fromWeb(eventStreamResponse([], false)) + if (call.url.pathname === "/base/sync/history") return yield* HttpServerResponse.json([]) + return HttpServerResponse.text("unexpected", { status: 500 }) + })) + const url = yield* serverUrl() + yield* provideTmpdirInstance((dir) => + Effect.gen(function* () { + const workspace = yield* WorkspaceOld.Service + const type = unique("remote-create") + const recorded = remoteAdaptor(`${url}/base/?ignored=1#hash`, { directory: dir }) + registerAdaptor(Instance.project.id, type, recorded.adaptor) + + const info = yield* workspace.create({ type, branch: null, projectID: Instance.project.id, extra: null }) + + expect(calls.map((call) => `${call.method} ${call.url.pathname}${call.url.search}${call.url.hash}`)).toEqual([ + "GET /base/global/event", + "POST /base/sync/history", + ]) + expect(calls[1].json).toEqual({}) + expect((yield* workspace.status()).find((item) => item.workspaceID === info.id)?.status).toBe("connected") + expect(yield* workspace.isSyncing(info.id)).toBe(true) + + yield* workspace.remove(info.id) + expect(yield* workspace.isSyncing(info.id)).toBe(false) + expect((yield* workspace.status()).find((item) => item.workspaceID === info.id)?.status).toBeUndefined() + }), + { git: true }, + ) + }) + }) + + test("remove returns undefined for a missing workspace", async () => { + await withInstance(async () => { + expect(await WorkspaceOld.remove(WorkspaceID.ascending("wrk_missing_remove"))).toBeUndefined() + }) + }) + + test("remove deletes the workspace, associated sessions, adaptor resources, and status", async () => { + await withInstance(async (dir) => { + const type = unique("remove-local") + const recorded = localAdaptor(path.join(dir, "remove-local")) + registerAdaptor(Instance.project.id, type, recorded.adaptor) + const info = await WorkspaceOld.create({ type, branch: null, projectID: Instance.project.id, extra: null }) + const one = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({}))) + const two = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({}))) + attachSessionToWorkspace(one.id, info.id) + attachSessionToWorkspace(two.id, info.id) + + const removed = await WorkspaceOld.remove(info.id) + + expect(removed).toEqual(info) + expect(await WorkspaceOld.get(info.id)).toBeUndefined() + expect(recorded.calls.remove).toEqual([info]) + expect(WorkspaceOld.status().find((item) => item.workspaceID === info.id)?.status).toBeUndefined() + expect( + Database.use((db) => + db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.workspace_id, info.id)).all(), + ), + ).toEqual([]) + }) + }) + + test("remove still deletes the row when the adaptor cannot remove resources", async () => { + await withInstance(async () => { + const type = unique("remove-throws") + const info = workspaceInfo(Instance.project.id, type, { id: WorkspaceID.ascending("wrk_remove_throws") }) + registerAdaptor( + Instance.project.id, + type, + recordedAdaptor({ + async remove() { + throw new Error("remove exploded") + }, + target() { + return { type: "local", directory: "/unused" } + }, + }).adaptor, + ) + insertWorkspace(info) + + expect(await WorkspaceOld.remove(info.id)).toEqual(info) + expect(await WorkspaceOld.get(info.id)).toBeUndefined() + }) + }) +}) + +describe("workspace-old sync state", () => { + test("startWorkspaceSyncing is disabled by the experimental workspace flag", async () => { + await withInstance(async (dir) => { + Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = false + const type = unique("flag-disabled") + const info = workspaceInfo(Instance.project.id, type) + const session = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({}))) + attachSessionToWorkspace(session.id, info.id) + insertWorkspace(info) + registerAdaptor(Instance.project.id, type, localAdaptor(path.join(dir, "flag-disabled")).adaptor) + + WorkspaceOld.startWorkspaceSyncing(Instance.project.id) + await delay(25) + + expect(WorkspaceOld.status().find((item) => item.workspaceID === info.id)?.status).toBeUndefined() + }) + }) + + test("startWorkspaceSyncing starts only workspaces with sessions", async () => { + await withInstance(async (dir) => { + const withSessionType = unique("with-session") + const withoutSessionType = unique("without-session") + const withSession = workspaceInfo(Instance.project.id, withSessionType) + const withoutSession = workspaceInfo(Instance.project.id, withoutSessionType) + const withSessionDir = path.join(dir, "with-session") + const withoutSessionDir = path.join(dir, "without-session") + await fs.mkdir(withSessionDir, { recursive: true }) + await fs.mkdir(withoutSessionDir, { recursive: true }) + insertWorkspace(withSession) + insertWorkspace(withoutSession) + registerAdaptor(Instance.project.id, withSessionType, localAdaptor(withSessionDir).adaptor) + registerAdaptor(Instance.project.id, withoutSessionType, localAdaptor(withoutSessionDir).adaptor) + attachSessionToWorkspace((await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))).id, withSession.id) + + WorkspaceOld.startWorkspaceSyncing(Instance.project.id) + + await eventually(() => expect(WorkspaceOld.status().find((item) => item.workspaceID === withSession.id)?.status).toBe("connected")) + expect(WorkspaceOld.status().find((item) => item.workspaceID === withoutSession.id)?.status).toBeUndefined() + await WorkspaceOld.remove(withSession.id) + await WorkspaceOld.remove(withoutSession.id) + }) + }) + + test("local start reports error when the target directory is missing", async () => { + await withInstance(async (dir) => { + const type = unique("missing-local") + const info = workspaceInfo(Instance.project.id, type) + insertWorkspace(info) + registerAdaptor(Instance.project.id, type, localAdaptor(path.join(dir, "missing-target"), { createDir: false }).adaptor) + attachSessionToWorkspace((await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))).id, info.id) + + WorkspaceOld.startWorkspaceSyncing(Instance.project.id) + + await eventually(() => expect(WorkspaceOld.status().find((item) => item.workspaceID === info.id)?.status).toBe("error")) + expect(await WorkspaceOld.isSyncing(info.id)).toBe(false) + await WorkspaceOld.remove(info.id) + }) + }) + + test("duplicate local status updates are suppressed", async () => { + await withInstance(async (dir) => { + const captured = captureGlobalEvents() + try { + const type = unique("dedupe-local") + const info = workspaceInfo(Instance.project.id, type) + const target = path.join(dir, "dedupe-local") + await fs.mkdir(target, { recursive: true }) + insertWorkspace(info) + registerAdaptor(Instance.project.id, type, localAdaptor(target).adaptor) + attachSessionToWorkspace((await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))).id, info.id) + + WorkspaceOld.startWorkspaceSyncing(Instance.project.id) + WorkspaceOld.startWorkspaceSyncing(Instance.project.id) + + await eventually(() => expect(WorkspaceOld.status().find((item) => item.workspaceID === info.id)?.status).toBe("connected")) + expect( + captured.events.filter( + (event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Status.type, + ), + ).toHaveLength(1) + await WorkspaceOld.remove(info.id) + } finally { + captured.dispose() + } + }) + }) + + it.live("remote start emits disconnected, connecting, and connected then refuses duplicate listeners", () => { + const calls: FetchCall[] = [] + return Effect.gen(function* () { + yield* HttpServer.serveEffect()(Effect.gen(function* () { + const req = yield* HttpServerRequest.HttpServerRequest + const bodyText = yield* req.text + const call = { + url: new URL(req.url, "http://localhost"), + method: req.method, + headers: new Headers(req.headers), + bodyText, + json: bodyText ? JSON.parse(bodyText) : undefined, + } + calls.push(call) + if (call.url.pathname === "/sync/global/event") return HttpServerResponse.fromWeb(eventStreamResponse()) + if (call.url.pathname === "/sync/sync/history") return HttpServerResponse.fromWeb(Response.json([])) + return HttpServerResponse.text("unexpected", { status: 500 }) + })) + const url = yield* serverUrl() + yield* provideTmpdirInstance(() => + Effect.gen(function* () { + const workspace = yield* WorkspaceOld.Service + const sessionSvc = yield* SessionNs.Service + const captured = captureGlobalEvents() + try { + const type = unique("remote-start") + const info = workspaceInfo(Instance.project.id, type) + insertWorkspace(info) + registerAdaptor(Instance.project.id, type, remoteAdaptor(`${url}/sync`).adaptor) + attachSessionToWorkspace((yield* sessionSvc.create({})).id, info.id) + + yield* workspace.startWorkspaceSyncing(Instance.project.id) + yield* eventuallyEffect(Effect.gen(function* () { + expect((yield* workspace.status()).find((item) => item.workspaceID === info.id)?.status).toBe("connected") + })) + yield* workspace.startWorkspaceSyncing(Instance.project.id) + yield* Effect.sleep("25 millis") + + expect( + captured.events + .filter((event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Status.type) + .map((event) => event.payload.properties.status), + ).toEqual(["disconnected", "connecting", "connected"]) + expect(calls.filter((call) => call.url.pathname === "/sync/global/event")).toHaveLength(1) + expect(calls.filter((call) => call.url.pathname === "/sync/sync/history")).toHaveLength(1) + expect(yield* workspace.isSyncing(info.id)).toBe(true) + + yield* workspace.remove(info.id) + expect(yield* workspace.isSyncing(info.id)).toBe(false) + } finally { + captured.dispose() + } + }), + { git: true }, + ) + }) + }) + + it.live("remote connection HTTP failures set error and clear syncing", () => + Effect.gen(function* () { + yield* HttpServer.serveEffect()(Effect.gen(function* () { + const req = yield* HttpServerRequest.HttpServerRequest + if (new URL(req.url, "http://localhost").pathname === "/failed/global/event") + return HttpServerResponse.text("nope", { status: 503 }) + return HttpServerResponse.fromWeb(Response.json([])) + })) + const url = yield* serverUrl() + yield* provideTmpdirInstance(() => + Effect.gen(function* () { + const workspace = yield* WorkspaceOld.Service + const sessionSvc = yield* SessionNs.Service + const type = unique("remote-connect-fail") + const info = workspaceInfo(Instance.project.id, type) + insertWorkspace(info) + registerAdaptor(Instance.project.id, type, remoteAdaptor(`${url}/failed`).adaptor) + attachSessionToWorkspace((yield* sessionSvc.create({})).id, info.id) + + yield* workspace.startWorkspaceSyncing(Instance.project.id) + + yield* eventuallyEffect(Effect.gen(function* () { + expect((yield* workspace.status()).find((item) => item.workspaceID === info.id)?.status).toBe("error") + })) + expect(yield* workspace.isSyncing(info.id)).toBe(false) + yield* workspace.remove(info.id) + }), + { git: true }, + ) + }), + ) + + it.live("remote history HTTP failures set error", () => + Effect.gen(function* () { + yield* HttpServer.serveEffect()(Effect.gen(function* () { + const req = yield* HttpServerRequest.HttpServerRequest + const url = new URL(req.url, "http://localhost") + if (url.pathname === "/history-failed/global/event") return HttpServerResponse.fromWeb(eventStreamResponse([], false)) + if (url.pathname === "/history-failed/sync/history") return HttpServerResponse.text("history failed", { status: 500 }) + return HttpServerResponse.fromWeb(Response.json([])) + })) + const url = yield* serverUrl() + yield* provideTmpdirInstance(() => + Effect.gen(function* () { + const workspace = yield* WorkspaceOld.Service + const sessionSvc = yield* SessionNs.Service + const type = unique("remote-history-fail") + const info = workspaceInfo(Instance.project.id, type) + insertWorkspace(info) + registerAdaptor(Instance.project.id, type, remoteAdaptor(`${url}/history-failed`).adaptor) + attachSessionToWorkspace((yield* sessionSvc.create({})).id, info.id) + + yield* workspace.startWorkspaceSyncing(Instance.project.id) + + yield* eventuallyEffect(Effect.gen(function* () { + expect((yield* workspace.status()).find((item) => item.workspaceID === info.id)?.status).toBe("error") + })) + expect(yield* workspace.isSyncing(info.id)).toBe(false) + yield* workspace.remove(info.id) + }), + { git: true }, + ) + }), + ) + + it.live("sync history sends the local sequence fence and replays returned events in workspace context", () => { + const historyBodies: unknown[] = [] + let historySessionID: SessionID | undefined + let historyNextSeq = 0 + return Effect.gen(function* () { + yield* HttpServer.serveEffect()(Effect.gen(function* () { + const req = yield* HttpServerRequest.HttpServerRequest + const bodyText = yield* req.text + const url = new URL(req.url, "http://localhost") + if (url.pathname === "/history/global/event") return HttpServerResponse.fromWeb(eventStreamResponse()) + if (url.pathname === "/history/sync/history") { + historyBodies.push(bodyText ? JSON.parse(bodyText) : undefined) + return HttpServerResponse.fromWeb( + Response.json([ + { + id: `evt_${unique("history")}`, + aggregate_id: historySessionID!, + seq: historyNextSeq, + type: sessionUpdatedType(), + data: { sessionID: historySessionID!, info: { title: "from history" } }, + }, + ]), + ) + } + return HttpServerResponse.text("unexpected", { status: 500 }) + })) + const url = yield* serverUrl() + yield* provideTmpdirInstance(() => + Effect.gen(function* () { + const workspace = yield* WorkspaceOld.Service + const sessionSvc = yield* SessionNs.Service + const captured = captureGlobalEvents() + try { + const type = unique("history-replay") + const info = workspaceInfo(Instance.project.id, type) + insertWorkspace(info) + registerAdaptor(Instance.project.id, type, remoteAdaptor(`${url}/history`).adaptor) + const session = yield* sessionSvc.create({ title: "before history" }) + attachSessionToWorkspace(session.id, info.id) + historySessionID = session.id + historyNextSeq = (sessionSequence(session.id) ?? -1) + 1 + + yield* workspace.startWorkspaceSyncing(Instance.project.id) + + yield* eventuallyEffect(Effect.gen(function* () { + expect((yield* sessionSvc.get(session.id)).title).toBe("from history") + })) + expect(historyBodies).toEqual([{ [session.id]: historyNextSeq - 1 }]) + expect( + captured.events.some( + (event) => + event.workspace === info.id && event.payload.type === "sync" && event.payload.syncEvent.seq === historyNextSeq, + ), + ).toBe(true) + yield* workspace.remove(info.id) + } finally { + captured.dispose() + } + }), + { git: true }, + ) + }) + }) + + it.live("SSE forwards non-heartbeat events and ignores heartbeats", () => + Effect.gen(function* () { + yield* HttpServer.serveEffect()(Effect.gen(function* () { + const req = yield* HttpServerRequest.HttpServerRequest + const url = new URL(req.url, "http://localhost") + if (url.pathname === "/sse-forward/global/event") + return HttpServerResponse.fromWeb( + eventStreamResponse( + [ + { directory: "remote-dir", project: "remote-project", payload: { type: "server.heartbeat" } }, + { + directory: "remote-dir", + project: "remote-project", + payload: { type: "custom.remote", properties: { ok: true } }, + }, + ], + false, + ), + ) + if (url.pathname === "/sse-forward/sync/history") return HttpServerResponse.fromWeb(Response.json([])) + return HttpServerResponse.text("unexpected", { status: 500 }) + })) + const url = yield* serverUrl() + yield* provideTmpdirInstance(() => + Effect.gen(function* () { + const workspace = yield* WorkspaceOld.Service + const sessionSvc = yield* SessionNs.Service + const captured = captureGlobalEvents() + try { + const type = unique("sse-forward") + const info = workspaceInfo(Instance.project.id, type) + insertWorkspace(info) + registerAdaptor(Instance.project.id, type, remoteAdaptor(`${url}/sse-forward`).adaptor) + attachSessionToWorkspace((yield* sessionSvc.create({})).id, info.id) + + yield* workspace.startWorkspaceSyncing(Instance.project.id) + + yield* eventuallyEffect(Effect.sync(() => + expect(captured.events.some((event) => event.workspace === info.id && event.payload.type === "custom.remote")) + .toBe(true), + )) + expect(captured.events.some((event) => event.workspace === info.id && event.payload.type === "server.heartbeat")).toBe( + false, + ) + expect( + captured.events.find((event) => event.workspace === info.id && event.payload.type === "custom.remote"), + ).toMatchObject({ directory: "remote-dir", project: "remote-project", payload: { properties: { ok: true } } }) + yield* workspace.remove(info.id) + } finally { + captured.dispose() + } + }), + { git: true }, + ) + }), + ) + + it.live("SSE sync events are replayed and forwarded", () => { + let sseSessionID: SessionID | undefined + let sseNextSeq = 0 + return Effect.gen(function* () { + yield* HttpServer.serveEffect()(Effect.gen(function* () { + const req = yield* HttpServerRequest.HttpServerRequest + const url = new URL(req.url, "http://localhost") + if (url.pathname === "/sse-sync/global/event") + return HttpServerResponse.fromWeb( + eventStreamResponse( + [ + { + directory: "remote-dir", + project: "remote-project", + payload: { + type: "sync", + syncEvent: { + id: `evt_${unique("sse")}`, + aggregateID: sseSessionID!, + seq: sseNextSeq, + type: sessionUpdatedType(), + data: { sessionID: sseSessionID!, info: { title: "from sse" } }, + }, + }, + }, + ], + false, + ), + ) + if (url.pathname === "/sse-sync/sync/history") return HttpServerResponse.fromWeb(Response.json([])) + return HttpServerResponse.text("unexpected", { status: 500 }) + })) + const url = yield* serverUrl() + yield* provideTmpdirInstance(() => + Effect.gen(function* () { + const workspace = yield* WorkspaceOld.Service + const sessionSvc = yield* SessionNs.Service + const captured = captureGlobalEvents() + try { + const type = unique("sse-sync") + const info = workspaceInfo(Instance.project.id, type) + insertWorkspace(info) + registerAdaptor(Instance.project.id, type, remoteAdaptor(`${url}/sse-sync`).adaptor) + const session = yield* sessionSvc.create({ title: "before sse" }) + attachSessionToWorkspace(session.id, info.id) + sseSessionID = session.id + sseNextSeq = (sessionSequence(session.id) ?? -1) + 1 + + yield* workspace.startWorkspaceSyncing(Instance.project.id) + + yield* eventuallyEffect(Effect.gen(function* () { + expect((yield* sessionSvc.get(session.id)).title).toBe("from sse") + })) + expect( + captured.events.some( + (event) => event.workspace === info.id && event.payload.type === "sync" && event.payload.syncEvent.seq === sseNextSeq, + ), + ).toBe(true) + yield* workspace.remove(info.id) + } finally { + captured.dispose() + } + }), + { git: true }, + ) + }) + }) +}) + +describe("workspace-old waitForSync", () => { + test("returns immediately for an empty fence", async () => { + await withInstance(async () => { + await expect(WorkspaceOld.waitForSync(WorkspaceID.ascending("wrk_wait_empty"), {})).resolves.toBeUndefined() + }) + }) + + test("returns immediately when the stored sequence already satisfies the fence", async () => { + await withInstance(async () => { + const sessionID = SessionID.descending("ses_wait_done") + Database.use((db) => db.insert(EventSequenceTable).values({ aggregate_id: sessionID, seq: 4 }).run()) + + await expect(WorkspaceOld.waitForSync(WorkspaceID.ascending("wrk_wait_done"), { [sessionID]: 4 })).resolves.toBeUndefined() + await expect(WorkspaceOld.waitForSync(WorkspaceID.ascending("wrk_wait_done_2"), { [sessionID]: 3 })).resolves.toBeUndefined() + }) + }) + + test("waits until the database reaches the requested sequence and a workspace event arrives", async () => { + await withInstance(async () => { + const workspaceID = WorkspaceID.ascending("wrk_wait_event") + const sessionID = SessionID.descending("ses_wait_event") + Database.use((db) => db.insert(EventSequenceTable).values({ aggregate_id: sessionID, seq: 1 }).run()) + + const waited = WorkspaceOld.waitForSync(workspaceID, { [sessionID]: 2 }) + await delay(10) + Database.use((db) => + db.update(EventSequenceTable).set({ seq: 2 }).where(eq(EventSequenceTable.aggregate_id, sessionID)).run(), + ) + GlobalBus.emit("event", { workspace: workspaceID, payload: { type: "anything" } }) + + await expect(waited).resolves.toBeUndefined() + }) + }) + + test("a sync event for a different workspace can also release the fence", async () => { + await withInstance(async () => { + const workspaceID = WorkspaceID.ascending("wrk_wait_sync_any") + const sessionID = SessionID.descending("ses_wait_sync_any") + Database.use((db) => db.insert(EventSequenceTable).values({ aggregate_id: sessionID, seq: 0 }).run()) + + const waited = WorkspaceOld.waitForSync(workspaceID, { [sessionID]: 1 }) + await delay(10) + Database.use((db) => + db.update(EventSequenceTable).set({ seq: 1 }).where(eq(EventSequenceTable.aggregate_id, sessionID)).run(), + ) + GlobalBus.emit("event", { + workspace: WorkspaceID.ascending("wrk_other_workspace"), + payload: { type: "sync" }, + }) + + await expect(waited).resolves.toBeUndefined() + }) + }) + + test("rejects with the abort reason when aborted", async () => { + await withInstance(async () => { + const abort = new AbortController() + const reason = new Error("caller aborted") + const waited = WorkspaceOld.waitForSync( + WorkspaceID.ascending("wrk_wait_abort"), + { [SessionID.descending("ses_wait_abort")]: 1 }, + abort.signal, + ) + abort.abort(reason) + + await expect(waited).rejects.toMatchObject({ _tag: "WorkspaceSyncAbortedError", message: reason.message, cause: reason }) + }) + }) + + test( + "times out with the requested fence in the error message", + async () => { + await withInstance(async () => { + const sessionID = SessionID.descending("ses_wait_timeout") + + await expect( + WorkspaceOld.waitForSync(WorkspaceID.ascending("wrk_wait_timeout"), { [sessionID]: 1 }), + ).rejects.toThrow(`Timed out waiting for sync fence: {"${sessionID}":1}`) + }) + }, + 7000, + ) +}) + +describe("workspace-old sessionRestore", () => { + test("throws when the workspace is missing", async () => { + await withInstance(async () => { + await expect( + WorkspaceOld.sessionRestore({ + workspaceID: WorkspaceID.ascending("wrk_restore_missing"), + sessionID: SessionID.descending("ses_restore_missing_workspace"), + }), + ).rejects.toThrow("Workspace not found: wrk_restore_missing") + }) + }) + + test("throws when switching a missing session fails", async () => { + await withInstance(async (dir) => { + const type = unique("restore-missing-session") + const info = workspaceInfo(Instance.project.id, type, { directory: dir }) + insertWorkspace(info) + registerAdaptor(Instance.project.id, type, localAdaptor(dir).adaptor) + + await expect( + WorkspaceOld.sessionRestore({ workspaceID: info.id, sessionID: SessionID.descending("ses_missing_restore") }), + ).rejects.toThrow("NotFoundError") + await WorkspaceOld.remove(info.id) + }) + }) + + it.live("posts remote replay batches of 10, emits progress, and includes the workspace update event", () => { + const replay: FetchCall[] = [] + return Effect.gen(function* () { + yield* HttpServer.serveEffect()(Effect.gen(function* () { + const req = yield* HttpServerRequest.HttpServerRequest + const bodyText = yield* req.text + const call = { + url: new URL(req.url, "http://localhost"), + method: req.method, + headers: new Headers(req.headers), + bodyText, + json: bodyText ? JSON.parse(bodyText) : undefined, + } + if (call.url.pathname === "/restore/sync/replay") { + replay.push(call) + return HttpServerResponse.fromWeb(Response.json({ ok: true })) + } + return HttpServerResponse.text("unexpected", { status: 500 }) + })) + const url = yield* serverUrl() + yield* provideTmpdirInstance((dir) => + Effect.gen(function* () { + const workspace = yield* WorkspaceOld.Service + const sessionSvc = yield* SessionNs.Service + const captured = captureGlobalEvents() + try { + const type = unique("restore-remote") + const info = workspaceInfo(Instance.project.id, type, { directory: dir }) + insertWorkspace(info) + registerAdaptor( + Instance.project.id, + type, + remoteAdaptor(`${url}/restore/?ignored=1#hash`, { + directory: dir, + headers: { authorization: "Bearer restore" }, + }).adaptor, + ) + const session = yield* sessionSvc.create({ title: "restore remote" }) + replaceSessionEvents(session.id, 24) + + const result = yield* workspace.sessionRestore({ workspaceID: info.id, sessionID: session.id }) + + expect(result).toEqual({ total: 3 }) + expect(replay).toHaveLength(3) + expect(replay.map((call) => call.url.pathname + call.url.search + call.url.hash)).toEqual([ + "/restore/sync/replay", + "/restore/sync/replay", + "/restore/sync/replay", + ]) + expect(replay.every((call) => call.headers.get("authorization") === "Bearer restore")).toBe(true) + expect(replay.every((call) => call.headers.get("content-type") === "application/json")).toBe(true) + expect(replay.map((call) => (call.json as { events: unknown[] }).events.length)).toEqual([10, 10, 5]) + expect(replay.map((call) => (call.json as { directory: string }).directory)).toEqual([dir, dir, dir]) + expect( + replay.flatMap((call) => (call.json as { events: Array<{ seq: number }> }).events.map((event) => event.seq)), + ).toEqual(Array.from({ length: 25 }, (_, i) => i)) + expect((replay[2].json as { events: Array<{ seq: number; type: string; data: unknown }> }).events.at(-1)).toMatchObject({ + seq: 24, + type: sessionUpdatedType(), + data: { sessionID: session.id, info: { workspaceID: info.id } }, + }) + expect((yield* sessionSvc.get(session.id)).workspaceID).toBe(info.id) + expect( + captured.events + .filter((event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Restore.type) + .map((event) => event.payload.properties.step), + ).toEqual([0, 1, 2, 3]) + yield* workspace.remove(info.id) + } finally { + captured.dispose() + } + }), + { git: true }, + ) + }) + }) + + it.live("remote restore sends an empty directory string when the workspace directory is null", () => { + const replay: FetchCall[] = [] + return Effect.gen(function* () { + yield* HttpServer.serveEffect()(Effect.gen(function* () { + const req = yield* HttpServerRequest.HttpServerRequest + const bodyText = yield* req.text + replay.push({ + url: new URL(req.url, "http://localhost"), + method: req.method, + headers: new Headers(req.headers), + bodyText, + json: bodyText ? JSON.parse(bodyText) : undefined, + }) + return HttpServerResponse.fromWeb(Response.json({ ok: true })) + })) + const url = yield* serverUrl() + yield* provideTmpdirInstance(() => + Effect.gen(function* () { + const workspace = yield* WorkspaceOld.Service + const sessionSvc = yield* SessionNs.Service + const type = unique("restore-null-dir") + const info = workspaceInfo(Instance.project.id, type, { directory: null }) + insertWorkspace(info) + registerAdaptor(Instance.project.id, type, remoteAdaptor(`${url}/null-dir`, { directory: null }).adaptor) + const session = yield* sessionSvc.create({ title: "null dir" }) + replaceSessionEvents(session.id, 0) + + expect(yield* workspace.sessionRestore({ workspaceID: info.id, sessionID: session.id })).toEqual({ total: 1 }) + expect((replay[0].json as { directory: string }).directory).toBe("") + expect((replay[0].json as { events: unknown[] }).events).toHaveLength(1) + yield* workspace.remove(info.id) + }), + { git: true }, + ) + }) + }) + + it.live("remote restore failures include status and body and do not emit completed batch progress", () => { + const replay: FetchCall[] = [] + return Effect.gen(function* () { + yield* HttpServer.serveEffect()(Effect.gen(function* () { + const req = yield* HttpServerRequest.HttpServerRequest + const bodyText = yield* req.text + replay.push({ + url: new URL(req.url, "http://localhost"), + method: req.method, + headers: new Headers(req.headers), + bodyText, + json: bodyText ? JSON.parse(bodyText) : undefined, + }) + return HttpServerResponse.text("replay failed", { status: 503 }) + })) + const url = yield* serverUrl() + yield* provideTmpdirInstance((dir) => + Effect.gen(function* () { + const workspace = yield* WorkspaceOld.Service + const sessionSvc = yield* SessionNs.Service + const captured = captureGlobalEvents() + try { + const type = unique("restore-remote-fail") + const info = workspaceInfo(Instance.project.id, type, { directory: dir }) + insertWorkspace(info) + registerAdaptor(Instance.project.id, type, remoteAdaptor(`${url}/fail`, { directory: dir }).adaptor) + const session = yield* sessionSvc.create({ title: "restore fail" }) + replaceSessionEvents(session.id, 11) + + const error = yield* Effect.flip(workspace.sessionRestore({ workspaceID: info.id, sessionID: session.id })) + expect((error as Error).message).toContain( + `Failed to replay session ${session.id} into workspace ${info.id}: HTTP 503 replay failed`, + ) + + expect(replay).toHaveLength(1) + expect( + captured.events + .filter((event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Restore.type) + .map((event) => event.payload.properties.step), + ).toEqual([0]) + yield* workspace.remove(info.id) + } finally { + captured.dispose() + } + }), + { git: true }, + ) + }) + }) + + test("local restore replays batches without fetch and emits progress", async () => { + await withInstance(async (dir) => { + const captured = captureGlobalEvents() + let fetchCallCount = 0 + const replayAll = spyOn(SyncEvent, "replayAll") + try { + using server = Bun.serve({ + port: 0, + fetch() { + fetchCallCount++ + return Response.json({ ok: true }) + }, + }) + const type = unique("restore-local") + const info = workspaceInfo(Instance.project.id, type, { directory: dir }) + insertWorkspace(info) + registerAdaptor(Instance.project.id, type, localAdaptor(dir).adaptor) + const session = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({ title: "restore local" }))) + replaceSessionEvents(session.id, 20) + + expect(await WorkspaceOld.sessionRestore({ workspaceID: info.id, sessionID: session.id })).toEqual({ total: 3 }) + + expect(fetchCallCount).toBe(0) + expect(replayAll).toHaveBeenCalledTimes(3) + expect(replayAll.mock.calls.map((call) => call[0].length)).toEqual([10, 10, 1]) + expect((await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.get(session.id)))).workspaceID).toBe(info.id) + expect(eventRows(session.id).map((row) => row.seq)).toEqual(Array.from({ length: 21 }, (_, i) => i)) + expect( + captured.events + .filter((event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Restore.type) + .map((event) => event.payload.properties.step), + ).toEqual([0, 1, 2, 3]) + await WorkspaceOld.remove(info.id) + } finally { + captured.dispose() + } + }) + }) + + it.live("session restore includes real message and part events in sequence order", () => { + const replay: FetchCall[] = [] + return Effect.gen(function* () { + yield* HttpServer.serveEffect()(Effect.gen(function* () { + const req = yield* HttpServerRequest.HttpServerRequest + const bodyText = yield* req.text + replay.push({ + url: new URL(req.url, "http://localhost"), + method: req.method, + headers: new Headers(req.headers), + bodyText, + json: bodyText ? JSON.parse(bodyText) : undefined, + }) + return HttpServerResponse.fromWeb(Response.json({ ok: true })) + })) + const url = yield* serverUrl() + yield* provideTmpdirInstance((dir) => + Effect.gen(function* () { + const workspace = yield* WorkspaceOld.Service + const sessionSvc = yield* SessionNs.Service + const type = unique("restore-real-events") + const info = workspaceInfo(Instance.project.id, type, { directory: dir }) + insertWorkspace(info) + registerAdaptor(Instance.project.id, type, remoteAdaptor(`${url}/real`, { directory: dir }).adaptor) + const session = yield* sessionSvc.create({ title: "real events" }) + for (let i = 0; i < 3; i++) { + const msg = yield* sessionSvc.updateMessage({ + id: MessageID.ascending(), + role: "user", + sessionID: session.id, + agent: "build", + model: { providerID: ProviderID.make("test"), modelID: ModelID.make("test") }, + time: { created: Date.now() }, + }) + yield* sessionSvc.updatePart({ + id: PartID.ascending(), + sessionID: session.id, + messageID: msg.id, + type: "text", + text: `message ${i}`, + }) + } + const before = eventRows(session.id) + + expect(yield* workspace.sessionRestore({ workspaceID: info.id, sessionID: session.id })).toEqual({ total: 1 }) + + const posted = (replay[0].json as { events: Array<{ seq: number; type: string }> }).events + expect(posted.map((event) => event.seq)).toEqual([...before.map((row) => row.seq), before.at(-1)!.seq + 1]) + expect(posted.map((event) => event.type).slice(0, -1)).toEqual(before.map((row) => row.type)) + expect(posted.at(-1)?.type).toBe(sessionUpdatedType()) + yield* workspace.remove(info.id) + }), + { git: true }, + ) + }) + }) +}) diff --git a/packages/opencode/test/workspace/workspace-restore.test.ts b/packages/opencode/test/workspace/workspace-restore.test.ts deleted file mode 100644 index 7f802150ea..0000000000 --- a/packages/opencode/test/workspace/workspace-restore.test.ts +++ /dev/null @@ -1,283 +0,0 @@ -import { afterEach, beforeEach, describe, expect, mock, spyOn, test } from "bun:test" -import fs from "node:fs/promises" -import path from "node:path" -import { GlobalBus } from "../../src/bus/global" -import { registerAdaptor } from "../../src/control-plane/adaptors" -import type { WorkspaceAdaptor } from "../../src/control-plane/types" -import { Workspace } from "../../src/control-plane/workspace" -import { AppRuntime } from "../../src/effect/app-runtime" -import { Flag } from "@opencode-ai/core/flag/flag" -import { ModelID, ProviderID } from "../../src/provider/schema" -import { Instance } from "../../src/project/instance" -import { Session as SessionNs } from "@/session/session" -import { MessageV2 } from "../../src/session/message-v2" -import { MessageID, PartID, type SessionID } from "../../src/session/schema" -import { Database } from "@/storage/db" -import { asc } from "drizzle-orm" -import { eq } from "drizzle-orm" -import { SyncEvent } from "../../src/sync" -import { EventTable } from "../../src/sync/event.sql" -import * as Log from "@opencode-ai/core/util/log" -import { resetDatabase } from "../fixture/db" -import { tmpdir } from "../fixture/fixture" - -void Log.init({ print: false }) - -const original = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES - -beforeEach(() => { - Database.close() - Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true -}) - -afterEach(async () => { - mock.restore() - await Instance.disposeAll() - Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = original - await resetDatabase() -}) - -function create(input?: SessionNs.CreateInput) { - return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create(input))) -} - -function get(id: SessionID) { - return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.get(id))) -} - -function updateMessage(msg: T) { - return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.updateMessage(msg))) -} - -function updatePart(part: T) { - return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.updatePart(part))) -} - -async function user(sessionID: SessionID, text: string) { - const msg = await updateMessage({ - id: MessageID.ascending(), - role: "user", - sessionID, - agent: "build", - model: { providerID: ProviderID.make("test"), modelID: ModelID.make("test") }, - time: { created: Date.now() }, - }) - await updatePart({ - id: PartID.ascending(), - sessionID, - messageID: msg.id, - type: "text", - text, - }) -} - -function remote(dir: string, url: string): WorkspaceAdaptor { - return { - name: "remote", - description: "remote", - configure(info) { - return { - ...info, - directory: dir, - } - }, - async create() { - await fs.mkdir(dir, { recursive: true }) - }, - async remove() {}, - target() { - return { - type: "remote" as const, - url, - } - }, - } -} - -function local(dir: string): WorkspaceAdaptor { - return { - name: "local", - description: "local", - configure(info) { - return { - ...info, - directory: dir, - } - }, - async create() { - await fs.mkdir(dir, { recursive: true }) - }, - async remove() {}, - target() { - return { - type: "local" as const, - directory: dir, - } - }, - } -} - -function eventStreamResponse() { - return new Response(new ReadableStream({ start() {} }), { - status: 200, - headers: { - "content-type": "text/event-stream", - }, - }) -} - -describe("Workspace.sessionRestore", () => { - test("replays session events in batches of 10 and emits progress", async () => { - await using tmp = await tmpdir({ git: true }) - const dir = path.join(tmp.path, ".restore") - const seen: any[] = [] - const posts: Array<{ - path: string - body: { directory: string; events: Array<{ seq: number; aggregateID: string }> } - }> = [] - const on = (evt: any) => seen.push(evt) - GlobalBus.on("event", on) - - const raw = globalThis.fetch - spyOn(globalThis, "fetch").mockImplementation( - Object.assign( - async (input: URL | RequestInfo, init?: BunFetchRequestInit | RequestInit) => { - const url = new URL(typeof input === "string" || input instanceof URL ? input : input.url) - if (url.pathname === "/base/global/event") { - return eventStreamResponse() - } - if (url.pathname === "/base/sync/history") { - return Response.json([]) - } - const body = JSON.parse(String(init?.body)) - posts.push({ - path: url.pathname, - body, - }) - return Response.json({ sessionID: body.events[0].aggregateID }) - }, - { - preconnect: raw.preconnect?.bind(raw), - }, - ) as typeof globalThis.fetch, - ) - - try { - const setup = await Instance.provide({ - directory: tmp.path, - fn: async () => { - registerAdaptor(Instance.project.id, "worktree", remote(dir, "https://workspace.test/base")) - const space = await Workspace.create({ - type: "worktree", - branch: null, - extra: null, - projectID: Instance.project.id, - }) - const session = await create({}) - for (let i = 0; i < 6; i++) { - await user(session.id, `msg ${i}`) - } - const rows = Database.use((db) => - db - .select({ seq: EventTable.seq }) - .from(EventTable) - .where(eq(EventTable.aggregate_id, session.id)) - .orderBy(asc(EventTable.seq)) - .all(), - ) - const result = await Workspace.sessionRestore({ - workspaceID: space.id, - sessionID: session.id, - }) - return { space, session, rows, result } - }, - }) - - expect(setup.rows).toHaveLength(13) - expect(setup.result).toEqual({ total: 2 }) - expect(posts).toHaveLength(2) - expect(posts[0]?.path).toBe("/base/sync/replay") - expect(posts[1]?.path).toBe("/base/sync/replay") - expect(posts[0]?.body.directory).toBe(dir) - expect(posts[1]?.body.directory).toBe(dir) - expect(posts[0]?.body.events).toHaveLength(10) - expect(posts[1]?.body.events).toHaveLength(4) - expect(posts.flatMap((item) => item.body.events.map((event) => event.seq))).toEqual([ - ...setup.rows.map((row) => row.seq), - setup.rows.at(-1)!.seq + 1, - ]) - expect(posts[1]?.body.events.at(-1)).toMatchObject({ - aggregateID: setup.session.id, - seq: setup.rows.at(-1)!.seq + 1, - type: SyncEvent.versionedType(SessionNs.Event.Updated.type, SessionNs.Event.Updated.version), - data: { - sessionID: setup.session.id, - info: { - workspaceID: setup.space.id, - }, - }, - }) - - const restore = seen.filter( - (evt) => evt.workspace === setup.space.id && evt.payload.type === Workspace.Event.Restore.type, - ) - expect(restore.map((evt) => evt.payload.properties.step)).toEqual([0, 1, 2]) - expect(restore.map((evt) => evt.payload.properties.total)).toEqual([2, 2, 2]) - expect(restore.map((evt) => evt.payload.properties.sessionID)).toEqual([ - setup.session.id, - setup.session.id, - setup.session.id, - ]) - } finally { - GlobalBus.off("event", on) - } - }) - - test("replays locally without posting to a server", async () => { - await using tmp = await tmpdir({ git: true }) - const dir = path.join(tmp.path, ".restore-local") - const seen: any[] = [] - const on = (evt: any) => seen.push(evt) - GlobalBus.on("event", on) - - const fetch = spyOn(globalThis, "fetch") - const replayAll = spyOn(SyncEvent, "replayAll") - - try { - const setup = await Instance.provide({ - directory: tmp.path, - fn: async () => { - registerAdaptor(Instance.project.id, "local-restore", local(dir)) - const space = await Workspace.create({ - type: "local-restore", - branch: null, - extra: null, - projectID: Instance.project.id, - }) - const session = await create({}) - for (let i = 0; i < 6; i++) { - await user(session.id, `msg ${i}`) - } - const result = await Workspace.sessionRestore({ - workspaceID: space.id, - sessionID: session.id, - }) - const updated = await get(session.id) - return { space, session, result, updated } - }, - }) - - expect(setup.result).toEqual({ total: 2 }) - expect(fetch).not.toHaveBeenCalled() - expect(replayAll).toHaveBeenCalledTimes(2) - expect(setup.updated.workspaceID).toBe(setup.space.id) - - const restore = seen.filter( - (evt) => evt.workspace === setup.space.id && evt.payload.type === Workspace.Event.Restore.type, - ) - expect(restore.map((evt) => evt.payload.properties.step)).toEqual([0, 1, 2]) - } finally { - GlobalBus.off("event", on) - } - }) -})