refactor(cli): convert import command to effectCmd (#25467)

This commit is contained in:
Kit Langton 2026-05-02 17:56:32 -04:00 committed by GitHub
parent 0c816eb4b1
commit 79b6ce5db4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,17 +1,15 @@
import type { Argv } from "yargs"
import type { Session as SDKSession, Message, Part } from "@opencode-ai/sdk/v2"
import { Session } from "@/session/session"
import { MessageV2 } from "../../session/message-v2"
import { cmd } from "./cmd"
import { bootstrap } from "../bootstrap"
import { CliError, effectCmd } from "../effect-cmd"
import { Database } from "@/storage/db"
import { SessionTable, MessageTable, PartTable } from "../../session/session.sql"
import { Instance } from "../../project/instance"
import { InstanceRef } from "@/effect/instance-ref"
import { InstanceStore } from "@/project/instance-store"
import { ShareNext } from "@/share/share-next"
import { EOL } from "os"
import { Filesystem } from "@/util/filesystem"
import { AppRuntime } from "@/effect/app-runtime"
import { Schema } from "effect"
import { Effect, Schema } from "effect"
const decodeMessageInfo = Schema.decodeUnknownSync(MessageV2.Info)
const decodePart = Schema.decodeUnknownSync(MessageV2.Part)
@ -78,135 +76,147 @@ export function transformShareData(shareData: ShareData[]): {
}
}
export const ImportCommand = cmd({
type ExportData = { info: SDKSession; messages: Array<{ info: Message; parts: Part[] }> }
export const ImportCommand = effectCmd({
command: "import <file>",
describe: "import session data from JSON file or URL",
builder: (yargs: Argv) => {
return yargs.positional("file", {
builder: (yargs) =>
yargs.positional("file", {
describe: "path to JSON file or share URL",
type: "string",
demandOption: true,
}),
handler: Effect.fn("Cli.import")(function* (args) {
// effectCmd always provides InstanceRef via InstanceStore.Service.provide; this is an invariant.
const ctx = yield* InstanceRef
if (!ctx) return yield* Effect.die("InstanceRef not provided")
const store = yield* InstanceStore.Service
// Ensure store.dispose runs disposers and emits server.instance.disposed
// on every exit path: success, early return, typed failure, defect, interrupt.
return yield* runImport(args.file, ctx.project.id).pipe(Effect.ensuring(store.dispose(ctx)))
}),
})
const runImport = Effect.fn("Cli.import.body")(function* (file: string, projectID: string) {
const share = yield* ShareNext.Service
let exportData: ExportData | undefined
const isUrl = file.startsWith("http://") || file.startsWith("https://")
if (isUrl) {
const slug = parseShareUrl(file)
if (!slug) {
const baseUrl = yield* Effect.orDie(share.url())
process.stdout.write(`Invalid URL format. Expected: ${baseUrl}/share/<slug>`)
process.stdout.write(EOL)
return
}
const baseUrl = new URL(file).origin
const req = yield* Effect.orDie(share.request())
const headers = shouldAttachShareAuthHeaders(file, req.baseUrl) ? req.headers : {}
const tryFetch = (url: string) =>
Effect.tryPromise({
try: () => fetch(url, { headers }),
catch: (e) =>
new CliError({
message: `Failed to fetch share data: ${e instanceof Error ? e.message : String(e)}`,
}),
})
const dataPath = req.api.data(slug)
let response = yield* tryFetch(`${baseUrl}${dataPath}`)
if (!response.ok && dataPath !== `/api/share/${slug}/data`) {
response = yield* tryFetch(`${baseUrl}/api/share/${slug}/data`)
}
if (!response.ok) {
process.stdout.write(`Failed to fetch share data: ${response.statusText}`)
process.stdout.write(EOL)
return
}
const shareData = yield* Effect.tryPromise({
try: () => response.json() as Promise<ShareData[]>,
catch: () => new CliError({ message: "Share data was not valid JSON" }),
})
},
handler: async (args) => {
await bootstrap(process.cwd(), async () => {
let exportData:
| {
info: SDKSession
messages: Array<{
info: Message
parts: Part[]
}>
}
| undefined
const transformed = transformShareData(shareData)
const isUrl = args.file.startsWith("http://") || args.file.startsWith("https://")
if (!transformed) {
process.stdout.write(`Share not found or empty: ${slug}`)
process.stdout.write(EOL)
return
}
if (isUrl) {
const slug = parseShareUrl(args.file)
if (!slug) {
const baseUrl = await AppRuntime.runPromise(ShareNext.Service.use((svc) => svc.url()))
process.stdout.write(`Invalid URL format. Expected: ${baseUrl}/share/<slug>`)
process.stdout.write(EOL)
return
}
exportData = transformed
} else {
exportData = yield* Effect.promise(() =>
Filesystem.readJson<NonNullable<typeof exportData>>(file).catch(() => undefined),
)
if (!exportData) {
process.stdout.write(`File not found: ${file}`)
process.stdout.write(EOL)
return
}
}
const parsed = new URL(args.file)
const baseUrl = parsed.origin
const req = await AppRuntime.runPromise(ShareNext.Service.use((svc) => svc.request()))
const headers = shouldAttachShareAuthHeaders(args.file, req.baseUrl) ? req.headers : {}
if (!exportData) {
process.stdout.write(`Failed to read session data`)
process.stdout.write(EOL)
return
}
const dataPath = req.api.data(slug)
let response = await fetch(`${baseUrl}${dataPath}`, {
headers,
const info = Schema.decodeUnknownSync(Session.Info)({
...exportData.info,
projectID,
}) as Session.Info
const row = Session.toRow(info)
Database.use((db) =>
db
.insert(SessionTable)
.values(row)
.onConflictDoUpdate({ target: SessionTable.id, set: { project_id: row.project_id } })
.run(),
)
for (const msg of exportData.messages) {
const msgInfo = decodeMessageInfo(msg.info) as MessageV2.Info
const { id, sessionID: _, ...msgData } = msgInfo
Database.use((db) =>
db
.insert(MessageTable)
.values({
id,
session_id: row.id,
time_created: msgInfo.time?.created ?? Date.now(),
data: msgData,
})
.onConflictDoNothing()
.run(),
)
if (!response.ok && dataPath !== `/api/share/${slug}/data`) {
response = await fetch(`${baseUrl}/api/share/${slug}/data`, {
headers,
})
}
if (!response.ok) {
process.stdout.write(`Failed to fetch share data: ${response.statusText}`)
process.stdout.write(EOL)
return
}
const shareData: ShareData[] = await response.json()
const transformed = transformShareData(shareData)
if (!transformed) {
process.stdout.write(`Share not found or empty: ${slug}`)
process.stdout.write(EOL)
return
}
exportData = transformed
} else {
exportData = await Filesystem.readJson<NonNullable<typeof exportData>>(args.file).catch(() => undefined)
if (!exportData) {
process.stdout.write(`File not found: ${args.file}`)
process.stdout.write(EOL)
return
}
}
if (!exportData) {
process.stdout.write(`Failed to read session data`)
process.stdout.write(EOL)
return
}
const info = Schema.decodeUnknownSync(Session.Info)({
...exportData.info,
projectID: Instance.project.id,
}) as Session.Info
const row = Session.toRow(info)
for (const part of msg.parts) {
const partInfo = decodePart(part) as MessageV2.Part
const { id: partId, sessionID: _s, messageID, ...partData } = partInfo
Database.use((db) =>
db
.insert(SessionTable)
.values(row)
.onConflictDoUpdate({ target: SessionTable.id, set: { project_id: row.project_id } })
.insert(PartTable)
.values({
id: partId,
message_id: messageID,
session_id: row.id,
data: partData,
})
.onConflictDoNothing()
.run(),
)
}
}
for (const msg of exportData.messages) {
const msgInfo = decodeMessageInfo(msg.info) as MessageV2.Info
const { id, sessionID: _, ...msgData } = msgInfo
Database.use((db) =>
db
.insert(MessageTable)
.values({
id,
session_id: row.id,
time_created: msgInfo.time?.created ?? Date.now(),
data: msgData,
})
.onConflictDoNothing()
.run(),
)
for (const part of msg.parts) {
const partInfo = decodePart(part) as MessageV2.Part
const { id: partId, sessionID: _s, messageID, ...partData } = partInfo
Database.use((db) =>
db
.insert(PartTable)
.values({
id: partId,
message_id: messageID,
session_id: row.id,
data: partData,
})
.onConflictDoNothing()
.run(),
)
}
}
process.stdout.write(`Imported session: ${exportData.info.id}`)
process.stdout.write(EOL)
})
},
process.stdout.write(`Imported session: ${exportData.info.id}`)
process.stdout.write(EOL)
})