diff --git a/packages/opencode/specs/effect/http-api.md b/packages/opencode/specs/effect/http-api.md index 5f16ef197e..6536ac947c 100644 --- a/packages/opencode/specs/effect/http-api.md +++ b/packages/opencode/specs/effect/http-api.md @@ -184,7 +184,7 @@ Use raw Effect HTTP routes where `HttpApi` does not fit. The goal is deleting Ho | experimental JSON routes | `bridged` | console, tool, worktree list/mutations, global session list, resource list | | `session` | `bridged` | read, lifecycle, prompt, message/part mutations, revert, permission reply | | `sync` | `bridged` | start/replay/history | -| `event` | `special` | SSE | +| `event` | `bridged` | SSE via raw Effect HTTP | | `pty` | `special` | websocket | | `tui` | `special` | UI bridge | @@ -316,7 +316,7 @@ This checklist tracks bridge parity only. Checked routes are available through t ### Event Routes -- [ ] `GET /event` - SSE event stream; replace with raw Effect HTTP, not `HttpApi`. +- [x] `GET /event` - SSE event stream via raw Effect HTTP. ### PTY Routes diff --git a/packages/opencode/src/server/routes/instance/httpapi/event.ts b/packages/opencode/src/server/routes/instance/httpapi/event.ts new file mode 100644 index 0000000000..dc5fa9c530 --- /dev/null +++ b/packages/opencode/src/server/routes/instance/httpapi/event.ts @@ -0,0 +1,46 @@ +import { Bus } from "@/bus" +import { Log } from "@/util" +import { Effect } from "effect" +import * as Stream from "effect/Stream" +import { HttpRouter, HttpServerResponse } from "effect/unstable/http" + +const log = Log.create({ service: "server" }) + +export const EventPaths = { + event: "/event", +} as const + +function eventData(data: unknown) { + return `data: ${JSON.stringify(data)}\n\n` +} + +export const eventRoute = HttpRouter.add( + "GET", + EventPaths.event, + Effect.gen(function* () { + const bus = yield* Bus.Service + const events = bus.subscribeAll().pipe(Stream.takeUntil((event) => event.type === Bus.InstanceDisposed.type)) + const heartbeat = Stream.tick("10 seconds").pipe( + Stream.drop(1), + Stream.map(() => ({ type: "server.heartbeat", properties: {} })), + ) + + log.info("event connected") + return HttpServerResponse.stream( + Stream.make({ type: "server.connected", properties: {} }).pipe( + Stream.concat(events.pipe(Stream.merge(heartbeat, { haltStrategy: "left" }))), + Stream.map(eventData), + Stream.encodeText, + Stream.ensuring(Effect.sync(() => log.info("event disconnected"))), + ), + { + contentType: "text/event-stream", + headers: { + "Cache-Control": "no-cache, no-transform", + "X-Accel-Buffering": "no", + "X-Content-Type-Options": "nosniff", + }, + }, + ) + }).pipe(Effect.provide(Bus.layer)), +) diff --git a/packages/opencode/src/server/routes/instance/httpapi/server.ts b/packages/opencode/src/server/routes/instance/httpapi/server.ts index adc70a43b3..1e8b23f55e 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/server.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/server.ts @@ -10,6 +10,7 @@ import { lazy } from "@/util/lazy" import { Filesystem } from "@/util" import { authorizationLayer } from "./auth" import { ConfigApi, configHandlers } from "./config" +import { eventRoute } from "./event" import { FileApi, fileHandlers } from "./file" import { ExperimentalApi, experimentalHandlers } from "./experimental" import { InstanceApi, instanceHandlers } from "./instance" @@ -66,6 +67,7 @@ const instance = HttpRouter.middleware()( ).layer export const routes = Layer.mergeAll( + eventRoute, HttpApiBuilder.layer(ConfigApi).pipe(Layer.provide(configHandlers)), HttpApiBuilder.layer(ExperimentalApi).pipe(Layer.provide(experimentalHandlers)), HttpApiBuilder.layer(FileApi).pipe(Layer.provide(fileHandlers)), diff --git a/packages/opencode/src/server/routes/instance/index.ts b/packages/opencode/src/server/routes/instance/index.ts index 4c0503af5a..c2e89c14e9 100644 --- a/packages/opencode/src/server/routes/instance/index.ts +++ b/packages/opencode/src/server/routes/instance/index.ts @@ -16,6 +16,7 @@ import { QuestionRoutes } from "./question" import { PermissionRoutes } from "./permission" import { Flag } from "@opencode-ai/core/flag/flag" import { ExperimentalHttpApiServer } from "./httpapi/server" +import { EventPaths } from "./httpapi/event" import { ExperimentalPaths } from "./httpapi/experimental" import { FilePaths } from "./httpapi/file" import { InstancePaths } from "./httpapi/instance" @@ -41,6 +42,7 @@ export const InstanceRoutes = (upgrade: UpgradeWebSocket): Hono => { if (Flag.OPENCODE_EXPERIMENTAL_HTTPAPI) { const handler = ExperimentalHttpApiServer.webHandler().handler const context = Context.empty() as Context.Context + app.get(EventPaths.event, (c) => handler(c.req.raw, context)) app.get("/question", (c) => handler(c.req.raw, context)) app.post("/question/:requestID/reply", (c) => handler(c.req.raw, context)) app.post("/question/:requestID/reject", (c) => handler(c.req.raw, context)) diff --git a/packages/opencode/test/server/httpapi-event.test.ts b/packages/opencode/test/server/httpapi-event.test.ts new file mode 100644 index 0000000000..42d2f80364 --- /dev/null +++ b/packages/opencode/test/server/httpapi-event.test.ts @@ -0,0 +1,52 @@ +import { afterEach, describe, expect, test } from "bun:test" +import type { UpgradeWebSocket } from "hono/ws" +import { Flag } from "@opencode-ai/core/flag/flag" +import { Instance } from "../../src/project/instance" +import { InstanceRoutes } from "../../src/server/routes/instance" +import { EventPaths } from "../../src/server/routes/instance/httpapi/event" +import { Log } from "../../src/util" +import { resetDatabase } from "../fixture/db" +import { tmpdir } from "../fixture/fixture" + +void Log.init({ print: false }) + +const original = Flag.OPENCODE_EXPERIMENTAL_HTTPAPI +const websocket = (() => () => new Response(null, { status: 501 })) as unknown as UpgradeWebSocket + +function app() { + Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = true + return InstanceRoutes(websocket) +} + +async function readFirstChunk(response: Response) { + if (!response.body) throw new Error("missing response body") + const reader = response.body.getReader() + const result = await Promise.race([ + reader.read(), + new Promise((_, reject) => setTimeout(() => reject(new Error("timed out waiting for event")), 5_000)), + ]) + await reader.cancel() + return new TextDecoder().decode(result.value) +} + +afterEach(async () => { + Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = original + await Instance.disposeAll() + await resetDatabase() +}) + +describe("event HttpApi bridge", () => { + test("serves event stream through experimental Effect route", async () => { + await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) + const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } }) + + expect(response.status).toBe(200) + expect(response.headers.get("content-type")).toContain("text/event-stream") + expect(response.headers.get("cache-control")).toBe("no-cache, no-transform") + expect(response.headers.get("x-accel-buffering")).toBe("no") + expect(response.headers.get("x-content-type-options")).toBe("nosniff") + expect(await readFirstChunk(response)).toContain( + 'data: {"type":"server.connected","properties":{}}\n\n', + ) + }) +})