Use PTY service directly in HTTP routes (#25138)

This commit is contained in:
Kit Langton 2026-04-30 14:24:43 -04:00 committed by GitHub
parent 320527a3e4
commit f4ce240a2e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 174 additions and 90 deletions

View file

@ -5,7 +5,6 @@ import { InstanceState } from "@/effect/instance-state"
import { EffectBridge } from "@/effect/bridge"
import { lazy } from "@opencode-ai/core/util/lazy"
import { Plugin } from "@/plugin"
import { Instance } from "@/project/instance"
import { Shell } from "@/shell/shell"
import type { Proc } from "#pty"
import * as Log from "@opencode-ai/core/util/log"
@ -229,42 +228,38 @@ export const layer = Layer.effect(
subscribers: new Map(),
}
s.sessions.set(id, session)
proc.onData(
Instance.bind((chunk) => {
session.cursor += chunk.length
proc.onData((chunk) => {
session.cursor += chunk.length
for (const [key, ws] of session.subscribers.entries()) {
if (ws.readyState !== 1) {
session.subscribers.delete(key)
continue
}
if (sock(ws) !== key) {
session.subscribers.delete(key)
continue
}
try {
ws.send(chunk)
} catch {
session.subscribers.delete(key)
}
for (const [key, ws] of session.subscribers.entries()) {
if (ws.readyState !== 1) {
session.subscribers.delete(key)
continue
}
if (sock(ws) !== key) {
session.subscribers.delete(key)
continue
}
try {
ws.send(chunk)
} catch {
session.subscribers.delete(key)
}
}
session.buffer += chunk
if (session.buffer.length <= BUFFER_LIMIT) return
const excess = session.buffer.length - BUFFER_LIMIT
session.buffer = session.buffer.slice(excess)
session.bufferCursor += excess
}),
)
proc.onExit(
Instance.bind(({ exitCode }) => {
if (session.info.status === "exited") return
log.info("session exited", { id, exitCode })
session.info.status = "exited"
bridge.fork(bus.publish(Event.Exited, { id, exitCode }))
bridge.fork(remove(id))
}),
)
session.buffer += chunk
if (session.buffer.length <= BUFFER_LIMIT) return
const excess = session.buffer.length - BUFFER_LIMIT
session.buffer = session.buffer.slice(excess)
session.bufferCursor += excess
})
proc.onExit(({ exitCode }) => {
if (session.info.status === "exited") return
log.info("session exited", { id, exitCode })
session.info.status = "exited"
bridge.fork(bus.publish(Event.Exited, { id, exitCode }))
bridge.fork(remove(id))
})
yield* bus.publish(Event.Created, { info })
return info
})

View file

