mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-20 01:12:15 +00:00
refactor: unwrap Workspace namespace + self-reexport (#22934)
This commit is contained in:
parent
f9aa3d77cd
commit
bae80af1b4
1 changed files with 443 additions and 443 deletions
|
|
@ -26,173 +26,239 @@ import { AppRuntime } from "@/effect/app-runtime"
|
|||
import { EventSequenceTable } from "@/sync/event.sql"
|
||||
import { waitEvent } from "./util"
|
||||
|
||||
export namespace Workspace {
|
||||
export const Info = WorkspaceInfo.meta({
|
||||
ref: "Workspace",
|
||||
})
|
||||
export type Info = z.infer<typeof Info>
|
||||
export const Info = WorkspaceInfo.meta({
|
||||
ref: "Workspace",
|
||||
})
|
||||
export type Info = z.infer<typeof Info>
|
||||
|
||||
export const ConnectionStatus = z.object({
|
||||
workspaceID: WorkspaceID.zod,
|
||||
status: z.enum(["connected", "connecting", "disconnected", "error"]),
|
||||
error: z.string().optional(),
|
||||
})
|
||||
export type ConnectionStatus = z.infer<typeof ConnectionStatus>
|
||||
export const ConnectionStatus = z.object({
|
||||
workspaceID: WorkspaceID.zod,
|
||||
status: z.enum(["connected", "connecting", "disconnected", "error"]),
|
||||
error: z.string().optional(),
|
||||
})
|
||||
export type ConnectionStatus = z.infer<typeof ConnectionStatus>
|
||||
|
||||
const Restore = z.object({
|
||||
workspaceID: WorkspaceID.zod,
|
||||
sessionID: SessionID.zod,
|
||||
total: z.number().int().min(0),
|
||||
step: z.number().int().min(0),
|
||||
})
|
||||
const Restore = z.object({
|
||||
workspaceID: WorkspaceID.zod,
|
||||
sessionID: SessionID.zod,
|
||||
total: z.number().int().min(0),
|
||||
step: z.number().int().min(0),
|
||||
})
|
||||
|
||||
export const Event = {
|
||||
Ready: BusEvent.define(
|
||||
"workspace.ready",
|
||||
z.object({
|
||||
name: z.string(),
|
||||
}),
|
||||
),
|
||||
Failed: BusEvent.define(
|
||||
"workspace.failed",
|
||||
z.object({
|
||||
message: z.string(),
|
||||
}),
|
||||
),
|
||||
Restore: BusEvent.define("workspace.restore", Restore),
|
||||
Status: BusEvent.define("workspace.status", ConnectionStatus),
|
||||
export const Event = {
|
||||
Ready: BusEvent.define(
|
||||
"workspace.ready",
|
||||
z.object({
|
||||
name: z.string(),
|
||||
}),
|
||||
),
|
||||
Failed: BusEvent.define(
|
||||
"workspace.failed",
|
||||
z.object({
|
||||
message: z.string(),
|
||||
}),
|
||||
),
|
||||
Restore: BusEvent.define("workspace.restore", Restore),
|
||||
Status: BusEvent.define("workspace.status", ConnectionStatus),
|
||||
}
|
||||
|
||||
function fromRow(row: typeof WorkspaceTable.$inferSelect): Info {
|
||||
return {
|
||||
id: row.id,
|
||||
type: row.type,
|
||||
branch: row.branch,
|
||||
name: row.name,
|
||||
directory: row.directory,
|
||||
extra: row.extra,
|
||||
projectID: row.project_id,
|
||||
}
|
||||
}
|
||||
|
||||
const CreateInput = z.object({
|
||||
id: WorkspaceID.zod.optional(),
|
||||
type: Info.shape.type,
|
||||
branch: Info.shape.branch,
|
||||
projectID: ProjectID.zod,
|
||||
extra: Info.shape.extra,
|
||||
})
|
||||
|
||||
export const create = fn(CreateInput, async (input) => {
|
||||
const id = WorkspaceID.ascending(input.id)
|
||||
const adaptor = await getAdaptor(input.projectID, input.type)
|
||||
|
||||
const config = await adaptor.configure({ ...input, id, name: Slug.create(), directory: null })
|
||||
|
||||
const info: Info = {
|
||||
id,
|
||||
type: config.type,
|
||||
branch: config.branch ?? null,
|
||||
name: config.name ?? null,
|
||||
directory: config.directory ?? null,
|
||||
extra: config.extra ?? null,
|
||||
projectID: input.projectID,
|
||||
}
|
||||
|
||||
function fromRow(row: typeof WorkspaceTable.$inferSelect): Info {
|
||||
return {
|
||||
id: row.id,
|
||||
type: row.type,
|
||||
branch: row.branch,
|
||||
name: row.name,
|
||||
directory: row.directory,
|
||||
extra: row.extra,
|
||||
projectID: row.project_id,
|
||||
}
|
||||
}
|
||||
|
||||
const CreateInput = z.object({
|
||||
id: WorkspaceID.zod.optional(),
|
||||
type: Info.shape.type,
|
||||
branch: Info.shape.branch,
|
||||
projectID: ProjectID.zod,
|
||||
extra: Info.shape.extra,
|
||||
Database.use((db) => {
|
||||
db.insert(WorkspaceTable)
|
||||
.values({
|
||||
id: info.id,
|
||||
type: info.type,
|
||||
branch: info.branch,
|
||||
name: info.name,
|
||||
directory: info.directory,
|
||||
extra: info.extra,
|
||||
project_id: info.projectID,
|
||||
})
|
||||
.run()
|
||||
})
|
||||
|
||||
export const create = fn(CreateInput, async (input) => {
|
||||
const id = WorkspaceID.ascending(input.id)
|
||||
const adaptor = await getAdaptor(input.projectID, input.type)
|
||||
const env = {
|
||||
OPENCODE_AUTH_CONTENT: JSON.stringify(await AppRuntime.runPromise(Auth.Service.use((auth) => auth.all()))),
|
||||
OPENCODE_WORKSPACE_ID: config.id,
|
||||
OPENCODE_EXPERIMENTAL_WORKSPACES: "true",
|
||||
}
|
||||
await adaptor.create(config, env)
|
||||
|
||||
const config = await adaptor.configure({ ...input, id, name: Slug.create(), directory: null })
|
||||
startSync(info)
|
||||
|
||||
const info: Info = {
|
||||
id,
|
||||
type: config.type,
|
||||
branch: config.branch ?? null,
|
||||
name: config.name ?? null,
|
||||
directory: config.directory ?? null,
|
||||
extra: config.extra ?? null,
|
||||
projectID: input.projectID,
|
||||
}
|
||||
await waitEvent({
|
||||
timeout: TIMEOUT,
|
||||
fn(event) {
|
||||
if (event.workspace === info.id && event.payload.type === Event.Status.type) {
|
||||
const { status } = event.payload.properties
|
||||
return status === "error" || status === "connected"
|
||||
}
|
||||
return false
|
||||
},
|
||||
})
|
||||
|
||||
Database.use((db) => {
|
||||
db.insert(WorkspaceTable)
|
||||
.values({
|
||||
id: info.id,
|
||||
type: info.type,
|
||||
branch: info.branch,
|
||||
name: info.name,
|
||||
directory: info.directory,
|
||||
extra: info.extra,
|
||||
project_id: info.projectID,
|
||||
})
|
||||
.run()
|
||||
})
|
||||
return info
|
||||
})
|
||||
|
||||
const env = {
|
||||
OPENCODE_AUTH_CONTENT: JSON.stringify(await AppRuntime.runPromise(Auth.Service.use((auth) => auth.all()))),
|
||||
OPENCODE_WORKSPACE_ID: config.id,
|
||||
OPENCODE_EXPERIMENTAL_WORKSPACES: "true",
|
||||
}
|
||||
await adaptor.create(config, env)
|
||||
const SessionRestoreInput = z.object({
|
||||
workspaceID: WorkspaceID.zod,
|
||||
sessionID: SessionID.zod,
|
||||
})
|
||||
|
||||
startSync(info)
|
||||
export const sessionRestore = fn(SessionRestoreInput, async (input) => {
|
||||
log.info("session restore requested", {
|
||||
workspaceID: input.workspaceID,
|
||||
sessionID: input.sessionID,
|
||||
})
|
||||
try {
|
||||
const space = await get(input.workspaceID)
|
||||
if (!space) throw new Error(`Workspace not found: ${input.workspaceID}`)
|
||||
|
||||
await waitEvent({
|
||||
timeout: TIMEOUT,
|
||||
fn(event) {
|
||||
if (event.workspace === info.id && event.payload.type === Event.Status.type) {
|
||||
const { status } = event.payload.properties
|
||||
return status === "error" || status === "connected"
|
||||
}
|
||||
return false
|
||||
const adaptor = await getAdaptor(space.projectID, space.type)
|
||||
const target = await adaptor.target(space)
|
||||
|
||||
// Need to switch the workspace of the session
|
||||
SyncEvent.run(Session.Event.Updated, {
|
||||
sessionID: input.sessionID,
|
||||
info: {
|
||||
workspaceID: input.workspaceID,
|
||||
},
|
||||
})
|
||||
|
||||
return info
|
||||
})
|
||||
const rows = Database.use((db) =>
|
||||
db
|
||||
.select({
|
||||
id: EventTable.id,
|
||||
aggregateID: EventTable.aggregate_id,
|
||||
seq: EventTable.seq,
|
||||
type: EventTable.type,
|
||||
data: EventTable.data,
|
||||
})
|
||||
.from(EventTable)
|
||||
.where(eq(EventTable.aggregate_id, input.sessionID))
|
||||
.orderBy(asc(EventTable.seq))
|
||||
.all(),
|
||||
)
|
||||
if (rows.length === 0) throw new Error(`No events found for session: ${input.sessionID}`)
|
||||
|
||||
const SessionRestoreInput = z.object({
|
||||
workspaceID: WorkspaceID.zod,
|
||||
sessionID: SessionID.zod,
|
||||
})
|
||||
const all = rows
|
||||
|
||||
export const sessionRestore = fn(SessionRestoreInput, async (input) => {
|
||||
log.info("session restore requested", {
|
||||
const size = 10
|
||||
const sets = Array.from({ length: Math.ceil(all.length / size) }, (_, i) => all.slice(i * size, (i + 1) * size))
|
||||
const total = sets.length
|
||||
log.info("session restore prepared", {
|
||||
workspaceID: input.workspaceID,
|
||||
sessionID: input.sessionID,
|
||||
workspaceType: space.type,
|
||||
directory: space.directory,
|
||||
target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory,
|
||||
events: all.length,
|
||||
batches: total,
|
||||
first: all[0]?.seq,
|
||||
last: all.at(-1)?.seq,
|
||||
})
|
||||
try {
|
||||
const space = await get(input.workspaceID)
|
||||
if (!space) throw new Error(`Workspace not found: ${input.workspaceID}`)
|
||||
|
||||
const adaptor = await getAdaptor(space.projectID, space.type)
|
||||
const target = await adaptor.target(space)
|
||||
|
||||
// Need to switch the workspace of the session
|
||||
SyncEvent.run(Session.Event.Updated, {
|
||||
sessionID: input.sessionID,
|
||||
info: {
|
||||
GlobalBus.emit("event", {
|
||||
directory: "global",
|
||||
workspace: input.workspaceID,
|
||||
payload: {
|
||||
type: Event.Restore.type,
|
||||
properties: {
|
||||
workspaceID: input.workspaceID,
|
||||
sessionID: input.sessionID,
|
||||
total,
|
||||
step: 0,
|
||||
},
|
||||
})
|
||||
|
||||
const rows = Database.use((db) =>
|
||||
db
|
||||
.select({
|
||||
id: EventTable.id,
|
||||
aggregateID: EventTable.aggregate_id,
|
||||
seq: EventTable.seq,
|
||||
type: EventTable.type,
|
||||
data: EventTable.data,
|
||||
})
|
||||
.from(EventTable)
|
||||
.where(eq(EventTable.aggregate_id, input.sessionID))
|
||||
.orderBy(asc(EventTable.seq))
|
||||
.all(),
|
||||
)
|
||||
if (rows.length === 0) throw new Error(`No events found for session: ${input.sessionID}`)
|
||||
|
||||
const all = rows
|
||||
|
||||
const size = 10
|
||||
const sets = Array.from({ length: Math.ceil(all.length / size) }, (_, i) => all.slice(i * size, (i + 1) * size))
|
||||
const total = sets.length
|
||||
log.info("session restore prepared", {
|
||||
},
|
||||
})
|
||||
for (const [i, events] of sets.entries()) {
|
||||
log.info("session restore batch starting", {
|
||||
workspaceID: input.workspaceID,
|
||||
sessionID: input.sessionID,
|
||||
workspaceType: space.type,
|
||||
directory: space.directory,
|
||||
step: i + 1,
|
||||
total,
|
||||
events: events.length,
|
||||
first: events[0]?.seq,
|
||||
last: events.at(-1)?.seq,
|
||||
target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory,
|
||||
events: all.length,
|
||||
batches: total,
|
||||
first: all[0]?.seq,
|
||||
last: all.at(-1)?.seq,
|
||||
})
|
||||
if (target.type === "local") {
|
||||
SyncEvent.replayAll(events)
|
||||
log.info("session restore batch replayed locally", {
|
||||
workspaceID: input.workspaceID,
|
||||
sessionID: input.sessionID,
|
||||
step: i + 1,
|
||||
total,
|
||||
events: events.length,
|
||||
})
|
||||
} else {
|
||||
const url = route(target.url, "/sync/replay")
|
||||
const headers = new Headers(target.headers)
|
||||
headers.set("content-type", "application/json")
|
||||
const res = await fetch(url, {
|
||||
method: "POST",
|
||||
headers,
|
||||
body: JSON.stringify({
|
||||
directory: space.directory ?? "",
|
||||
events,
|
||||
}),
|
||||
})
|
||||
if (!res.ok) {
|
||||
const body = await res.text()
|
||||
log.error("session restore batch failed", {
|
||||
workspaceID: input.workspaceID,
|
||||
sessionID: input.sessionID,
|
||||
step: i + 1,
|
||||
total,
|
||||
status: res.status,
|
||||
body,
|
||||
})
|
||||
throw new Error(
|
||||
`Failed to replay session ${input.sessionID} into workspace ${input.workspaceID}: HTTP ${res.status} ${body}`,
|
||||
)
|
||||
}
|
||||
log.info("session restore batch posted", {
|
||||
workspaceID: input.workspaceID,
|
||||
sessionID: input.sessionID,
|
||||
step: i + 1,
|
||||
total,
|
||||
status: res.status,
|
||||
})
|
||||
}
|
||||
GlobalBus.emit("event", {
|
||||
directory: "global",
|
||||
workspace: input.workspaceID,
|
||||
|
|
@ -202,329 +268,263 @@ export namespace Workspace {
|
|||
workspaceID: input.workspaceID,
|
||||
sessionID: input.sessionID,
|
||||
total,
|
||||
step: 0,
|
||||
step: i + 1,
|
||||
},
|
||||
},
|
||||
})
|
||||
for (const [i, events] of sets.entries()) {
|
||||
log.info("session restore batch starting", {
|
||||
workspaceID: input.workspaceID,
|
||||
sessionID: input.sessionID,
|
||||
step: i + 1,
|
||||
total,
|
||||
events: events.length,
|
||||
first: events[0]?.seq,
|
||||
last: events.at(-1)?.seq,
|
||||
target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory,
|
||||
}
|
||||
|
||||
log.info("session restore complete", {
|
||||
workspaceID: input.workspaceID,
|
||||
sessionID: input.sessionID,
|
||||
batches: total,
|
||||
})
|
||||
|
||||
return {
|
||||
total,
|
||||
}
|
||||
} catch (err) {
|
||||
log.error("session restore failed", {
|
||||
workspaceID: input.workspaceID,
|
||||
sessionID: input.sessionID,
|
||||
error: errorData(err),
|
||||
})
|
||||
throw err
|
||||
}
|
||||
})
|
||||
|
||||
export function list(project: Project.Info) {
|
||||
const rows = Database.use((db) =>
|
||||
db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(),
|
||||
)
|
||||
const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
|
||||
|
||||
for (const space of spaces) startSync(space)
|
||||
return spaces
|
||||
}
|
||||
|
||||
function lookup(id: WorkspaceID) {
|
||||
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
|
||||
if (!row) return
|
||||
return fromRow(row)
|
||||
}
|
||||
|
||||
export const get = fn(WorkspaceID.zod, async (id) => {
|
||||
const space = lookup(id)
|
||||
if (!space) return
|
||||
startSync(space)
|
||||
return space
|
||||
})
|
||||
|
||||
export const remove = fn(WorkspaceID.zod, async (id) => {
|
||||
const sessions = Database.use((db) =>
|
||||
db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.workspace_id, id)).all(),
|
||||
)
|
||||
for (const session of sessions) {
|
||||
await AppRuntime.runPromise(Session.Service.use((svc) => svc.remove(session.id)))
|
||||
}
|
||||
|
||||
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
|
||||
|
||||
if (row) {
|
||||
stopSync(id)
|
||||
|
||||
const info = fromRow(row)
|
||||
try {
|
||||
const adaptor = await getAdaptor(info.projectID, row.type)
|
||||
await adaptor.remove(info)
|
||||
} catch {
|
||||
log.error("adaptor not available when removing workspace", { type: row.type })
|
||||
}
|
||||
Database.use((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run())
|
||||
return info
|
||||
}
|
||||
})
|
||||
|
||||
const connections = new Map<WorkspaceID, ConnectionStatus>()
|
||||
const aborts = new Map<WorkspaceID, AbortController>()
|
||||
const TIMEOUT = 5000
|
||||
|
||||
function setStatus(id: WorkspaceID, status: ConnectionStatus["status"], error?: string) {
|
||||
const prev = connections.get(id)
|
||||
if (prev?.status === status && prev?.error === error) return
|
||||
const next = { workspaceID: id, status, error }
|
||||
connections.set(id, next)
|
||||
|
||||
if (status === "error") {
|
||||
aborts.delete(id)
|
||||
}
|
||||
|
||||
GlobalBus.emit("event", {
|
||||
directory: "global",
|
||||
workspace: id,
|
||||
payload: {
|
||||
type: Event.Status.type,
|
||||
properties: next,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
export function status(): ConnectionStatus[] {
|
||||
return [...connections.values()]
|
||||
}
|
||||
|
||||
function synced(state: Record<string, number>) {
|
||||
const ids = Object.keys(state)
|
||||
if (ids.length === 0) return true
|
||||
|
||||
const done = Object.fromEntries(
|
||||
Database.use((db) =>
|
||||
db
|
||||
.select({
|
||||
id: EventSequenceTable.aggregate_id,
|
||||
seq: EventSequenceTable.seq,
|
||||
})
|
||||
if (target.type === "local") {
|
||||
SyncEvent.replayAll(events)
|
||||
log.info("session restore batch replayed locally", {
|
||||
workspaceID: input.workspaceID,
|
||||
sessionID: input.sessionID,
|
||||
step: i + 1,
|
||||
total,
|
||||
events: events.length,
|
||||
})
|
||||
} else {
|
||||
const url = route(target.url, "/sync/replay")
|
||||
const headers = new Headers(target.headers)
|
||||
headers.set("content-type", "application/json")
|
||||
const res = await fetch(url, {
|
||||
method: "POST",
|
||||
headers,
|
||||
body: JSON.stringify({
|
||||
directory: space.directory ?? "",
|
||||
events,
|
||||
}),
|
||||
})
|
||||
if (!res.ok) {
|
||||
const body = await res.text()
|
||||
log.error("session restore batch failed", {
|
||||
workspaceID: input.workspaceID,
|
||||
sessionID: input.sessionID,
|
||||
step: i + 1,
|
||||
total,
|
||||
status: res.status,
|
||||
body,
|
||||
})
|
||||
throw new Error(
|
||||
`Failed to replay session ${input.sessionID} into workspace ${input.workspaceID}: HTTP ${res.status} ${body}`,
|
||||
)
|
||||
}
|
||||
log.info("session restore batch posted", {
|
||||
workspaceID: input.workspaceID,
|
||||
sessionID: input.sessionID,
|
||||
step: i + 1,
|
||||
total,
|
||||
status: res.status,
|
||||
})
|
||||
.from(EventSequenceTable)
|
||||
.where(inArray(EventSequenceTable.aggregate_id, ids))
|
||||
.all(),
|
||||
).map((row) => [row.id, row.seq]),
|
||||
) as Record<string, number>
|
||||
|
||||
return ids.every((id) => {
|
||||
return (done[id] ?? -1) >= state[id]
|
||||
})
|
||||
}
|
||||
|
||||
export async function isSyncing(workspaceID: WorkspaceID) {
|
||||
return aborts.has(workspaceID)
|
||||
}
|
||||
|
||||
export async function waitForSync(workspaceID: WorkspaceID, state: Record<string, number>, signal?: AbortSignal) {
|
||||
if (synced(state)) return
|
||||
|
||||
try {
|
||||
await waitEvent({
|
||||
timeout: TIMEOUT,
|
||||
signal,
|
||||
fn(event) {
|
||||
if (event.workspace !== workspaceID && event.payload.type !== "sync") {
|
||||
return false
|
||||
}
|
||||
GlobalBus.emit("event", {
|
||||
directory: "global",
|
||||
workspace: input.workspaceID,
|
||||
payload: {
|
||||
type: Event.Restore.type,
|
||||
properties: {
|
||||
workspaceID: input.workspaceID,
|
||||
sessionID: input.sessionID,
|
||||
total,
|
||||
step: i + 1,
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
log.info("session restore complete", {
|
||||
workspaceID: input.workspaceID,
|
||||
sessionID: input.sessionID,
|
||||
batches: total,
|
||||
})
|
||||
|
||||
return {
|
||||
total,
|
||||
}
|
||||
} catch (err) {
|
||||
log.error("session restore failed", {
|
||||
workspaceID: input.workspaceID,
|
||||
sessionID: input.sessionID,
|
||||
error: errorData(err),
|
||||
})
|
||||
throw err
|
||||
}
|
||||
})
|
||||
|
||||
export function list(project: Project.Info) {
|
||||
const rows = Database.use((db) =>
|
||||
db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(),
|
||||
)
|
||||
const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
|
||||
|
||||
for (const space of spaces) startSync(space)
|
||||
return spaces
|
||||
}
|
||||
|
||||
function lookup(id: WorkspaceID) {
|
||||
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
|
||||
if (!row) return
|
||||
return fromRow(row)
|
||||
}
|
||||
|
||||
export const get = fn(WorkspaceID.zod, async (id) => {
|
||||
const space = lookup(id)
|
||||
if (!space) return
|
||||
startSync(space)
|
||||
return space
|
||||
})
|
||||
|
||||
export const remove = fn(WorkspaceID.zod, async (id) => {
|
||||
const sessions = Database.use((db) =>
|
||||
db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.workspace_id, id)).all(),
|
||||
)
|
||||
for (const session of sessions) {
|
||||
await AppRuntime.runPromise(Session.Service.use((svc) => svc.remove(session.id)))
|
||||
}
|
||||
|
||||
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
|
||||
|
||||
if (row) {
|
||||
stopSync(id)
|
||||
|
||||
const info = fromRow(row)
|
||||
try {
|
||||
const adaptor = await getAdaptor(info.projectID, row.type)
|
||||
await adaptor.remove(info)
|
||||
} catch {
|
||||
log.error("adaptor not available when removing workspace", { type: row.type })
|
||||
}
|
||||
Database.use((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run())
|
||||
return info
|
||||
}
|
||||
})
|
||||
|
||||
const connections = new Map<WorkspaceID, ConnectionStatus>()
|
||||
const aborts = new Map<WorkspaceID, AbortController>()
|
||||
const TIMEOUT = 5000
|
||||
|
||||
function setStatus(id: WorkspaceID, status: ConnectionStatus["status"], error?: string) {
|
||||
const prev = connections.get(id)
|
||||
if (prev?.status === status && prev?.error === error) return
|
||||
const next = { workspaceID: id, status, error }
|
||||
connections.set(id, next)
|
||||
|
||||
if (status === "error") {
|
||||
aborts.delete(id)
|
||||
}
|
||||
|
||||
GlobalBus.emit("event", {
|
||||
directory: "global",
|
||||
workspace: id,
|
||||
payload: {
|
||||
type: Event.Status.type,
|
||||
properties: next,
|
||||
return synced(state)
|
||||
},
|
||||
})
|
||||
} catch {
|
||||
if (signal?.aborted) throw signal.reason ?? new Error("Request aborted")
|
||||
throw new Error(`Timed out waiting for sync fence: ${JSON.stringify(state)}`)
|
||||
}
|
||||
}
|
||||
|
||||
export function status(): ConnectionStatus[] {
|
||||
return [...connections.values()]
|
||||
}
|
||||
const log = Log.create({ service: "workspace-sync" })
|
||||
|
||||
function synced(state: Record<string, number>) {
|
||||
const ids = Object.keys(state)
|
||||
if (ids.length === 0) return true
|
||||
function route(url: string | URL, path: string) {
|
||||
const next = new URL(url)
|
||||
next.pathname = `${next.pathname.replace(/\/$/, "")}${path}`
|
||||
next.search = ""
|
||||
next.hash = ""
|
||||
return next
|
||||
}
|
||||
|
||||
const done = Object.fromEntries(
|
||||
Database.use((db) =>
|
||||
db
|
||||
.select({
|
||||
id: EventSequenceTable.aggregate_id,
|
||||
seq: EventSequenceTable.seq,
|
||||
})
|
||||
.from(EventSequenceTable)
|
||||
.where(inArray(EventSequenceTable.aggregate_id, ids))
|
||||
.all(),
|
||||
).map((row) => [row.id, row.seq]),
|
||||
) as Record<string, number>
|
||||
|
||||
return ids.every((id) => {
|
||||
return (done[id] ?? -1) >= state[id]
|
||||
})
|
||||
}
|
||||
|
||||
export async function isSyncing(workspaceID: WorkspaceID) {
|
||||
return aborts.has(workspaceID)
|
||||
}
|
||||
|
||||
export async function waitForSync(workspaceID: WorkspaceID, state: Record<string, number>, signal?: AbortSignal) {
|
||||
if (synced(state)) return
|
||||
|
||||
try {
|
||||
await waitEvent({
|
||||
timeout: TIMEOUT,
|
||||
signal,
|
||||
fn(event) {
|
||||
if (event.workspace !== workspaceID && event.payload.type !== "sync") {
|
||||
return false
|
||||
}
|
||||
return synced(state)
|
||||
},
|
||||
})
|
||||
} catch {
|
||||
if (signal?.aborted) throw signal.reason ?? new Error("Request aborted")
|
||||
throw new Error(`Timed out waiting for sync fence: ${JSON.stringify(state)}`)
|
||||
}
|
||||
}
|
||||
|
||||
const log = Log.create({ service: "workspace-sync" })
|
||||
|
||||
function route(url: string | URL, path: string) {
|
||||
const next = new URL(url)
|
||||
next.pathname = `${next.pathname.replace(/\/$/, "")}${path}`
|
||||
next.search = ""
|
||||
next.hash = ""
|
||||
return next
|
||||
}
|
||||
|
||||
async function syncWorkspace(space: Info, signal: AbortSignal) {
|
||||
while (!signal.aborted) {
|
||||
log.info("connecting to global sync", { workspace: space.name })
|
||||
setStatus(space.id, "connecting")
|
||||
|
||||
const adaptor = await getAdaptor(space.projectID, space.type)
|
||||
const target = await adaptor.target(space)
|
||||
|
||||
if (target.type === "local") return
|
||||
|
||||
const res = await fetch(route(target.url, "/global/event"), {
|
||||
method: "GET",
|
||||
headers: target.headers,
|
||||
signal,
|
||||
}).catch((err: unknown) => {
|
||||
setStatus(space.id, "error", err instanceof Error ? err.message : String(err))
|
||||
|
||||
log.info("failed to connect to global sync", {
|
||||
workspace: space.name,
|
||||
error: err,
|
||||
})
|
||||
return undefined
|
||||
})
|
||||
|
||||
if (!res || !res.ok || !res.body) {
|
||||
const error = !res ? "No response from global sync" : `Global sync HTTP ${res.status}`
|
||||
log.info("failed to connect to global sync", { workspace: space.name, error })
|
||||
setStatus(space.id, "error", error)
|
||||
await sleep(1000)
|
||||
continue
|
||||
}
|
||||
|
||||
log.info("global sync connected", { workspace: space.name })
|
||||
setStatus(space.id, "connected")
|
||||
|
||||
await parseSSE(res.body, signal, (evt: any) => {
|
||||
try {
|
||||
if (!("payload" in evt)) return
|
||||
|
||||
if (evt.payload.type === "sync") {
|
||||
SyncEvent.replay(evt.payload.syncEvent as SyncEvent.SerializedEvent)
|
||||
}
|
||||
|
||||
GlobalBus.emit("event", {
|
||||
directory: evt.directory,
|
||||
project: evt.project,
|
||||
workspace: space.id,
|
||||
payload: evt.payload,
|
||||
})
|
||||
} catch (err) {
|
||||
log.info("failed to replay global event", {
|
||||
workspaceID: space.id,
|
||||
error: err,
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
log.info("disconnected from global sync: " + space.id)
|
||||
setStatus(space.id, "disconnected")
|
||||
|
||||
// TODO: Implement exponential backoff
|
||||
await sleep(1000)
|
||||
}
|
||||
}
|
||||
|
||||
async function startSync(space: Info) {
|
||||
if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) return
|
||||
async function syncWorkspace(space: Info, signal: AbortSignal) {
|
||||
while (!signal.aborted) {
|
||||
log.info("connecting to global sync", { workspace: space.name })
|
||||
setStatus(space.id, "connecting")
|
||||
|
||||
const adaptor = await getAdaptor(space.projectID, space.type)
|
||||
const target = await adaptor.target(space)
|
||||
|
||||
if (target.type === "local") {
|
||||
void Filesystem.exists(target.directory).then((exists) => {
|
||||
setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
|
||||
if (target.type === "local") return
|
||||
|
||||
const res = await fetch(route(target.url, "/global/event"), {
|
||||
method: "GET",
|
||||
headers: target.headers,
|
||||
signal,
|
||||
}).catch((err: unknown) => {
|
||||
setStatus(space.id, "error", err instanceof Error ? err.message : String(err))
|
||||
|
||||
log.info("failed to connect to global sync", {
|
||||
workspace: space.name,
|
||||
error: err,
|
||||
})
|
||||
return
|
||||
return undefined
|
||||
})
|
||||
|
||||
if (!res || !res.ok || !res.body) {
|
||||
const error = !res ? "No response from global sync" : `Global sync HTTP ${res.status}`
|
||||
log.info("failed to connect to global sync", { workspace: space.name, error })
|
||||
setStatus(space.id, "error", error)
|
||||
await sleep(1000)
|
||||
continue
|
||||
}
|
||||
|
||||
if (aborts.has(space.id)) return true
|
||||
log.info("global sync connected", { workspace: space.name })
|
||||
setStatus(space.id, "connected")
|
||||
|
||||
await parseSSE(res.body, signal, (evt: any) => {
|
||||
try {
|
||||
if (!("payload" in evt)) return
|
||||
|
||||
if (evt.payload.type === "sync") {
|
||||
SyncEvent.replay(evt.payload.syncEvent as SyncEvent.SerializedEvent)
|
||||
}
|
||||
|
||||
GlobalBus.emit("event", {
|
||||
directory: evt.directory,
|
||||
project: evt.project,
|
||||
workspace: space.id,
|
||||
payload: evt.payload,
|
||||
})
|
||||
} catch (err) {
|
||||
log.info("failed to replay global event", {
|
||||
workspaceID: space.id,
|
||||
error: err,
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
log.info("disconnected from global sync: " + space.id)
|
||||
setStatus(space.id, "disconnected")
|
||||
|
||||
const abort = new AbortController()
|
||||
aborts.set(space.id, abort)
|
||||
|
||||
void syncWorkspace(space, abort.signal).catch((error) => {
|
||||
aborts.delete(space.id)
|
||||
|
||||
setStatus(space.id, "error", String(error))
|
||||
log.warn("workspace listener failed", {
|
||||
workspaceID: space.id,
|
||||
error,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
function stopSync(id: WorkspaceID) {
|
||||
aborts.get(id)?.abort()
|
||||
aborts.delete(id)
|
||||
connections.delete(id)
|
||||
// TODO: Implement exponential backoff
|
||||
await sleep(1000)
|
||||
}
|
||||
}
|
||||
|
||||
async function startSync(space: Info) {
|
||||
if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) return
|
||||
|
||||
const adaptor = await getAdaptor(space.projectID, space.type)
|
||||
const target = await adaptor.target(space)
|
||||
|
||||
if (target.type === "local") {
|
||||
void Filesystem.exists(target.directory).then((exists) => {
|
||||
setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
if (aborts.has(space.id)) return true
|
||||
|
||||
setStatus(space.id, "disconnected")
|
||||
|
||||
const abort = new AbortController()
|
||||
aborts.set(space.id, abort)
|
||||
|
||||
void syncWorkspace(space, abort.signal).catch((error) => {
|
||||
aborts.delete(space.id)
|
||||
|
||||
setStatus(space.id, "error", String(error))
|
||||
log.warn("workspace listener failed", {
|
||||
workspaceID: space.id,
|
||||
error,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
function stopSync(id: WorkspaceID) {
|
||||
aborts.get(id)?.abort()
|
||||
aborts.delete(id)
|
||||
connections.delete(id)
|
||||
}
|
||||
|
||||
export * as Workspace from "./workspace"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue