feat(httpapi): bridge sync routes (#24484)

This commit is contained in:
Kit Langton 2026-04-26 11:31:46 -04:00 committed by GitHub
parent d03e6cedde
commit da61b0290a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 246 additions and 25 deletions

View file

@ -170,23 +170,23 @@ Use raw Effect HTTP routes where `HttpApi` does not fit. The goal is deleting Ho
## Current Route Status
| Area | Status | Notes |
| ------------------------- | ----------------- | -------------------------------------------------------------------------- |
| `question` | `bridged` | `GET /question`, reply, reject |
| `permission` | `bridged` | list and reply |
| `provider` | `bridged` | list, auth, OAuth authorize/callback |
| `config` | `bridged` | read, providers, update |
| `project` | `bridged` | list, current, git init, update |
| `file` | `bridged` partial | find text/file/symbol, list/content/status |
| `mcp` | `bridged` | status, add, OAuth, connect/disconnect |
| `workspace` | `bridged` | adaptor/list/status/create/remove/session-restore |
| top-level instance routes | `bridged` | path, vcs, command, agent, skill, lsp, formatter, dispose |
| experimental JSON routes | `bridged` | console, tool, worktree list/mutations, global session list, resource list |
| `session` | `later/special` | large stateful surface plus streaming |
| `sync` | `later` | process/control side effects |
| `event` | `special` | SSE |
| `pty` | `special` | websocket |
| `tui` | `special` | UI bridge |
| Area | Status | Notes |
| ------------------------- | ----------------- | ---------------------------------------------------------------------------------------- |
| `question` | `bridged` | `GET /question`, reply, reject |
| `permission` | `bridged` | list and reply |
| `provider` | `bridged` | list, auth, OAuth authorize/callback |
| `config` | `bridged` | read, providers, update |
| `project` | `bridged` | list, current, git init, update |
| `file` | `bridged` partial | find text/file/symbol, list/content/status |
| `mcp` | `bridged` | status, add, OAuth, connect/disconnect |
| `workspace` | `bridged` | adaptor/list/status/create/remove/session-restore |
| top-level instance routes | `bridged` | path, vcs, command, agent, skill, lsp, formatter, dispose |
| experimental JSON routes | `bridged` | console, tool, worktree list/mutations, global session list, resource list |
| `session` | `later/special` | large stateful surface plus streaming |
| `sync` | `bridged` | start/replay/history |
| `event` | `special` | SSE |
| `pty` | `special` | websocket |
| `tui` | `special` | UI bridge |
## Full Route Checklist
@ -280,9 +280,9 @@ This checklist tracks bridge parity only. Checked routes are available through t
### Sync Routes
- [ ] `POST /sync/start` - start workspace sync.
- [ ] `POST /sync/replay` - replay sync events.
- [ ] `POST /sync/history` - list sync event history.
- [x] `POST /sync/start` - start workspace sync.
- [x] `POST /sync/replay` - replay sync events.
- [x] `POST /sync/history` - list sync event history.
### Session Routes
@ -353,7 +353,7 @@ Prefer smaller PRs from here so route behavior and SDK/OpenAPI fallout stays rev
4. [x] Bridge experimental console switch and tool list routes.
5. [x] Bridge experimental global session list.
6. [x] Bridge workspace create/remove/session-restore routes.
7. [ ] Bridge sync start/replay/history routes.
7. [x] Bridge sync start/replay/history routes.
8. [ ] Bridge session read routes: list, status, get, children, todo, diff, messages.
9. [ ] Bridge session lifecycle mutation routes: create, delete, update, fork, abort.
10. [ ] Bridge session share/summary/message/part mutation routes.

View file

@ -139,9 +139,10 @@ export const mcpHandlers = Layer.unwrap(
})
const add = Effect.fn("McpHttpApi.add")(function* (ctx: { payload: typeof AddPayload.Type }) {
const payload = Schema.decodeUnknownSync(AddPayload)(ctx.payload)
const result = (yield* mcp.add(payload.name, payload.config)).status
return Schema.decodeUnknownSync(StatusMap)("status" in result ? { [payload.name]: result } : result)
const result = (yield* mcp.add(ctx.payload.name, ctx.payload.config)).status
return yield* Schema.decodeUnknownEffect(StatusMap)("status" in result ? { [ctx.payload.name]: result } : result).pipe(
Effect.mapError(() => new HttpApiError.BadRequest({})),
)
})
const authStart = Effect.fn("McpHttpApi.authStart")(function* (ctx: { params: { name: string } }) {

View file

@ -18,6 +18,7 @@ import { PermissionApi, permissionHandlers } from "./permission"
import { ProjectApi, projectHandlers } from "./project"
import { ProviderApi, providerHandlers } from "./provider"
import { QuestionApi, questionHandlers } from "./question"
import { SyncApi, syncHandlers } from "./sync"
import { WorkspaceApi, workspaceHandlers } from "./workspace"
import { disposeMiddleware } from "./lifecycle"
import { memoMap } from "@opencode-ai/core/effect/memo-map"
@ -73,6 +74,7 @@ export const routes = Layer.mergeAll(
HttpApiBuilder.layer(QuestionApi).pipe(Layer.provide(questionHandlers)),
HttpApiBuilder.layer(PermissionApi).pipe(Layer.provide(permissionHandlers)),
HttpApiBuilder.layer(ProviderApi).pipe(Layer.provide(providerHandlers)),
HttpApiBuilder.layer(SyncApi).pipe(Layer.provide(syncHandlers)),
HttpApiBuilder.layer(WorkspaceApi).pipe(Layer.provide(workspaceHandlers)),
).pipe(
Layer.provide(authorizationLayer),

View file

@ -0,0 +1,130 @@
import { startWorkspaceSyncing } from "@/control-plane/workspace"
import * as InstanceState from "@/effect/instance-state"
import { Database, asc, and, eq, lte, not, or } from "@/storage"
import { SyncEvent } from "@/sync"
import { EventTable } from "@/sync/event.sql"
import { Effect, Layer, Schema } from "effect"
import { HttpApi, HttpApiBuilder, HttpApiEndpoint, HttpApiGroup, OpenApi } from "effect/unstable/httpapi"
import { Authorization } from "./auth"
const root = "/sync"
const ReplayEvent = Schema.Struct({
id: Schema.String,
aggregateID: Schema.String,
seq: Schema.Number,
type: Schema.String,
data: Schema.Record(Schema.String, Schema.Unknown),
}).annotate({ identifier: "SyncReplayEvent" })
const ReplayPayload = Schema.Struct({
directory: Schema.String,
events: Schema.NonEmptyArray(ReplayEvent),
}).annotate({ identifier: "SyncReplayInput" })
const ReplayResponse = Schema.Struct({
sessionID: Schema.String,
}).annotate({ identifier: "SyncReplayResponse" })
const HistoryPayload = Schema.Record(Schema.String, Schema.Number)
const HistoryEvent = Schema.Struct({
id: Schema.String,
aggregate_id: Schema.String,
seq: Schema.Number,
type: Schema.String,
data: Schema.Record(Schema.String, Schema.Unknown),
}).annotate({ identifier: "SyncHistoryEvent" })
export const SyncPaths = {
start: `${root}/start`,
replay: `${root}/replay`,
history: `${root}/history`,
} as const
export const SyncApi = HttpApi.make("sync")
.add(
HttpApiGroup.make("sync")
.add(
HttpApiEndpoint.post("start", SyncPaths.start, {
success: Schema.Boolean,
}).annotateMerge(
OpenApi.annotations({
identifier: "sync.start",
summary: "Start workspace sync",
description: "Start sync loops for workspaces in the current project that have active sessions.",
}),
),
HttpApiEndpoint.post("replay", SyncPaths.replay, {
payload: ReplayPayload,
success: ReplayResponse,
}).annotateMerge(
OpenApi.annotations({
identifier: "sync.replay",
summary: "Replay sync events",
description: "Validate and replay a complete sync event history.",
}),
),
HttpApiEndpoint.post("history", SyncPaths.history, {
payload: HistoryPayload,
success: Schema.Array(HistoryEvent),
}).annotateMerge(
OpenApi.annotations({
identifier: "sync.history.list",
summary: "List sync events",
description:
"List sync events for all aggregates. Keys are aggregate IDs the client already knows about, values are the last known sequence ID. Events with seq > value are returned for those aggregates. Aggregates not listed in the input get their full history.",
}),
),
)
.annotateMerge(
OpenApi.annotations({
title: "sync",
description: "Experimental HttpApi sync routes.",
}),
)
.middleware(Authorization),
)
.annotateMerge(
OpenApi.annotations({
title: "opencode experimental HttpApi",
version: "0.0.1",
description: "Experimental HttpApi surface for selected instance routes.",
}),
)
export const syncHandlers = Layer.unwrap(
Effect.gen(function* () {
const start = Effect.fn("SyncHttpApi.start")(function* () {
startWorkspaceSyncing((yield* InstanceState.context).project.id)
return true
})
const replay = Effect.fn("SyncHttpApi.replay")(function* (ctx: { payload: typeof ReplayPayload.Type }) {
const events: SyncEvent.SerializedEvent[] = ctx.payload.events.map((event) => ({
id: event.id,
aggregateID: event.aggregateID,
seq: event.seq,
type: event.type,
data: { ...event.data },
}))
SyncEvent.replayAll(events)
return { sessionID: events[0].aggregateID }
})
const history = Effect.fn("SyncHttpApi.history")(function* (ctx: { payload: typeof HistoryPayload.Type }) {
const exclude = Object.entries(ctx.payload)
return Database.use((db) =>
db
.select()
.from(EventTable)
.where(
exclude.length > 0
? not(or(...exclude.map(([id, seq]) => and(eq(EventTable.aggregate_id, id), lte(EventTable.seq, seq))))!)
: undefined,
)
.orderBy(asc(EventTable.seq))
.all(),
)
})
return HttpApiBuilder.group(SyncApi, "sync", (handlers) =>
handlers.handle("start", start).handle("replay", replay).handle("history", history),
)
}),
)

View file

@ -123,7 +123,7 @@ export const workspaceHandlers = Layer.unwrap(
return yield* Effect.promise(() =>
Instance.restore(instance, () =>
Workspace.create({
...Schema.decodeUnknownSync(CreatePayload)(ctx.payload),
...ctx.payload,
projectID: instance.project.id,
}),
),

View file

@ -20,6 +20,7 @@ import { ExperimentalPaths } from "./httpapi/experimental"
import { FilePaths } from "./httpapi/file"
import { InstancePaths } from "./httpapi/instance"
import { McpPaths } from "./httpapi/mcp"
import { SyncPaths } from "./httpapi/sync"
import { ProjectRoutes } from "./project"
import { SessionRoutes } from "./session"
import { PtyRoutes } from "./pty"
@ -89,6 +90,9 @@ export const InstanceRoutes = (upgrade: UpgradeWebSocket): Hono => {
app.delete(McpPaths.auth, (c) => handler(c.req.raw, context))
app.post(McpPaths.connect, (c) => handler(c.req.raw, context))
app.post(McpPaths.disconnect, (c) => handler(c.req.raw, context))
app.post(SyncPaths.start, (c) => handler(c.req.raw, context))
app.post(SyncPaths.replay, (c) => handler(c.req.raw, context))
app.post(SyncPaths.history, (c) => handler(c.req.raw, context))
}
return app

View file

@ -0,0 +1,84 @@
import { afterEach, describe, expect, test } from "bun:test"
import type { UpgradeWebSocket } from "hono/ws"
import { Effect } from "effect"
import { Flag } from "@opencode-ai/core/flag/flag"
import { Instance } from "../../src/project/instance"
import { InstanceRoutes } from "../../src/server/routes/instance"
import { SyncPaths } from "../../src/server/routes/instance/httpapi/sync"
import { Session } from "../../src/session"
import { Log } from "../../src/util"
import { resetDatabase } from "../fixture/db"
import { tmpdir } from "../fixture/fixture"
void Log.init({ print: false })
const originalHttpApi = Flag.OPENCODE_EXPERIMENTAL_HTTPAPI
const originalWorkspaces = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES
const websocket = (() => () => new Response(null, { status: 501 })) as unknown as UpgradeWebSocket
function app() {
Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = true
return InstanceRoutes(websocket)
}
function runSession<A, E>(fx: Effect.Effect<A, E, Session.Service>) {
return Effect.runPromise(fx.pipe(Effect.provide(Session.defaultLayer)))
}
afterEach(async () => {
Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = originalHttpApi
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = originalWorkspaces
await Instance.disposeAll()
await resetDatabase()
})
describe("sync HttpApi", () => {
test("serves sync routes through Hono bridge", async () => {
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
const headers = { "x-opencode-directory": tmp.path, "content-type": "application/json" }
const session = await Instance.provide({
directory: tmp.path,
fn: async () => runSession(Session.Service.use((svc) => svc.create({ title: "sync" }))),
})
const started = await app().request(SyncPaths.start, { method: "POST", headers })
expect(started.status).toBe(200)
expect(await started.json()).toBe(true)
const history = await app().request(SyncPaths.history, {
method: "POST",
headers,
body: JSON.stringify({}),
})
expect(history.status).toBe(200)
const rows = (await history.json()) as Array<{
id: string
aggregate_id: string
seq: number
type: string
data: Record<string, unknown>
}>
expect(rows.map((row) => row.aggregate_id)).toContain(session.id)
const replayed = await app().request(SyncPaths.replay, {
method: "POST",
headers,
body: JSON.stringify({
directory: tmp.path,
events: rows
.filter((row) => row.aggregate_id === session.id)
.map((row) => ({
id: row.id,
aggregateID: row.aggregate_id,
seq: row.seq,
type: row.type,
data: row.data,
})),
}),
})
expect(replayed.status).toBe(200)
expect(await replayed.json()).toEqual({ sessionID: session.id })
})
})