@ -1,4 +1,3 @@
import { EffectBridge } from "@/effect/bridge"
import { Pty } from "@/pty"
import { PtyID } from "@/pty/schema"
import { handlePtyInput } from "@/pty/input"
@ -23,16 +22,11 @@ export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handler
})
const create = Effect.fn("PtyHttpApi.create")(function* (ctx: { payload: typeof Pty.CreateInput.Type }) {
const bridge = yield* EffectBridge.make()
return yield* Effect.promise(() =>
bridge.promise(
pty.create({
...ctx.payload,
args: ctx.payload.args ? [...ctx.payload.args] : undefined,
env: ctx.payload.env ? { ...ctx.payload.env } : undefined,
}),
),
)
return yield* pty.create({
...ctx.payload,
args: ctx.payload.args ? [...ctx.payload.args] : undefined,
env: ctx.payload.env ? { ...ctx.payload.env } : undefined,
})
})
const get = Effect.fn("PtyHttpApi.get")(function* (ctx: { params: { ptyID: PtyID } }) {
@ -68,52 +62,60 @@ export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handler
}),
)
export const ptyConnectRoute = HttpRouter.add(
"GET",
PtyPaths.connect,
export const ptyConnectRoute = HttpRouter.use((router) =>
Effect.gen(function* () {
const pty = yield* Pty.Service
const params = yield* HttpRouter.schemaPathParams(Params)
if (!(yield* pty.get(params.ptyID))) return HttpServerResponse.empty({ status: 404 })
yield* router.add(
"GET",
PtyPaths.connect,
Effect.gen(function* () {
const params = yield* HttpRouter.schemaPathParams(Params)
if (!(yield* pty.get(params.ptyID))) return HttpServerResponse.empty({ status: 404 })
const query = yield* HttpServerRequest.schemaSearchParams(CursorQuery)
const parsedCursor = query.cursor === undefined ? undefined : Number(query.cursor)
const cursor =
parsedCursor !== undefined && Number.isSafeInteger(parsedCursor) && parsedCursor >= -1 ? parsedCursor : undefined
const socket = yield* Effect.orDie((yield* HttpServerRequest.HttpServerRequest).upgrade)
const write = yield* socket.writer
let closed = false
const adapter = {
get readyState() {
return closed ? 3 : 1
},
send: (data: string | Uint8Array | ArrayBuffer) => {
if (closed) return
Effect.runFork(
write(data instanceof ArrayBuffer ? new Uint8Array(data) : data).pipe(Effect.catch(() => Effect.void)),
)
},
close: (code?: number, reason?: string) => {
if (closed) return
closed = true
Effect.runFork(write(new Socket.CloseEvent(code, reason)).pipe(Effect.catch(() => Effect.void)))
},
}
const handler = yield* pty.connect(params.ptyID, adapter, cursor)
if (!handler) return HttpServerResponse.empty()
yield* socket
.runRaw((message) => handlePtyInput(handler, message))
.pipe(
Effect.catchReason("SocketError", "SocketCloseError", () => Effect.void),
Effect.ensuring(
Effect.sync(() => {
const query = yield* HttpServerRequest.schemaSearchParams(CursorQuery)
const parsedCursor = query.cursor === undefined ? undefined : Number(query.cursor)
const cursor =
parsedCursor !== undefined && Number.isSafeInteger(parsedCursor) && parsedCursor >= -1
? parsedCursor
: undefined
const socket = yield* Effect.orDie((yield* HttpServerRequest.HttpServerRequest).upgrade)
const write = yield* socket.writer
const services = yield* Effect.context()
const writeScoped = (effect: Effect.Effect<void, unknown>) => {
Effect.runForkWith(services)(effect.pipe(Effect.catch(() => Effect.void)))
}
let closed = false
const adapter = {
get readyState() {
return closed ? 3 : 1
},
send: (data: string | Uint8Array | ArrayBuffer) => {
if (closed) return
writeScoped(write(data instanceof ArrayBuffer ? new Uint8Array(data) : data))
},
close: (code?: number, reason?: string) => {
if (closed) return
closed = true
handler.onClose()
}),
),
Effect.orDie,
)
return HttpServerResponse.empty()
}).pipe(Effect.provide(Pty.defaultLayer)),
writeScoped(write(new Socket.CloseEvent(code, reason)))
},
}
const handler = yield* pty.connect(params.ptyID, adapter, cursor)
if (!handler) return HttpServerResponse.empty()
yield* socket
.runRaw((message) => handlePtyInput(handler, message))
.pipe(
Effect.catchReason("SocketError", "SocketCloseError", () => Effect.void),
Effect.ensuring(
Effect.sync(() => {
closed = true
handler.onClose()
}),
),
Effect.orDie,
)
return HttpServerResponse.empty()
}),
)
}),
)

View file

@ -1,4 +1,5 @@
import { afterEach, describe, expect, test } from "bun:test"
import { NodeHttpServer, NodeServices } from "@effect/platform-node"
import { Flag } from "@opencode-ai/core/flag/flag"
import { PtyID } from "../../src/pty/schema"
import { Instance } from "../../src/project/instance"
@ -6,18 +7,60 @@ import { Server } from "../../src/server/server"
import { PtyPaths } from "../../src/server/routes/instance/httpapi/groups/pty"
import * as Log from "@opencode-ai/core/util/log"
import { resetDatabase } from "../fixture/db"
import { tmpdir } from "../fixture/fixture"
import { tmpdir, tmpdirScoped } from "../fixture/fixture"
import { Config, Effect, Layer, Queue, Schema } from "effect"
import { HttpClient, HttpClientRequest, HttpRouter, HttpServer } from "effect/unstable/http"
import * as Socket from "effect/unstable/socket/Socket"
import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
import { Pty } from "../../src/pty"
import { testEffect } from "../lib/effect"
void Log.init({ print: false })
const original = Flag.OPENCODE_EXPERIMENTAL_HTTPAPI
const testPty = process.platform === "win32" ? test.skip : test
const testStateLayer = Layer.effectDiscard(
Effect.gen(function* () {
Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = true
yield* Effect.promise(() => resetDatabase())
yield* Effect.addFinalizer(() =>
Effect.promise(async () => {
Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = original
await resetDatabase()
}),
)
}),
)
const servedRoutes: Layer.Layer<never, Config.ConfigError, HttpServer.HttpServer> = HttpRouter.serve(
ExperimentalHttpApiServer.routes,
{ disableListenLog: true, disableLogger: true },
)
const effectIt = testEffect(
Layer.mergeAll(
testStateLayer,
Socket.layerWebSocketConstructorGlobal,
servedRoutes.pipe(
Layer.provide(Socket.layerWebSocketConstructorGlobal),
Layer.provideMerge(NodeHttpServer.layerTest),
Layer.provideMerge(NodeServices.layer),
),
),
)
function app() {
Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = true
return Server.Default().app
}
function serverUrl() {
return HttpServer.HttpServer.use((server) => Effect.succeed(HttpServer.formatAddress(server.address)))
}
const directoryHeader = (dir: string) => HttpClientRequest.setHeader("x-opencode-directory", dir)
afterEach(async () => {
Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = original
await Instance.disposeAll()
@ -85,4 +128,48 @@ describe("pty HttpApi bridge", () => {
})
expect(response.status).toBe(404)
})
;(process.platform === "win32" ? effectIt.live.skip : effectIt.live)(
"serves PTY websocket output and input through Effect routes",
() =>
Effect.gen(function* () {
const dir = yield* tmpdirScoped({ git: true, config: { formatter: false, lsp: false } })
const created = yield* HttpClientRequest.post(PtyPaths.create).pipe(
directoryHeader(dir),
HttpClientRequest.bodyJson({ command: "/bin/cat", title: "websocket" }),
Effect.flatMap(HttpClient.execute),
)
expect(created.status).toBe(200)
const info = yield* Schema.decodeUnknownEffect(Pty.Info)(yield* created.json)
const socket = yield* Socket.makeWebSocket(
`${(yield* serverUrl()).replace(/^http/, "ws")}${PtyPaths.connect.replace(":ptyID", info.id)}?cursor=-1&directory=${encodeURIComponent(dir)}`,
{ closeCodeIsError: () => false },
)
const messages = yield* Queue.unbounded<string>()
yield* socket
.runRaw((message) =>
Queue.offer(messages, typeof message === "string" ? message : new TextDecoder().decode(message)),
)
.pipe(Effect.catch(() => Effect.void))
.pipe(Effect.forkScoped)
const write = yield* socket.writer
const takeUntil = (expected: string, seen = ""): Effect.Effect<string, unknown> =>
Effect.gen(function* () {
const next = seen + (yield* Queue.take(messages).pipe(Effect.timeout("5 seconds")))
if (next.includes(expected)) return next
return yield* takeUntil(expected, next)
})
yield* write("ping-route\n")
expect(yield* takeUntil("ping-route")).toContain("ping-route")
yield* write(new Socket.CloseEvent(1000, "done")).pipe(Effect.catch(() => Effect.void))
const removed = yield* HttpClientRequest.delete(PtyPaths.remove.replace(":ptyID", info.id)).pipe(
directoryHeader(dir),
HttpClient.execute,
)
expect(removed.status).toBe(200)
}),
)
})