From 96f4da1e1d060df9a07ad9ec89ab7d5e27905e14 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Thu, 30 Apr 2026 20:02:46 -0400 Subject: [PATCH] Serve instance events through HttpApiBuilder (#25182) --- .../server/routes/instance/httpapi/event.ts | 68 +++++++++---------- .../server/routes/instance/httpapi/server.ts | 21 +++--- 2 files changed, 46 insertions(+), 43 deletions(-) diff --git a/packages/opencode/src/server/routes/instance/httpapi/event.ts b/packages/opencode/src/server/routes/instance/httpapi/event.ts index 7d14480c32..f13e3251bf 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/event.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/event.ts @@ -2,8 +2,8 @@ import { Bus } from "@/bus" import * as Log from "@opencode-ai/core/util/log" import { Effect, Schema } from "effect" import * as Stream from "effect/Stream" -import { HttpRouter, HttpServerResponse } from "effect/unstable/http" -import { HttpApi, HttpApiEndpoint, HttpApiGroup, OpenApi } from "effect/unstable/httpapi" +import { HttpServerResponse } from "effect/unstable/http" +import { HttpApi, HttpApiBuilder, HttpApiEndpoint, HttpApiGroup, HttpApiSchema, OpenApi } from "effect/unstable/httpapi" import * as Sse from "effect/unstable/encoding/Sse" const log = Log.create({ service: "server" }) @@ -16,7 +16,7 @@ export const EventApi = HttpApi.make("event").add( HttpApiGroup.make("event") .add( HttpApiEndpoint.get("subscribe", EventPaths.event, { - success: Schema.Unknown, + success: Schema.String.pipe(HttpApiSchema.asText({ contentType: "text/event-stream" })), }).annotateMerge( OpenApi.annotations({ identifier: "event.subscribe", @@ -37,38 +37,38 @@ function eventData(data: unknown): Sse.Event { } } -export const eventRoute = HttpRouter.use((router) => +function eventResponse(bus: Bus.Interface) { + const events = bus.subscribeAll().pipe(Stream.takeUntil((event) => event.type === Bus.InstanceDisposed.type)) + const heartbeat = Stream.tick("10 seconds").pipe( + Stream.drop(1), + Stream.map(() => ({ type: "server.heartbeat", properties: {} })), + ) + + log.info("event connected") + return HttpServerResponse.stream( + Stream.make({ type: "server.connected", properties: {} }).pipe( + Stream.concat(events.pipe(Stream.merge(heartbeat, { haltStrategy: "left" }))), + Stream.map(eventData), + Stream.pipeThroughChannel(Sse.encode()), + Stream.encodeText, + Stream.ensuring(Effect.sync(() => log.info("event disconnected"))), + ), + { + contentType: "text/event-stream", + headers: { + "Cache-Control": "no-cache, no-transform", + "X-Accel-Buffering": "no", + "X-Content-Type-Options": "nosniff", + }, + }, + ) +} + +export const eventHandlers = HttpApiBuilder.group(EventApi, "event", (handlers) => Effect.gen(function* () { const bus = yield* Bus.Service - yield* router.add( - "GET", - EventPaths.event, - Effect.gen(function* () { - const events = bus.subscribeAll().pipe(Stream.takeUntil((event) => event.type === Bus.InstanceDisposed.type)) - const heartbeat = Stream.tick("10 seconds").pipe( - Stream.drop(1), - Stream.map(() => ({ type: "server.heartbeat", properties: {} })), - ) - - log.info("event connected") - return HttpServerResponse.stream( - Stream.make({ type: "server.connected", properties: {} }).pipe( - Stream.concat(events.pipe(Stream.merge(heartbeat, { haltStrategy: "left" }))), - Stream.map(eventData), - Stream.pipeThroughChannel(Sse.encode()), - Stream.encodeText, - Stream.ensuring(Effect.sync(() => log.info("event disconnected"))), - ), - { - contentType: "text/event-stream", - headers: { - "Cache-Control": "no-cache, no-transform", - "X-Accel-Buffering": "no", - "X-Content-Type-Options": "nosniff", - }, - }, - ) - }), - ) + return handlers.handleRaw("subscribe", Effect.fn("EventHttpApi.subscribe")(function* () { + return eventResponse(bus) + })) }), ) diff --git a/packages/opencode/src/server/routes/instance/httpapi/server.ts b/packages/opencode/src/server/routes/instance/httpapi/server.ts index 43671ff74f..dabda5aaaf 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/server.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/server.ts @@ -42,7 +42,7 @@ import { isAllowedCorsOrigin } from "@/server/cors" import { serveUIEffect } from "@/server/routes/ui" import { InstanceHttpApi, RootHttpApi } from "./api" import { ServerAuthConfig, authorizationLayer, authorizationRouterMiddleware } from "./middleware/authorization" -import { eventRoute } from "./event" +import { EventApi, eventHandlers } from "./event" import { configHandlers } from "./handlers/config" import { controlHandlers } from "./handlers/control" import { experimentalHandlers } from "./handlers/experimental" @@ -86,6 +86,14 @@ const cors = HttpRouter.middleware( ) const rootApiRoutes = HttpApiBuilder.layer(RootHttpApi).pipe(Layer.provide([controlHandlers, globalHandlers])) +const instanceRouterLayer = authorizationRouterMiddleware + .combine(instanceRouterMiddleware) + .combine(workspaceRouterMiddleware) + .layer.pipe(Layer.provide(Socket.layerWebSocketConstructorGlobal), Layer.provide(ServerAuthConfig.defaultLayer)) +const eventApiRoutes = HttpApiBuilder.layer(EventApi).pipe( + Layer.provide(eventHandlers), + Layer.provide(instanceRouterLayer), +) const instanceApiRoutes = HttpApiBuilder.layer(InstanceHttpApi).pipe( Layer.provide([ configHandlers, @@ -105,13 +113,8 @@ const instanceApiRoutes = HttpApiBuilder.layer(InstanceHttpApi).pipe( ]), ) -const rawInstanceRoutes = Layer.mergeAll(eventRoute, ptyConnectRoute).pipe( - Layer.provide( - authorizationRouterMiddleware - .combine(instanceRouterMiddleware) - .combine(workspaceRouterMiddleware) - .layer.pipe(Layer.provide(Socket.layerWebSocketConstructorGlobal), Layer.provide(ServerAuthConfig.defaultLayer)), - ), +const rawInstanceRoutes = Layer.mergeAll(ptyConnectRoute).pipe( + Layer.provide(instanceRouterLayer), ) const instanceRoutes = Layer.mergeAll(rawInstanceRoutes, instanceApiRoutes).pipe( Layer.provide([ @@ -129,7 +132,7 @@ const uiRoute = HttpRouter.use((router) => }), ).pipe(Layer.provide(authorizationRouterMiddleware.layer.pipe(Layer.provide(ServerAuthConfig.defaultLayer)))) -export const routes = Layer.mergeAll(rootApiRoutes, instanceRoutes, uiRoute).pipe( +export const routes = Layer.mergeAll(rootApiRoutes, eventApiRoutes, instanceRoutes, uiRoute).pipe( Layer.provide([ cors, runtime,