Refactor workspace service boundaries (#25152)

This commit is contained in:
Kit Langton 2026-04-30 15:34:37 -04:00 committed by GitHub
parent b315a70773
commit 0e9d9282c6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 433 additions and 417 deletions

View file

@ -1,6 +1,5 @@
import { Context, Effect, FiberMap, Layer, Schema, Stream } from "effect"
import { FetchHttpClient, HttpBody, HttpClient, HttpClientError, HttpClientRequest } from "effect/unstable/http"
import { fn } from "@/util/fn"
import { Database } from "@/storage/db"
import { asc } from "drizzle-orm"
import { eq } from "drizzle-orm"
@ -24,7 +23,6 @@ import { Session } from "@/session/session"
import { SessionTable } from "@/session/session.sql"
import { SessionID } from "@/session/schema"
import { errorData } from "@/util/error"
import { makeRuntime } from "@/effect/run-service"
import { waitEvent } from "./util"
import { WorkspaceContext } from "./workspace-context"
import { NonNegativeInt, withStatics } from "@/util/schema"
@ -857,42 +855,4 @@ function route(url: string | URL, path: string) {
return next
}
const { runPromise, runSync } = makeRuntime(Service, defaultLayer)
export const create = fn(CreateInput.zod, (input) => runPromise((svc) => svc.create(input)))
export const sessionRestore = fn(SessionRestoreInput.zod, (input) => runPromise((svc) => svc.sessionRestore(input)))
export function list(project: Project.Info) {
return Database.use((db) =>
db
.select()
.from(WorkspaceTable)
.where(eq(WorkspaceTable.project_id, project.id))
.all()
.map(fromRow)
.sort((a, b) => a.id.localeCompare(b.id)),
)
}
export const get = fn(WorkspaceID.zod, (id) => runPromise((svc) => svc.get(id)))
export const remove = fn(WorkspaceID.zod, (id) => runPromise((svc) => svc.remove(id)))
export function status() {
return runSync((svc) => svc.status())
}
export function isSyncing(workspaceID: WorkspaceID) {
return runSync((svc) => svc.isSyncing(workspaceID))
}
export function waitForSync(workspaceID: WorkspaceID, state: Record<string, number>, signal?: AbortSignal) {
return runPromise((svc) => svc.waitForSync(workspaceID, state, signal))
}
export function startWorkspaceSyncing(projectID: ProjectID) {
void runPromise((svc) => svc.startWorkspaceSyncing(projectID))
}
export * as Workspace from "./workspace"

View file

@ -5,6 +5,8 @@ import { EventSequenceTable } from "@/sync/event.sql"
import { Workspace } from "@/control-plane/workspace"
import type { WorkspaceID } from "@/control-plane/schema"
import * as Log from "@opencode-ai/core/util/log"
import { AppRuntime } from "@/effect/app-runtime"
import { Effect } from "effect"
const HEADER = "x-opencode-sync"
type State = Record<string, number>
@ -54,16 +56,22 @@ export function parse(headers: Headers) {
) as State
}
export function waitEffect(workspaceID: WorkspaceID, state: State, signal?: AbortSignal) {
return Effect.gen(function* () {
log.info("waiting for state", {
workspaceID,
state,
})
yield* Workspace.Service.use((workspace) => workspace.waitForSync(workspaceID, state, signal))
log.info("state fully synced", {
workspaceID,
state,
})
})
}
export async function wait(workspaceID: WorkspaceID, state: State, signal?: AbortSignal) {
log.info("waiting for state", {
workspaceID,
state,
})
await Workspace.waitForSync(workspaceID, state, signal)
log.info("state fully synced", {
workspaceID,
state,
})
await AppRuntime.runPromise(waitEffect(workspaceID, state, signal))
}
export const FenceMiddleware: MiddlewareHandler = async (c, next) => {

View file

@ -4,6 +4,7 @@ import * as Log from "@opencode-ai/core/util/log"
import * as Fence from "./fence"
import type { WorkspaceID } from "@/control-plane/schema"
import { Workspace } from "@/control-plane/workspace"
import { AppRuntime } from "@/effect/app-runtime"
import { ProxyUtil } from "./proxy-util"
import { Effect, Stream } from "effect"
import { FetchHttpClient, HttpBody, HttpClient, HttpClientRequest } from "effect/unstable/http"
@ -69,18 +70,17 @@ function statusText(response: unknown) {
}
export function httpEffect(url: string | URL, extra: HeadersInit | undefined, req: Request, workspaceID: WorkspaceID) {
if (!Workspace.isSyncing(workspaceID)) {
return Effect.succeed(
new Response(`broken sync connection for workspace: ${workspaceID}`, {
return Effect.gen(function* () {
const syncing = yield* Workspace.Service.use((workspace) => workspace.isSyncing(workspaceID))
if (!syncing) {
return new Response(`broken sync connection for workspace: ${workspaceID}`, {
status: 503,
headers: {
"content-type": "text/plain; charset=utf-8",
},
}),
)
}
})
}
return Effect.gen(function* () {
const response = yield* HttpClient.execute(
HttpClientRequest.make(req.method as never)(url, {
headers: ProxyUtil.headers(req, extra),
@ -100,7 +100,7 @@ export function httpEffect(url: string | URL, extra: HeadersInit | undefined, re
next.delete("content-encoding")
next.delete("content-length")
if (sync) yield* Effect.promise(() => Fence.wait(workspaceID, sync, req.signal))
if (sync) yield* Fence.waitEffect(workspaceID, sync, req.signal)
const body = yield* Stream.toReadableStreamEffect(response.stream.pipe(Stream.catchCause(() => Stream.empty)))
return new Response(body, {
status: response.status,
@ -114,7 +114,7 @@ export function httpEffect(url: string | URL, extra: HeadersInit | undefined, re
}
export function http(url: string | URL, extra: HeadersInit | undefined, req: Request, workspaceID: WorkspaceID) {
return Effect.runPromise(httpEffect(url, extra, req, workspaceID))
return AppRuntime.runPromise(httpEffect(url, extra, req, workspaceID))
}
export function websocket(

View file

@ -1,8 +1,10 @@
import { Hono } from "hono"
import { describeRoute, resolver, validator } from "hono-openapi"
import z from "zod"
import { Effect } from "effect"
import { listAdaptors } from "@/control-plane/adaptors"
import { Workspace } from "@/control-plane/workspace"
import { AppRuntime } from "@/effect/app-runtime"
import { WorkspaceAdaptorEntry } from "@/control-plane/types"
import { zodObject } from "@/util/effect-zod"
import { Instance } from "@/project/instance"
@ -62,10 +64,14 @@ export const WorkspaceRoutes = lazy(() =>
),
async (c) => {
const body = c.req.valid("json") as Omit<Workspace.CreateInput, "projectID">
const workspace = await Workspace.create({
projectID: Instance.project.id,
...body,
})
const workspace = await AppRuntime.runPromise(
Workspace.Service.use((svc) =>
svc.create({
projectID: Instance.project.id,
...body,
}),
),
)
return c.json(workspace)
},
)
@ -87,7 +93,7 @@ export const WorkspaceRoutes = lazy(() =>
},
}),
async (c) => {
return c.json(Workspace.list(Instance.project))
return c.json(await AppRuntime.runPromise(Workspace.Service.use((svc) => svc.list(Instance.project))))
},
)
.get(
@ -108,8 +114,11 @@ export const WorkspaceRoutes = lazy(() =>
},
}),
async (c) => {
const ids = new Set(Workspace.list(Instance.project).map((item) => item.id))
return c.json(Workspace.status().filter((item) => ids.has(item.workspaceID)))
const result = await AppRuntime.runPromise(
Workspace.Service.use((svc) => Effect.all([svc.list(Instance.project), svc.status()])),
)
const ids = new Set(result[0].map((item) => item.id))
return c.json(result[1].filter((item) => ids.has(item.workspaceID)))
},
)
.delete(
@ -138,7 +147,7 @@ export const WorkspaceRoutes = lazy(() =>
),
async (c) => {
const { id } = c.req.valid("param")
return c.json(await Workspace.remove(id))
return c.json(await AppRuntime.runPromise(Workspace.Service.use((svc) => svc.remove(id))))
},
)
.post(
@ -174,10 +183,14 @@ export const WorkspaceRoutes = lazy(() =>
directory: Instance.directory,
})
try {
const result = await Workspace.sessionRestore({
workspaceID: id,
...body,
})
const result = await AppRuntime.runPromise(
Workspace.Service.use((svc) =>
svc.sessionRestore({
workspaceID: id,
...body,
}),
),
)
log.info("session restore route complete", {
workspaceID: id,
sessionID: body.sessionID,

View file

@ -1,4 +1,4 @@
import { startWorkspaceSyncing } from "@/control-plane/workspace"
import { Workspace } from "@/control-plane/workspace"
import * as InstanceState from "@/effect/instance-state"
import { Database } from "@/storage/db"
import { SyncEvent } from "@/sync"
@ -9,15 +9,20 @@ import { eq } from "drizzle-orm"
import { lte } from "drizzle-orm"
import { not } from "drizzle-orm"
import { or } from "drizzle-orm"
import { Effect } from "effect"
import { Effect, Scope } from "effect"
import { HttpApiBuilder } from "effect/unstable/httpapi"
import { InstanceHttpApi } from "../api"
import { HistoryPayload, ReplayPayload } from "../groups/sync"
export const syncHandlers = HttpApiBuilder.group(InstanceHttpApi, "sync", (handlers) =>
Effect.gen(function* () {
const workspace = yield* Workspace.Service
const scope = yield* Scope.Scope
const start = Effect.fn("SyncHttpApi.start")(function* () {
startWorkspaceSyncing((yield* InstanceState.context).project.id)
yield* workspace
.startWorkspaceSyncing((yield* InstanceState.context).project.id)
.pipe(Effect.ignore, Effect.forkIn(scope))
return true
})

View file

@ -75,9 +75,9 @@ function shouldStayOnControlPlane(request: HttpServerRequest.HttpServerRequest,
function resolveWorkspace(
id: WorkspaceID | undefined,
envWorkspaceID: WorkspaceID | undefined,
): Effect.Effect<Workspace.Info | void> {
): Effect.Effect<Workspace.Info | void, never, Workspace.Service> {
if (!id || envWorkspaceID) return Effect.void
return Effect.promise(() => Workspace.get(id))
return Workspace.Service.use((workspace) => workspace.get(id))
}
function missingWorkspaceResponse(id: WorkspaceID): HttpServerResponse.HttpServerResponse {
@ -99,9 +99,9 @@ function proxyRemote(
workspace: Workspace.Info,
target: RemoteTarget,
url: URL,
): Effect.Effect<HttpServerResponse.HttpServerResponse, never, Socket.WebSocketConstructor> {
): Effect.Effect<HttpServerResponse.HttpServerResponse, never, Socket.WebSocketConstructor | Workspace.Service> {
return Effect.gen(function* () {
const syncing = yield* Effect.sync(() => Workspace.isSyncing(workspace.id))
const syncing = yield* Workspace.Service.use((svc) => svc.isSyncing(workspace.id))
if (!syncing) {
return HttpServerResponse.text(`broken sync connection for workspace: ${workspace.id}`, {
status: 503,
@ -113,10 +113,17 @@ function proxyRemote(
if (headers["upgrade"]?.toLowerCase() === "websocket") return yield* HttpApiProxy.websocket(request, proxyURL)
const response = yield* HttpApiProxy.http(proxyURL, target.headers, request)
const sync = Fence.parse(new Headers(response.headers))
if (sync)
yield* Effect.promise(() =>
Fence.wait(workspace.id, sync, request.source instanceof Request ? request.source.signal : undefined),
if (sync) {
const syncFailure = yield* Fence.waitEffect(
workspace.id,
sync,
request.source instanceof Request ? request.source.signal : undefined,
).pipe(
Effect.as(undefined),
Effect.catch((error) => Effect.succeed(HttpServerResponse.text(error.message, { status: 503 }))),
)
if (syncFailure) return syncFailure
}
return response
})
}
@ -125,7 +132,7 @@ function planWorkspaceRequest(
request: HttpServerRequest.HttpServerRequest,
url: URL,
workspace: Workspace.Info,
): Effect.Effect<RequestPlan> {
): Effect.Effect<RequestPlan, never, Workspace.Service> {
return Effect.gen(function* () {
const target = yield* resolveTarget(workspace)
if (target.type === "remote") return RequestPlan.Remote({ request, workspace, target, url })
@ -136,7 +143,7 @@ function planWorkspaceRequest(
function planRequest(
request: HttpServerRequest.HttpServerRequest,
sessionWorkspaceID?: WorkspaceID,
): Effect.Effect<RequestPlan> {
): Effect.Effect<RequestPlan, never, Workspace.Service> {
return Effect.gen(function* () {
const url = requestURL(request)
const envWorkspaceID = configuredWorkspaceID()
@ -158,7 +165,7 @@ function planRequest(
function routeWorkspace<E>(
effect: Effect.Effect<HttpServerResponse.HttpServerResponse, E, WorkspaceRouteContext>,
plan: RequestPlan,
): Effect.Effect<HttpServerResponse.HttpServerResponse, E, Socket.WebSocketConstructor> {
): Effect.Effect<HttpServerResponse.HttpServerResponse, E, Socket.WebSocketConstructor | Workspace.Service> {
return RequestPlan.$match(plan, {
MissingWorkspace: ({ workspaceID }) => Effect.succeed(missingWorkspaceResponse(workspaceID)),
Remote: ({ request, workspace, target, url }) => proxyRemote(request, workspace, target, url),
@ -167,20 +174,12 @@ function routeWorkspace<E>(
})
}
function routeWorkspaceRequest<E>(
effect: Effect.Effect<HttpServerResponse.HttpServerResponse, E, WorkspaceRouteContext>,
request: HttpServerRequest.HttpServerRequest,
sessionWorkspaceID?: WorkspaceID,
): Effect.Effect<HttpServerResponse.HttpServerResponse, E, Socket.WebSocketConstructor> {
return Effect.flatMap(planRequest(request, sessionWorkspaceID), (plan) => routeWorkspace(effect, plan))
}
function routeHttpApiWorkspace<E>(
effect: Effect.Effect<HttpServerResponse.HttpServerResponse, E, WorkspaceRouteContext>,
): Effect.Effect<
HttpServerResponse.HttpServerResponse,
E,
Session.Service | HttpServerRequest.HttpServerRequest | Socket.WebSocketConstructor
Session.Service | Workspace.Service | HttpServerRequest.HttpServerRequest | Socket.WebSocketConstructor
> {
return Effect.gen(function* () {
const request = yield* HttpServerRequest.HttpServerRequest
@ -188,7 +187,8 @@ function routeHttpApiWorkspace<E>(
const session = sessionID
? yield* Session.Service.use((svc) => svc.get(sessionID)).pipe(Effect.catchDefect(() => Effect.void))
: undefined
return yield* routeWorkspaceRequest(effect, request, session?.workspaceID)
const plan = yield* planRequest(request, session?.workspaceID)
return yield* routeWorkspace(effect, plan)
})
}
@ -196,8 +196,12 @@ export const workspaceRoutingLayer = Layer.effect(
WorkspaceRoutingMiddleware,
Effect.gen(function* () {
const makeWebSocket = yield* Socket.WebSocketConstructor
const workspace = yield* Workspace.Service
return WorkspaceRoutingMiddleware.of((effect) =>
routeHttpApiWorkspace(effect).pipe(Effect.provideService(Socket.WebSocketConstructor, makeWebSocket)),
routeHttpApiWorkspace(effect).pipe(
Effect.provideService(Socket.WebSocketConstructor, makeWebSocket),
Effect.provideService(Workspace.Service, workspace),
),
)
}),
)
@ -205,12 +209,15 @@ export const workspaceRoutingLayer = Layer.effect(
export const workspaceRouterMiddleware = HttpRouter.middleware<{ provides: WorkspaceRouteContext }>()(
Effect.gen(function* () {
const makeWebSocket = yield* Socket.WebSocketConstructor
const workspace = yield* Workspace.Service
return (effect) =>
Effect.gen(function* () {
const request = yield* HttpServerRequest.HttpServerRequest
return yield* routeWorkspaceRequest(effect, request).pipe(
Effect.provideService(Socket.WebSocketConstructor, makeWebSocket),
)
})
const plan = yield* planRequest(request)
return yield* routeWorkspace(effect, plan)
}).pipe(
Effect.provideService(Socket.WebSocketConstructor, makeWebSocket),
Effect.provideService(Workspace.Service, workspace),
)
}),
)

View file

@ -12,7 +12,8 @@ import { eq } from "drizzle-orm"
import { EventTable } from "@/sync/event.sql"
import { lazy } from "@/util/lazy"
import * as Log from "@opencode-ai/core/util/log"
import { startWorkspaceSyncing } from "@/control-plane/workspace"
import { Workspace } from "@/control-plane/workspace"
import { AppRuntime } from "@/effect/app-runtime"
import { Instance } from "@/project/instance"
import { errors } from "../../error"
@ -46,7 +47,9 @@ export const SyncRoutes = lazy(() =>
},
}),
async (c) => {
startWorkspaceSyncing(Instance.project.id)
void AppRuntime.runPromise(
Workspace.Service.use((workspace) => workspace.startWorkspaceSyncing(Instance.project.id)),
)
return c.json(true)
},
)

View file

@ -72,7 +72,9 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware
return next()
}
const workspace = await Workspace.get(WorkspaceID.make(workspaceID))
const workspace = await AppRuntime.runPromise(
Workspace.Service.use((svc) => svc.get(WorkspaceID.make(workspaceID))),
)
if (!workspace) {
return new Response(`Workspace not found: ${workspaceID}`, {
@ -89,7 +91,7 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware
return next()
}
const adaptor = await getAdaptor(workspace.projectID, workspace.type)
const adaptor = getAdaptor(workspace.projectID, workspace.type)
const target = await adaptor.target(workspace)
if (target.type === "local") {

View file

@ -107,6 +107,24 @@ async function withInstance<T>(fn: (dir: string) => T | Promise<T>) {
})
}
const runWorkspace = <A, E>(effect: Effect.Effect<A, E, WorkspaceOld.Service>) => AppRuntime.runPromise(effect)
const createWorkspace = (input: WorkspaceOld.CreateInput) =>
runWorkspace(WorkspaceOld.Service.use((workspace) => workspace.create(input)))
const restoreWorkspaceSession = (input: WorkspaceOld.SessionRestoreInput) =>
runWorkspace(WorkspaceOld.Service.use((workspace) => workspace.sessionRestore(input)))
const listWorkspaces = (project: Parameters<WorkspaceOld.Interface["list"]>[0]) =>
runWorkspace(WorkspaceOld.Service.use((workspace) => workspace.list(project)))
const getWorkspace = (id: WorkspaceID) => runWorkspace(WorkspaceOld.Service.use((workspace) => workspace.get(id)))
const removeWorkspace = (id: WorkspaceID) => runWorkspace(WorkspaceOld.Service.use((workspace) => workspace.remove(id)))
const workspaceStatus = () => runWorkspace(WorkspaceOld.Service.use((workspace) => workspace.status()))
const isWorkspaceSyncing = (id: WorkspaceID) =>
runWorkspace(WorkspaceOld.Service.use((workspace) => workspace.isSyncing(id)))
const startWorkspaceSyncing = (projectID: ProjectID) => {
void runWorkspace(WorkspaceOld.Service.use((workspace) => workspace.startWorkspaceSyncing(projectID)))
}
const waitForWorkspaceSync = (workspaceID: WorkspaceID, state: Record<string, number>, signal?: AbortSignal) =>
runWorkspace(WorkspaceOld.Service.use((workspace) => workspace.waitForSync(workspaceID, state, signal)))
function captureGlobalEvents() {
const events: GlobalEvent[] = []
const handler = (event: GlobalEvent) => events.push(event)
@ -372,12 +390,12 @@ describe("workspace-old schemas and exports", () => {
describe("workspace-old CRUD", () => {
test("get returns undefined for a missing workspace", async () => {
await withInstance(async () => {
expect(await WorkspaceOld.get(WorkspaceID.ascending("wrk_missing_get"))).toBeUndefined()
expect(await getWorkspace(WorkspaceID.ascending("wrk_missing_get"))).toBeUndefined()
})
})
test("list maps database rows, filters by project, and sorts by id", async () => {
await withInstance(() => {
await withInstance(async () => {
const otherProjectID = ProjectID.make("project-other")
insertProject(otherProjectID, "/tmp/other")
const a = workspaceInfo(Instance.project.id, "manual", {
@ -397,7 +415,7 @@ describe("workspace-old CRUD", () => {
insertWorkspace(other)
insertWorkspace(a)
expect(WorkspaceOld.list(Instance.project)).toEqual([a, b])
expect(await listWorkspaces(Instance.project)).toEqual([a, b])
})
})
@ -430,7 +448,7 @@ describe("workspace-old CRUD", () => {
})
registerAdaptor(Instance.project.id, type, recorded.adaptor)
const info = await WorkspaceOld.create({
const info = await createWorkspace({
id: workspaceID,
type,
branch: null,
@ -447,8 +465,8 @@ describe("workspace-old CRUD", () => {
extra: { configured: true },
projectID: Instance.project.id,
})
expect(await WorkspaceOld.get(workspaceID)).toEqual(info)
expect(WorkspaceOld.list(Instance.project)).toEqual([info])
expect(await getWorkspace(workspaceID)).toEqual(info)
expect(await listWorkspaces(Instance.project)).toEqual([info])
expect(recorded.calls.configure).toHaveLength(1)
expect(recorded.calls.configure[0]).toMatchObject({ id: workspaceID, type, directory: null })
expect(recorded.calls.create).toHaveLength(1)
@ -461,10 +479,10 @@ describe("workspace-old CRUD", () => {
expect(recorded.calls.create[0].env.OTEL_EXPORTER_OTLP_HEADERS).toBe("authorization=otel")
expect(recorded.calls.create[0].env.OTEL_EXPORTER_OTLP_ENDPOINT).toBe("https://otel.test")
expect(recorded.calls.create[0].env.OTEL_RESOURCE_ATTRIBUTES).toBe("service.name=opencode-test")
expect(WorkspaceOld.status().find((item) => item.workspaceID === workspaceID)?.status).toBe("connected")
expect((await workspaceStatus()).find((item) => item.workspaceID === workspaceID)?.status).toBe("connected")
await WorkspaceOld.remove(workspaceID)
expect(WorkspaceOld.status().find((item) => item.workspaceID === workspaceID)?.status).toBeUndefined()
await removeWorkspace(workspaceID)
expect((await workspaceStatus()).find((item) => item.workspaceID === workspaceID)?.status).toBeUndefined()
})
})
@ -485,9 +503,9 @@ describe("workspace-old CRUD", () => {
)
await expect(
WorkspaceOld.create({ type, branch: null, projectID: Instance.project.id, extra: null }),
createWorkspace({ type, branch: null, projectID: Instance.project.id, extra: null }),
).rejects.toThrow("configure exploded")
expect(WorkspaceOld.list(Instance.project)).toEqual([])
expect(await listWorkspaces(Instance.project)).toEqual([])
})
})
@ -505,14 +523,14 @@ describe("workspace-old CRUD", () => {
registerAdaptor(Instance.project.id, type, recorded.adaptor)
await expect(
WorkspaceOld.create({ type, branch: "branch", projectID: Instance.project.id, extra: { x: 1 } }),
createWorkspace({ type, branch: "branch", projectID: Instance.project.id, extra: { x: 1 } }),
).rejects.toThrow("create exploded")
const rows = WorkspaceOld.list(Instance.project)
const rows = await listWorkspaces(Instance.project)
expect(rows).toHaveLength(1)
expect(rows[0]).toMatchObject({ type, branch: "branch", extra: { x: 1 } })
expect(recorded.calls.target).toHaveLength(0)
await WorkspaceOld.remove(rows[0].id)
await removeWorkspace(rows[0].id)
})
})
@ -523,11 +541,11 @@ describe("workspace-old CRUD", () => {
const recorded = localAdaptor(missing, { createDir: false })
registerAdaptor(Instance.project.id, type, recorded.adaptor)
const info = await WorkspaceOld.create({ type, branch: null, projectID: Instance.project.id, extra: null })
const info = await createWorkspace({ type, branch: null, projectID: Instance.project.id, extra: null })
expect(info.directory).toBe(missing)
expect(WorkspaceOld.status().find((item) => item.workspaceID === info.id)?.status).toBe("error")
await WorkspaceOld.remove(info.id)
expect((await workspaceStatus()).find((item) => item.workspaceID === info.id)?.status).toBe("error")
await removeWorkspace(info.id)
})
})
@ -581,7 +599,7 @@ describe("workspace-old CRUD", () => {
test("remove returns undefined for a missing workspace", async () => {
await withInstance(async () => {
expect(await WorkspaceOld.remove(WorkspaceID.ascending("wrk_missing_remove"))).toBeUndefined()
expect(await removeWorkspace(WorkspaceID.ascending("wrk_missing_remove"))).toBeUndefined()
})
})
@ -590,18 +608,18 @@ describe("workspace-old CRUD", () => {
const type = unique("remove-local")
const recorded = localAdaptor(path.join(dir, "remove-local"))
registerAdaptor(Instance.project.id, type, recorded.adaptor)
const info = await WorkspaceOld.create({ type, branch: null, projectID: Instance.project.id, extra: null })
const info = await createWorkspace({ type, branch: null, projectID: Instance.project.id, extra: null })
const one = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))
const two = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))
attachSessionToWorkspace(one.id, info.id)
attachSessionToWorkspace(two.id, info.id)
const removed = await WorkspaceOld.remove(info.id)
const removed = await removeWorkspace(info.id)
expect(removed).toEqual(info)
expect(await WorkspaceOld.get(info.id)).toBeUndefined()
expect(await getWorkspace(info.id)).toBeUndefined()
expect(recorded.calls.remove).toEqual([info])
expect(WorkspaceOld.status().find((item) => item.workspaceID === info.id)?.status).toBeUndefined()
expect((await workspaceStatus()).find((item) => item.workspaceID === info.id)?.status).toBeUndefined()
expect(
Database.use((db) =>
db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.workspace_id, info.id)).all(),
@ -628,8 +646,8 @@ describe("workspace-old CRUD", () => {
)
insertWorkspace(info)
expect(await WorkspaceOld.remove(info.id)).toEqual(info)
expect(await WorkspaceOld.get(info.id)).toBeUndefined()
expect(await removeWorkspace(info.id)).toEqual(info)
expect(await getWorkspace(info.id)).toBeUndefined()
})
})
})
@ -645,10 +663,10 @@ describe("workspace-old sync state", () => {
insertWorkspace(info)
registerAdaptor(Instance.project.id, type, localAdaptor(path.join(dir, "flag-disabled")).adaptor)
WorkspaceOld.startWorkspaceSyncing(Instance.project.id)
startWorkspaceSyncing(Instance.project.id)
await delay(25)
expect(WorkspaceOld.status().find((item) => item.workspaceID === info.id)?.status).toBeUndefined()
expect((await workspaceStatus()).find((item) => item.workspaceID === info.id)?.status).toBeUndefined()
})
})
@ -671,14 +689,16 @@ describe("workspace-old sync state", () => {
withSession.id,
)
WorkspaceOld.startWorkspaceSyncing(Instance.project.id)
startWorkspaceSyncing(Instance.project.id)
await eventually(() =>
expect(WorkspaceOld.status().find((item) => item.workspaceID === withSession.id)?.status).toBe("connected"),
workspaceStatus().then((status) =>
expect(status.find((item) => item.workspaceID === withSession.id)?.status).toBe("connected"),
),
)
expect(WorkspaceOld.status().find((item) => item.workspaceID === withoutSession.id)?.status).toBeUndefined()
await WorkspaceOld.remove(withSession.id)
await WorkspaceOld.remove(withoutSession.id)
expect((await workspaceStatus()).find((item) => item.workspaceID === withoutSession.id)?.status).toBeUndefined()
await removeWorkspace(withSession.id)
await removeWorkspace(withoutSession.id)
})
})
@ -697,13 +717,15 @@ describe("workspace-old sync state", () => {
info.id,
)
WorkspaceOld.startWorkspaceSyncing(Instance.project.id)
startWorkspaceSyncing(Instance.project.id)
await eventually(() =>
expect(WorkspaceOld.status().find((item) => item.workspaceID === info.id)?.status).toBe("error"),
workspaceStatus().then((status) =>
expect(status.find((item) => item.workspaceID === info.id)?.status).toBe("error"),
),
)
expect(await WorkspaceOld.isSyncing(info.id)).toBe(false)
await WorkspaceOld.remove(info.id)
expect(await isWorkspaceSyncing(info.id)).toBe(false)
await removeWorkspace(info.id)
})
})
@ -722,18 +744,20 @@ describe("workspace-old sync state", () => {
info.id,
)
WorkspaceOld.startWorkspaceSyncing(Instance.project.id)
WorkspaceOld.startWorkspaceSyncing(Instance.project.id)
startWorkspaceSyncing(Instance.project.id)
startWorkspaceSyncing(Instance.project.id)
await eventually(() =>
expect(WorkspaceOld.status().find((item) => item.workspaceID === info.id)?.status).toBe("connected"),
workspaceStatus().then((status) =>
expect(status.find((item) => item.workspaceID === info.id)?.status).toBe("connected"),
),
)
expect(
captured.events.filter(
(event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Status.type,
),
).toHaveLength(1)
await WorkspaceOld.remove(info.id)
await removeWorkspace(info.id)
} finally {
captured.dispose()
}
@ -1106,7 +1130,7 @@ describe("workspace-old sync state", () => {
describe("workspace-old waitForSync", () => {
test("returns immediately for an empty fence", async () => {
await withInstance(async () => {
await expect(WorkspaceOld.waitForSync(WorkspaceID.ascending("wrk_wait_empty"), {})).resolves.toBeUndefined()
await expect(waitForWorkspaceSync(WorkspaceID.ascending("wrk_wait_empty"), {})).resolves.toBeUndefined()
})
})
@ -1116,10 +1140,10 @@ describe("workspace-old waitForSync", () => {
Database.use((db) => db.insert(EventSequenceTable).values({ aggregate_id: sessionID, seq: 4 }).run())
await expect(
WorkspaceOld.waitForSync(WorkspaceID.ascending("wrk_wait_done"), { [sessionID]: 4 }),
waitForWorkspaceSync(WorkspaceID.ascending("wrk_wait_done"), { [sessionID]: 4 }),
).resolves.toBeUndefined()
await expect(
WorkspaceOld.waitForSync(WorkspaceID.ascending("wrk_wait_done_2"), { [sessionID]: 3 }),
waitForWorkspaceSync(WorkspaceID.ascending("wrk_wait_done_2"), { [sessionID]: 3 }),
).resolves.toBeUndefined()
})
})
@ -1130,7 +1154,7 @@ describe("workspace-old waitForSync", () => {
const sessionID = SessionID.descending("ses_wait_event")
Database.use((db) => db.insert(EventSequenceTable).values({ aggregate_id: sessionID, seq: 1 }).run())
const waited = WorkspaceOld.waitForSync(workspaceID, { [sessionID]: 2 })
const waited = waitForWorkspaceSync(workspaceID, { [sessionID]: 2 })
await delay(10)
Database.use((db) =>
db.update(EventSequenceTable).set({ seq: 2 }).where(eq(EventSequenceTable.aggregate_id, sessionID)).run(),
@ -1147,7 +1171,7 @@ describe("workspace-old waitForSync", () => {
const sessionID = SessionID.descending("ses_wait_sync_any")
Database.use((db) => db.insert(EventSequenceTable).values({ aggregate_id: sessionID, seq: 0 }).run())
const waited = WorkspaceOld.waitForSync(workspaceID, { [sessionID]: 1 })
const waited = waitForWorkspaceSync(workspaceID, { [sessionID]: 1 })
await delay(10)
Database.use((db) =>
db.update(EventSequenceTable).set({ seq: 1 }).where(eq(EventSequenceTable.aggregate_id, sessionID)).run(),
@ -1165,7 +1189,7 @@ describe("workspace-old waitForSync", () => {
await withInstance(async () => {
const abort = new AbortController()
const reason = new Error("caller aborted")
const waited = WorkspaceOld.waitForSync(
const waited = waitForWorkspaceSync(
WorkspaceID.ascending("wrk_wait_abort"),
{ [SessionID.descending("ses_wait_abort")]: 1 },
abort.signal,
@ -1184,9 +1208,9 @@ describe("workspace-old waitForSync", () => {
await withInstance(async () => {
const sessionID = SessionID.descending("ses_wait_timeout")
await expect(
WorkspaceOld.waitForSync(WorkspaceID.ascending("wrk_wait_timeout"), { [sessionID]: 1 }),
).rejects.toThrow(`Timed out waiting for sync fence: {"${sessionID}":1}`)
await expect(waitForWorkspaceSync(WorkspaceID.ascending("wrk_wait_timeout"), { [sessionID]: 1 })).rejects.toThrow(
`Timed out waiting for sync fence: {"${sessionID}":1}`,
)
})
}, 7000)
})
@ -1195,7 +1219,7 @@ describe("workspace-old sessionRestore", () => {
test("throws when the workspace is missing", async () => {
await withInstance(async () => {
await expect(
WorkspaceOld.sessionRestore({
restoreWorkspaceSession({
workspaceID: WorkspaceID.ascending("wrk_restore_missing"),
sessionID: SessionID.descending("ses_restore_missing_workspace"),
}),
@ -1211,9 +1235,9 @@ describe("workspace-old sessionRestore", () => {
registerAdaptor(Instance.project.id, type, localAdaptor(dir).adaptor)
await expect(
WorkspaceOld.sessionRestore({ workspaceID: info.id, sessionID: SessionID.descending("ses_missing_restore") }),
restoreWorkspaceSession({ workspaceID: info.id, sessionID: SessionID.descending("ses_missing_restore") }),
).rejects.toThrow("NotFoundError")
await WorkspaceOld.remove(info.id)
await removeWorkspace(info.id)
})
})
@ -1424,7 +1448,7 @@ describe("workspace-old sessionRestore", () => {
)
replaceSessionEvents(session.id, 20)
expect(await WorkspaceOld.sessionRestore({ workspaceID: info.id, sessionID: session.id })).toEqual({ total: 3 })
expect(await restoreWorkspaceSession({ workspaceID: info.id, sessionID: session.id })).toEqual({ total: 3 })
expect(fetchCallCount).toBe(0)
expect(replayAll).toHaveBeenCalledTimes(3)
@ -1438,7 +1462,7 @@ describe("workspace-old sessionRestore", () => {
.filter((event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Restore.type)
.map((event) => event.payload.properties.step),
).toEqual([0, 1, 2, 3])
await WorkspaceOld.remove(info.id)
await removeWorkspace(info.id)
} finally {
captured.dispose()
}

View file

@ -13,7 +13,7 @@ const { Flag } = await import("@opencode-ai/core/flag/flag")
const { Plugin } = await import("../../src/plugin/index")
const { Workspace } = await import("../../src/control-plane/workspace")
const { Instance } = await import("../../src/project/instance")
const it = testEffect(Layer.mergeAll(Plugin.defaultLayer, CrossSpawnSpawner.defaultLayer))
const it = testEffect(Layer.mergeAll(Plugin.defaultLayer, Workspace.defaultLayer, CrossSpawnSpawner.defaultLayer))
const experimental = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES
@ -83,14 +83,13 @@ describe("plugin.workspace", () => {
const plugin = yield* Plugin.Service
yield* plugin.init()
const info = yield* Effect.promise(() =>
Workspace.create({
type,
branch: null,
extra: { key: "value" },
projectID: Instance.project.id,
}),
)
const workspace = yield* Workspace.Service
const info = yield* workspace.create({
type,
branch: null,
extra: { key: "value" },
projectID: Instance.project.id,
})
expect(info.type).toBe(type)
expect(info.name).toBe("plug")

View file

@ -36,7 +36,13 @@ const testStateLayer = Layer.effectDiscard(
)
const it = testEffect(
Layer.mergeAll(testStateLayer, NodeHttpServer.layerTest, NodeServices.layer, Project.defaultLayer),
Layer.mergeAll(
testStateLayer,
NodeHttpServer.layerTest,
NodeServices.layer,
Project.defaultLayer,
Workspace.defaultLayer,
),
)
const instanceContextTestLayer = instanceRouterMiddleware
@ -56,16 +62,17 @@ const localAdaptor = (directory: string): WorkspaceAdaptor => ({
const createLocalWorkspace = (input: { projectID: Project.Info["id"]; type: string; directory: string }) =>
Effect.acquireRelease(
Effect.promise(async () => {
Effect.gen(function* () {
registerAdaptor(input.projectID, input.type, localAdaptor(input.directory))
return Workspace.create({
const workspace = yield* Workspace.Service
return yield* workspace.create({
type: input.type,
branch: null,
extra: null,
projectID: input.projectID,
})
}),
(workspace) => Effect.promise(() => Workspace.remove(workspace.id)).pipe(Effect.ignore),
(info) => Workspace.Service.use((workspace) => workspace.remove(info.id)).pipe(Effect.ignore),
)
const probeInstanceContext = Effect.gen(function* () {

View file

@ -94,14 +94,16 @@ const localAdaptor = (directory: string): WorkspaceAdaptor => ({
})
const createLocalWorkspace = (input: { projectID: Project.Info["id"]; type: string; directory: string }) =>
Effect.promise(async () => {
Effect.gen(function* () {
registerAdaptor(input.projectID, input.type, localAdaptor(input.directory))
return Workspace.create({
type: input.type,
branch: null,
extra: null,
projectID: input.projectID,
})
return yield* Workspace.Service.use((svc) =>
svc.create({
type: input.type,
branch: null,
extra: null,
projectID: input.projectID,
}),
).pipe(Effect.provide(Workspace.defaultLayer))
})
function request(path: string, init?: RequestInit) {

View file

@ -50,6 +50,7 @@ const it = testEffect(
NodeHttpServer.layerTest,
NodeServices.layer,
Project.defaultLayer,
Workspace.defaultLayer,
Socket.layerWebSocketConstructorGlobal,
),
)
@ -116,16 +117,17 @@ const syncResponse = (request: HttpServerRequest.HttpServerRequest) => {
const createWorkspace = (input: { projectID: Project.Info["id"]; type: string; adaptor: WorkspaceAdaptor }) =>
Effect.acquireRelease(
Effect.promise(async () => {
Effect.gen(function* () {
registerAdaptor(input.projectID, input.type, input.adaptor)
return Workspace.create({
const workspace = yield* Workspace.Service
return yield* workspace.create({
type: input.type,
branch: null,
extra: null,
projectID: input.projectID,
})
}),
(workspace) => Effect.promise(() => Workspace.remove(workspace.id)).pipe(Effect.ignore),
(info) => Workspace.Service.use((workspace) => workspace.remove(info.id)).pipe(Effect.ignore),
)
const createRemoteWorkspace = (input: {

View file

@ -1,7 +1,8 @@
import { afterEach, describe, expect, mock, test } from "bun:test"
import { NodeServices } from "@effect/platform-node"
import { mkdir } from "node:fs/promises"
import path from "node:path"
import { Effect } from "effect"
import { Effect, Layer } from "effect"
import { Flag } from "@opencode-ai/core/flag/flag"
import { registerAdaptor } from "../../src/control-plane/adaptors"
import type { WorkspaceAdaptor } from "../../src/control-plane/types"
@ -11,30 +12,28 @@ import { Session } from "@/session/session"
import * as Log from "@opencode-ai/core/util/log"
import { Server } from "../../src/server/server"
import { resetDatabase } from "../fixture/db"
import { tmpdir } from "../fixture/fixture"
import { provideInstance, tmpdirScoped } from "../fixture/fixture"
import { Instance } from "../../src/project/instance"
import { Project } from "../../src/project/project"
import { InstancePaths } from "../../src/server/routes/instance/httpapi/groups/instance"
import { WorkspaceRef } from "../../src/effect/instance-ref"
import { testEffect } from "../lib/effect"
void Log.init({ print: false })
const originalWorkspaces = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES
const originalHttpApi = Flag.OPENCODE_EXPERIMENTAL_HTTPAPI
const it = testEffect(
Layer.mergeAll(NodeServices.layer, Project.defaultLayer, Session.defaultLayer, Workspace.defaultLayer),
)
function request(path: string, directory: string, init: RequestInit = {}) {
Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = true
const headers = new Headers(init.headers)
headers.set("x-opencode-directory", directory)
return Server.Default().app.request(path, { ...init, headers })
}
function runSession<A, E>(fx: Effect.Effect<A, E, Session.Service>, workspaceID?: Workspace.Info["id"]) {
return Effect.runPromise(
fx.pipe(
workspaceID ? Effect.provideService(WorkspaceRef, workspaceID) : (effect) => effect,
Effect.provide(Session.defaultLayer),
),
)
return Effect.promise(() => {
Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = true
const headers = new Headers(init.headers)
headers.set("x-opencode-directory", directory)
return Promise.resolve(Server.Default().app.request(path, { ...init, headers }))
})
}
function localAdaptor(directory: string): WorkspaceAdaptor {
@ -136,243 +135,228 @@ afterEach(async () => {
describe("workspace HttpApi", () => {
test.todo("proxies remote workspace websocket through real Effect listener", () => {})
test("serves read endpoints", async () => {
await using tmp = await tmpdir({ git: true })
it.live("serves read endpoints", () =>
Effect.gen(function* () {
const dir = yield* tmpdirScoped({ git: true })
const [adaptors, workspaces, status] = await Promise.all([
request(WorkspacePaths.adaptors, tmp.path),
request(WorkspacePaths.list, tmp.path),
request(WorkspacePaths.status, tmp.path),
])
const [adaptors, workspaces, status] = yield* Effect.all([
request(WorkspacePaths.adaptors, dir),
request(WorkspacePaths.list, dir),
request(WorkspacePaths.status, dir),
])
expect(adaptors.status).toBe(200)
expect(await adaptors.json()).toEqual([
{
expect(adaptors.status).toBe(200)
expect(yield* Effect.promise(() => adaptors.json())).toContainEqual({
type: "worktree",
name: "Worktree",
description: "Create a git worktree",
},
])
expect(workspaces.status).toBe(200)
expect(await workspaces.json()).toEqual([])
expect(status.status).toBe(200)
expect(await status.json()).toEqual([])
})
test("serves mutation endpoints", async () => {
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true
await using tmp = await tmpdir({ git: true })
await Instance.provide({
directory: tmp.path,
fn: async () =>
registerAdaptor(Instance.project.id, "local-test", localAdaptor(path.join(tmp.path, ".workspace"))),
})
const created = await request(WorkspacePaths.list, tmp.path, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ type: "local-test", branch: null, extra: null }),
})
expect(created.status).toBe(200)
const workspace = (await created.json()) as Workspace.Info
expect(workspace).toMatchObject({ type: "local-test", name: "local-test" })
const session = await Instance.provide({
directory: tmp.path,
fn: async () => runSession(Session.Service.use((svc) => svc.create({}))),
})
const restored = await request(WorkspacePaths.sessionRestore.replace(":id", workspace.id), tmp.path, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ sessionID: session.id }),
})
expect(restored.status).toBe(200)
expect((await restored.json()) as { total: number }).toMatchObject({ total: expect.any(Number) })
const removed = await request(WorkspacePaths.remove.replace(":id", workspace.id), tmp.path, { method: "DELETE" })
expect(removed.status).toBe(200)
expect(await removed.json()).toMatchObject({ id: workspace.id })
const listed = await request(WorkspacePaths.list, tmp.path)
expect(listed.status).toBe(200)
expect(await listed.json()).toEqual([])
})
test("routes local workspace requests through the workspace target directory", async () => {
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true
await using tmp = await tmpdir({ git: true })
const workspaceDir = path.join(tmp.path, ".workspace-local")
const workspace = await Instance.provide({
directory: tmp.path,
fn: async () => {
registerAdaptor(Instance.project.id, "local-target", localAdaptor(workspaceDir))
return Workspace.create({
type: "local-target",
branch: null,
extra: null,
projectID: Instance.project.id,
})
},
})
const url = new URL(`http://localhost${InstancePaths.path}`)
url.searchParams.set("workspace", workspace.id)
try {
const response = await request(url.toString(), tmp.path)
expect(response.status).toBe(200)
expect(await response.json()).toMatchObject({ directory: workspaceDir })
} finally {
await Workspace.remove(workspace.id)
}
})
test("proxies remote workspace HTTP requests with sanitized forwarding", async () => {
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true
await using tmp = await tmpdir({ git: true })
const proxied: ProxiedRequest[] = []
const remote = listenRemoteHttp((request) => {
proxied.push(request)
const url = new URL(request.url)
if (url.pathname === "/base/global/event") return eventStreamResponse()
if (url.pathname === "/base/sync/history") return Response.json([])
return new Response(
JSON.stringify({
proxied: true,
path: url.pathname,
keep: url.searchParams.get("keep"),
workspace: url.searchParams.get("workspace"),
}),
{
status: 201,
statusText: "Created",
headers: {
"content-length": "999",
"content-type": "application/json",
"x-remote": "yes",
},
},
)
})
const workspace = await Instance.provide({
directory: tmp.path,
fn: async () => {
registerAdaptor(
Instance.project.id,
"remote-target",
remoteAdaptor(path.join(tmp.path, ".remote"), `http://127.0.0.1:${remote.port}/base`, {
"x-target-auth": "secret",
}),
)
return Workspace.create({
type: "remote-target",
branch: null,
extra: null,
projectID: Instance.project.id,
})
},
})
const url = new URL("http://localhost/config")
url.searchParams.set("workspace", workspace.id)
url.searchParams.set("keep", "yes")
try {
const response = await request(url.toString(), tmp.path, {
method: "PATCH",
headers: {
"accept-encoding": "br",
"content-type": "application/json",
"x-opencode-workspace": "internal",
},
body: JSON.stringify({ $schema: "https://opencode.ai/config.json" }),
})
const responseBody = await response.text()
expect({ status: response.status, body: responseBody }).toMatchObject({ status: 201 })
expect(response.headers.get("content-length")).toBeNull()
expect(response.headers.get("x-remote")).toBe("yes")
expect(JSON.parse(responseBody)).toEqual({ proxied: true, path: "/base/config", keep: "yes", workspace: null })
const forwarded = proxied.filter((item) => new URL(item.url).pathname === "/base/config")
expect(forwarded).toEqual([
{
url: `http://127.0.0.1:${remote.port}/base/config?keep=yes`,
method: "PATCH",
headers: expect.objectContaining({
"content-type": "application/json",
"x-target-auth": "secret",
}),
body: JSON.stringify({ $schema: "https://opencode.ai/config.json" }),
},
])
expect(forwarded[0]?.headers).not.toHaveProperty("x-opencode-directory")
expect(forwarded[0]?.headers).not.toHaveProperty("x-opencode-workspace")
} finally {
remote.stop(true)
await Workspace.remove(workspace.id)
}
})
expect(workspaces.status).toBe(200)
expect(yield* Effect.promise(() => workspaces.json())).toEqual([])
test("proxies remote workspace requests selected from session ownership", async () => {
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true
await using tmp = await tmpdir({ git: true })
const proxied: ProxiedRequest[] = []
const remote = listenRemoteHttp((request) => {
proxied.push(request)
const url = new URL(request.url)
if (url.pathname === "/base/global/event") return eventStreamResponse()
if (url.pathname === "/base/sync/history") return Response.json([])
return Response.json({ proxied: true, path: new URL(request.url).pathname })
})
expect(status.status).toBe(200)
expect(yield* Effect.promise(() => status.json())).toEqual([])
}),
)
const workspace = await Instance.provide({
directory: tmp.path,
fn: async () => {
registerAdaptor(
Instance.project.id,
"remote-session-target",
remoteAdaptor(path.join(tmp.path, ".remote-session"), `http://127.0.0.1:${remote.port}/base`),
)
return Workspace.create({
type: "remote-session-target",
branch: null,
extra: null,
projectID: Instance.project.id,
})
},
})
const session = await Instance.provide({
directory: tmp.path,
fn: async () =>
runSession(
Session.Service.use((svc) => svc.create()),
workspace.id,
),
})
it.live("serves mutation endpoints", () =>
Effect.gen(function* () {
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true
const dir = yield* tmpdirScoped({ git: true })
const project = yield* Project.use.fromDirectory(dir)
registerAdaptor(project.project.id, "local-test", localAdaptor(path.join(dir, ".workspace")))
try {
const response = await request(`http://localhost/session/${session.id}/message`, tmp.path, {
const created = yield* request(WorkspacePaths.list, dir, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ parts: [{ type: "text", text: "hello" }] }),
body: JSON.stringify({ type: "local-test", branch: null, extra: null }),
})
expect(created.status).toBe(200)
const workspace = (yield* Effect.promise(() => created.json())) as Workspace.Info
expect(workspace).toMatchObject({ type: "local-test", name: "local-test" })
const session = yield* Session.Service.use((svc) => svc.create({})).pipe(provideInstance(dir))
const restored = yield* request(WorkspacePaths.sessionRestore.replace(":id", workspace.id), dir, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ sessionID: session.id }),
})
expect(restored.status).toBe(200)
expect((yield* Effect.promise(() => restored.json())) as { total: number }).toMatchObject({
total: expect.any(Number),
})
const responseBody = await response.text()
expect({ status: response.status, body: responseBody }).toMatchObject({ status: 200 })
expect(JSON.parse(responseBody)).toEqual({ proxied: true, path: `/base/session/${session.id}/message` })
expect(proxied.filter((item) => new URL(item.url).pathname === `/base/session/${session.id}/message`)).toEqual([
expect.objectContaining({
url: `http://127.0.0.1:${remote.port}/base/session/${session.id}/message`,
method: "POST",
const removed = yield* request(WorkspacePaths.remove.replace(":id", workspace.id), dir, { method: "DELETE" })
expect(removed.status).toBe(200)
expect(yield* Effect.promise(() => removed.json())).toMatchObject({ id: workspace.id })
const listed = yield* request(WorkspacePaths.list, dir)
expect(listed.status).toBe(200)
expect(yield* Effect.promise(() => listed.json())).toEqual([])
}),
)
it.live("routes local workspace requests through the workspace target directory", () =>
Effect.gen(function* () {
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true
const dir = yield* tmpdirScoped({ git: true })
const workspaceDir = path.join(dir, ".workspace-local")
const project = yield* Project.use.fromDirectory(dir)
registerAdaptor(project.project.id, "local-target", localAdaptor(workspaceDir))
const created = yield* request(WorkspacePaths.list, dir, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ type: "local-target", branch: null, extra: null }),
})
const workspace = (yield* Effect.promise(() => created.json())) as Workspace.Info
const url = new URL(`http://localhost${InstancePaths.path}`)
url.searchParams.set("workspace", workspace.id)
const response = yield* request(url.toString(), dir)
expect(response.status).toBe(200)
expect(yield* Effect.promise(() => response.json())).toMatchObject({ directory: workspaceDir })
yield* request(WorkspacePaths.remove.replace(":id", workspace.id), dir, { method: "DELETE" })
}),
)
it.live("proxies remote workspace HTTP requests with sanitized forwarding", () =>
Effect.gen(function* () {
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true
const dir = yield* tmpdirScoped({ git: true })
const proxied: ProxiedRequest[] = []
const remote = listenRemoteHttp((request) => {
proxied.push(request)
const url = new URL(request.url)
if (url.pathname === "/base/global/event") return eventStreamResponse()
if (url.pathname === "/base/sync/history") return Response.json([])
return new Response(
JSON.stringify({
proxied: true,
path: url.pathname,
keep: url.searchParams.get("keep"),
workspace: url.searchParams.get("workspace"),
}),
{
status: 201,
statusText: "Created",
headers: {
"content-length": "999",
"content-type": "application/json",
"x-remote": "yes",
},
},
)
})
const project = yield* Project.use.fromDirectory(dir)
registerAdaptor(
project.project.id,
"remote-target",
remoteAdaptor(path.join(dir, ".remote"), `http://127.0.0.1:${remote.port}/base`, {
"x-target-auth": "secret",
}),
])
} finally {
remote.stop(true)
await Workspace.remove(workspace.id)
}
})
)
const created = yield* request(WorkspacePaths.list, dir, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ type: "remote-target", branch: null, extra: null }),
})
const workspace = (yield* Effect.promise(() => created.json())) as Workspace.Info
const url = new URL("http://localhost/config")
url.searchParams.set("workspace", workspace.id)
url.searchParams.set("keep", "yes")
try {
const response = yield* request(url.toString(), dir, {
method: "PATCH",
headers: {
"accept-encoding": "br",
"content-type": "application/json",
"x-opencode-workspace": "internal",
},
body: JSON.stringify({ $schema: "https://opencode.ai/config.json" }),
})
const responseBody = yield* Effect.promise(() => response.text())
expect({ status: response.status, body: responseBody }).toMatchObject({ status: 201 })
expect(response.headers.get("content-length")).toBeNull()
expect(response.headers.get("x-remote")).toBe("yes")
expect(JSON.parse(responseBody)).toEqual({ proxied: true, path: "/base/config", keep: "yes", workspace: null })
const forwarded = proxied.filter((item) => new URL(item.url).pathname === "/base/config")
expect(forwarded).toEqual([
{
url: `http://127.0.0.1:${remote.port}/base/config?keep=yes`,
method: "PATCH",
headers: expect.objectContaining({
"content-type": "application/json",
"x-target-auth": "secret",
}),
body: JSON.stringify({ $schema: "https://opencode.ai/config.json" }),
},
])
expect(forwarded[0]?.headers).not.toHaveProperty("x-opencode-directory")
expect(forwarded[0]?.headers).not.toHaveProperty("x-opencode-workspace")
} finally {
void remote.stop(true)
yield* request(WorkspacePaths.remove.replace(":id", workspace.id), dir, { method: "DELETE" })
}
}),
)
it.live("proxies remote workspace requests selected from session ownership", () =>
Effect.gen(function* () {
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true
const dir = yield* tmpdirScoped({ git: true })
const proxied: ProxiedRequest[] = []
const remote = listenRemoteHttp((request) => {
proxied.push(request)
const url = new URL(request.url)
if (url.pathname === "/base/global/event") return eventStreamResponse()
if (url.pathname === "/base/sync/history") return Response.json([])
return Response.json({ proxied: true, path: new URL(request.url).pathname })
})
const project = yield* Project.use.fromDirectory(dir)
registerAdaptor(
project.project.id,
"remote-session-target",
remoteAdaptor(path.join(dir, ".remote-session"), `http://127.0.0.1:${remote.port}/base`),
)
const created = yield* request(WorkspacePaths.list, dir, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ type: "remote-session-target", branch: null, extra: null }),
})
const workspace = (yield* Effect.promise(() => created.json())) as Workspace.Info
const session = yield* Session.Service.use((svc) => svc.create()).pipe(
Effect.provideService(WorkspaceRef, workspace.id),
provideInstance(dir),
)
try {
const response = yield* request(`http://localhost/session/${session.id}/message`, dir, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ parts: [{ type: "text", text: "hello" }] }),
})
const responseBody = yield* Effect.promise(() => response.text())
expect({ status: response.status, body: responseBody }).toMatchObject({ status: 200 })
expect(JSON.parse(responseBody)).toEqual({ proxied: true, path: `/base/session/${session.id}/message` })
expect(proxied.filter((item) => new URL(item.url).pathname === `/base/session/${session.id}/message`)).toEqual([
expect.objectContaining({
url: `http://127.0.0.1:${remote.port}/base/session/${session.id}/message`,
method: "POST",
}),
])
} finally {
void remote.stop(true)
yield* request(WorkspacePaths.remove.replace(":id", workspace.id), dir, { method: "DELETE" })
}
}),
)
})