queue cleanup

This commit is contained in:
Simon Klee 2026-04-18 22:35:24 +02:00
parent 29260bd106
commit 8c623f3b23
No known key found for this signature in database
GPG key ID: B91696044D47BEA3
6 changed files with 240 additions and 131 deletions

View file

@ -105,7 +105,7 @@ export function printableBinding(binding: string, leader: string): string {
export function isExitCommand(input: string): boolean {
const text = input.trim().toLowerCase()
return text === "/exit" || text === "/quit"
return text === "/exit" || text === "/quit" || text === ":q"
}
export function promptInfo(event: {

View file

@ -17,6 +17,12 @@ type Trace = {
write(type: string, data?: unknown): void
}
type Deferred<T = void> = {
promise: Promise<T>
resolve: (value: T | PromiseLike<T>) => void
reject: (error?: unknown) => void
}
export type QueueInput = {
footer: FooterApi
initialInput?: string
@ -25,6 +31,23 @@ export type QueueInput = {
run: (prompt: RunPrompt, signal: AbortSignal) => Promise<void>
}
type State = {
queue: RunPrompt[]
ctrl?: AbortController
closed: boolean
}
function defer<T = void>(): Deferred<T> {
let resolve!: (value: T | PromiseLike<T>) => void
let reject!: (error?: unknown) => void
const promise = new Promise<T>((next, fail) => {
resolve = next
reject = fail
})
return { promise, resolve, reject }
}
// Runs the prompt queue until the footer closes.
//
// Subscribes to footer prompt events, queues them, and drains one at a
@ -32,123 +55,128 @@ export type QueueInput = {
// a turn is running, they queue up and execute in order. The footer shows
// the queue depth so the user knows how many are pending.
export async function runPromptQueue(input: QueueInput): Promise<void> {
const q: RunPrompt[] = []
let busy = false
let closed = input.footer.isClosed
let ctrl: AbortController | undefined
let stop: (() => void) | undefined
let err: unknown
let hasErr = false
let done: (() => void) | undefined
const wait = new Promise<void>((resolve) => {
done = resolve
})
const until = new Promise<void>((resolve) => {
stop = resolve
})
const fail = (error: unknown) => {
err = error
hasErr = true
done?.()
done = undefined
}
const finish = () => {
if (!closed || busy) {
return
}
done?.()
done = undefined
const stop = defer<{ type: "closed" }>()
const done = defer<void>()
const state: State = {
queue: [],
closed: input.footer.isClosed,
}
let draining: Promise<void> | undefined
const emit = (next: FooterEvent, row: Record<string, unknown>) => {
input.trace?.write("ui.patch", row)
input.footer.event(next)
}
const pump = async () => {
if (busy || closed) {
const finish = () => {
if (!state.closed || draining) {
return
}
busy = true
try {
while (!closed && q.length > 0) {
const prompt = q.shift()
if (!prompt) {
continue
}
emit(
{
type: "turn.send",
queue: q.length,
},
{
phase: "running",
status: "sending prompt",
queue: q.length,
},
)
const start = Date.now()
const next = new AbortController()
ctrl = next
try {
const task = input.run(prompt, next.signal).then(
() => ({ type: "done" as const }),
(error) => ({ type: "error" as const, error }),
)
await input.footer.idle()
const commit = { kind: "user", text: prompt.text, phase: "start", source: "system" } as const
input.trace?.write("ui.commit", commit)
input.footer.append(commit)
const out = await Promise.race([task, until.then(() => ({ type: "closed" as const }))])
if (out.type === "closed") {
next.abort()
break
}
if (out.type === "error") {
throw out.error
}
} finally {
if (ctrl === next) {
ctrl = undefined
}
const duration = Locale.duration(Math.max(0, Date.now() - start))
emit(
{
type: "turn.duration",
duration,
},
{
duration,
},
)
}
}
} finally {
busy = false
emit(
{
type: "turn.idle",
queue: q.length,
},
{
phase: "idle",
status: "",
queue: q.length,
},
)
finish()
}
done.resolve()
}
const push = (prompt: RunPrompt) => {
if (!prompt.text.trim() || closed) {
const close = () => {
if (state.closed) {
return
}
state.closed = true
state.queue.length = 0
state.ctrl?.abort()
stop.resolve({ type: "closed" })
finish()
}
const drain = () => {
if (draining || state.closed || state.queue.length === 0) {
return
}
draining = (async () => {
try {
while (!state.closed && state.queue.length > 0) {
const prompt = state.queue.shift()
if (!prompt) {
continue
}
emit(
{
type: "turn.send",
queue: state.queue.length,
},
{
phase: "running",
status: "sending prompt",
queue: state.queue.length,
},
)
const start = Date.now()
const ctrl = new AbortController()
state.ctrl = ctrl
try {
const task = input.run(prompt, ctrl.signal).then(
() => ({ type: "done" as const }),
(error) => ({ type: "error" as const, error }),
)
await input.footer.idle()
const commit = { kind: "user", text: prompt.text, phase: "start", source: "system" } as const
input.trace?.write("ui.commit", commit)
input.footer.append(commit)
const next = await Promise.race([task, stop.promise])
if (next.type === "closed") {
ctrl.abort()
break
}
if (next.type === "error") {
throw next.error
}
} finally {
if (state.ctrl === ctrl) {
state.ctrl = undefined
}
const duration = Locale.duration(Math.max(0, Date.now() - start))
emit(
{
type: "turn.duration",
duration,
},
{
duration,
},
)
}
}
} catch (error) {
done.reject(error)
return
} finally {
draining = undefined
emit(
{
type: "turn.idle",
queue: state.queue.length,
},
{
phase: "idle",
status: "",
queue: state.queue.length,
},
)
}
finish()
})()
}
const submit = (prompt: RunPrompt) => {
if (!prompt.text.trim() || state.closed) {
return
}
@ -158,14 +186,14 @@ export async function runPromptQueue(input: QueueInput): Promise<void> {
}
input.onPrompt?.()
q.push(prompt)
state.queue.push(prompt)
emit(
{
type: "queue",
queue: q.length,
queue: state.queue.length,
},
{
queue: q.length,
queue: state.queue.length,
},
)
emit(
@ -177,37 +205,31 @@ export async function runPromptQueue(input: QueueInput): Promise<void> {
first: false,
},
)
void pump().catch(fail)
drain()
}
const offPrompt = input.footer.onPrompt((prompt) => {
push(prompt)
submit(prompt)
})
const offClose = input.footer.onClose(() => {
closed = true
q.length = 0
ctrl?.abort()
stop?.()
finish()
close()
})
try {
if (closed) {
if (state.closed) {
return
}
push({ text: input.initialInput ?? "", parts: [] })
await pump()
if (!closed) {
await wait
}
if (hasErr) {
throw err
}
submit({
text: input.initialInput ?? "",
parts: [],
})
finish()
await done.promise
} finally {
offPrompt()
offClose()
close()
await draining?.catch(() => {})
}
}

View file

@ -232,7 +232,7 @@ function build(input: SplashWriterInput, kind: "entry" | "exit", ctx: Scrollback
}
if (kind === "entry") {
push(lines, 0, y, "Type /exit or /quit to finish.", input.theme.system.body, undefined, undefined)
push(lines, 0, y, "Type /exit to finish.", input.theme.system.body, undefined, undefined)
y += 1
}

View file

@ -152,7 +152,7 @@ function active(event: Event, sessionID: string): boolean {
// Races the turn's deferred completion against an abort signal.
function waitTurn(done: Wait["done"], signal: AbortSignal) {
return Effect.raceAll([
Deferred.await(done).pipe(Effect.as("idle" as const)),
Deferred.await(done).pipe(Effect.as("idle" as const), Effect.exit),
Effect.callback<"abort">((resume) => {
if (signal.aborted) {
resume(Effect.succeed("abort"))
@ -166,8 +166,10 @@ function waitTurn(done: Wait["done"], signal: AbortSignal) {
signal.addEventListener("abort", onAbort, { once: true })
return Effect.sync(() => signal.removeEventListener("abort", onAbort))
}),
])
}).pipe(Effect.exit),
]).pipe(
Effect.flatMap((exit) => (Exit.isFailure(exit) ? Effect.failCause(exit.cause) : Effect.succeed(exit.value))),
)
}
export function formatUnknownError(error: unknown): string {

View file

@ -161,6 +161,39 @@ describe("run runtime queue", () => {
expect(seen).toEqual(["one", "two"])
})
test("drains a prompt queued during an in-flight turn", async () => {
const ui = footer()
const seen: string[] = []
let wake: (() => void) | undefined
const gate = new Promise<void>((resolve) => {
wake = resolve
})
const task = runPromptQueue({
footer: ui.api,
run: async (input) => {
seen.push(input.text)
if (seen.length === 1) {
await gate
return
}
ui.api.close()
},
})
ui.submit("one")
await Promise.resolve()
expect(seen).toEqual(["one"])
wake?.()
await Promise.resolve()
ui.submit("two")
await task
expect(seen).toEqual(["one", "two"])
})
test("close aborts the active run and drops pending queued work", async () => {
const ui = footer()
const seen: string[] = []

View file

@ -668,6 +668,58 @@ describe("run stream transport", () => {
}
})
test("rejects the active turn when the event stream faults", async () => {
const ui = footer()
const ready = defer()
const transport = await createSessionTransport({
sdk: {
event: {
subscribe: async () => ({
stream: (async function* () {
await ready.promise
yield busy()
throw new Error("boom")
})(),
}),
},
session: {
promptAsync: async () => {
ready.resolve()
},
status: async () => ({ data: { "session-1": { type: "busy" } } }),
messages: async () => ({ data: [] }),
children: async () => ({ data: [] }),
},
permission: {
list: async () => ({ data: [] }),
},
question: {
list: async () => ({ data: [] }),
},
} as unknown as OpencodeClient,
sessionID: "session-1",
thinking: true,
limits: () => ({}),
footer: ui.api,
})
try {
await expect(
transport.runPromptTurn({
agent: undefined,
model: undefined,
variant: undefined,
prompt: { text: "hello", parts: [] },
files: [],
includeFiles: false,
}),
).rejects.toThrow("boom")
} finally {
await transport.close()
}
})
test("closes while the event stream is waiting for the next item", async () => {
const src = blockingFeed()
const ui = footer()