feat(httpapi): bridge event stream (#24518)

This commit is contained in:
Kit Langton 2026-04-26 19:55:13 -04:00 committed by GitHub
parent e9071b0a80
commit 58244eb687
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 104 additions and 2 deletions

View file

@ -184,7 +184,7 @@ Use raw Effect HTTP routes where `HttpApi` does not fit. The goal is deleting Ho
| experimental JSON routes | `bridged` | console, tool, worktree list/mutations, global session list, resource list |
| `session` | `bridged` | read, lifecycle, prompt, message/part mutations, revert, permission reply |
| `sync` | `bridged` | start/replay/history |
| `event` | `special` | SSE |
| `event` | `bridged` | SSE via raw Effect HTTP |
| `pty` | `special` | websocket |
| `tui` | `special` | UI bridge |
@ -316,7 +316,7 @@ This checklist tracks bridge parity only. Checked routes are available through t
### Event Routes
- [ ] `GET /event` - SSE event stream; replace with raw Effect HTTP, not `HttpApi`.
- [x] `GET /event` - SSE event stream via raw Effect HTTP.
### PTY Routes

View file

@ -0,0 +1,46 @@
import { Bus } from "@/bus"
import { Log } from "@/util"
import { Effect } from "effect"
import * as Stream from "effect/Stream"
import { HttpRouter, HttpServerResponse } from "effect/unstable/http"
const log = Log.create({ service: "server" })
export const EventPaths = {
event: "/event",
} as const
function eventData(data: unknown) {
return `data: ${JSON.stringify(data)}\n\n`
}
export const eventRoute = HttpRouter.add(
"GET",
EventPaths.event,
Effect.gen(function* () {
const bus = yield* Bus.Service
const events = bus.subscribeAll().pipe(Stream.takeUntil((event) => event.type === Bus.InstanceDisposed.type))
const heartbeat = Stream.tick("10 seconds").pipe(
Stream.drop(1),
Stream.map(() => ({ type: "server.heartbeat", properties: {} })),
)
log.info("event connected")
return HttpServerResponse.stream(
Stream.make({ type: "server.connected", properties: {} }).pipe(
Stream.concat(events.pipe(Stream.merge(heartbeat, { haltStrategy: "left" }))),
Stream.map(eventData),
Stream.encodeText,
Stream.ensuring(Effect.sync(() => log.info("event disconnected"))),
),
{
contentType: "text/event-stream",
headers: {
"Cache-Control": "no-cache, no-transform",
"X-Accel-Buffering": "no",
"X-Content-Type-Options": "nosniff",
},
},
)
}).pipe(Effect.provide(Bus.layer)),
)

View file

@ -10,6 +10,7 @@ import { lazy } from "@/util/lazy"
import { Filesystem } from "@/util"
import { authorizationLayer } from "./auth"
import { ConfigApi, configHandlers } from "./config"
import { eventRoute } from "./event"
import { FileApi, fileHandlers } from "./file"
import { ExperimentalApi, experimentalHandlers } from "./experimental"
import { InstanceApi, instanceHandlers } from "./instance"
@ -66,6 +67,7 @@ const instance = HttpRouter.middleware()(
).layer
export const routes = Layer.mergeAll(
eventRoute,
HttpApiBuilder.layer(ConfigApi).pipe(Layer.provide(configHandlers)),
HttpApiBuilder.layer(ExperimentalApi).pipe(Layer.provide(experimentalHandlers)),
HttpApiBuilder.layer(FileApi).pipe(Layer.provide(fileHandlers)),

View file

@ -16,6 +16,7 @@ import { QuestionRoutes } from "./question"
import { PermissionRoutes } from "./permission"
import { Flag } from "@opencode-ai/core/flag/flag"
import { ExperimentalHttpApiServer } from "./httpapi/server"
import { EventPaths } from "./httpapi/event"
import { ExperimentalPaths } from "./httpapi/experimental"
import { FilePaths } from "./httpapi/file"
import { InstancePaths } from "./httpapi/instance"
@ -41,6 +42,7 @@ export const InstanceRoutes = (upgrade: UpgradeWebSocket): Hono => {
if (Flag.OPENCODE_EXPERIMENTAL_HTTPAPI) {
const handler = ExperimentalHttpApiServer.webHandler().handler
const context = Context.empty() as Context.Context<unknown>
app.get(EventPaths.event, (c) => handler(c.req.raw, context))
app.get("/question", (c) => handler(c.req.raw, context))
app.post("/question/:requestID/reply", (c) => handler(c.req.raw, context))
app.post("/question/:requestID/reject", (c) => handler(c.req.raw, context))

View file

@ -0,0 +1,52 @@
import { afterEach, describe, expect, test } from "bun:test"
import type { UpgradeWebSocket } from "hono/ws"
import { Flag } from "@opencode-ai/core/flag/flag"
import { Instance } from "../../src/project/instance"
import { InstanceRoutes } from "../../src/server/routes/instance"
import { EventPaths } from "../../src/server/routes/instance/httpapi/event"
import { Log } from "../../src/util"
import { resetDatabase } from "../fixture/db"
import { tmpdir } from "../fixture/fixture"
void Log.init({ print: false })
const original = Flag.OPENCODE_EXPERIMENTAL_HTTPAPI
const websocket = (() => () => new Response(null, { status: 501 })) as unknown as UpgradeWebSocket
function app() {
Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = true
return InstanceRoutes(websocket)
}
async function readFirstChunk(response: Response) {
if (!response.body) throw new Error("missing response body")
const reader = response.body.getReader()
const result = await Promise.race([
reader.read(),
new Promise<never>((_, reject) => setTimeout(() => reject(new Error("timed out waiting for event")), 5_000)),
])
await reader.cancel()
return new TextDecoder().decode(result.value)
}
afterEach(async () => {
Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = original
await Instance.disposeAll()
await resetDatabase()
})
describe("event HttpApi bridge", () => {
test("serves event stream through experimental Effect route", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } })
expect(response.status).toBe(200)
expect(response.headers.get("content-type")).toContain("text/event-stream")
expect(response.headers.get("cache-control")).toBe("no-cache, no-transform")
expect(response.headers.get("x-accel-buffering")).toBe("no")
expect(response.headers.get("x-content-type-options")).toBe("nosniff")
expect(await readFirstChunk(response)).toContain(
'data: {"type":"server.connected","properties":{}}\n\n',
)
})
})