Serve instance events through HttpApiBuilder (#25182)

This commit is contained in:
Kit Langton 2026-04-30 20:02:46 -04:00 committed by GitHub
parent 96a0dd6b04
commit 96f4da1e1d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 46 additions and 43 deletions

View file

@ -2,8 +2,8 @@ import { Bus } from "@/bus"
import * as Log from "@opencode-ai/core/util/log"
import { Effect, Schema } from "effect"
import * as Stream from "effect/Stream"
import { HttpRouter, HttpServerResponse } from "effect/unstable/http"
import { HttpApi, HttpApiEndpoint, HttpApiGroup, OpenApi } from "effect/unstable/httpapi"
import { HttpServerResponse } from "effect/unstable/http"
import { HttpApi, HttpApiBuilder, HttpApiEndpoint, HttpApiGroup, HttpApiSchema, OpenApi } from "effect/unstable/httpapi"
import * as Sse from "effect/unstable/encoding/Sse"
const log = Log.create({ service: "server" })
@ -16,7 +16,7 @@ export const EventApi = HttpApi.make("event").add(
HttpApiGroup.make("event")
.add(
HttpApiEndpoint.get("subscribe", EventPaths.event, {
success: Schema.Unknown,
success: Schema.String.pipe(HttpApiSchema.asText({ contentType: "text/event-stream" })),
}).annotateMerge(
OpenApi.annotations({
identifier: "event.subscribe",
@ -37,38 +37,38 @@ function eventData(data: unknown): Sse.Event {
}
}
export const eventRoute = HttpRouter.use((router) =>
function eventResponse(bus: Bus.Interface) {
const events = bus.subscribeAll().pipe(Stream.takeUntil((event) => event.type === Bus.InstanceDisposed.type))
const heartbeat = Stream.tick("10 seconds").pipe(
Stream.drop(1),
Stream.map(() => ({ type: "server.heartbeat", properties: {} })),
)
log.info("event connected")
return HttpServerResponse.stream(
Stream.make({ type: "server.connected", properties: {} }).pipe(
Stream.concat(events.pipe(Stream.merge(heartbeat, { haltStrategy: "left" }))),
Stream.map(eventData),
Stream.pipeThroughChannel(Sse.encode()),
Stream.encodeText,
Stream.ensuring(Effect.sync(() => log.info("event disconnected"))),
),
{
contentType: "text/event-stream",
headers: {
"Cache-Control": "no-cache, no-transform",
"X-Accel-Buffering": "no",
"X-Content-Type-Options": "nosniff",
},
},
)
}
export const eventHandlers = HttpApiBuilder.group(EventApi, "event", (handlers) =>
Effect.gen(function* () {
const bus = yield* Bus.Service
yield* router.add(
"GET",
EventPaths.event,
Effect.gen(function* () {
const events = bus.subscribeAll().pipe(Stream.takeUntil((event) => event.type === Bus.InstanceDisposed.type))
const heartbeat = Stream.tick("10 seconds").pipe(
Stream.drop(1),
Stream.map(() => ({ type: "server.heartbeat", properties: {} })),
)
log.info("event connected")
return HttpServerResponse.stream(
Stream.make({ type: "server.connected", properties: {} }).pipe(
Stream.concat(events.pipe(Stream.merge(heartbeat, { haltStrategy: "left" }))),
Stream.map(eventData),
Stream.pipeThroughChannel(Sse.encode()),
Stream.encodeText,
Stream.ensuring(Effect.sync(() => log.info("event disconnected"))),
),
{
contentType: "text/event-stream",
headers: {
"Cache-Control": "no-cache, no-transform",
"X-Accel-Buffering": "no",
"X-Content-Type-Options": "nosniff",
},
},
)
}),
)
return handlers.handleRaw("subscribe", Effect.fn("EventHttpApi.subscribe")(function* () {
return eventResponse(bus)
}))
}),
)

View file

@ -42,7 +42,7 @@ import { isAllowedCorsOrigin } from "@/server/cors"
import { serveUIEffect } from "@/server/routes/ui"
import { InstanceHttpApi, RootHttpApi } from "./api"
import { ServerAuthConfig, authorizationLayer, authorizationRouterMiddleware } from "./middleware/authorization"
import { eventRoute } from "./event"
import { EventApi, eventHandlers } from "./event"
import { configHandlers } from "./handlers/config"
import { controlHandlers } from "./handlers/control"
import { experimentalHandlers } from "./handlers/experimental"
@ -86,6 +86,14 @@ const cors = HttpRouter.middleware(
)
const rootApiRoutes = HttpApiBuilder.layer(RootHttpApi).pipe(Layer.provide([controlHandlers, globalHandlers]))
const instanceRouterLayer = authorizationRouterMiddleware
.combine(instanceRouterMiddleware)
.combine(workspaceRouterMiddleware)
.layer.pipe(Layer.provide(Socket.layerWebSocketConstructorGlobal), Layer.provide(ServerAuthConfig.defaultLayer))
const eventApiRoutes = HttpApiBuilder.layer(EventApi).pipe(
Layer.provide(eventHandlers),
Layer.provide(instanceRouterLayer),
)
const instanceApiRoutes = HttpApiBuilder.layer(InstanceHttpApi).pipe(
Layer.provide([
configHandlers,
@ -105,13 +113,8 @@ const instanceApiRoutes = HttpApiBuilder.layer(InstanceHttpApi).pipe(
]),
)
const rawInstanceRoutes = Layer.mergeAll(eventRoute, ptyConnectRoute).pipe(
Layer.provide(
authorizationRouterMiddleware
.combine(instanceRouterMiddleware)
.combine(workspaceRouterMiddleware)
.layer.pipe(Layer.provide(Socket.layerWebSocketConstructorGlobal), Layer.provide(ServerAuthConfig.defaultLayer)),
),
const rawInstanceRoutes = Layer.mergeAll(ptyConnectRoute).pipe(
Layer.provide(instanceRouterLayer),
)
const instanceRoutes = Layer.mergeAll(rawInstanceRoutes, instanceApiRoutes).pipe(
Layer.provide([
@ -129,7 +132,7 @@ const uiRoute = HttpRouter.use((router) =>
}),
).pipe(Layer.provide(authorizationRouterMiddleware.layer.pipe(Layer.provide(ServerAuthConfig.defaultLayer))))
export const routes = Layer.mergeAll(rootApiRoutes, instanceRoutes, uiRoute).pipe(
export const routes = Layer.mergeAll(rootApiRoutes, eventApiRoutes, instanceRoutes, uiRoute).pipe(
Layer.provide([
cors,
runtime,