diff --git a/packages/opencode/src/id/id.ts b/packages/opencode/src/id/id.ts index d86b99250d..91783ceae2 100644 --- a/packages/opencode/src/id/id.ts +++ b/packages/opencode/src/id/id.ts @@ -3,6 +3,7 @@ import { randomBytes } from "crypto" export namespace Identifier { const prefixes = { + job: "job", event: "evt", session: "ses", message: "msg", diff --git a/packages/opencode/src/shell-job/index.ts b/packages/opencode/src/shell-job/index.ts new file mode 100644 index 0000000000..c722367915 --- /dev/null +++ b/packages/opencode/src/shell-job/index.ts @@ -0,0 +1,301 @@ +import path from "path" +import * as NodeFS from "fs/promises" +import { InstanceState } from "@/effect/instance-state" +import { AppFileSystem } from "@/filesystem" +import { Shell } from "@/shell/shell" +import { Effect, Layer, Scope, Deferred, Stream, Context, Exit, Schema, Struct } from "effect" +import { ChildProcess } from "effect/unstable/process" +import { ChildProcessSpawner } from "effect/unstable/process/ChildProcessSpawner" +import type { ChildProcessHandle } from "effect/unstable/process/ChildProcessSpawner" + +import { JobID } from "./schema" + +const PS = new Set(["powershell", "pwsh"]) + +export namespace ShellJob { + export const Status = Schema.Literals(["running", "completed", "failed", "killed", "timed_out"]) + export type Status = Schema.Schema.Type + + export class Info extends Schema.Class("ShellJob.Info")({ + id: JobID, + command: Schema.String, + cwd: Schema.String, + shell: Schema.String, + title: Schema.optional(Schema.String), + status: Status, + pid: Schema.optional(Schema.Number), + started_at: Schema.Number, + ended_at: Schema.optional(Schema.Number), + exit_code: Schema.optional(Schema.NullOr(Schema.Number)), + output_path: Schema.String, + meta_path: Schema.String, + cursor: Schema.Number, + }) {} + + export class Output extends Schema.Class("ShellJob.Output")({ + text: Schema.String, + cursor: Schema.Number, + done: Schema.Boolean, + }) {} + + export class StartInput extends Schema.Class("ShellJob.StartInput")({ + command: Schema.String, + cwd: Schema.optional(Schema.String), + shell: Schema.optional(Schema.String), + title: Schema.optional(Schema.String), + timeout: Schema.optional(Schema.Number), + env: Schema.optional(Schema.Record(Schema.String, Schema.String)), + }) {} + + export class WaitInput extends Schema.Class("ShellJob.WaitInput")({ + id: JobID, + timeout: Schema.optional(Schema.Number), + }) {} + + export class OutputInput extends Schema.Class("ShellJob.OutputInput")({ + id: JobID, + cursor: Schema.optional(Schema.Number), + }) {} + + type Active = { + info: Struct.Mutable + next: Status | undefined + done: Deferred.Deferred + handle: ChildProcessHandle | undefined + } + + type State = { + dir: string + root: string + jobs: Map + scope: Scope.Scope + } + + export interface Interface { + readonly list: () => Effect.Effect + readonly get: (id: JobID) => Effect.Effect + readonly start: (input: StartInput) => Effect.Effect + readonly output: (input: OutputInput) => Effect.Effect + readonly wait: (input: WaitInput) => Effect.Effect + readonly kill: (id: JobID) => Effect.Effect + } + + export class Service extends Context.Service()("@opencode/ShellJob") {} + + function spawn(shell: string, name: string, command: string, cwd: string, env: NodeJS.ProcessEnv) { + if (process.platform === "win32" && PS.has(name)) { + return ChildProcess.make(shell, ["-NoLogo", "-NoProfile", "-NonInteractive", "-Command", command], { + cwd, + env, + stdin: "ignore", + detached: false, + }) + } + + return ChildProcess.make(command, [], { + shell, + cwd, + env, + stdin: "ignore", + detached: process.platform !== "win32", + }) + } + + const snap = (job: Active) => + new Info({ + ...job.info, + id: String(job.info.id), + }) + + export const layer: Layer.Layer = Layer.effect( + Service, + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const spawner = yield* ChildProcessSpawner + + const append = Effect.fn("ShellJob.append")(function* (job: Active, chunk: string) { + yield* Effect.tryPromise({ + try: () => NodeFS.appendFile(job.info.output_path, chunk, "utf8"), + catch: () => new Error("Failed to append shell job output"), + }).pipe(Effect.orDie) + }) + + const write = Effect.fn("ShellJob.write")(function* (job: Active) { + yield* fs.writeJson(job.info.meta_path, job.info).pipe(Effect.orDie) + }) + + const end = Effect.fn("ShellJob.end")(function* (job: Active, status: Status, code?: number | null) { + if (job.info.status !== "running") return snap(job) + job.info.status = status + job.info.ended_at = Date.now() + job.info.exit_code = code + job.handle = undefined + job.next = undefined + yield* write(job) + const info = snap(job) + yield* Deferred.succeed(job.done, info).pipe(Effect.ignore) + return info + }) + + const watch = Effect.fn("ShellJob.watch")(function* (job: Active, timeout?: number) { + const handle = job.handle + if (!handle) return snap(job) + + if (timeout) { + yield* Effect.sleep(`${timeout} millis`).pipe( + Effect.andThen( + Effect.gen(function* () { + if (job.info.status !== "running") return + job.next = "timed_out" + yield* handle.kill({ forceKillAfter: "3 seconds" }).pipe(Effect.ignore) + }), + ), + Effect.forkScoped, + ) + } + + yield* Effect.forkScoped( + Stream.runForEach(Stream.decodeText(handle.all), (chunk) => + Effect.gen(function* () { + job.info.cursor += chunk.length + yield* append(job, chunk) + }), + ), + ) + + const exit = yield* Effect.exit(handle.exitCode) + if (Exit.isSuccess(exit)) { + const code = Number(exit.value) + return yield* end(job, code === 0 ? "completed" : "failed", code) + } + + return yield* end(job, job.next ?? "killed", null) + }) + + const state = yield* InstanceState.make( + Effect.fn("ShellJob.state")(function* (ctx) { + const dir = ctx.project.vcs ? ctx.worktree : ctx.directory + const root = path.join(dir, ".opencode", "jobs") + const state: State = { + dir: ctx.directory, + root, + jobs: new Map(), + scope: yield* Scope.Scope, + } + + yield* fs.ensureDir(root).pipe(Effect.orDie) + yield* Effect.addFinalizer(() => + Effect.gen(function* () { + state.jobs.clear() + }), + ) + + return state + }), + ) + + const list: Interface["list"] = Effect.fn("ShellJob.list")(function* () { + const s = yield* InstanceState.get(state) + return Array.from(s.jobs.values()) + .map(snap) + .toSorted((a, b) => a.started_at - b.started_at) + }) + + const get: Interface["get"] = Effect.fn("ShellJob.get")(function* (id: JobID) { + const s = yield* InstanceState.get(state) + const job = s.jobs.get(id) + if (!job) return + return snap(job) + }) + + const start: Interface["start"] = Effect.fn("ShellJob.start")(function* (input: StartInput) { + const s = yield* InstanceState.get(state) + const id = JobID.ascending() + const dir = path.join(s.root, String(id)) + const cwd = input.cwd ?? s.dir + const shell = input.shell ?? Shell.acceptable() + const name = Shell.name(shell) + const handle = yield* Scope.provide(s.scope)( + spawner.spawn( + spawn(shell, name, input.command, cwd, { + ...process.env, + ...input.env, + }), + ), + ).pipe(Effect.orDie) + + const job: Active = { + info: { + id, + command: input.command, + cwd, + shell, + title: input.title, + status: "running", + pid: Number(handle.pid), + started_at: Date.now(), + output_path: path.join(dir, "output.log"), + meta_path: path.join(dir, "meta.json"), + cursor: 0, + } satisfies Struct.Mutable, + next: undefined, + done: yield* Deferred.make(), + handle, + } + + s.jobs.set(id, job) + yield* fs.writeWithDirs(job.info.output_path, "").pipe(Effect.orDie) + yield* write(job) + yield* Effect.sync(() => { + Effect.runFork(Scope.provide(s.scope)(watch(job, input.timeout))) + }) + return snap(job) + }) + + const output: Interface["output"] = Effect.fn("ShellJob.output")(function* (input: OutputInput) { + const s = yield* InstanceState.get(state) + const job = s.jobs.get(input.id) + if (!job) return + const cursor = input.cursor ?? 0 + const text = yield* fs.readFileString(job.info.output_path).pipe(Effect.catch(() => Effect.succeed(""))) + return new Output({ + text: cursor >= text.length ? "" : text.slice(cursor), + cursor: text.length, + done: job.info.status !== "running", + }) + }) + + const wait: Interface["wait"] = Effect.fn("ShellJob.wait")(function* (input: WaitInput) { + const s = yield* InstanceState.get(state) + const job = s.jobs.get(input.id) + if (!job) return + if (job.info.status !== "running") return snap(job) + if (!input.timeout) return yield* Deferred.await(job.done) + return yield* Effect.raceAll([ + Deferred.await(job.done), + Effect.sleep(`${input.timeout} millis`).pipe(Effect.as(snap(job))), + ]) + }) + + const kill: Interface["kill"] = Effect.fn("ShellJob.kill")(function* (id: JobID) { + const s = yield* InstanceState.get(state) + const job = s.jobs.get(id) + if (!job) return + if (job.info.status !== "running") return snap(job) + if (!job.handle) return snap(job) + if (!job.next) job.next = "killed" + yield* job.handle.kill({ forceKillAfter: "3 seconds" }).pipe(Effect.ignore) + return yield* Deferred.await(job.done) + }) + + return Service.of({ + list, + get, + start, + output, + wait, + kill, + }) + }), + ) +} diff --git a/packages/opencode/src/shell-job/schema.ts b/packages/opencode/src/shell-job/schema.ts new file mode 100644 index 0000000000..b359547cb9 --- /dev/null +++ b/packages/opencode/src/shell-job/schema.ts @@ -0,0 +1,10 @@ +import { Schema } from "effect" + +import { Identifier } from "@/id/id" +import { Newtype } from "@/util/schema" + +export class JobID extends Newtype()("JobID", Schema.String) { + static ascending(id?: string): JobID { + return this.make(Identifier.ascending("job", id)) + } +} diff --git a/packages/opencode/test/shell-job/shell-job.test.ts b/packages/opencode/test/shell-job/shell-job.test.ts new file mode 100644 index 0000000000..002eed9080 --- /dev/null +++ b/packages/opencode/test/shell-job/shell-job.test.ts @@ -0,0 +1,143 @@ +import { describe, expect } from "bun:test" +import { Effect, Layer } from "effect" + +import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner" +import { AppFileSystem } from "../../src/filesystem" +import { Instance } from "../../src/project/instance" +import { ShellJob } from "../../src/shell-job" +import { provideTmpdirInstance } from "../fixture/fixture" +import { testEffect } from "../lib/effect" + +const it = testEffect( + Layer.mergeAll( + CrossSpawnSpawner.defaultLayer, + ShellJob.layer.pipe(Layer.provide(CrossSpawnSpawner.defaultLayer), Layer.provide(AppFileSystem.defaultLayer)), + ), +) + +const node = (script: string) => `${JSON.stringify(process.execPath)} -e ${JSON.stringify(script)}` + +const alive = (pid: number) => { + try { + process.kill(pid, 0) + return true + } catch { + return false + } +} + +describe("shell-job", () => { + it.live("captures output and persists spool files", () => + provideTmpdirInstance((dir) => + Effect.gen(function* () { + const jobs = yield* ShellJob.Service + const job = yield* jobs.start({ + command: node('process.stdout.write("ok")'), + cwd: dir, + title: "ok", + }) + const done = yield* jobs.wait({ id: job.id }) + const out = yield* jobs.output({ id: job.id }) + + expect(done).toBeDefined() + expect(done?.status).toBe("completed") + expect(done?.pid).toBeGreaterThan(0) + expect(out).toEqual({ text: "ok", cursor: 2, done: true }) + + const log = yield* Effect.promise(() => Bun.file(done!.output_path).text()) + const meta = yield* Effect.promise(() => Bun.file(done!.meta_path).json()) + expect(log).toBe("ok") + expect(meta).toMatchObject({ + id: done!.id, + status: "completed", + title: "ok", + cursor: 2, + }) + }), + ), + ) + + it.live("reads output incrementally with a cursor", () => + provideTmpdirInstance((dir) => + Effect.gen(function* () { + const jobs = yield* ShellJob.Service + const job = yield* jobs.start({ + command: node( + 'process.stdout.write("a"); setTimeout(() => process.stdout.write("b"), 200); setTimeout(() => process.exit(0), 350)', + ), + cwd: dir, + }) + + yield* Effect.sleep("100 millis") + const a = yield* jobs.output({ id: job.id }) + const done = yield* jobs.wait({ id: job.id }) + const b = yield* jobs.output({ id: job.id, cursor: a?.cursor ?? 0 }) + + expect(a).toEqual({ text: "a", cursor: 1, done: false }) + expect(done?.status).toBe("completed") + expect(b).toEqual({ text: "b", cursor: 2, done: true }) + }), + ), + ) + + it.live("marks non-zero exits as failed", () => + provideTmpdirInstance((dir) => + Effect.gen(function* () { + const jobs = yield* ShellJob.Service + const job = yield* jobs.start({ + command: node('process.stderr.write("bad"); process.exit(7)'), + cwd: dir, + }) + const done = yield* jobs.wait({ id: job.id }) + const out = yield* jobs.output({ id: job.id }) + + expect(done).toBeDefined() + expect(done?.status).toBe("failed") + expect(done?.exit_code).toBe(7) + expect(out?.text).toBe("bad") + expect(out?.done).toBe(true) + }), + ), + ) + + it.live("kills a running job and returns final state", () => + provideTmpdirInstance((dir) => + Effect.gen(function* () { + const jobs = yield* ShellJob.Service + const job = yield* jobs.start({ + command: node("setInterval(() => {}, 1000)"), + cwd: dir, + }) + + yield* Effect.sleep("50 millis") + const done = yield* jobs.kill(job.id) + + expect(done).toBeDefined() + expect(done?.status).toBe("killed") + expect(done?.exit_code).toBeNull() + }), + ), + ) + + it.live("kills running jobs when the instance is disposed", () => { + if (process.platform === "win32") return Effect.void + + return provideTmpdirInstance((dir) => + Effect.gen(function* () { + const jobs = yield* ShellJob.Service + const job = yield* jobs.start({ + command: node("setInterval(() => {}, 1000)"), + cwd: dir, + }) + + expect(job.pid).toBeGreaterThan(0) + yield* Effect.sleep("50 millis") + expect(alive(job.pid!)).toBe(true) + + yield* Effect.promise(() => Instance.dispose()) + yield* Effect.sleep("100 millis") + expect(alive(job.pid!)).toBe(false) + }), + ) + }) +})