From f4ce240a2ed419bba49dd1ff1326c036ba53b2ca Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Thu, 30 Apr 2026 14:24:43 -0400 Subject: [PATCH] Use PTY service directly in HTTP routes (#25138) --- packages/opencode/src/pty/index.ts | 63 +++++----- .../routes/instance/httpapi/handlers/pty.ts | 112 +++++++++--------- .../opencode/test/server/httpapi-pty.test.ts | 89 +++++++++++++- 3 files changed, 174 insertions(+), 90 deletions(-) diff --git a/packages/opencode/src/pty/index.ts b/packages/opencode/src/pty/index.ts index 2518800ce8..ade4b5d02e 100644 --- a/packages/opencode/src/pty/index.ts +++ b/packages/opencode/src/pty/index.ts @@ -5,7 +5,6 @@ import { InstanceState } from "@/effect/instance-state" import { EffectBridge } from "@/effect/bridge" import { lazy } from "@opencode-ai/core/util/lazy" import { Plugin } from "@/plugin" -import { Instance } from "@/project/instance" import { Shell } from "@/shell/shell" import type { Proc } from "#pty" import * as Log from "@opencode-ai/core/util/log" @@ -229,42 +228,38 @@ export const layer = Layer.effect( subscribers: new Map(), } s.sessions.set(id, session) - proc.onData( - Instance.bind((chunk) => { - session.cursor += chunk.length + proc.onData((chunk) => { + session.cursor += chunk.length - for (const [key, ws] of session.subscribers.entries()) { - if (ws.readyState !== 1) { - session.subscribers.delete(key) - continue - } - if (sock(ws) !== key) { - session.subscribers.delete(key) - continue - } - try { - ws.send(chunk) - } catch { - session.subscribers.delete(key) - } + for (const [key, ws] of session.subscribers.entries()) { + if (ws.readyState !== 1) { + session.subscribers.delete(key) + continue } + if (sock(ws) !== key) { + session.subscribers.delete(key) + continue + } + try { + ws.send(chunk) + } catch { + session.subscribers.delete(key) + } + } - session.buffer += chunk - if (session.buffer.length <= BUFFER_LIMIT) return - const excess = session.buffer.length - BUFFER_LIMIT - session.buffer = session.buffer.slice(excess) - session.bufferCursor += excess - }), - ) - proc.onExit( - Instance.bind(({ exitCode }) => { - if (session.info.status === "exited") return - log.info("session exited", { id, exitCode }) - session.info.status = "exited" - bridge.fork(bus.publish(Event.Exited, { id, exitCode })) - bridge.fork(remove(id)) - }), - ) + session.buffer += chunk + if (session.buffer.length <= BUFFER_LIMIT) return + const excess = session.buffer.length - BUFFER_LIMIT + session.buffer = session.buffer.slice(excess) + session.bufferCursor += excess + }) + proc.onExit(({ exitCode }) => { + if (session.info.status === "exited") return + log.info("session exited", { id, exitCode }) + session.info.status = "exited" + bridge.fork(bus.publish(Event.Exited, { id, exitCode })) + bridge.fork(remove(id)) + }) yield* bus.publish(Event.Created, { info }) return info }) diff --git a/packages/opencode/src/server/routes/instance/httpapi/handlers/pty.ts b/packages/opencode/src/server/routes/instance/httpapi/handlers/pty.ts index aa151cecec..cc7c385b3e 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/handlers/pty.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/handlers/pty.ts @@ -1,4 +1,3 @@ -import { EffectBridge } from "@/effect/bridge" import { Pty } from "@/pty" import { PtyID } from "@/pty/schema" import { handlePtyInput } from "@/pty/input" @@ -23,16 +22,11 @@ export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handler }) const create = Effect.fn("PtyHttpApi.create")(function* (ctx: { payload: typeof Pty.CreateInput.Type }) { - const bridge = yield* EffectBridge.make() - return yield* Effect.promise(() => - bridge.promise( - pty.create({ - ...ctx.payload, - args: ctx.payload.args ? [...ctx.payload.args] : undefined, - env: ctx.payload.env ? { ...ctx.payload.env } : undefined, - }), - ), - ) + return yield* pty.create({ + ...ctx.payload, + args: ctx.payload.args ? [...ctx.payload.args] : undefined, + env: ctx.payload.env ? { ...ctx.payload.env } : undefined, + }) }) const get = Effect.fn("PtyHttpApi.get")(function* (ctx: { params: { ptyID: PtyID } }) { @@ -68,52 +62,60 @@ export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handler }), ) -export const ptyConnectRoute = HttpRouter.add( - "GET", - PtyPaths.connect, +export const ptyConnectRoute = HttpRouter.use((router) => Effect.gen(function* () { const pty = yield* Pty.Service - const params = yield* HttpRouter.schemaPathParams(Params) - if (!(yield* pty.get(params.ptyID))) return HttpServerResponse.empty({ status: 404 }) + yield* router.add( + "GET", + PtyPaths.connect, + Effect.gen(function* () { + const params = yield* HttpRouter.schemaPathParams(Params) + if (!(yield* pty.get(params.ptyID))) return HttpServerResponse.empty({ status: 404 }) - const query = yield* HttpServerRequest.schemaSearchParams(CursorQuery) - const parsedCursor = query.cursor === undefined ? undefined : Number(query.cursor) - const cursor = - parsedCursor !== undefined && Number.isSafeInteger(parsedCursor) && parsedCursor >= -1 ? parsedCursor : undefined - const socket = yield* Effect.orDie((yield* HttpServerRequest.HttpServerRequest).upgrade) - const write = yield* socket.writer - let closed = false - const adapter = { - get readyState() { - return closed ? 3 : 1 - }, - send: (data: string | Uint8Array | ArrayBuffer) => { - if (closed) return - Effect.runFork( - write(data instanceof ArrayBuffer ? new Uint8Array(data) : data).pipe(Effect.catch(() => Effect.void)), - ) - }, - close: (code?: number, reason?: string) => { - if (closed) return - closed = true - Effect.runFork(write(new Socket.CloseEvent(code, reason)).pipe(Effect.catch(() => Effect.void))) - }, - } - const handler = yield* pty.connect(params.ptyID, adapter, cursor) - if (!handler) return HttpServerResponse.empty() - - yield* socket - .runRaw((message) => handlePtyInput(handler, message)) - .pipe( - Effect.catchReason("SocketError", "SocketCloseError", () => Effect.void), - Effect.ensuring( - Effect.sync(() => { + const query = yield* HttpServerRequest.schemaSearchParams(CursorQuery) + const parsedCursor = query.cursor === undefined ? undefined : Number(query.cursor) + const cursor = + parsedCursor !== undefined && Number.isSafeInteger(parsedCursor) && parsedCursor >= -1 + ? parsedCursor + : undefined + const socket = yield* Effect.orDie((yield* HttpServerRequest.HttpServerRequest).upgrade) + const write = yield* socket.writer + const services = yield* Effect.context() + const writeScoped = (effect: Effect.Effect) => { + Effect.runForkWith(services)(effect.pipe(Effect.catch(() => Effect.void))) + } + let closed = false + const adapter = { + get readyState() { + return closed ? 3 : 1 + }, + send: (data: string | Uint8Array | ArrayBuffer) => { + if (closed) return + writeScoped(write(data instanceof ArrayBuffer ? new Uint8Array(data) : data)) + }, + close: (code?: number, reason?: string) => { + if (closed) return closed = true - handler.onClose() - }), - ), - Effect.orDie, - ) - return HttpServerResponse.empty() - }).pipe(Effect.provide(Pty.defaultLayer)), + writeScoped(write(new Socket.CloseEvent(code, reason))) + }, + } + const handler = yield* pty.connect(params.ptyID, adapter, cursor) + if (!handler) return HttpServerResponse.empty() + + yield* socket + .runRaw((message) => handlePtyInput(handler, message)) + .pipe( + Effect.catchReason("SocketError", "SocketCloseError", () => Effect.void), + Effect.ensuring( + Effect.sync(() => { + closed = true + handler.onClose() + }), + ), + Effect.orDie, + ) + return HttpServerResponse.empty() + }), + ) + }), ) diff --git a/packages/opencode/test/server/httpapi-pty.test.ts b/packages/opencode/test/server/httpapi-pty.test.ts index 37d2a4f64d..e4d22427cb 100644 --- a/packages/opencode/test/server/httpapi-pty.test.ts +++ b/packages/opencode/test/server/httpapi-pty.test.ts @@ -1,4 +1,5 @@ import { afterEach, describe, expect, test } from "bun:test" +import { NodeHttpServer, NodeServices } from "@effect/platform-node" import { Flag } from "@opencode-ai/core/flag/flag" import { PtyID } from "../../src/pty/schema" import { Instance } from "../../src/project/instance" @@ -6,18 +7,60 @@ import { Server } from "../../src/server/server" import { PtyPaths } from "../../src/server/routes/instance/httpapi/groups/pty" import * as Log from "@opencode-ai/core/util/log" import { resetDatabase } from "../fixture/db" -import { tmpdir } from "../fixture/fixture" +import { tmpdir, tmpdirScoped } from "../fixture/fixture" +import { Config, Effect, Layer, Queue, Schema } from "effect" +import { HttpClient, HttpClientRequest, HttpRouter, HttpServer } from "effect/unstable/http" +import * as Socket from "effect/unstable/socket/Socket" +import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server" +import { Pty } from "../../src/pty" +import { testEffect } from "../lib/effect" void Log.init({ print: false }) const original = Flag.OPENCODE_EXPERIMENTAL_HTTPAPI const testPty = process.platform === "win32" ? test.skip : test +const testStateLayer = Layer.effectDiscard( + Effect.gen(function* () { + Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = true + yield* Effect.promise(() => resetDatabase()) + yield* Effect.addFinalizer(() => + Effect.promise(async () => { + Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = original + await resetDatabase() + }), + ) + }), +) + +const servedRoutes: Layer.Layer = HttpRouter.serve( + ExperimentalHttpApiServer.routes, + { disableListenLog: true, disableLogger: true }, +) + +const effectIt = testEffect( + Layer.mergeAll( + testStateLayer, + Socket.layerWebSocketConstructorGlobal, + servedRoutes.pipe( + Layer.provide(Socket.layerWebSocketConstructorGlobal), + Layer.provideMerge(NodeHttpServer.layerTest), + Layer.provideMerge(NodeServices.layer), + ), + ), +) + function app() { Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = true return Server.Default().app } +function serverUrl() { + return HttpServer.HttpServer.use((server) => Effect.succeed(HttpServer.formatAddress(server.address))) +} + +const directoryHeader = (dir: string) => HttpClientRequest.setHeader("x-opencode-directory", dir) + afterEach(async () => { Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = original await Instance.disposeAll() @@ -85,4 +128,48 @@ describe("pty HttpApi bridge", () => { }) expect(response.status).toBe(404) }) + ;(process.platform === "win32" ? effectIt.live.skip : effectIt.live)( + "serves PTY websocket output and input through Effect routes", + () => + Effect.gen(function* () { + const dir = yield* tmpdirScoped({ git: true, config: { formatter: false, lsp: false } }) + const created = yield* HttpClientRequest.post(PtyPaths.create).pipe( + directoryHeader(dir), + HttpClientRequest.bodyJson({ command: "/bin/cat", title: "websocket" }), + Effect.flatMap(HttpClient.execute), + ) + expect(created.status).toBe(200) + const info = yield* Schema.decodeUnknownEffect(Pty.Info)(yield* created.json) + + const socket = yield* Socket.makeWebSocket( + `${(yield* serverUrl()).replace(/^http/, "ws")}${PtyPaths.connect.replace(":ptyID", info.id)}?cursor=-1&directory=${encodeURIComponent(dir)}`, + { closeCodeIsError: () => false }, + ) + const messages = yield* Queue.unbounded() + yield* socket + .runRaw((message) => + Queue.offer(messages, typeof message === "string" ? message : new TextDecoder().decode(message)), + ) + .pipe(Effect.catch(() => Effect.void)) + .pipe(Effect.forkScoped) + const write = yield* socket.writer + + const takeUntil = (expected: string, seen = ""): Effect.Effect => + Effect.gen(function* () { + const next = seen + (yield* Queue.take(messages).pipe(Effect.timeout("5 seconds"))) + if (next.includes(expected)) return next + return yield* takeUntil(expected, next) + }) + + yield* write("ping-route\n") + expect(yield* takeUntil("ping-route")).toContain("ping-route") + yield* write(new Socket.CloseEvent(1000, "done")).pipe(Effect.catch(() => Effect.void)) + + const removed = yield* HttpClientRequest.delete(PtyPaths.remove.replace(":ptyID", info.id)).pipe( + directoryHeader(dir), + HttpClient.execute, + ) + expect(removed.status).toBe(200) + }), + ) })