diff --git a/packages/opencode/src/cli/cmd/tui/context/sdk.tsx b/packages/opencode/src/cli/cmd/tui/context/sdk.tsx index 14d3062886..6a240ceef8 100644 --- a/packages/opencode/src/cli/cmd/tui/context/sdk.tsx +++ b/packages/opencode/src/cli/cmd/tui/context/sdk.tsx @@ -2,6 +2,7 @@ import { createOpencodeClient } from "@opencode-ai/sdk/v2" import type { GlobalEvent } from "@opencode-ai/sdk/v2" import { createSimpleContext } from "./helper" import { createGlobalEmitter } from "@solid-primitives/event-bus" +import { Flag } from "@/flag/flag" import { batch, onCleanup, onMount } from "solid-js" export type EventSource = { @@ -39,6 +40,8 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({ let queue: GlobalEvent[] = [] let timer: Timer | undefined let last = 0 + const retryDelay = 1000 + const maxRetryDelay = 30000 const flush = () => { if (queue.length === 0) return @@ -73,9 +76,20 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({ const ctrl = new AbortController() sse = ctrl ;(async () => { + let attempt = 0 while (true) { if (abort.signal.aborted || ctrl.signal.aborted) break - const events = await sdk.global.event({ signal: ctrl.signal }) + + const events = await sdk.global.event({ + signal: ctrl.signal, + sseMaxRetryAttempts: 0, + }) + + if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) { + // Start syncing workspaces, it's important to do this after + // we've started listening to events + await sdk.sync.start().catch(() => {}) + } for await (const event of events.stream) { if (ctrl.signal.aborted) break @@ -84,6 +98,12 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({ if (timer) clearTimeout(timer) if (queue.length > 0) flush() + attempt += 1 + if (abort.signal.aborted || ctrl.signal.aborted) break + + // Exponential backoff + const backoff = Math.min(retryDelay * 2 ** (attempt - 1), maxRetryDelay) + await new Promise((resolve) => setTimeout(resolve, backoff)) } })().catch(() => {}) } @@ -92,6 +112,12 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({ if (props.events) { const unsub = await props.events.subscribe(handleEvent) onCleanup(unsub) + + if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) { + // Start syncing workspaces, it's important to do this after + // we've started listening to events + await sdk.sync.start().catch(() => {}) + } } else { startSSE() } diff --git a/packages/opencode/src/control-plane/workspace.ts b/packages/opencode/src/control-plane/workspace.ts index fd22d3af04..d678ad7526 100644 --- a/packages/opencode/src/control-plane/workspace.ts +++ b/packages/opencode/src/control-plane/workspace.ts @@ -7,7 +7,7 @@ import { BusEvent } from "@/bus/bus-event" import { GlobalBus } from "@/bus/global" import { Auth } from "@/auth" import { SyncEvent } from "@/sync" -import { EventTable } from "@/sync/event.sql" +import { EventSequenceTable, EventTable } from "@/sync/event.sql" import { Flag } from "@/flag/flag" import { Log } from "@/util" import { Filesystem } from "@/util" @@ -23,8 +23,8 @@ import { SessionTable } from "@/session/session.sql" import { SessionID } from "@/session/schema" import { errorData } from "@/util/error" import { AppRuntime } from "@/effect/app-runtime" -import { EventSequenceTable } from "@/sync/event.sql" import { waitEvent } from "./util" +import { WorkspaceContext } from "./workspace-context" export const Info = WorkspaceInfo.meta({ ref: "Workspace", @@ -297,22 +297,13 @@ export function list(project: Project.Info) { db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(), ) const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id)) - - for (const space of spaces) startSync(space) return spaces } -function lookup(id: WorkspaceID) { +export const get = fn(WorkspaceID.zod, async (id) => { const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get()) if (!row) return return fromRow(row) -} - -export const get = fn(WorkspaceID.zod, async (id) => { - const space = lookup(id) - if (!space) return - startSync(space) - return space }) export const remove = fn(WorkspaceID.zod, async (id) => { @@ -437,6 +428,70 @@ async function connectSSE(url: URL | string, headers: HeadersInit | undefined, s return res.body } +async function syncHistory(space: Info, url: URL | string, headers: HeadersInit | undefined, signal: AbortSignal) { + const sessionIDs = Database.use((db) => + db + .select({ id: SessionTable.id }) + .from(SessionTable) + .where(eq(SessionTable.workspace_id, space.id)) + .all() + .map((row) => row.id), + ) + const state = sessionIDs.length + ? Object.fromEntries( + Database.use((db) => + db.select().from(EventSequenceTable).where(inArray(EventSequenceTable.aggregate_id, sessionIDs)).all(), + ).map((row) => [row.aggregate_id, row.seq]), + ) + : {} + + log.info("syncing workspace history", { + workspaceID: space.id, + sessions: sessionIDs.length, + known: Object.keys(state).length, + }) + + const requestHeaders = new Headers(headers) + requestHeaders.set("content-type", "application/json") + + const res = await fetch(route(url, "/sync/history"), { + method: "POST", + headers: requestHeaders, + body: JSON.stringify(state), + signal, + }) + + if (!res.ok) { + const body = await res.text() + throw new Error(`Workspace history HTTP failure: ${res.status} ${body}`) + } + + const events = await res.json() + + return WorkspaceContext.provide({ + workspaceID: space.id, + fn: () => { + for (const event of events) { + SyncEvent.replay( + { + id: event.id, + aggregateID: event.aggregate_id, + seq: event.seq, + type: event.type, + data: event.data, + }, + { publish: true }, + ) + } + }, + }) + + log.info("workspace history synced", { + workspaceID: space.id, + events: events.length, + }) +} + async function syncWorkspaceLoop(space: Info, signal: AbortSignal) { const adaptor = await getAdaptor(space.projectID, space.type) const target = await adaptor.target(space) @@ -452,7 +507,9 @@ async function syncWorkspaceLoop(space: Info, signal: AbortSignal) { let stream try { stream = await connectSSE(target.url, target.headers, signal) + await syncHistory(space, target.url, target.headers, signal) } catch (err) { + stream = null setStatus(space.id, "error") log.info("failed to connect to global sync", { workspace: space.name, @@ -469,6 +526,7 @@ async function syncWorkspaceLoop(space: Info, signal: AbortSignal) { await parseSSE(stream, signal, (evt: any) => { try { if (!("payload" in evt)) return + if (evt.payload.type === "server.heartbeat") return if (evt.payload.type === "sync") { SyncEvent.replay(evt.payload.syncEvent as SyncEvent.SerializedEvent) @@ -536,4 +594,19 @@ function stopSync(id: WorkspaceID) { connections.delete(id) } +export function startWorkspaceSyncing(projectID: ProjectID) { + const spaces = Database.use((db) => + db + .select({ workspace: WorkspaceTable }) + .from(WorkspaceTable) + .innerJoin(SessionTable, eq(SessionTable.workspace_id, WorkspaceTable.id)) + .where(eq(WorkspaceTable.project_id, projectID)) + .all(), + ) + + for (const row of new Map(spaces.map((row) => [row.workspace.id, row.workspace])).values()) { + void startSync(fromRow(row)) + } +} + export * as Workspace from "./workspace" diff --git a/packages/opencode/src/server/proxy.ts b/packages/opencode/src/server/proxy.ts index 9c1fd1f288..19a623cb0c 100644 --- a/packages/opencode/src/server/proxy.ts +++ b/packages/opencode/src/server/proxy.ts @@ -130,13 +130,6 @@ export async function http(url: string | URL, extra: HeadersInit | undefined, re const done = sync ? Fence.wait(workspaceID, sync, req.signal) : Promise.resolve() return done.then(async () => { - console.log("proxy http response", { - method: req.method, - request: req.url, - url: String(url), - status: res.status, - statusText: res.statusText, - }) return new Response(res.body, { status: res.status, statusText: res.statusText, diff --git a/packages/opencode/src/server/routes/instance/sync.ts b/packages/opencode/src/server/routes/instance/sync.ts index c6a067997b..b124cd875d 100644 --- a/packages/opencode/src/server/routes/instance/sync.ts +++ b/packages/opencode/src/server/routes/instance/sync.ts @@ -6,6 +6,8 @@ import { Database, asc, and, not, or, lte, eq } from "@/storage" import { EventTable } from "@/sync/event.sql" import { lazy } from "@/util/lazy" import { Log } from "@/util" +import { startWorkspaceSyncing } from "@/control-plane/workspace" +import { Instance } from "@/project/instance" import { errors } from "../../error" const ReplayEvent = z.object({ @@ -20,6 +22,28 @@ const log = Log.create({ service: "server.sync" }) export const SyncRoutes = lazy(() => new Hono() + .post( + "/start", + describeRoute({ + summary: "Start workspace sync", + description: "Start sync loops for workspaces in the current project that have active sessions.", + operationId: "sync.start", + responses: { + 200: { + description: "Workspace sync started", + content: { + "application/json": { + schema: resolver(z.boolean()), + }, + }, + }, + }, + }), + async (c) => { + startWorkspaceSyncing(Instance.project.id) + return c.json(true) + }, + ) .post( "/replay", describeRoute({ @@ -75,7 +99,7 @@ export const SyncRoutes = lazy(() => }) }, ) - .get( + .post( "/history", describeRoute({ summary: "List sync events", diff --git a/packages/opencode/test/workspace/workspace-restore.test.ts b/packages/opencode/test/workspace/workspace-restore.test.ts index 429eeaf9dd..ad6ac2c5fd 100644 --- a/packages/opencode/test/workspace/workspace-restore.test.ts +++ b/packages/opencode/test/workspace/workspace-restore.test.ts @@ -141,9 +141,12 @@ describe("Workspace.sessionRestore", () => { Object.assign( async (input: URL | RequestInfo, init?: BunFetchRequestInit | RequestInit) => { const url = new URL(typeof input === "string" || input instanceof URL ? input : input.url) - if (url.pathname !== "/base/sync/replay") { + if (url.pathname === "/base/global/event") { return eventStreamResponse() } + if (url.pathname === "/base/sync/history") { + return Response.json([]) + } const body = JSON.parse(String(init?.body)) posts.push({ path: url.pathname, diff --git a/packages/sdk/js/src/v2/gen/sdk.gen.ts b/packages/sdk/js/src/v2/gen/sdk.gen.ts index f484147a40..6248eb8e4d 100644 --- a/packages/sdk/js/src/v2/gen/sdk.gen.ts +++ b/packages/sdk/js/src/v2/gen/sdk.gen.ts @@ -163,6 +163,7 @@ import type { SyncHistoryListResponses, SyncReplayErrors, SyncReplayResponses, + SyncStartResponses, TextPartInput, ToolIdsErrors, ToolIdsResponses, @@ -3038,7 +3039,7 @@ export class History extends HeyApiClient { }, ], ) - return (options?.client ?? this.client).get({ + return (options?.client ?? this.client).post({ url: "/sync/history", ...options, ...params, @@ -3052,6 +3053,36 @@ export class History extends HeyApiClient { } export class Sync extends HeyApiClient { + /** + * Start workspace sync + * + * Start sync loops for workspaces in the current project that have active sessions. + */ + public start( + parameters?: { + directory?: string + workspace?: string + }, + options?: Options, + ) { + const params = buildClientParams( + [parameters], + [ + { + args: [ + { in: "query", key: "directory" }, + { in: "query", key: "workspace" }, + ], + }, + ], + ) + return (options?.client ?? this.client).post({ + url: "/sync/start", + ...options, + ...params, + }) + } + /** * Replay sync events * diff --git a/packages/sdk/js/src/v2/gen/types.gen.ts b/packages/sdk/js/src/v2/gen/types.gen.ts index 460c2bcdfa..5698cba54f 100644 --- a/packages/sdk/js/src/v2/gen/types.gen.ts +++ b/packages/sdk/js/src/v2/gen/types.gen.ts @@ -4502,6 +4502,25 @@ export type ProviderOauthCallbackResponses = { export type ProviderOauthCallbackResponse = ProviderOauthCallbackResponses[keyof ProviderOauthCallbackResponses] +export type SyncStartData = { + body?: never + path?: never + query?: { + directory?: string + workspace?: string + } + url: "/sync/start" +} + +export type SyncStartResponses = { + /** + * Workspace sync started + */ + 200: boolean +} + +export type SyncStartResponse = SyncStartResponses[keyof SyncStartResponses] + export type SyncReplayData = { body?: { directory: string diff --git a/packages/sdk/openapi.json b/packages/sdk/openapi.json index 7bdf025bbe..3b811f2fa9 100644 --- a/packages/sdk/openapi.json +++ b/packages/sdk/openapi.json @@ -5224,6 +5224,47 @@ ] } }, + "/sync/start": { + "post": { + "operationId": "sync.start", + "parameters": [ + { + "in": "query", + "name": "directory", + "schema": { + "type": "string" + } + }, + { + "in": "query", + "name": "workspace", + "schema": { + "type": "string" + } + } + ], + "summary": "Start workspace sync", + "description": "Start sync loops for workspaces in the current project that have active sessions.", + "responses": { + "200": { + "description": "Workspace sync started", + "content": { + "application/json": { + "schema": { + "type": "boolean" + } + } + } + } + }, + "x-codeSamples": [ + { + "lang": "js", + "source": "import { createOpencodeClient } from \"@opencode-ai/sdk\n\nconst client = createOpencodeClient()\nawait client.sync.start({\n ...\n})" + } + ] + } + }, "/sync/replay": { "post": { "operationId": "sync.replay", @@ -5328,7 +5369,7 @@ } }, "/sync/history": { - "get": { + "post": { "operationId": "sync.history.list", "parameters": [ {