diff --git a/packages/opencode/src/cli/cmd/run/prompt.shared.ts b/packages/opencode/src/cli/cmd/run/prompt.shared.ts index 024fc61e8c..6a09f6133b 100644 --- a/packages/opencode/src/cli/cmd/run/prompt.shared.ts +++ b/packages/opencode/src/cli/cmd/run/prompt.shared.ts @@ -105,7 +105,7 @@ export function printableBinding(binding: string, leader: string): string { export function isExitCommand(input: string): boolean { const text = input.trim().toLowerCase() - return text === "/exit" || text === "/quit" + return text === "/exit" || text === "/quit" || text === ":q" } export function promptInfo(event: { diff --git a/packages/opencode/src/cli/cmd/run/runtime.queue.ts b/packages/opencode/src/cli/cmd/run/runtime.queue.ts index f0a3e7e637..22e7158f54 100644 --- a/packages/opencode/src/cli/cmd/run/runtime.queue.ts +++ b/packages/opencode/src/cli/cmd/run/runtime.queue.ts @@ -17,6 +17,12 @@ type Trace = { write(type: string, data?: unknown): void } +type Deferred = { + promise: Promise + resolve: (value: T | PromiseLike) => void + reject: (error?: unknown) => void +} + export type QueueInput = { footer: FooterApi initialInput?: string @@ -25,6 +31,23 @@ export type QueueInput = { run: (prompt: RunPrompt, signal: AbortSignal) => Promise } +type State = { + queue: RunPrompt[] + ctrl?: AbortController + closed: boolean +} + +function defer(): Deferred { + let resolve!: (value: T | PromiseLike) => void + let reject!: (error?: unknown) => void + const promise = new Promise((next, fail) => { + resolve = next + reject = fail + }) + + return { promise, resolve, reject } +} + // Runs the prompt queue until the footer closes. // // Subscribes to footer prompt events, queues them, and drains one at a @@ -32,123 +55,128 @@ export type QueueInput = { // a turn is running, they queue up and execute in order. The footer shows // the queue depth so the user knows how many are pending. export async function runPromptQueue(input: QueueInput): Promise { - const q: RunPrompt[] = [] - let busy = false - let closed = input.footer.isClosed - let ctrl: AbortController | undefined - let stop: (() => void) | undefined - let err: unknown - let hasErr = false - let done: (() => void) | undefined - const wait = new Promise((resolve) => { - done = resolve - }) - const until = new Promise((resolve) => { - stop = resolve - }) - - const fail = (error: unknown) => { - err = error - hasErr = true - done?.() - done = undefined - } - - const finish = () => { - if (!closed || busy) { - return - } - - done?.() - done = undefined + const stop = defer<{ type: "closed" }>() + const done = defer() + const state: State = { + queue: [], + closed: input.footer.isClosed, } + let draining: Promise | undefined const emit = (next: FooterEvent, row: Record) => { input.trace?.write("ui.patch", row) input.footer.event(next) } - const pump = async () => { - if (busy || closed) { + const finish = () => { + if (!state.closed || draining) { return } - busy = true - - try { - while (!closed && q.length > 0) { - const prompt = q.shift() - if (!prompt) { - continue - } - - emit( - { - type: "turn.send", - queue: q.length, - }, - { - phase: "running", - status: "sending prompt", - queue: q.length, - }, - ) - const start = Date.now() - const next = new AbortController() - ctrl = next - try { - const task = input.run(prompt, next.signal).then( - () => ({ type: "done" as const }), - (error) => ({ type: "error" as const, error }), - ) - await input.footer.idle() - const commit = { kind: "user", text: prompt.text, phase: "start", source: "system" } as const - input.trace?.write("ui.commit", commit) - input.footer.append(commit) - const out = await Promise.race([task, until.then(() => ({ type: "closed" as const }))]) - if (out.type === "closed") { - next.abort() - break - } - - if (out.type === "error") { - throw out.error - } - } finally { - if (ctrl === next) { - ctrl = undefined - } - const duration = Locale.duration(Math.max(0, Date.now() - start)) - emit( - { - type: "turn.duration", - duration, - }, - { - duration, - }, - ) - } - } - } finally { - busy = false - emit( - { - type: "turn.idle", - queue: q.length, - }, - { - phase: "idle", - status: "", - queue: q.length, - }, - ) - finish() - } + done.resolve() } - const push = (prompt: RunPrompt) => { - if (!prompt.text.trim() || closed) { + const close = () => { + if (state.closed) { + return + } + + state.closed = true + state.queue.length = 0 + state.ctrl?.abort() + stop.resolve({ type: "closed" }) + finish() + } + + const drain = () => { + if (draining || state.closed || state.queue.length === 0) { + return + } + + draining = (async () => { + try { + while (!state.closed && state.queue.length > 0) { + const prompt = state.queue.shift() + if (!prompt) { + continue + } + + emit( + { + type: "turn.send", + queue: state.queue.length, + }, + { + phase: "running", + status: "sending prompt", + queue: state.queue.length, + }, + ) + const start = Date.now() + const ctrl = new AbortController() + state.ctrl = ctrl + + try { + const task = input.run(prompt, ctrl.signal).then( + () => ({ type: "done" as const }), + (error) => ({ type: "error" as const, error }), + ) + + await input.footer.idle() + const commit = { kind: "user", text: prompt.text, phase: "start", source: "system" } as const + input.trace?.write("ui.commit", commit) + input.footer.append(commit) + + const next = await Promise.race([task, stop.promise]) + if (next.type === "closed") { + ctrl.abort() + break + } + + if (next.type === "error") { + throw next.error + } + } finally { + if (state.ctrl === ctrl) { + state.ctrl = undefined + } + + const duration = Locale.duration(Math.max(0, Date.now() - start)) + emit( + { + type: "turn.duration", + duration, + }, + { + duration, + }, + ) + } + } + } catch (error) { + done.reject(error) + return + } finally { + draining = undefined + emit( + { + type: "turn.idle", + queue: state.queue.length, + }, + { + phase: "idle", + status: "", + queue: state.queue.length, + }, + ) + } + + finish() + })() + } + + const submit = (prompt: RunPrompt) => { + if (!prompt.text.trim() || state.closed) { return } @@ -158,14 +186,14 @@ export async function runPromptQueue(input: QueueInput): Promise { } input.onPrompt?.() - q.push(prompt) + state.queue.push(prompt) emit( { type: "queue", - queue: q.length, + queue: state.queue.length, }, { - queue: q.length, + queue: state.queue.length, }, ) emit( @@ -177,37 +205,31 @@ export async function runPromptQueue(input: QueueInput): Promise { first: false, }, ) - void pump().catch(fail) + drain() } const offPrompt = input.footer.onPrompt((prompt) => { - push(prompt) + submit(prompt) }) const offClose = input.footer.onClose(() => { - closed = true - q.length = 0 - ctrl?.abort() - stop?.() - finish() + close() }) try { - if (closed) { + if (state.closed) { return } - push({ text: input.initialInput ?? "", parts: [] }) - await pump() - - if (!closed) { - await wait - } - - if (hasErr) { - throw err - } + submit({ + text: input.initialInput ?? "", + parts: [], + }) + finish() + await done.promise } finally { offPrompt() offClose() + close() + await draining?.catch(() => {}) } } diff --git a/packages/opencode/src/cli/cmd/run/splash.ts b/packages/opencode/src/cli/cmd/run/splash.ts index cc6f86c1e8..257ce75fcf 100644 --- a/packages/opencode/src/cli/cmd/run/splash.ts +++ b/packages/opencode/src/cli/cmd/run/splash.ts @@ -232,7 +232,7 @@ function build(input: SplashWriterInput, kind: "entry" | "exit", ctx: Scrollback } if (kind === "entry") { - push(lines, 0, y, "Type /exit or /quit to finish.", input.theme.system.body, undefined, undefined) + push(lines, 0, y, "Type /exit to finish.", input.theme.system.body, undefined, undefined) y += 1 } diff --git a/packages/opencode/src/cli/cmd/run/stream.transport.ts b/packages/opencode/src/cli/cmd/run/stream.transport.ts index 5f4fb54500..86f388a63a 100644 --- a/packages/opencode/src/cli/cmd/run/stream.transport.ts +++ b/packages/opencode/src/cli/cmd/run/stream.transport.ts @@ -152,7 +152,7 @@ function active(event: Event, sessionID: string): boolean { // Races the turn's deferred completion against an abort signal. function waitTurn(done: Wait["done"], signal: AbortSignal) { return Effect.raceAll([ - Deferred.await(done).pipe(Effect.as("idle" as const)), + Deferred.await(done).pipe(Effect.as("idle" as const), Effect.exit), Effect.callback<"abort">((resume) => { if (signal.aborted) { resume(Effect.succeed("abort")) @@ -166,8 +166,10 @@ function waitTurn(done: Wait["done"], signal: AbortSignal) { signal.addEventListener("abort", onAbort, { once: true }) return Effect.sync(() => signal.removeEventListener("abort", onAbort)) - }), - ]) + }).pipe(Effect.exit), + ]).pipe( + Effect.flatMap((exit) => (Exit.isFailure(exit) ? Effect.failCause(exit.cause) : Effect.succeed(exit.value))), + ) } export function formatUnknownError(error: unknown): string { diff --git a/packages/opencode/test/cli/run/runtime.queue.test.ts b/packages/opencode/test/cli/run/runtime.queue.test.ts index 48cb9630ae..746f7f6a33 100644 --- a/packages/opencode/test/cli/run/runtime.queue.test.ts +++ b/packages/opencode/test/cli/run/runtime.queue.test.ts @@ -161,6 +161,39 @@ describe("run runtime queue", () => { expect(seen).toEqual(["one", "two"]) }) + test("drains a prompt queued during an in-flight turn", async () => { + const ui = footer() + const seen: string[] = [] + let wake: (() => void) | undefined + const gate = new Promise((resolve) => { + wake = resolve + }) + + const task = runPromptQueue({ + footer: ui.api, + run: async (input) => { + seen.push(input.text) + if (seen.length === 1) { + await gate + return + } + + ui.api.close() + }, + }) + + ui.submit("one") + await Promise.resolve() + expect(seen).toEqual(["one"]) + + wake?.() + await Promise.resolve() + ui.submit("two") + await task + + expect(seen).toEqual(["one", "two"]) + }) + test("close aborts the active run and drops pending queued work", async () => { const ui = footer() const seen: string[] = [] diff --git a/packages/opencode/test/cli/run/stream.transport.test.ts b/packages/opencode/test/cli/run/stream.transport.test.ts index 43a72a4a09..3121d448e4 100644 --- a/packages/opencode/test/cli/run/stream.transport.test.ts +++ b/packages/opencode/test/cli/run/stream.transport.test.ts @@ -668,6 +668,58 @@ describe("run stream transport", () => { } }) + test("rejects the active turn when the event stream faults", async () => { + const ui = footer() + const ready = defer() + + const transport = await createSessionTransport({ + sdk: { + event: { + subscribe: async () => ({ + stream: (async function* () { + await ready.promise + yield busy() + throw new Error("boom") + })(), + }), + }, + session: { + promptAsync: async () => { + ready.resolve() + }, + status: async () => ({ data: { "session-1": { type: "busy" } } }), + messages: async () => ({ data: [] }), + children: async () => ({ data: [] }), + }, + permission: { + list: async () => ({ data: [] }), + }, + question: { + list: async () => ({ data: [] }), + }, + } as unknown as OpencodeClient, + sessionID: "session-1", + thinking: true, + limits: () => ({}), + footer: ui.api, + }) + + try { + await expect( + transport.runPromptTurn({ + agent: undefined, + model: undefined, + variant: undefined, + prompt: { text: "hello", parts: [] }, + files: [], + includeFiles: false, + }), + ).rejects.toThrow("boom") + } finally { + await transport.close() + } + }) + test("closes while the event stream is waiting for the next item", async () => { const src = blockingFeed() const ui = footer()