mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-06 08:21:50 +00:00
Avoid request-time HttpApi layer provisioning (#25179)
This commit is contained in:
parent
510f01674a
commit
2dd1f2d453
7 changed files with 58 additions and 48 deletions
|
|
@ -37,34 +37,38 @@ function eventData(data: unknown): Sse.Event {
|
|||
}
|
||||
}
|
||||
|
||||
export const eventRoute = HttpRouter.add(
|
||||
"GET",
|
||||
EventPaths.event,
|
||||
export const eventRoute = HttpRouter.use((router) =>
|
||||
Effect.gen(function* () {
|
||||
const bus = yield* Bus.Service
|
||||
const events = bus.subscribeAll().pipe(Stream.takeUntil((event) => event.type === Bus.InstanceDisposed.type))
|
||||
const heartbeat = Stream.tick("10 seconds").pipe(
|
||||
Stream.drop(1),
|
||||
Stream.map(() => ({ type: "server.heartbeat", properties: {} })),
|
||||
)
|
||||
yield* router.add(
|
||||
"GET",
|
||||
EventPaths.event,
|
||||
Effect.gen(function* () {
|
||||
const events = bus.subscribeAll().pipe(Stream.takeUntil((event) => event.type === Bus.InstanceDisposed.type))
|
||||
const heartbeat = Stream.tick("10 seconds").pipe(
|
||||
Stream.drop(1),
|
||||
Stream.map(() => ({ type: "server.heartbeat", properties: {} })),
|
||||
)
|
||||
|
||||
log.info("event connected")
|
||||
return HttpServerResponse.stream(
|
||||
Stream.make({ type: "server.connected", properties: {} }).pipe(
|
||||
Stream.concat(events.pipe(Stream.merge(heartbeat, { haltStrategy: "left" }))),
|
||||
Stream.map(eventData),
|
||||
Stream.pipeThroughChannel(Sse.encode()),
|
||||
Stream.encodeText,
|
||||
Stream.ensuring(Effect.sync(() => log.info("event disconnected"))),
|
||||
),
|
||||
{
|
||||
contentType: "text/event-stream",
|
||||
headers: {
|
||||
"Cache-Control": "no-cache, no-transform",
|
||||
"X-Accel-Buffering": "no",
|
||||
"X-Content-Type-Options": "nosniff",
|
||||
},
|
||||
},
|
||||
log.info("event connected")
|
||||
return HttpServerResponse.stream(
|
||||
Stream.make({ type: "server.connected", properties: {} }).pipe(
|
||||
Stream.concat(events.pipe(Stream.merge(heartbeat, { haltStrategy: "left" }))),
|
||||
Stream.map(eventData),
|
||||
Stream.pipeThroughChannel(Sse.encode()),
|
||||
Stream.encodeText,
|
||||
Stream.ensuring(Effect.sync(() => log.info("event disconnected"))),
|
||||
),
|
||||
{
|
||||
contentType: "text/event-stream",
|
||||
headers: {
|
||||
"Cache-Control": "no-cache, no-transform",
|
||||
"X-Accel-Buffering": "no",
|
||||
"X-Content-Type-Options": "nosniff",
|
||||
},
|
||||
},
|
||||
)
|
||||
}),
|
||||
)
|
||||
}).pipe(Effect.provide(Bus.layer)),
|
||||
}),
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
import { ProxyUtil } from "@/server/proxy-util"
|
||||
import { Effect, Stream } from "effect"
|
||||
import {
|
||||
FetchHttpClient,
|
||||
HttpBody,
|
||||
HttpClient,
|
||||
HttpClientRequest,
|
||||
|
|
@ -66,12 +65,13 @@ function statusText(response: unknown) {
|
|||
}
|
||||
|
||||
export function http(
|
||||
client: HttpClient.HttpClient,
|
||||
url: string | URL,
|
||||
extra: HeadersInit | undefined,
|
||||
request: HttpServerRequest.HttpServerRequest,
|
||||
): Effect.Effect<HttpServerResponse.HttpServerResponse> {
|
||||
return Effect.gen(function* () {
|
||||
const response = yield* HttpClient.execute(
|
||||
const response = yield* client.execute(
|
||||
HttpClientRequest.make(request.method as never)(url, {
|
||||
headers: ProxyUtil.headers(request.headers as HeadersInit, extra),
|
||||
body: requestBody(request),
|
||||
|
|
@ -86,10 +86,7 @@ export function http(
|
|||
statusText: statusText(response),
|
||||
headers,
|
||||
})
|
||||
}).pipe(
|
||||
Effect.provide(FetchHttpClient.layer),
|
||||
Effect.catch(() => Effect.succeed(HttpServerResponse.empty({ status: 500 }))),
|
||||
)
|
||||
}).pipe(Effect.catch(() => Effect.succeed(HttpServerResponse.empty({ status: 500 }))))
|
||||
}
|
||||
|
||||
export * as HttpApiProxy from "./proxy"
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import * as Fence from "@/server/fence"
|
|||
import { getWorkspaceRouteSessionID, isLocalWorkspaceRoute, workspaceProxyURL } from "@/server/workspace"
|
||||
import { Flag } from "@opencode-ai/core/flag/flag"
|
||||
import { Context, Data, Effect, Layer } from "effect"
|
||||
import { HttpRouter, HttpServerRequest, HttpServerResponse } from "effect/unstable/http"
|
||||
import { HttpClient, HttpRouter, HttpServerRequest, HttpServerResponse } from "effect/unstable/http"
|
||||
import { HttpApiMiddleware } from "effect/unstable/httpapi"
|
||||
import * as Socket from "effect/unstable/socket/Socket"
|
||||
|
||||
|
|
@ -95,6 +95,7 @@ function resolveTarget(workspace: Workspace.Info): Effect.Effect<Target> {
|
|||
}
|
||||
|
||||
function proxyRemote(
|
||||
client: HttpClient.HttpClient,
|
||||
request: HttpServerRequest.HttpServerRequest,
|
||||
workspace: Workspace.Info,
|
||||
target: RemoteTarget,
|
||||
|
|
@ -111,7 +112,7 @@ function proxyRemote(
|
|||
const proxyURL = workspaceProxyURL(target.url, url)
|
||||
const headers = request.headers as Record<string, string>
|
||||
if (headers["upgrade"]?.toLowerCase() === "websocket") return yield* HttpApiProxy.websocket(request, proxyURL)
|
||||
const response = yield* HttpApiProxy.http(proxyURL, target.headers, request)
|
||||
const response = yield* HttpApiProxy.http(client, proxyURL, target.headers, request)
|
||||
const sync = Fence.parse(new Headers(response.headers))
|
||||
if (sync) {
|
||||
const syncFailure = yield* Fence.waitEffect(
|
||||
|
|
@ -163,18 +164,20 @@ function planRequest(
|
|||
}
|
||||
|
||||
function routeWorkspace<E>(
|
||||
client: HttpClient.HttpClient,
|
||||
effect: Effect.Effect<HttpServerResponse.HttpServerResponse, E, WorkspaceRouteContext>,
|
||||
plan: RequestPlan,
|
||||
): Effect.Effect<HttpServerResponse.HttpServerResponse, E, Socket.WebSocketConstructor | Workspace.Service> {
|
||||
return RequestPlan.$match(plan, {
|
||||
MissingWorkspace: ({ workspaceID }) => Effect.succeed(missingWorkspaceResponse(workspaceID)),
|
||||
Remote: ({ request, workspace, target, url }) => proxyRemote(request, workspace, target, url),
|
||||
Remote: ({ request, workspace, target, url }) => proxyRemote(client, request, workspace, target, url),
|
||||
Local: ({ directory, workspaceID }) =>
|
||||
effect.pipe(Effect.provideService(WorkspaceRouteContext, WorkspaceRouteContext.of({ directory, workspaceID }))),
|
||||
})
|
||||
}
|
||||
|
||||
function routeHttpApiWorkspace<E>(
|
||||
client: HttpClient.HttpClient,
|
||||
effect: Effect.Effect<HttpServerResponse.HttpServerResponse, E, WorkspaceRouteContext>,
|
||||
): Effect.Effect<
|
||||
HttpServerResponse.HttpServerResponse,
|
||||
|
|
@ -188,7 +191,7 @@ function routeHttpApiWorkspace<E>(
|
|||
? yield* Session.Service.use((svc) => svc.get(sessionID)).pipe(Effect.catchDefect(() => Effect.void))
|
||||
: undefined
|
||||
const plan = yield* planRequest(request, session?.workspaceID)
|
||||
return yield* routeWorkspace(effect, plan)
|
||||
return yield* routeWorkspace(client, effect, plan)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -197,8 +200,9 @@ export const workspaceRoutingLayer = Layer.effect(
|
|||
Effect.gen(function* () {
|
||||
const makeWebSocket = yield* Socket.WebSocketConstructor
|
||||
const workspace = yield* Workspace.Service
|
||||
const client = yield* HttpClient.HttpClient
|
||||
return WorkspaceRoutingMiddleware.of((effect) =>
|
||||
routeHttpApiWorkspace(effect).pipe(
|
||||
routeHttpApiWorkspace(client, effect).pipe(
|
||||
Effect.provideService(Socket.WebSocketConstructor, makeWebSocket),
|
||||
Effect.provideService(Workspace.Service, workspace),
|
||||
),
|
||||
|
|
@ -210,11 +214,12 @@ export const workspaceRouterMiddleware = HttpRouter.middleware<{ provides: Works
|
|||
Effect.gen(function* () {
|
||||
const makeWebSocket = yield* Socket.WebSocketConstructor
|
||||
const workspace = yield* Workspace.Service
|
||||
const client = yield* HttpClient.HttpClient
|
||||
return (effect) =>
|
||||
Effect.gen(function* () {
|
||||
const request = yield* HttpServerRequest.HttpServerRequest
|
||||
const plan = yield* planRequest(request)
|
||||
return yield* routeWorkspace(effect, plan)
|
||||
return yield* routeWorkspace(client, effect, plan)
|
||||
}).pipe(
|
||||
Effect.provideService(Socket.WebSocketConstructor, makeWebSocket),
|
||||
Effect.provideService(Workspace.Service, workspace),
|
||||
|
|
|
|||
|
|
@ -121,11 +121,10 @@ const instanceRoutes = Layer.mergeAll(rawInstanceRoutes, instanceApiRoutes).pipe
|
|||
]),
|
||||
)
|
||||
|
||||
const uiRoute = Layer.effectDiscard(
|
||||
const uiRoute = HttpRouter.use((router) =>
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const client = yield* HttpClient.HttpClient
|
||||
const router = yield* HttpRouter.HttpRouter
|
||||
yield* router.add("*", "/*", (request) => serveUIEffect(request, { fs, client }))
|
||||
}),
|
||||
).pipe(Layer.provide(authorizationRouterMiddleware.layer.pipe(Layer.provide(ServerAuthConfig.defaultLayer))))
|
||||
|
|
|
|||
|
|
@ -74,11 +74,10 @@ function app(input?: { password?: string; username?: string }) {
|
|||
|
||||
function uiApp(input?: { password?: string; username?: string; client?: Layer.Layer<HttpClient.HttpClient> }) {
|
||||
const handler = HttpRouter.toWebHandler(
|
||||
Layer.effectDiscard(
|
||||
HttpRouter.use((router) =>
|
||||
Effect.gen(function* () {
|
||||
const fs = yield* AppFileSystem.Service
|
||||
const client = yield* HttpClient.HttpClient
|
||||
const router = yield* HttpRouter.HttpRouter
|
||||
yield* router.add("*", "/*", (request) => serveUIEffect(request, { fs, client }))
|
||||
}),
|
||||
).pipe(
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import { Flag } from "@opencode-ai/core/flag/flag"
|
|||
import { describe, expect } from "bun:test"
|
||||
import { Context, Effect, Layer, Queue } from "effect"
|
||||
import {
|
||||
FetchHttpClient,
|
||||
HttpClient,
|
||||
HttpClientRequest,
|
||||
HttpRouter,
|
||||
|
|
@ -66,7 +67,7 @@ type TestHandler<E, R> = (
|
|||
) => Effect.Effect<HttpServerResponse.HttpServerResponse, E, R>
|
||||
|
||||
const workspaceRoutingTestLayer = workspaceRouterMiddleware.layer.pipe(
|
||||
Layer.provide(Socket.layerWebSocketConstructorGlobal),
|
||||
Layer.provide([Socket.layerWebSocketConstructorGlobal, FetchHttpClient.layer]),
|
||||
)
|
||||
|
||||
const serverUrl = HttpServer.HttpServer.use((server) => Effect.succeed(HttpServer.formatAddress(server.address)))
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
import { NodeHttpServer } from "@effect/platform-node"
|
||||
import { NodeHttpServer, NodeServices } from "@effect/platform-node"
|
||||
import Http from "node:http"
|
||||
import { describe, expect } from "bun:test"
|
||||
import { Context, Effect, Layer, Queue } from "effect"
|
||||
import { HttpServer, HttpServerRequest, HttpServerResponse } from "effect/unstable/http"
|
||||
import { FetchHttpClient, HttpClient, HttpServer, HttpServerRequest, HttpServerResponse } from "effect/unstable/http"
|
||||
import * as Socket from "effect/unstable/socket/Socket"
|
||||
import { HttpApiProxy } from "../../src/server/routes/instance/httpapi/middleware/proxy"
|
||||
import { testEffect } from "../lib/effect"
|
||||
|
|
@ -13,6 +13,8 @@ function serverUrl() {
|
|||
|
||||
const testServerLayer = Layer.mergeAll(
|
||||
NodeHttpServer.layer(Http.createServer, { host: "127.0.0.1", port: 0 }),
|
||||
NodeServices.layer,
|
||||
FetchHttpClient.layer,
|
||||
Socket.layerWebSocketConstructorGlobal,
|
||||
)
|
||||
const it = testEffect(testServerLayer)
|
||||
|
|
@ -79,7 +81,8 @@ describe("HttpApi workspace proxy", () => {
|
|||
const request = HttpServerRequest.fromWeb(
|
||||
new Request("http://localhost/session/abc", { method: "POST", body: "request-body" }),
|
||||
)
|
||||
const response = yield* HttpApiProxy.http(`${url}/session/abc?keep=yes`, { "x-extra": "injected" }, request)
|
||||
const httpClient = yield* HttpClient.HttpClient
|
||||
const response = yield* HttpApiProxy.http(httpClient, `${url}/session/abc?keep=yes`, { "x-extra": "injected" }, request)
|
||||
|
||||
expect(response.status).toBe(201)
|
||||
const client = HttpServerResponse.toClientResponse(response)
|
||||
|
|
@ -97,7 +100,8 @@ describe("HttpApi workspace proxy", () => {
|
|||
it.live("returns 500 when remote is unreachable", () =>
|
||||
Effect.gen(function* () {
|
||||
const request = HttpServerRequest.fromWeb(new Request("http://localhost/anything"))
|
||||
const response = yield* HttpApiProxy.http("http://127.0.0.1:1/unreachable", undefined, request)
|
||||
const httpClient = yield* HttpClient.HttpClient
|
||||
const response = yield* HttpApiProxy.http(httpClient, "http://127.0.0.1:1/unreachable", undefined, request)
|
||||
|
||||
expect(response.status).toBe(500)
|
||||
}),
|
||||
|
|
@ -122,7 +126,8 @@ describe("HttpApi workspace proxy", () => {
|
|||
},
|
||||
}),
|
||||
)
|
||||
yield* HttpApiProxy.http(`${url}/test`, { "x-injected": "extra" }, request)
|
||||
const httpClient = yield* HttpClient.HttpClient
|
||||
yield* HttpApiProxy.http(httpClient, `${url}/test`, { "x-injected": "extra" }, request)
|
||||
|
||||
expect(forwarded["x-opencode-directory"]).toBeUndefined()
|
||||
expect(forwarded["x-opencode-workspace"]).toBeUndefined()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue