fix(server): support desktop PTY websockets with HttpApi (#25598)

This commit is contained in:
Kit Langton 2026-05-03 14:23:29 -04:00 committed by GitHub
parent adb7cb1037
commit 387220f368
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 564 additions and 436 deletions

View file

@ -15,6 +15,7 @@ import { terminalFontFamily, useSettings } from "@/context/settings"
import type { LocalPTY } from "@/context/terminal"
import { disposeIfDisposable, getHoveredLinkText, setOptionIfSupported } from "@/utils/runtime-adapters"
import { terminalWriter } from "@/utils/terminal-writer"
import { terminalWebSocketURL } from "@/utils/terminal-websocket-url"
const TOGGLE_TERMINAL_ID = "terminal.toggle"
const DEFAULT_TOGGLE_TERMINAL_KEYBIND = "ctrl+`"
@ -67,13 +68,6 @@ const debugTerminal = (...values: unknown[]) => {
console.debug("[terminal]", ...values)
}
const errorName = (err: unknown) => {
if (!err || typeof err !== "object") return
if (!("name" in err)) return
const errorName = err.name
return typeof errorName === "string" ? errorName : undefined
}
const useTerminalUiBindings = (input: {
container: HTMLDivElement
term: Term
@ -478,10 +472,9 @@ export const Terminal = (props: TerminalProps) => {
const gone = () =>
client.pty
.get({ ptyID: id })
.then(() => false)
.get({ ptyID: id }, { throwOnError: false })
.then((result) => result.response.status === 404)
.catch((err) => {
if (errorName(err) === "NotFoundError") return true
debugTerminal("failed to inspect terminal session", err)
return false
})
@ -509,18 +502,9 @@ export const Terminal = (props: TerminalProps) => {
if (disposed) return
drop?.()
const next = new URL(url + `/pty/${id}/connect`)
next.searchParams.set("directory", directory)
next.searchParams.set("cursor", String(seek))
next.protocol = next.protocol === "https:" ? "wss:" : "ws:"
if (!sameOrigin && password) {
next.searchParams.set("auth_token", btoa(`${username}:${password}`))
// For same-origin requests, let the browser reuse the page's existing auth.
next.username = username
next.password = password
}
const socket = new WebSocket(next)
const socket = new WebSocket(
terminalWebSocketURL({ url, id, directory, cursor: seek, sameOrigin, username, password }),
)
socket.binaryType = "arraybuffer"
ws = socket

View file

@ -0,0 +1,36 @@
import { describe, expect, test } from "bun:test"
import { terminalWebSocketURL } from "./terminal-websocket-url"
describe("terminalWebSocketURL", () => {
test("uses query auth without embedding credentials in websocket URL", () => {
const url = terminalWebSocketURL({
url: "http://127.0.0.1:49365",
id: "pty_test",
directory: "/tmp/project",
cursor: 0,
sameOrigin: false,
username: "opencode",
password: "secret",
})
expect(url.protocol).toBe("ws:")
expect(url.username).toBe("")
expect(url.password).toBe("")
expect(url.searchParams.get("auth_token")).toBe(btoa("opencode:secret"))
})
test("omits query auth for same-origin websocket URL", () => {
const url = terminalWebSocketURL({
url: "https://app.example.test",
id: "pty_test",
directory: "/tmp/project",
cursor: 10,
sameOrigin: true,
username: "opencode",
password: "secret",
})
expect(url.protocol).toBe("wss:")
expect(url.searchParams.has("auth_token")).toBe(false)
})
})

View file

@ -0,0 +1,16 @@
export function terminalWebSocketURL(input: {
url: string
id: string
directory: string
cursor: number
sameOrigin: boolean
username: string
password?: string
}) {
const next = new URL(`${input.url}/pty/${input.id}/connect`)
next.searchParams.set("directory", input.directory)
next.searchParams.set("cursor", String(input.cursor))
next.protocol = next.protocol === "https:" ? "wss:" : "ws:"
if (!input.sameOrigin && input.password) next.searchParams.set("auth_token", btoa(`${input.username}:${input.password}`))
return next
}

View file

@ -37,6 +37,11 @@
"bun": "./src/server/adapter.bun.ts",
"node": "./src/server/adapter.node.ts",
"default": "./src/server/adapter.bun.ts"
},
"#httpapi-server": {
"bun": "./src/server/httpapi-server.node.ts",
"node": "./src/server/httpapi-server.node.ts",
"default": "./src/server/httpapi-server.node.ts"
}
},
"devDependencies": {

View file

@ -1,244 +0,0 @@
// TODO: Node adapter forthcoming — same pattern but using `node:http` + `ws` library,
// and `node:http`'s `upgrade` event.
//
// This module is a Bun-only proof-of-concept for a native `Bun.serve` listener that
// drives the experimental HttpApi handler directly (no Hono in the middle) and handles
// WebSocket upgrades inline based on path-matching. It exists to validate the pattern
// before deleting the Hono backend; `Server.listen()` is intentionally NOT wired to it.
import type { ServerWebSocket } from "bun"
import { Effect, Schema } from "effect"
import { AppRuntime } from "@/effect/app-runtime"
import { WithInstance } from "@/project/with-instance"
import { Pty } from "@/pty"
import { handlePtyInput } from "@/pty/input"
import { PtyID } from "@/pty/schema"
import { PtyPaths } from "@/server/routes/instance/httpapi/groups/pty"
import { ExperimentalHttpApiServer } from "@/server/routes/instance/httpapi/server"
import * as Log from "@opencode-ai/core/util/log"
import type { CorsOptions } from "./cors"
const log = Log.create({ service: "httpapi-listener" })
const decodePtyID = Schema.decodeUnknownSync(PtyID)
export type Listener = {
hostname: string
port: number
url: URL
stop: (close?: boolean) => Promise<void>
}
export type ListenOptions = CorsOptions & {
port: number
hostname: string
}
type WsKind = { kind: "pty"; ptyID: string; cursor: number | undefined; directory: string }
type PtyHandler = {
onMessage: (message: string | ArrayBuffer) => void
onClose: () => void
}
type WsState = WsKind & {
handler?: PtyHandler
pending: Array<string | Uint8Array>
ready: boolean
closed: boolean
}
// Derive from the OpenAPI path so this stays in sync if the route literal moves.
const ptyConnectPattern = new RegExp(`^${PtyPaths.connect.replace(/:[^/]+/g, "([^/]+)")}$`)
function parseCursor(value: string | null): number | undefined {
if (!value) return undefined
const parsed = Number(value)
if (!Number.isSafeInteger(parsed) || parsed < -1) return undefined
return parsed
}
function asAdapter(ws: ServerWebSocket<WsState>) {
return {
get readyState() {
return ws.readyState
},
send: (data: string | Uint8Array | ArrayBuffer) => {
try {
if (data instanceof ArrayBuffer) ws.send(new Uint8Array(data))
else ws.send(data)
} catch {
// socket likely already closed; ignore
}
},
close: (code?: number, reason?: string) => {
try {
ws.close(code, reason)
} catch {
// ignore
}
},
}
}
/**
* Spin up a native Bun.serve that:
* 1. Routes all HTTP traffic through the HttpApi web handler.
* 2. Intercepts known WebSocket upgrade paths and handles them inline.
*
* This bypasses Hono entirely. The Hono code path remains untouched.
*/
export async function listen(opts: ListenOptions): Promise<Listener> {
const built = ExperimentalHttpApiServer.webHandler(opts)
const handler = built.handler
const context = ExperimentalHttpApiServer.context
const start = (port: number) => {
try {
return Bun.serve<WsState>({
hostname: opts.hostname,
port,
idleTimeout: 0,
fetch(request, server) {
const url = new URL(request.url)
const ptyMatch = url.pathname.match(ptyConnectPattern)
if (ptyMatch && request.headers.get("upgrade")?.toLowerCase() === "websocket") {
const ptyID = ptyMatch[1]!
const cursor = parseCursor(url.searchParams.get("cursor"))
// Resolve the instance directory the same way the HttpApi
// `instance-context` middleware does (search params, then header,
// then process.cwd()).
const directory =
url.searchParams.get("directory") ?? request.headers.get("x-opencode-directory") ?? process.cwd()
const upgraded = server.upgrade(request, {
data: {
kind: "pty",
ptyID,
cursor,
directory,
pending: [],
ready: false,
closed: false,
} satisfies WsState,
})
if (upgraded) return undefined
return new Response("upgrade failed", { status: 400 })
}
// TODO: workspace-proxy WS upgrade detection. The Hono path forwards via a
// remote `new WebSocket(url, ...)` (see ServerProxy.websocket). To support
// that here we'd need to (a) resolve the workspace target the same way
// `WorkspaceRouterMiddleware` does today, then (b) `server.upgrade(request,
// { data: { kind: "proxy", target, headers, protocols } })` and bridge the
// ServerWebSocket to a remote WebSocket inside the `websocket` handlers.
// Deferred to a follow-up — the proxy story needs more design (auth header
// forwarding, fence sync, reconnection semantics) than fits this PR.
return handler(request as Request, context as never)
},
websocket: {
open(ws) {
const data = ws.data
if (data.kind !== "pty") {
ws.close(1011, "unknown ws kind")
return
}
const id = (() => {
try {
return decodePtyID(data.ptyID)
} catch {
ws.close(1008, "invalid pty id")
return undefined
}
})()
if (!id) return
;(async () => {
const result = await WithInstance.provide({
directory: data.directory,
fn: () =>
AppRuntime.runPromise(
Effect.gen(function* () {
const pty = yield* Pty.Service
return yield* pty.connect(id, asAdapter(ws), data.cursor)
}).pipe(Effect.withSpan("HttpApiListener.pty.connect.open")),
),
})
return await result
})()
.then((handler) => {
if (data.closed) {
handler?.onClose()
return
}
if (!handler) {
ws.close(4404, "session not found")
return
}
data.handler = handler
data.ready = true
for (const msg of data.pending) {
AppRuntime.runPromise(handlePtyInput(handler, msg)).catch(() => undefined)
}
data.pending.length = 0
})
.catch((err) => {
log.error("pty connect failed", { error: err })
ws.close(1011, "pty connect failed")
})
},
message(ws, message) {
const data = ws.data
if (data.kind !== "pty") return
const payload =
typeof message === "string"
? message
: message instanceof Buffer
? new Uint8Array(message.buffer, message.byteOffset, message.byteLength)
: (message as Uint8Array)
if (!data.ready || !data.handler) {
data.pending.push(payload)
return
}
AppRuntime.runPromise(handlePtyInput(data.handler, payload)).catch(() => undefined)
},
close(ws) {
const data = ws.data
data.closed = true
data.handler?.onClose()
},
},
})
} catch (err) {
log.error("Bun.serve failed", { error: err })
return undefined
}
}
const server = opts.port === 0 ? (start(4096) ?? start(0)) : start(opts.port)
if (!server) throw new Error(`Failed to start server on port ${opts.port}`)
const port = server.port
if (port === undefined) throw new Error("Bun.serve started without a numeric port")
const url = new URL("http://localhost")
url.hostname = opts.hostname
url.port = String(port)
let closing: Promise<void> | undefined
return {
hostname: opts.hostname,
port,
url,
stop(close?: boolean) {
closing ??= (async () => {
await server.stop(close)
// NOTE: we deliberately do NOT call `built.dispose()` here. The
// underlying `webHandler` is memoized at module level (same as the
// Hono path), so disposing it would tear down shared services for
// every other consumer in the process. Lifecycle teardown is owned
// by the AppRuntime itself.
})()
return closing
},
}
}
export * as HttpApiListener from "./httpapi-listener"

View file

@ -0,0 +1,34 @@
import { NodeHttpServer } from "@effect/platform-node"
import { Effect, Layer } from "effect"
import { createServer } from "node:http"
import type { Opts } from "./adapter"
import { Service } from "./httpapi-server"
export { Service }
export const name = "node-http-server"
export const layer = (opts: Opts) => {
const server = createServer()
const serverRef = { closeStarted: false, forceStop: false }
const close = server.close.bind(server)
// Keep shutdown owned by NodeHttpServer, but honor listener.stop(true) by
// force-closing active HTTP sockets when its finalizer calls server.close().
server.close = ((callback?: Parameters<typeof server.close>[0]) => {
serverRef.closeStarted = true
const result = close(callback)
if (serverRef.forceStop) server.closeAllConnections()
return result
}) as typeof server.close
return Layer.mergeAll(
NodeHttpServer.layer(() => server, { port: opts.port, host: opts.hostname, gracefulShutdownTimeout: "1 second" }),
Layer.succeed(Service)(
Service.of({
closeAll: Effect.sync(() => {
serverRef.forceStop = true
if (serverRef.closeStarted) server.closeAllConnections()
}),
}),
),
)
}

View file

@ -0,0 +1,9 @@
import { Context, Effect } from "effect"
export interface Interface {
readonly closeAll: Effect.Effect<void>
}
export class Service extends Context.Service<Service, Interface>()("@opencode/HttpApiServer") {}
export * as HttpApiServer from "./httpapi-server"

View file

@ -2,12 +2,14 @@ import { Pty } from "@/pty"
import { PtyID } from "@/pty/schema"
import { handlePtyInput } from "@/pty/input"
import { Shell } from "@/shell/shell"
import { EffectBridge } from "@/effect/bridge"
import { Effect } from "effect"
import { HttpRouter, HttpServerRequest, HttpServerResponse } from "effect/unstable/http"
import { HttpApiBuilder, HttpApiError } from "effect/unstable/httpapi"
import * as Socket from "effect/unstable/socket/Socket"
import { InstanceHttpApi } from "../api"
import { CursorQuery, Params, PtyPaths } from "../groups/pty"
import { WebSocketTracker } from "../websocket-tracker"
export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handlers) =>
Effect.gen(function* () {
@ -80,9 +82,22 @@ export const ptyConnectRoute = HttpRouter.use((router) =>
: undefined
const socket = yield* Effect.orDie((yield* HttpServerRequest.HttpServerRequest).upgrade)
const write = yield* socket.writer
const services = yield* Effect.context()
const closeAccepted = (event: Socket.CloseEvent) =>
socket
.runRaw(() => Effect.void, { onOpen: write(event).pipe(Effect.catch(() => Effect.void)) })
.pipe(
Effect.timeout("1 second"),
Effect.catchReason("SocketError", "SocketCloseError", () => Effect.void),
Effect.catch(() => Effect.void),
)
const registered = yield* WebSocketTracker.register(write(WebSocketTracker.SERVER_CLOSING_EVENT()))
if (!registered) {
yield* closeAccepted(WebSocketTracker.SERVER_CLOSING_EVENT())
return HttpServerResponse.empty()
}
const bridge = yield* EffectBridge.make()
const writeScoped = (effect: Effect.Effect<void, unknown>) => {
Effect.runForkWith(services)(effect.pipe(Effect.catch(() => Effect.void)))
bridge.fork(effect.pipe(Effect.catch(() => Effect.void)))
}
let closed = false
const adapter = {
@ -100,7 +115,10 @@ export const ptyConnectRoute = HttpRouter.use((router) =>
},
}
const handler = yield* pty.connect(params.ptyID, adapter, cursor)
if (!handler) return HttpServerResponse.empty()
if (!handler) {
yield* closeAccepted(new Socket.CloseEvent(4404, "session not found"))
return HttpServerResponse.empty()
}
yield* socket
.runRaw((message) => handlePtyInput(handler, message))

View file

@ -1,23 +1,29 @@
import { ServerAuth } from "@/server/auth"
import { Effect, Encoding, Layer, Redacted } from "effect"
import { HttpRouter, HttpServerRequest, HttpServerResponse } from "effect/unstable/http"
import { HttpApiError, HttpApiMiddleware, HttpApiSecurity } from "effect/unstable/httpapi"
import { HttpApiError, HttpApiMiddleware } from "effect/unstable/httpapi"
const AUTH_TOKEN_QUERY = "auth_token"
const UNAUTHORIZED = 401
const WWW_AUTHENTICATE = 'Basic realm="Secure Area"'
// Avoid HttpApiSecurity alternatives here: Effect security middleware wraps the
// full handler, so a downstream failure can make the next auth alternative run
// and remap an authorized NotFound into Unauthorized.
export class Authorization extends HttpApiMiddleware.Service<Authorization>()(
"@opencode/ExperimentalHttpApiAuthorization",
{
error: HttpApiError.UnauthorizedNoContent,
security: {
basic: HttpApiSecurity.basic,
authToken: HttpApiSecurity.apiKey({ in: "query", key: AUTH_TOKEN_QUERY }),
},
},
) {}
function emptyCredential() {
return {
username: "",
password: Redacted.make(""),
}
}
function validateCredential<A, E, R>(
effect: Effect.Effect<A, E, R>,
credential: ServerAuth.DecodedCredentials,
@ -31,19 +37,14 @@ function validateCredential<A, E, R>(
}
function decodeCredential(input: string) {
const emptyCredential = {
username: "",
password: Redacted.make(""),
}
return Encoding.decodeBase64String(input)
.asEffect()
.pipe(
Effect.match({
onFailure: () => emptyCredential,
onFailure: emptyCredential,
onSuccess: (header) => {
const parts = header.split(":")
if (parts.length !== 2) return emptyCredential
if (parts.length !== 2) return emptyCredential()
return {
username: parts[0],
password: Redacted.make(parts[1]),
@ -53,6 +54,14 @@ function decodeCredential(input: string) {
)
}
function credentialFromRequest(request: HttpServerRequest.HttpServerRequest) {
const token = new URL(request.url, "http://localhost").searchParams.get(AUTH_TOKEN_QUERY)
if (token) return decodeCredential(token)
const match = /^Basic\s+(.+)$/i.exec(request.headers.authorization ?? "")
if (match) return decodeCredential(match[1])
return Effect.succeed(emptyCredential())
}
function validateRawCredential<A, E, R>(
effect: Effect.Effect<A, E, R>,
credential: ServerAuth.DecodedCredentials,
@ -77,21 +86,9 @@ export const authorizationRouterMiddleware = HttpRouter.middleware()(
return (effect) =>
Effect.gen(function* () {
const request = yield* HttpServerRequest.HttpServerRequest
const match = /^Basic\s+(.+)$/i.exec(request.headers.authorization ?? "")
if (match) {
return yield* decodeCredential(match[1]).pipe(
Effect.flatMap((credential) => validateRawCredential(effect, credential, config)),
)
}
const token = new URL(request.url, "http://localhost").searchParams.get(AUTH_TOKEN_QUERY)
if (token) {
return yield* decodeCredential(token).pipe(
Effect.flatMap((credential) => validateRawCredential(effect, credential, config)),
)
}
return yield* validateRawCredential(effect, { username: "", password: Redacted.make("") }, config)
return yield* credentialFromRequest(request).pipe(
Effect.flatMap((credential) => validateRawCredential(effect, credential, config)),
)
})
}),
)
@ -100,12 +97,14 @@ export const authorizationLayer = Layer.effect(
Authorization,
Effect.gen(function* () {
const config = yield* ServerAuth.Config
return Authorization.of({
basic: (effect, { credential }) => validateCredential(effect, credential, config),
authToken: (effect, { credential }) =>
decodeCredential(Redacted.value(credential)).pipe(
Effect.flatMap((decoded) => validateCredential(effect, decoded, config)),
),
})
if (!ServerAuth.required(config)) return Authorization.of((effect) => effect)
return Authorization.of((effect) =>
Effect.gen(function* () {
const request = yield* HttpServerRequest.HttpServerRequest
return yield* credentialFromRequest(request).pipe(
Effect.flatMap((credential) => validateCredential(effect, credential, config)),
)
}),
)
}),
)

View file

@ -2,6 +2,7 @@ import { ProxyUtil } from "@/server/proxy-util"
import { Effect, Stream } from "effect"
import { HttpBody, HttpClient, HttpClientRequest, HttpServerRequest, HttpServerResponse } from "effect/unstable/http"
import * as Socket from "effect/unstable/socket/Socket"
import { WebSocketTracker } from "../websocket-tracker"
function webSource(request: HttpServerRequest.HttpServerRequest): Request | undefined {
return request.source instanceof Request ? request.source : undefined
@ -28,6 +29,30 @@ export function websocket(
})
const writeInbound = yield* inbound.writer
const writeOutbound = yield* outbound.writer
const closeSocket = (socket: Socket.Socket, write: (event: Socket.CloseEvent) => Effect.Effect<void, unknown>) =>
socket
.runRaw(() => Effect.void, {
onOpen: write(WebSocketTracker.SERVER_CLOSING_EVENT()).pipe(Effect.catch(() => Effect.void)),
})
.pipe(
Effect.timeout("1 second"),
Effect.catchReason("SocketError", "SocketCloseError", () => Effect.void),
Effect.catch(() => Effect.void),
)
const closeAccepted = Effect.all(
[closeSocket(inbound, writeInbound), closeSocket(outbound, writeOutbound)],
{ concurrency: "unbounded", discard: true },
)
const registered = yield* WebSocketTracker.register(
Effect.all(
[writeInbound(WebSocketTracker.SERVER_CLOSING_EVENT()), writeOutbound(WebSocketTracker.SERVER_CLOSING_EVENT())],
{ concurrency: "unbounded", discard: true },
),
)
if (!registered) {
yield* closeAccepted
return HttpServerResponse.empty()
}
yield* outbound
.runRaw((message) => writeInbound(message))

View file

@ -0,0 +1,52 @@
import { Context, Effect, Layer, Option } from "effect"
import * as Socket from "effect/unstable/socket/Socket"
export const SERVER_CLOSING_EVENT = () => new Socket.CloseEvent(1001, "server closing")
type Close = Effect.Effect<void, unknown>
export interface Interface {
readonly add: (close: Close) => Effect.Effect<boolean>
readonly remove: (close: Close) => Effect.Effect<void>
readonly closeAll: Effect.Effect<void>
}
export class Service extends Context.Service<Service, Interface>()("@opencode/HttpApiWebSocketTracker") {}
export const layer = Layer.sync(Service)(() => {
const sockets = new Set<Close>()
let closing = false
return Service.of({
add: (close) =>
Effect.gen(function* () {
if (closing) return false
sockets.add(close)
return true
}),
remove: (close) =>
Effect.sync(() => {
sockets.delete(close)
}),
closeAll: Effect.gen(function* () {
closing = true
const active = Array.from(sockets)
sockets.clear()
yield* Effect.all(
active.map((close) => close.pipe(Effect.timeout("1 second"), Effect.catch(() => Effect.void))),
{ concurrency: "unbounded", discard: true },
)
}),
})
})
export const register = (close: Close) =>
Effect.gen(function* () {
const tracker = yield* Effect.serviceOption(Service)
if (Option.isNone(tracker)) return true
const registered = yield* tracker.value.add(close)
if (!registered) return false
yield* Effect.addFinalizer(() => tracker.value.remove(close))
return true
})
export * as WebSocketTracker from "./websocket-tracker"

View file

@ -5,7 +5,10 @@ import { lazy } from "@/util/lazy"
import * as Log from "@opencode-ai/core/util/log"
import { Flag } from "@opencode-ai/core/flag/flag"
import { WorkspaceID } from "@/control-plane/schema"
import { Context, Effect, Exit, Layer, Scope } from "effect"
import { HttpRouter, HttpServer } from "effect/unstable/http"
import { OpenApi } from "effect/unstable/httpapi"
import * as HttpApiServer from "#httpapi-server"
import { MDNS } from "./mdns"
import { AuthMiddleware, CompressionMiddleware, CorsMiddleware, ErrorMiddleware, LoggerMiddleware } from "./middleware"
import { FenceMiddleware } from "./fence"
@ -18,6 +21,8 @@ import { WorkspaceRouterMiddleware } from "./workspace"
import { InstanceMiddleware } from "./routes/instance/middleware"
import { WorkspaceRoutes } from "./routes/control/workspace"
import { ExperimentalHttpApiServer } from "./routes/instance/httpapi/server"
import { disposeMiddleware } from "./routes/instance/httpapi/lifecycle"
import { WebSocketTracker } from "./routes/instance/httpapi/websocket-tracker"
import { PublicApi } from "./routes/instance/httpapi/public"
import * as ServerBackend from "./backend"
import type { CorsOptions } from "./cors"
@ -182,37 +187,147 @@ export async function openapiHono() {
export let url: URL
export async function listen(opts: ListenOptions): Promise<Listener> {
const built = create(opts)
const server = await built.runtime.listen(opts)
const selected = select()
const inner: Listener =
selected.backend === "effect-httpapi" ? await listenHttpApi(opts, selected) : await listenLegacy(opts)
const next = new URL("http://localhost")
next.hostname = opts.hostname
next.port = String(server.port)
const next = new URL(inner.url)
url = next
const mdns =
opts.mdns &&
server.port &&
inner.port &&
opts.hostname !== "127.0.0.1" &&
opts.hostname !== "localhost" &&
opts.hostname !== "::1"
if (mdns) {
MDNS.publish(server.port, opts.mdnsDomain)
MDNS.publish(inner.port, opts.mdnsDomain)
} else if (opts.mdns) {
log.warn("mDNS enabled but hostname is loopback; skipping mDNS publish")
}
let closing: Promise<void> | undefined
let mdnsUnpublished = false
const unpublish = () => {
if (!mdns || mdnsUnpublished) return
mdnsUnpublished = true
MDNS.unpublish()
}
return {
hostname: inner.hostname,
port: inner.port,
url: next,
stop(close?: boolean) {
unpublish()
// Always forward stop(true), even if a graceful stop was requested
// first, so native listeners can escalate shutdown in-place.
const next = inner.stop(close)
closing ??= next
return close ? next.then(() => closing!) : closing
},
}
}
async function listenLegacy(opts: ListenOptions): Promise<Listener> {
const built = create(opts)
const server = await built.runtime.listen(opts)
const innerUrl = new URL("http://localhost")
innerUrl.hostname = opts.hostname
innerUrl.port = String(server.port)
return {
hostname: opts.hostname,
port: server.port,
url: next,
stop(close?: boolean) {
closing ??= (async () => {
if (mdns) MDNS.unpublish()
await server.stop(close)
})()
return closing
url: innerUrl,
stop: (close?: boolean) => server.stop(close),
}
}
/**
* Run the effect-httpapi backend on a native Effect HTTP server. This
* lets HttpApi routes that call `request.upgrade` (PTY connect, the
* workspace-routing proxy WS bridge) work end-to-end; the legacy Hono
* adapter path can't surface `request.upgrade` because its fetch handler has
* no reference to the platform server instance for websocket upgrades.
*/
async function listenHttpApi(opts: ListenOptions, selection: ServerBackend.Selection): Promise<Listener> {
log.info("server backend selected", {
...ServerBackend.attributes(selection),
"opencode.server.runtime": HttpApiServer.name,
})
const buildLayer = (port: number) =>
HttpRouter.serve(ExperimentalHttpApiServer.createRoutes(opts), {
middleware: disposeMiddleware,
disableLogger: true,
disableListenLog: true,
}).pipe(
Layer.provideMerge(WebSocketTracker.layer),
Layer.provideMerge(HttpApiServer.layer({ port, hostname: opts.hostname })),
)
const start = async (port: number) => {
const scope = Scope.makeUnsafe()
try {
// Effect's `HttpMiddleware` interface returns `Effect<…, any, any>` by
// design, which leaks `R = any` through `HttpRouter.serve`. The actual
// requirements at this point are fully satisfied by `createRoutes` and the
// platform HTTP server layer; cast away the `any` to satisfy `runPromise`.
const layer = buildLayer(port) as Layer.Layer<
HttpServer.HttpServer | WebSocketTracker.Service | HttpApiServer.Service,
unknown,
never
>
const ctx = await Effect.runPromise(Layer.buildWithMemoMap(layer, Layer.makeMemoMapUnsafe(), scope))
return { scope, ctx }
} catch (err) {
await Effect.runPromise(Scope.close(scope, Exit.void)).catch(() => undefined)
throw err
}
}
// Match the legacy adapter port-resolution behavior: explicit `0` prefers
// 4096 first, then any free port.
let resolved: Awaited<ReturnType<typeof start>> | undefined
if (opts.port === 0) {
resolved = await start(4096).catch(() => undefined)
if (!resolved) resolved = await start(0)
} else {
resolved = await start(opts.port)
}
if (!resolved) throw new Error(`Failed to start server on port ${opts.port}`)
const server = Context.get(resolved.ctx, HttpServer.HttpServer)
if (server.address._tag !== "TcpAddress") {
await Effect.runPromise(Scope.close(resolved.scope, Exit.void))
throw new Error(`Unexpected HttpServer address tag: ${server.address._tag}`)
}
const port = server.address.port
const innerUrl = new URL("http://localhost")
innerUrl.hostname = opts.hostname
innerUrl.port = String(port)
let forceStopPromise: Promise<void> | undefined
let stopPromise: Promise<void> | undefined
const forceStop = () => {
forceStopPromise ??= Effect.runPromiseExit(
Effect.gen(function* () {
yield* Context.get(resolved!.ctx, HttpApiServer.Service).closeAll
yield* Context.get(resolved!.ctx, WebSocketTracker.Service).closeAll
}),
).then(() => undefined)
return forceStopPromise
}
return {
hostname: opts.hostname,
port,
url: innerUrl,
stop: (close?: boolean) => {
const requested = close ? forceStop() : Promise.resolve()
// The first call starts scope shutdown. A later stop(true) cannot undo
// that, but it still runs forceStop() before awaiting the original close.
stopPromise ??= requested.then(() => Effect.runPromiseExit(Scope.close(resolved!.scope, Exit.void))).then(() => undefined)
return requested.then(() => stopPromise!)
},
}
}

View file

@ -1,4 +1,4 @@
export function withTimeout<T>(promise: Promise<T>, ms: number): Promise<T> {
export function withTimeout<T>(promise: Promise<T>, ms: number, label?: string): Promise<T> {
let timeout: NodeJS.Timeout
return Promise.race([
promise.finally(() => {
@ -6,7 +6,7 @@ export function withTimeout<T>(promise: Promise<T>, ms: number): Promise<T> {
}),
new Promise<never>((_, reject) => {
timeout = setTimeout(() => {
reject(new Error(`Operation timed out after ${ms}ms`))
reject(new Error(label ?? `Operation timed out after ${ms}ms`))
}, ms)
}),
])

View file

@ -2,7 +2,7 @@ import { NodeHttpServer } from "@effect/platform-node"
import { describe, expect } from "bun:test"
import { Effect, Layer, Option, Schema } from "effect"
import { HttpClient, HttpClientRequest, HttpRouter } from "effect/unstable/http"
import { HttpApi, HttpApiBuilder, HttpApiEndpoint, HttpApiGroup } from "effect/unstable/httpapi"
import { HttpApi, HttpApiBuilder, HttpApiEndpoint, HttpApiError, HttpApiGroup } from "effect/unstable/httpapi"
import { ServerAuth } from "../../src/server/auth"
import { Authorization, authorizationLayer } from "../../src/server/routes/instance/httpapi/middleware/authorization"
import { testEffect } from "../lib/effect"
@ -13,11 +13,19 @@ const Api = HttpApi.make("test-authorization").add(
HttpApiEndpoint.get("probe", "/probe", {
success: Schema.String,
}),
HttpApiEndpoint.get("missing", "/missing", {
success: Schema.String,
error: HttpApiError.NotFound,
}),
)
.middleware(Authorization),
)
const handlers = HttpApiBuilder.group(Api, "test", (handlers) => handlers.handle("probe", () => Effect.succeed("ok")))
const handlers = HttpApiBuilder.group(Api, "test", (handlers) =>
handlers
.handle("probe", () => Effect.succeed("ok"))
.handle("missing", () => Effect.fail(new HttpApiError.NotFound({}))),
)
const apiLayer = HttpRouter.serve(
HttpApiBuilder.layer(Api).pipe(Layer.provide(handlers), Layer.provide(authorizationLayer)),
@ -32,8 +40,7 @@ const it = testEffect(apiLayer.pipe(Layer.provide(noAuthLayer)))
const itSecret = testEffect(apiLayer.pipe(Layer.provide(secretLayer)))
const itKitSecret = testEffect(apiLayer.pipe(Layer.provide(kitSecretLayer)))
const basic = (username: string, password: string) =>
`Basic ${Buffer.from(`${username}:${password}`).toString("base64")}`
const basic = (username: string, password: string) => ServerAuth.header({ username, password }) ?? ""
const token = (username: string, password: string) => Buffer.from(`${username}:${password}`).toString("base64")
@ -90,6 +97,35 @@ describe("HttpApi authorization middleware", () => {
}),
)
itSecret.live("prefers auth token query credentials over basic auth", () =>
Effect.gen(function* () {
const response = yield* HttpClientRequest.get(
`/probe?auth_token=${encodeURIComponent(token("opencode", "secret"))}`,
).pipe(HttpClientRequest.setHeader("authorization", basic("opencode", "wrong")), HttpClient.execute)
expect(response.status).toBe(200)
}),
)
itSecret.live("preserves handler errors when basic auth succeeds", () =>
Effect.gen(function* () {
const response = yield* HttpClientRequest.get("/missing").pipe(
HttpClientRequest.setHeader("authorization", basic("opencode", "secret")),
HttpClient.execute,
)
expect(response.status).toBe(404)
}),
)
itSecret.live("preserves handler errors when auth token query succeeds", () =>
Effect.gen(function* () {
const response = yield* HttpClient.get(`/missing?auth_token=${encodeURIComponent(token("opencode", "secret"))}`)
expect(response.status).toBe(404)
}),
)
itSecret.live("rejects malformed auth token query credentials", () =>
Effect.gen(function* () {
const response = yield* HttpClient.get("/probe?auth_token=not-base64")

View file

@ -0,0 +1,155 @@
import { afterEach, describe, expect, test } from "bun:test"
import { Flag } from "@opencode-ai/core/flag/flag"
import * as Log from "@opencode-ai/core/util/log"
import { Server } from "../../src/server/server"
import { PtyPaths } from "../../src/server/routes/instance/httpapi/groups/pty"
import { withTimeout } from "../../src/util/timeout"
import { resetDatabase } from "../fixture/db"
import { disposeAllInstances, tmpdir } from "../fixture/fixture"
void Log.init({ print: false })
const original = {
OPENCODE_EXPERIMENTAL_HTTPAPI: Flag.OPENCODE_EXPERIMENTAL_HTTPAPI,
OPENCODE_SERVER_PASSWORD: Flag.OPENCODE_SERVER_PASSWORD,
OPENCODE_SERVER_USERNAME: Flag.OPENCODE_SERVER_USERNAME,
envPassword: process.env.OPENCODE_SERVER_PASSWORD,
envUsername: process.env.OPENCODE_SERVER_USERNAME,
}
const auth = { username: "opencode", password: "listen-secret" }
const testPty = process.platform === "win32" ? test.skip : test
afterEach(async () => {
Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = original.OPENCODE_EXPERIMENTAL_HTTPAPI
Flag.OPENCODE_SERVER_PASSWORD = original.OPENCODE_SERVER_PASSWORD
Flag.OPENCODE_SERVER_USERNAME = original.OPENCODE_SERVER_USERNAME
if (original.envPassword === undefined) delete process.env.OPENCODE_SERVER_PASSWORD
else process.env.OPENCODE_SERVER_PASSWORD = original.envPassword
if (original.envUsername === undefined) delete process.env.OPENCODE_SERVER_USERNAME
else process.env.OPENCODE_SERVER_USERNAME = original.envUsername
await disposeAllInstances()
await resetDatabase()
})
async function startListener() {
Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = true
Flag.OPENCODE_SERVER_PASSWORD = auth.password
Flag.OPENCODE_SERVER_USERNAME = auth.username
process.env.OPENCODE_SERVER_PASSWORD = auth.password
process.env.OPENCODE_SERVER_USERNAME = auth.username
return Server.listen({ hostname: "127.0.0.1", port: 0 })
}
function authorization() {
return `Basic ${btoa(`${auth.username}:${auth.password}`)}`
}
function socketURL(listener: Awaited<ReturnType<typeof startListener>>, id: string, dir: string) {
const url = new URL(PtyPaths.connect.replace(":ptyID", id), listener.url)
url.protocol = "ws:"
url.searchParams.set("directory", dir)
url.searchParams.set("cursor", "-1")
url.searchParams.set("auth_token", btoa(`${auth.username}:${auth.password}`))
return url
}
async function createCat(listener: Awaited<ReturnType<typeof startListener>>, dir: string) {
const response = await fetch(new URL(PtyPaths.create, listener.url), {
method: "POST",
headers: {
authorization: authorization(),
"x-opencode-directory": dir,
"content-type": "application/json",
},
body: JSON.stringify({ command: "/bin/cat", title: "listen-smoke" }),
})
expect(response.status).toBe(200)
return (await response.json()) as { id: string }
}
async function openSocket(url: URL) {
const ws = new WebSocket(url)
ws.binaryType = "arraybuffer"
await withTimeout(
new Promise<void>((resolve, reject) => {
ws.addEventListener("open", () => resolve(), { once: true })
ws.addEventListener("error", () => reject(new Error("websocket failed before open")), { once: true })
}),
5_000,
"timed out waiting for websocket open",
)
return ws
}
function stop(listener: Awaited<ReturnType<typeof startListener>>, label: string) {
return withTimeout(listener.stop(true), 10_000, label)
}
function waitForMessage(ws: WebSocket, predicate: (message: string) => boolean) {
const decoder = new TextDecoder()
let onMessage: ((event: MessageEvent) => void) | undefined
return withTimeout(
new Promise<string>((resolve) => {
onMessage = (event: MessageEvent) => {
const message = typeof event.data === "string" ? event.data : decoder.decode(event.data as ArrayBuffer)
if (!predicate(message)) return
resolve(message)
}
ws.addEventListener("message", onMessage)
}),
5_000,
"timed out waiting for websocket message",
).finally(() => {
if (onMessage) ws.removeEventListener("message", onMessage)
})
}
describe("HttpApi Server.listen", () => {
testPty("serves HTTP routes and upgrades PTY websocket through Server.listen", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
const listener = await startListener()
let stopped = false
try {
const response = await fetch(new URL(PtyPaths.shells, listener.url), {
headers: { authorization: authorization(), "x-opencode-directory": tmp.path },
})
expect(response.status).toBe(200)
expect(await response.json()).toEqual(
expect.arrayContaining([
expect.objectContaining({
path: expect.any(String),
name: expect.any(String),
acceptable: expect.any(Boolean),
}),
]),
)
const info = await createCat(listener, tmp.path)
const ws = await openSocket(socketURL(listener, info.id, tmp.path))
const closed = new Promise<void>((resolve) => ws.addEventListener("close", () => resolve(), { once: true }))
const message = waitForMessage(ws, (message) => message.includes("ping-listen"))
ws.send("ping-listen\n")
expect(await message).toContain("ping-listen")
await stop(listener, "timed out waiting for listener.stop(true)")
stopped = true
await withTimeout(closed, 5_000, "timed out waiting for websocket close")
expect(ws.readyState).toBe(WebSocket.CLOSED)
const restarted = await startListener()
try {
const nextInfo = await createCat(restarted, tmp.path)
const nextWs = await openSocket(socketURL(restarted, nextInfo.id, tmp.path))
const nextMessage = waitForMessage(nextWs, (message) => message.includes("ping-restarted"))
nextWs.send("ping-restarted\n")
expect(await nextMessage).toContain("ping-restarted")
nextWs.close(1000)
} finally {
await stop(restarted, "timed out waiting for restarted listener.stop(true)")
}
} finally {
if (!stopped) await stop(listener, "timed out cleaning up listener").catch(() => undefined)
}
})
})

View file

@ -1,109 +0,0 @@
import { afterEach, describe, expect, test } from "bun:test"
import { Flag } from "@opencode-ai/core/flag/flag"
import * as Log from "@opencode-ai/core/util/log"
import { resetDatabase } from "../fixture/db"
import { disposeAllInstances, tmpdir } from "../fixture/fixture"
import { HttpApiListener } from "../../src/server/httpapi-listener"
import { PtyPaths } from "../../src/server/routes/instance/httpapi/groups/pty"
void Log.init({ print: false })
const original = Flag.OPENCODE_EXPERIMENTAL_HTTPAPI
const testPty = process.platform === "win32" ? test.skip : test
afterEach(async () => {
Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = original
await disposeAllInstances()
await resetDatabase()
})
async function startListener() {
Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = true
return HttpApiListener.listen({ hostname: "127.0.0.1", port: 0 })
}
describe("native HttpApi listener", () => {
test("serves HTTP routes via the HttpApi web handler", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
const listener = await startListener()
try {
const response = await fetch(`${listener.url.origin}${PtyPaths.shells}`, {
headers: { "x-opencode-directory": tmp.path },
})
expect(response.status).toBe(200)
const body = await response.json()
expect(Array.isArray(body)).toBe(true)
expect(body[0]).toMatchObject({
path: expect.any(String),
name: expect.any(String),
acceptable: expect.any(Boolean),
})
} finally {
await listener.stop(true)
}
})
testPty("PTY websocket connect echoes input back to the client", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
const listener = await startListener()
try {
const created = await fetch(`${listener.url.origin}${PtyPaths.create}`, {
method: "POST",
headers: {
"x-opencode-directory": tmp.path,
"content-type": "application/json",
},
body: JSON.stringify({ command: "/bin/cat", title: "listener-smoke" }),
})
expect(created.status).toBe(200)
const info = (await created.json()) as { id: string }
try {
const wsURL = new URL(PtyPaths.connect.replace(":ptyID", info.id), listener.url)
wsURL.protocol = "ws:"
wsURL.searchParams.set("directory", tmp.path)
wsURL.searchParams.set("cursor", "-1")
const messages: string[] = []
const ws = new WebSocket(wsURL)
ws.binaryType = "arraybuffer"
const opened = new Promise<void>((resolve, reject) => {
ws.addEventListener("open", () => resolve(), { once: true })
ws.addEventListener("error", () => reject(new Error("ws error before open")), { once: true })
})
const closed = new Promise<void>((resolve) => {
ws.addEventListener("close", () => resolve(), { once: true })
})
ws.addEventListener("message", (event) => {
const data = event.data
messages.push(typeof data === "string" ? data : new TextDecoder().decode(data as ArrayBuffer))
})
await opened
ws.send("ping-listener\n")
const start = Date.now()
while (!messages.some((m) => m.includes("ping-listener")) && Date.now() - start < 5_000) {
await new Promise((r) => setTimeout(r, 50))
}
ws.close(1000, "done")
expect(messages.some((m) => m.includes("ping-listener"))).toBe(true)
// Verify close event fires (handler.onClose path runs and the
// Bun.serve websocket lifecycle reaches close).
await closed
expect(ws.readyState).toBe(WebSocket.CLOSED)
} finally {
await fetch(`${listener.url.origin}${PtyPaths.remove.replace(":ptyID", info.id)}`, {
method: "DELETE",
headers: { "x-opencode-directory": tmp.path },
}).catch(() => undefined)
}
} finally {
await listener.stop(true)
}
})
})

View file

@ -33,10 +33,7 @@ const testMcpHandlers = HttpApiBuilder.group(TestHttpApi, "mcp", (handlers) =>
const passthroughAuthorization = Layer.succeed(
Authorization,
Authorization.of({
basic: (effect) => effect,
authToken: (effect) => effect,
}),
Authorization.of((effect) => effect),
)
const passthroughInstanceContext = Layer.succeed(