mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-06 08:21:50 +00:00
refactor(core): convert control-plane workspace to Effect (#25018)
This commit is contained in:
parent
fe0c182747
commit
53e9cac383
11 changed files with 2171 additions and 881 deletions
|
|
@ -1,27 +1,26 @@
|
|||
import { lazy } from "@/util/lazy"
|
||||
import type { ProjectID } from "@/project/schema"
|
||||
import type { WorkspaceAdaptor, WorkspaceAdaptorEntry } from "../types"
|
||||
import { WorktreeAdaptor } from "./worktree"
|
||||
|
||||
const BUILTIN: Record<string, () => Promise<WorkspaceAdaptor>> = {
|
||||
worktree: lazy(async () => (await import("./worktree")).WorktreeAdaptor),
|
||||
const BUILTIN: Record<string, WorkspaceAdaptor> = {
|
||||
worktree: WorktreeAdaptor,
|
||||
}
|
||||
|
||||
const state = new Map<ProjectID, Map<string, WorkspaceAdaptor>>()
|
||||
|
||||
export async function getAdaptor(projectID: ProjectID, type: string): Promise<WorkspaceAdaptor> {
|
||||
export function getAdaptor(projectID: ProjectID, type: string): WorkspaceAdaptor {
|
||||
const custom = state.get(projectID)?.get(type)
|
||||
if (custom) return custom
|
||||
|
||||
const builtin = BUILTIN[type]
|
||||
if (builtin) return builtin()
|
||||
if (builtin) return builtin
|
||||
|
||||
throw new Error(`Unknown workspace adaptor: ${type}`)
|
||||
}
|
||||
|
||||
export async function listAdaptors(projectID: ProjectID): Promise<WorkspaceAdaptorEntry[]> {
|
||||
const builtin = await Promise.all(
|
||||
Object.entries(BUILTIN).map(async ([type, init]) => {
|
||||
const adaptor = await init()
|
||||
Object.entries(BUILTIN).map(async ([type, adaptor]) => {
|
||||
return {
|
||||
type,
|
||||
name: adaptor.name,
|
||||
|
|
|
|||
|
|
@ -1,6 +1,4 @@
|
|||
import { Schema } from "effect"
|
||||
import { AppRuntime } from "@/effect/app-runtime"
|
||||
import { Worktree } from "@/worktree"
|
||||
import { type WorkspaceAdaptor, WorkspaceInfo } from "../types"
|
||||
|
||||
const WorktreeConfig = Schema.Struct({
|
||||
|
|
@ -10,19 +8,26 @@ const WorktreeConfig = Schema.Struct({
|
|||
})
|
||||
const decodeWorktreeConfig = Schema.decodeUnknownSync(WorktreeConfig)
|
||||
|
||||
async function loadWorktree() {
|
||||
const [{ AppRuntime }, { Worktree }] = await Promise.all([import("@/effect/app-runtime"), import("@/worktree")])
|
||||
return { AppRuntime, Worktree }
|
||||
}
|
||||
|
||||
export const WorktreeAdaptor: WorkspaceAdaptor = {
|
||||
name: "Worktree",
|
||||
description: "Create a git worktree",
|
||||
async configure(info) {
|
||||
const worktree = await AppRuntime.runPromise(Worktree.Service.use((svc) => svc.makeWorktreeInfo()))
|
||||
const { AppRuntime, Worktree } = await loadWorktree()
|
||||
const next = await AppRuntime.runPromise(Worktree.Service.use((svc) => svc.makeWorktreeInfo()))
|
||||
return {
|
||||
...info,
|
||||
name: worktree.name,
|
||||
branch: worktree.branch,
|
||||
directory: worktree.directory,
|
||||
name: next.name,
|
||||
branch: next.branch,
|
||||
directory: next.directory,
|
||||
}
|
||||
},
|
||||
async create(info) {
|
||||
const { AppRuntime, Worktree } = await loadWorktree()
|
||||
const config = decodeWorktreeConfig(info)
|
||||
await AppRuntime.runPromise(
|
||||
Worktree.Service.use((svc) =>
|
||||
|
|
@ -35,6 +40,7 @@ export const WorktreeAdaptor: WorkspaceAdaptor = {
|
|||
)
|
||||
},
|
||||
async remove(info) {
|
||||
const { AppRuntime, Worktree } = await loadWorktree()
|
||||
const config = decodeWorktreeConfig(info)
|
||||
await AppRuntime.runPromise(Worktree.Service.use((svc) => svc.remove({ directory: config.directory })))
|
||||
},
|
||||
|
|
|
|||
20
packages/opencode/src/control-plane/dev/README.md
Normal file
20
packages/opencode/src/control-plane/dev/README.md
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
|
||||
This is a plugin to simulate a remote environment locally. Add this to `.opencode/opencode.jsonc`:
|
||||
|
||||
```json
|
||||
"plugin": ["../packages/opencode/src/control-plane/dev/debug-workspace-plugin.ts"],
|
||||
```
|
||||
|
||||
In a separate terminal, run a separate OpenCode server. This will act like a remote server and the local instance will proxy all requests to it:
|
||||
|
||||
```
|
||||
./packages/opencode/script/run-workspace-server
|
||||
```
|
||||
|
||||
With the plugin install, you can now run OpenCode and create a `debug` workspace type. This will create a "remote" workspace which talks to the second workspace server started above.
|
||||
|
||||
How this works:
|
||||
|
||||
* The workspace server needs to know the workspace id and port to run. It waits for this information to be written to a file and starts the server when the data is written.
|
||||
* The debug plugin writes this information in the `create` call to the workspace. So create a `debug` workspace will always kick off a new external server.
|
||||
* The server script watches for file changes, so whenver you create a new `debug` workspace it will restart with the new information. This means that there is only ever one working `debug` workspace at a time; when you create a new one all previous sessions will show that it can't connect because previous debug workspaces do not exist.
|
||||
|
|
@ -1,66 +0,0 @@
|
|||
export async function parseSSE(
|
||||
body: ReadableStream<Uint8Array>,
|
||||
signal: AbortSignal,
|
||||
onEvent: (event: unknown) => void,
|
||||
) {
|
||||
const reader = body.getReader()
|
||||
const decoder = new TextDecoder()
|
||||
let buf = ""
|
||||
let last = ""
|
||||
let retry = 1000
|
||||
|
||||
const abort = () => {
|
||||
void reader.cancel().catch(() => undefined)
|
||||
}
|
||||
|
||||
signal.addEventListener("abort", abort)
|
||||
|
||||
try {
|
||||
while (!signal.aborted) {
|
||||
const chunk = await reader.read().catch(() => ({ done: true, value: undefined as Uint8Array | undefined }))
|
||||
if (chunk.done) break
|
||||
|
||||
buf += decoder.decode(chunk.value, { stream: true })
|
||||
buf = buf.replace(/\r\n/g, "\n").replace(/\r/g, "\n")
|
||||
|
||||
const chunks = buf.split("\n\n")
|
||||
buf = chunks.pop() ?? ""
|
||||
|
||||
chunks.forEach((chunk) => {
|
||||
const data: string[] = []
|
||||
chunk.split("\n").forEach((line) => {
|
||||
if (line.startsWith("data:")) {
|
||||
data.push(line.replace(/^data:\s*/, ""))
|
||||
return
|
||||
}
|
||||
if (line.startsWith("id:")) {
|
||||
last = line.replace(/^id:\s*/, "")
|
||||
return
|
||||
}
|
||||
if (line.startsWith("retry:")) {
|
||||
const parsed = Number.parseInt(line.replace(/^retry:\s*/, ""), 10)
|
||||
if (!Number.isNaN(parsed)) retry = parsed
|
||||
}
|
||||
})
|
||||
|
||||
if (!data.length) return
|
||||
const raw = data.join("\n")
|
||||
try {
|
||||
onEvent(JSON.parse(raw))
|
||||
} catch {
|
||||
onEvent({
|
||||
type: "sse.message",
|
||||
properties: {
|
||||
data: raw,
|
||||
id: last || undefined,
|
||||
retry,
|
||||
},
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
} finally {
|
||||
signal.removeEventListener("abort", abort)
|
||||
reader.releaseLock()
|
||||
}
|
||||
}
|
||||
|
|
@ -1,22 +1,23 @@
|
|||
import { GlobalBus, type GlobalEvent } from "@/bus/global"
|
||||
import { Effect } from "effect"
|
||||
|
||||
export function waitEvent(input: { timeout: number; signal?: AbortSignal; fn: (event: GlobalEvent) => boolean }) {
|
||||
if (input.signal?.aborted) return Promise.reject(input.signal.reason ?? new Error("Request aborted"))
|
||||
if (input.signal?.aborted) return Effect.fail(input.signal.reason ?? new Error("Request aborted"))
|
||||
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
return Effect.callback<void, unknown>((resume) => {
|
||||
const abort = () => {
|
||||
cleanup()
|
||||
reject(input.signal?.reason ?? new Error("Request aborted"))
|
||||
resume(Effect.fail(input.signal?.reason ?? new Error("Request aborted")))
|
||||
}
|
||||
|
||||
const handler = (event: GlobalEvent) => {
|
||||
try {
|
||||
if (!input.fn(event)) return
|
||||
cleanup()
|
||||
resolve()
|
||||
resume(Effect.void)
|
||||
} catch (error) {
|
||||
cleanup()
|
||||
reject(error)
|
||||
resume(Effect.fail(error))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -28,10 +29,11 @@ export function waitEvent(input: { timeout: number; signal?: AbortSignal; fn: (e
|
|||
|
||||
const timeout = setTimeout(() => {
|
||||
cleanup()
|
||||
reject(new Error("Timed out waiting for global event"))
|
||||
resume(Effect.fail(new Error("Timed out waiting for global event")))
|
||||
}, input.timeout)
|
||||
|
||||
GlobalBus.on("event", handler)
|
||||
input.signal?.addEventListener("abort", abort, { once: true })
|
||||
return Effect.sync(cleanup)
|
||||
})
|
||||
}
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -41,6 +41,7 @@ import { ToolRegistry } from "@/tool/registry"
|
|||
import { Format } from "@/format"
|
||||
import { Project } from "@/project/project"
|
||||
import { Vcs } from "@/project/vcs"
|
||||
import { Workspace } from "@/control-plane/workspace"
|
||||
import { Worktree } from "@/worktree"
|
||||
import { Pty } from "@/pty"
|
||||
import { Installation } from "@/installation"
|
||||
|
|
@ -90,6 +91,7 @@ export const AppLayer = Layer.mergeAll(
|
|||
Format.defaultLayer,
|
||||
Project.defaultLayer,
|
||||
Vcs.defaultLayer,
|
||||
Workspace.defaultLayer,
|
||||
Worktree.defaultLayer,
|
||||
Pty.defaultLayer,
|
||||
Installation.defaultLayer,
|
||||
|
|
|
|||
|
|
@ -89,7 +89,7 @@ function missingWorkspaceResponse(id: WorkspaceID): HttpServerResponse.HttpServe
|
|||
|
||||
function resolveTarget(workspace: Workspace.Info): Effect.Effect<Target> {
|
||||
return Effect.gen(function* () {
|
||||
const adaptor = yield* Effect.promise(() => getAdaptor(workspace.projectID, workspace.type))
|
||||
const adaptor = yield* Effect.sync(() => getAdaptor(workspace.projectID, workspace.type))
|
||||
return yield* Effect.promise(() => Promise.resolve(adaptor.target(workspace)))
|
||||
})
|
||||
}
|
||||
|
|
@ -101,7 +101,7 @@ function proxyRemote(
|
|||
url: URL,
|
||||
): Effect.Effect<HttpServerResponse.HttpServerResponse, never, Socket.WebSocketConstructor> {
|
||||
return Effect.gen(function* () {
|
||||
const syncing = yield* Effect.promise(() => Workspace.isSyncing(workspace.id))
|
||||
const syncing = yield* Effect.sync(() => Workspace.isSyncing(workspace.id))
|
||||
if (!syncing) {
|
||||
return HttpServerResponse.text(`broken sync connection for workspace: ${workspace.id}`, {
|
||||
status: 503,
|
||||
|
|
|
|||
|
|
@ -1,56 +0,0 @@
|
|||
import { afterEach, describe, expect, test } from "bun:test"
|
||||
import { parseSSE } from "../../src/control-plane/sse"
|
||||
import { resetDatabase } from "../fixture/db"
|
||||
|
||||
afterEach(async () => {
|
||||
await resetDatabase()
|
||||
})
|
||||
|
||||
function stream(chunks: string[]) {
|
||||
return new ReadableStream<Uint8Array>({
|
||||
start(controller) {
|
||||
const encoder = new TextEncoder()
|
||||
chunks.forEach((chunk) => controller.enqueue(encoder.encode(chunk)))
|
||||
controller.close()
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
describe("control-plane/sse", () => {
|
||||
test("parses JSON events with CRLF and multiline data blocks", async () => {
|
||||
const events: unknown[] = []
|
||||
const stop = new AbortController()
|
||||
|
||||
await parseSSE(
|
||||
stream([
|
||||
'data: {"type":"one","properties":{"ok":true}}\r\n\r\n',
|
||||
'data: {"type":"two",\r\ndata: "properties":{"n":2}}\r\n\r\n',
|
||||
]),
|
||||
stop.signal,
|
||||
(event) => events.push(event),
|
||||
)
|
||||
|
||||
expect(events).toEqual([
|
||||
{ type: "one", properties: { ok: true } },
|
||||
{ type: "two", properties: { n: 2 } },
|
||||
])
|
||||
})
|
||||
|
||||
test("falls back to sse.message for non-json payload", async () => {
|
||||
const events: unknown[] = []
|
||||
const stop = new AbortController()
|
||||
|
||||
await parseSSE(stream(["id: abc\nretry: 1500\ndata: hello world\n\n"]), stop.signal, (event) => events.push(event))
|
||||
|
||||
expect(events).toEqual([
|
||||
{
|
||||
type: "sse.message",
|
||||
properties: {
|
||||
data: "hello world",
|
||||
id: "abc",
|
||||
retry: 1500,
|
||||
},
|
||||
},
|
||||
])
|
||||
})
|
||||
})
|
||||
1391
packages/opencode/test/control-plane/workspace.test.ts
Normal file
1391
packages/opencode/test/control-plane/workspace.test.ts
Normal file
File diff suppressed because it is too large
Load diff
|
|
@ -1,283 +0,0 @@
|
|||
import { afterEach, beforeEach, describe, expect, mock, spyOn, test } from "bun:test"
|
||||
import fs from "node:fs/promises"
|
||||
import path from "node:path"
|
||||
import { GlobalBus } from "../../src/bus/global"
|
||||
import { registerAdaptor } from "../../src/control-plane/adaptors"
|
||||
import type { WorkspaceAdaptor } from "../../src/control-plane/types"
|
||||
import { Workspace } from "../../src/control-plane/workspace"
|
||||
import { AppRuntime } from "../../src/effect/app-runtime"
|
||||
import { Flag } from "@opencode-ai/core/flag/flag"
|
||||
import { ModelID, ProviderID } from "../../src/provider/schema"
|
||||
import { Instance } from "../../src/project/instance"
|
||||
import { Session as SessionNs } from "@/session/session"
|
||||
import { MessageV2 } from "../../src/session/message-v2"
|
||||
import { MessageID, PartID, type SessionID } from "../../src/session/schema"
|
||||
import { Database } from "@/storage/db"
|
||||
import { asc } from "drizzle-orm"
|
||||
import { eq } from "drizzle-orm"
|
||||
import { SyncEvent } from "../../src/sync"
|
||||
import { EventTable } from "../../src/sync/event.sql"
|
||||
import * as Log from "@opencode-ai/core/util/log"
|
||||
import { resetDatabase } from "../fixture/db"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
|
||||
void Log.init({ print: false })
|
||||
|
||||
const original = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES
|
||||
|
||||
beforeEach(() => {
|
||||
Database.close()
|
||||
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
mock.restore()
|
||||
await Instance.disposeAll()
|
||||
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = original
|
||||
await resetDatabase()
|
||||
})
|
||||
|
||||
function create(input?: SessionNs.CreateInput) {
|
||||
return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create(input)))
|
||||
}
|
||||
|
||||
function get(id: SessionID) {
|
||||
return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.get(id)))
|
||||
}
|
||||
|
||||
function updateMessage<T extends MessageV2.Info>(msg: T) {
|
||||
return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.updateMessage(msg)))
|
||||
}
|
||||
|
||||
function updatePart<T extends MessageV2.Part>(part: T) {
|
||||
return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.updatePart(part)))
|
||||
}
|
||||
|
||||
async function user(sessionID: SessionID, text: string) {
|
||||
const msg = await updateMessage({
|
||||
id: MessageID.ascending(),
|
||||
role: "user",
|
||||
sessionID,
|
||||
agent: "build",
|
||||
model: { providerID: ProviderID.make("test"), modelID: ModelID.make("test") },
|
||||
time: { created: Date.now() },
|
||||
})
|
||||
await updatePart({
|
||||
id: PartID.ascending(),
|
||||
sessionID,
|
||||
messageID: msg.id,
|
||||
type: "text",
|
||||
text,
|
||||
})
|
||||
}
|
||||
|
||||
function remote(dir: string, url: string): WorkspaceAdaptor {
|
||||
return {
|
||||
name: "remote",
|
||||
description: "remote",
|
||||
configure(info) {
|
||||
return {
|
||||
...info,
|
||||
directory: dir,
|
||||
}
|
||||
},
|
||||
async create() {
|
||||
await fs.mkdir(dir, { recursive: true })
|
||||
},
|
||||
async remove() {},
|
||||
target() {
|
||||
return {
|
||||
type: "remote" as const,
|
||||
url,
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
function local(dir: string): WorkspaceAdaptor {
|
||||
return {
|
||||
name: "local",
|
||||
description: "local",
|
||||
configure(info) {
|
||||
return {
|
||||
...info,
|
||||
directory: dir,
|
||||
}
|
||||
},
|
||||
async create() {
|
||||
await fs.mkdir(dir, { recursive: true })
|
||||
},
|
||||
async remove() {},
|
||||
target() {
|
||||
return {
|
||||
type: "local" as const,
|
||||
directory: dir,
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
function eventStreamResponse() {
|
||||
return new Response(new ReadableStream({ start() {} }), {
|
||||
status: 200,
|
||||
headers: {
|
||||
"content-type": "text/event-stream",
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
describe("Workspace.sessionRestore", () => {
|
||||
test("replays session events in batches of 10 and emits progress", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
const dir = path.join(tmp.path, ".restore")
|
||||
const seen: any[] = []
|
||||
const posts: Array<{
|
||||
path: string
|
||||
body: { directory: string; events: Array<{ seq: number; aggregateID: string }> }
|
||||
}> = []
|
||||
const on = (evt: any) => seen.push(evt)
|
||||
GlobalBus.on("event", on)
|
||||
|
||||
const raw = globalThis.fetch
|
||||
spyOn(globalThis, "fetch").mockImplementation(
|
||||
Object.assign(
|
||||
async (input: URL | RequestInfo, init?: BunFetchRequestInit | RequestInit) => {
|
||||
const url = new URL(typeof input === "string" || input instanceof URL ? input : input.url)
|
||||
if (url.pathname === "/base/global/event") {
|
||||
return eventStreamResponse()
|
||||
}
|
||||
if (url.pathname === "/base/sync/history") {
|
||||
return Response.json([])
|
||||
}
|
||||
const body = JSON.parse(String(init?.body))
|
||||
posts.push({
|
||||
path: url.pathname,
|
||||
body,
|
||||
})
|
||||
return Response.json({ sessionID: body.events[0].aggregateID })
|
||||
},
|
||||
{
|
||||
preconnect: raw.preconnect?.bind(raw),
|
||||
},
|
||||
) as typeof globalThis.fetch,
|
||||
)
|
||||
|
||||
try {
|
||||
const setup = await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
registerAdaptor(Instance.project.id, "worktree", remote(dir, "https://workspace.test/base"))
|
||||
const space = await Workspace.create({
|
||||
type: "worktree",
|
||||
branch: null,
|
||||
extra: null,
|
||||
projectID: Instance.project.id,
|
||||
})
|
||||
const session = await create({})
|
||||
for (let i = 0; i < 6; i++) {
|
||||
await user(session.id, `msg ${i}`)
|
||||
}
|
||||
const rows = Database.use((db) =>
|
||||
db
|
||||
.select({ seq: EventTable.seq })
|
||||
.from(EventTable)
|
||||
.where(eq(EventTable.aggregate_id, session.id))
|
||||
.orderBy(asc(EventTable.seq))
|
||||
.all(),
|
||||
)
|
||||
const result = await Workspace.sessionRestore({
|
||||
workspaceID: space.id,
|
||||
sessionID: session.id,
|
||||
})
|
||||
return { space, session, rows, result }
|
||||
},
|
||||
})
|
||||
|
||||
expect(setup.rows).toHaveLength(13)
|
||||
expect(setup.result).toEqual({ total: 2 })
|
||||
expect(posts).toHaveLength(2)
|
||||
expect(posts[0]?.path).toBe("/base/sync/replay")
|
||||
expect(posts[1]?.path).toBe("/base/sync/replay")
|
||||
expect(posts[0]?.body.directory).toBe(dir)
|
||||
expect(posts[1]?.body.directory).toBe(dir)
|
||||
expect(posts[0]?.body.events).toHaveLength(10)
|
||||
expect(posts[1]?.body.events).toHaveLength(4)
|
||||
expect(posts.flatMap((item) => item.body.events.map((event) => event.seq))).toEqual([
|
||||
...setup.rows.map((row) => row.seq),
|
||||
setup.rows.at(-1)!.seq + 1,
|
||||
])
|
||||
expect(posts[1]?.body.events.at(-1)).toMatchObject({
|
||||
aggregateID: setup.session.id,
|
||||
seq: setup.rows.at(-1)!.seq + 1,
|
||||
type: SyncEvent.versionedType(SessionNs.Event.Updated.type, SessionNs.Event.Updated.version),
|
||||
data: {
|
||||
sessionID: setup.session.id,
|
||||
info: {
|
||||
workspaceID: setup.space.id,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
const restore = seen.filter(
|
||||
(evt) => evt.workspace === setup.space.id && evt.payload.type === Workspace.Event.Restore.type,
|
||||
)
|
||||
expect(restore.map((evt) => evt.payload.properties.step)).toEqual([0, 1, 2])
|
||||
expect(restore.map((evt) => evt.payload.properties.total)).toEqual([2, 2, 2])
|
||||
expect(restore.map((evt) => evt.payload.properties.sessionID)).toEqual([
|
||||
setup.session.id,
|
||||
setup.session.id,
|
||||
setup.session.id,
|
||||
])
|
||||
} finally {
|
||||
GlobalBus.off("event", on)
|
||||
}
|
||||
})
|
||||
|
||||
test("replays locally without posting to a server", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
const dir = path.join(tmp.path, ".restore-local")
|
||||
const seen: any[] = []
|
||||
const on = (evt: any) => seen.push(evt)
|
||||
GlobalBus.on("event", on)
|
||||
|
||||
const fetch = spyOn(globalThis, "fetch")
|
||||
const replayAll = spyOn(SyncEvent, "replayAll")
|
||||
|
||||
try {
|
||||
const setup = await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
registerAdaptor(Instance.project.id, "local-restore", local(dir))
|
||||
const space = await Workspace.create({
|
||||
type: "local-restore",
|
||||
branch: null,
|
||||
extra: null,
|
||||
projectID: Instance.project.id,
|
||||
})
|
||||
const session = await create({})
|
||||
for (let i = 0; i < 6; i++) {
|
||||
await user(session.id, `msg ${i}`)
|
||||
}
|
||||
const result = await Workspace.sessionRestore({
|
||||
workspaceID: space.id,
|
||||
sessionID: session.id,
|
||||
})
|
||||
const updated = await get(session.id)
|
||||
return { space, session, result, updated }
|
||||
},
|
||||
})
|
||||
|
||||
expect(setup.result).toEqual({ total: 2 })
|
||||
expect(fetch).not.toHaveBeenCalled()
|
||||
expect(replayAll).toHaveBeenCalledTimes(2)
|
||||
expect(setup.updated.workspaceID).toBe(setup.space.id)
|
||||
|
||||
const restore = seen.filter(
|
||||
(evt) => evt.workspace === setup.space.id && evt.payload.type === Workspace.Event.Restore.type,
|
||||
)
|
||||
expect(restore.map((evt) => evt.payload.properties.step)).toEqual([0, 1, 2])
|
||||
} finally {
|
||||
GlobalBus.off("event", on)
|
||||
}
|
||||
})
|
||||
})
|
||||
Loading…
Add table
Add a link
Reference in a new issue