diff --git a/packages/opencode/src/cli/cmd/run/footer.subagent.tsx b/packages/opencode/src/cli/cmd/run/footer.subagent.tsx
index 93344b66ee..1c11908678 100644
--- a/packages/opencode/src/cli/cmd/run/footer.subagent.tsx
+++ b/packages/opencode/src/cli/cmd/run/footer.subagent.tsx
@@ -2,7 +2,7 @@
import type { ScrollBoxRenderable } from "@opentui/core"
import { useKeyboard } from "@opentui/solid"
import "opentui-spinner/solid"
-import { For, createMemo } from "solid-js"
+import { createMemo, mapArray } from "solid-js"
import { SPINNER_FRAMES } from "../tui/component/spinner"
import { RunEntryContent, sameEntryGroup } from "./scrollback.writer"
import type { FooterSubagentDetail, FooterSubagentTab, RunDiffStyle } from "./types"
@@ -35,21 +35,21 @@ function statusIcon(status: FooterSubagentTab["status"]) {
return "◔"
}
-function tabText(input: { tab: FooterSubagentTab; slot: string; count: number; width: number }) {
+function tabText(tab: FooterSubagentTab, slot: string, count: number, width: number) {
const perTab = Math.max(
1,
- Math.floor((input.width - 4 - Math.max(0, input.count - 1) * 3) / Math.max(1, input.count)),
+ Math.floor((width - 4 - Math.max(0, count - 1) * 3) / Math.max(1, count)),
)
- if (input.count >= 8 || perTab < 12) {
- return `[${input.slot}]`
+ if (count >= 8 || perTab < 12) {
+ return `[${slot}]`
}
- const label = `[${input.slot}] ${input.tab.label}`
- if (input.count >= 5 || perTab < 24) {
+ const label = `[${slot}] ${tab.label}`
+ if (count >= 5 || perTab < 24) {
return label
}
- const detail = input.tab.description || input.tab.title
+ const detail = tab.description || tab.title
if (!detail) {
return label
}
@@ -63,6 +63,32 @@ export function RunFooterSubagentTabs(props: {
theme: RunFooterTheme
width: number
}) {
+ const items = mapArray(
+ () => props.tabs,
+ (tab, index) => {
+ const active = () => props.selected === tab.sessionID
+ const slot = () => String(index() + 1)
+ return (
+
+
+ {tab.status === "running" ? (
+
+
+
+ ) : (
+
+ {statusIcon(tab.status)}
+
+ )}
+
+ {tabText(tab, slot(), props.tabs.length, props.width)}
+
+
+
+ )
+ },
+ )
+
return (
)
}
@@ -116,13 +114,22 @@ export function RunFooterSubagentBody(props: {
onCycle: (dir: -1 | 1) => void
onClose: () => void
}) {
+ const theme = createMemo(() => props.theme())
+ const footer = createMemo(() => theme().footer)
const commits = createMemo(() => props.detail()?.commits ?? [])
- const entries = createMemo(() => {
- return commits().map((commit, index, list) => ({
- commit,
- gap: index > 0 && !sameEntryGroup(list[index - 1], commit),
- }))
- })
+ const opts = createMemo(() => ({ diffStyle: props.diffStyle }))
+ const scrollbar = createMemo(() => ({
+ trackOptions: {
+ backgroundColor: footer().surface,
+ foregroundColor: footer().line,
+ },
+ }))
+ const rows = mapArray(commits, (commit, index) => (
+
+ {index() > 0 && !sameEntryGroup(commits()[index() - 1], commit) ? : null}
+
+
+ ))
let scroll: ScrollBoxRenderable | undefined
useKeyboard((event) => {
@@ -160,7 +167,7 @@ export function RunFooterSubagentBody(props: {
width="100%"
height="100%"
flexDirection="column"
- backgroundColor={props.theme().footer.surface}
+ backgroundColor={footer().surface}
>
{
scroll = item as ScrollBoxRenderable
}}
>
-
- {(item) => (
-
- {item.gap ? : null}
-
-
- )}
-
- {entries().length === 0 ? (
-
+ {commits().length > 0 ? (
+ rows()
+ ) : (
+
No subagent activity yet
- ) : null}
+ )}
diff --git a/packages/opencode/src/cli/cmd/run/footer.ts b/packages/opencode/src/cli/cmd/run/footer.ts
index c686aaa1c5..3bb3b92cf2 100644
--- a/packages/opencode/src/cli/cmd/run/footer.ts
+++ b/packages/opencode/src/cli/cmd/run/footer.ts
@@ -26,6 +26,7 @@
import { CliRenderEvents, type CliRenderer, type TreeSitterClient } from "@opentui/core"
import { render } from "@opentui/solid"
import { createComponent, createSignal, type Accessor, type Setter } from "solid-js"
+import { createStore, reconcile } from "solid-js/store"
import { SUBAGENT_INSPECTOR_ROWS, SUBAGENT_TAB_ROWS } from "./footer.subagent"
import { PROMPT_MAX_ROWS, TEXTAREA_MIN_ROWS } from "./footer.prompt"
import { printableBinding } from "./prompt.shared"
@@ -156,7 +157,7 @@ export class RunFooter implements FooterApi {
private view: Accessor
private setView: Setter
private subagent: Accessor
- private setSubagent: Setter
+ private setSubagent: (next: FooterSubagentState) => void
private promptRoute: FooterPromptRoute = { type: "composer" }
private tabsVisible = false
private interruptTimeout: NodeJS.Timeout | undefined
@@ -190,9 +191,14 @@ export class RunFooter implements FooterApi {
const [resources, setResources] = createSignal(options.resources)
this.resources = resources
this.setResources = setResources
- const [subagent, setSubagent] = createSignal(createEmptySubagentState())
- this.subagent = subagent
- this.setSubagent = setSubagent
+ const [subagent, setSubagent] = createStore(createEmptySubagentState())
+ this.subagent = () => subagent as FooterSubagentState
+ this.setSubagent = (next) => {
+ setSubagent("tabs", reconcile(next.tabs, { key: "sessionID" }))
+ setSubagent("details", reconcile(next.details))
+ setSubagent("permissions", reconcile(next.permissions, { key: "id" }))
+ setSubagent("questions", reconcile(next.questions, { key: "id" }))
+ }
this.base = Math.max(1, renderer.footerHeight - TEXTAREA_MIN_ROWS)
this.interruptHint = printableBinding(options.keybinds.interrupt, options.keybinds.leader) || "esc"
this.scrollback = new RunScrollbackStream(renderer, options.theme, {
diff --git a/packages/opencode/src/cli/cmd/run/stream.transport.ts b/packages/opencode/src/cli/cmd/run/stream.transport.ts
index 99bfa4cb2f..5f4fb54500 100644
--- a/packages/opencode/src/cli/cmd/run/stream.transport.ts
+++ b/packages/opencode/src/cli/cmd/run/stream.transport.ts
@@ -13,6 +13,8 @@
// We also re-check live session status before resolving an idle event so a
// delayed idle from an older turn cannot complete a newer busy turn.
import type { Event, OpencodeClient } from "@opencode-ai/sdk/v2"
+import { Context, Deferred, Effect, Exit, Layer, Scope, Stream } from "effect"
+import { makeRuntime } from "@/effect/run-service"
import {
blockerStatus,
bootstrapSessionData,
@@ -70,9 +72,7 @@ type Wait = {
tick: number
armed: boolean
live: boolean
- done: Promise
- resolve: () => void
- reject: (error: unknown) => void
+ done: Deferred.Deferred
}
export type SessionTurnInput = {
@@ -91,25 +91,26 @@ export type SessionTransport = {
close(): Promise
}
-// Creates a deferred promise tied to a specific turn tick.
-function defer(tick: number): Wait {
- let resolve: () => void = () => {}
- let reject: (error: unknown) => void = () => {}
- const done = new Promise((next, fail) => {
- resolve = next
- reject = fail
- })
-
- return {
- tick,
- armed: false,
- live: false,
- done,
- resolve,
- reject,
- }
+type State = {
+ data: SessionData
+ subagent: SubagentData
+ wait?: Wait
+ tick: number
+ fault?: unknown
+ footerView: FooterView
+ blockerTick: number
+ selectedSubagent?: string
+ blockers: Map
}
+type TransportService = {
+ readonly runPromptTurn: (input: SessionTurnInput) => Effect.Effect
+ readonly selectSubagent: (sessionID: string | undefined) => Effect.Effect
+ readonly close: () => Effect.Effect
+}
+
+class Service extends Context.Service()("@opencode/RunStreamTransport") {}
+
function sid(event: Event): string | undefined {
if (event.type === "message.updated") {
return event.properties.sessionID
@@ -148,31 +149,25 @@ function active(event: Event, sessionID: string): boolean {
return event.properties.status.type !== "idle"
}
-// Races the turn's deferred promise against an abort signal.
-function waitTurn(done: Promise, signal: AbortSignal): Promise<"idle" | "abort"> {
- return new Promise((resolve, reject) => {
- if (signal.aborted) {
- resolve("abort")
- return
- }
+// Races the turn's deferred completion against an abort signal.
+function waitTurn(done: Wait["done"], signal: AbortSignal) {
+ return Effect.raceAll([
+ Deferred.await(done).pipe(Effect.as("idle" as const)),
+ Effect.callback<"abort">((resume) => {
+ if (signal.aborted) {
+ resume(Effect.succeed("abort"))
+ return
+ }
- const onAbort = () => {
- signal.removeEventListener("abort", onAbort)
- resolve("abort")
- }
+ const onAbort = () => {
+ signal.removeEventListener("abort", onAbort)
+ resume(Effect.succeed("abort"))
+ }
- signal.addEventListener("abort", onAbort, { once: true })
- done.then(
- () => {
- signal.removeEventListener("abort", onAbort)
- resolve("idle")
- },
- (error) => {
- signal.removeEventListener("abort", onAbort)
- reject(error)
- },
- )
- })
+ signal.addEventListener("abort", onAbort, { once: true })
+ return Effect.sync(() => signal.removeEventListener("abort", onAbort))
+ }),
+ ])
}
export function formatUnknownError(error: unknown): string {
@@ -316,6 +311,530 @@ function traceTabs(trace: Trace | undefined, prev: FooterSubagentTab[], next: Fo
}
}
+function createLayer(input: StreamInput) {
+ return Layer.fresh(
+ Layer.effect(
+ Service,
+ Effect.gen(function* () {
+ const scope = yield* Scope.make()
+ const abort = yield* Scope.provide(scope)(
+ Effect.acquireRelease(
+ Effect.sync(() => new AbortController()),
+ (abort) => Effect.sync(() => abort.abort()),
+ ),
+ )
+ let closed = false
+ let closeStream = () => {}
+ const halt = () => {
+ abort.abort()
+ }
+ const stop = () => {
+ input.signal?.removeEventListener("abort", halt)
+ abort.abort()
+ closeStream()
+ }
+ const closeScope = () => {
+ if (closed) {
+ return Effect.void
+ }
+
+ closed = true
+ stop()
+ return Scope.close(scope, Exit.void)
+ }
+
+ input.signal?.addEventListener("abort", halt, { once: true })
+ yield* Effect.addFinalizer(() => closeScope())
+
+ const events = yield* Scope.provide(scope)(
+ Effect.acquireRelease(
+ Effect.promise(() =>
+ input.sdk.event.subscribe(undefined, {
+ signal: abort.signal,
+ }),
+ ),
+ (events) =>
+ Effect.sync(() => {
+ void events.stream.return(undefined).catch(() => {})
+ }),
+ ),
+ )
+ closeStream = () => {
+ void events.stream.return(undefined).catch(() => {})
+ }
+ input.trace?.write("recv.subscribe", {
+ sessionID: input.sessionID,
+ })
+
+ const state: State = {
+ data: createSessionData(),
+ subagent: createSubagentData(),
+ tick: 0,
+ footerView: { type: "prompt" },
+ blockerTick: 0,
+ blockers: new Map(),
+ }
+
+ const currentSubagentState = () => {
+ if (state.selectedSubagent && !state.subagent.tabs.has(state.selectedSubagent)) {
+ state.selectedSubagent = undefined
+ }
+
+ return snapshotSelectedSubagentData(state.subagent, state.selectedSubagent)
+ }
+
+ const seedBlocker = (id: string) => {
+ if (state.blockers.has(id)) {
+ return
+ }
+
+ state.blockerTick += 1
+ state.blockers.set(id, state.blockerTick)
+ }
+
+ const trackBlocker = (event: Event) => {
+ if (event.type !== "permission.asked" && event.type !== "question.asked") {
+ return
+ }
+
+ if (
+ event.properties.sessionID !== input.sessionID &&
+ !state.subagent.tabs.has(event.properties.sessionID)
+ ) {
+ return
+ }
+
+ seedBlocker(event.properties.id)
+ }
+
+ const releaseBlocker = (event: Event) => {
+ if (
+ event.type !== "permission.replied" &&
+ event.type !== "question.replied" &&
+ event.type !== "question.rejected"
+ ) {
+ return
+ }
+
+ state.blockers.delete(event.properties.requestID)
+ }
+
+ const syncFooter = (commits: StreamCommit[], patch?: FooterPatch, nextSubagent?: FooterSubagentState) => {
+ const current = pickView(state.data, state.subagent, state.blockers)
+ const footer = composeFooter({
+ patch,
+ subagent: nextSubagent,
+ current,
+ previous: state.footerView,
+ })
+
+ if (commits.length === 0 && !footer) {
+ state.footerView = current
+ return
+ }
+
+ input.trace?.write("reduce.output", {
+ commits,
+ footer: traceFooterOutput(footer),
+ })
+ writeSessionOutput(
+ {
+ footer: input.footer,
+ trace: input.trace,
+ },
+ {
+ commits,
+ footer,
+ },
+ )
+ state.footerView = current
+ }
+
+ const messages = (sessionID: string, limit: number) =>
+ Effect.promise(() =>
+ input.sdk.session.messages({
+ sessionID,
+ limit,
+ }),
+ ).pipe(
+ Effect.map((item) => item.data ?? []),
+ Effect.orElseSucceed(() => []),
+ )
+
+ const bootstrap = Effect.fn("RunStreamTransport.bootstrap")(function* () {
+ const [messagesList, children, permissions, questions] = yield* Effect.all(
+ [
+ messages(input.sessionID, SUBAGENT_BOOTSTRAP_LIMIT),
+ Effect.promise(() =>
+ input.sdk.session.children({
+ sessionID: input.sessionID,
+ }),
+ ).pipe(
+ Effect.map((item) => item.data ?? []),
+ Effect.orElseSucceed(() => []),
+ ),
+ Effect.promise(() => input.sdk.permission.list()).pipe(
+ Effect.map((item) => item.data ?? []),
+ Effect.orElseSucceed(() => []),
+ ),
+ Effect.promise(() => input.sdk.question.list()).pipe(
+ Effect.map((item) => item.data ?? []),
+ Effect.orElseSucceed(() => []),
+ ),
+ ],
+ {
+ concurrency: "unbounded",
+ },
+ )
+
+ bootstrapSessionData({
+ data: state.data,
+ messages: messagesList,
+ permissions: permissions.filter((item) => item.sessionID === input.sessionID),
+ questions: questions.filter((item) => item.sessionID === input.sessionID),
+ })
+ bootstrapSubagentData({
+ data: state.subagent,
+ messages: messagesList,
+ children,
+ permissions,
+ questions,
+ })
+
+ const sessions = [
+ ...new Set(
+ listSubagentPermissions(state.subagent)
+ .filter((item) => item.tool && item.metadata?.input === undefined)
+ .map((item) => item.sessionID),
+ ),
+ ]
+ yield* Effect.forEach(
+ sessions,
+ (sessionID) =>
+ messages(sessionID, SUBAGENT_CALL_BOOTSTRAP_LIMIT).pipe(
+ Effect.tap((messagesList) =>
+ Effect.sync(() => {
+ bootstrapSubagentCalls({
+ data: state.subagent,
+ sessionID,
+ messages: messagesList,
+ })
+ }),
+ ),
+ ),
+ {
+ concurrency: "unbounded",
+ discard: true,
+ },
+ )
+
+ for (const request of [
+ ...state.data.permissions,
+ ...listSubagentPermissions(state.subagent),
+ ...state.data.questions,
+ ...listSubagentQuestions(state.subagent),
+ ].sort((a, b) => a.id.localeCompare(b.id))) {
+ seedBlocker(request.id)
+ }
+
+ const snapshot = currentSubagentState()
+ traceTabs(input.trace, [], snapshot.tabs)
+ syncFooter([], undefined, snapshot)
+ })
+
+ const idle = Effect.fn("RunStreamTransport.idle")(() =>
+ Effect.promise(() => input.sdk.session.status()).pipe(
+ Effect.map((out) => {
+ const item = out.data?.[input.sessionID]
+ return !item || item.type === "idle"
+ }),
+ Effect.orElseSucceed(() => true),
+ ),
+ )
+
+ const fail = Effect.fn("RunStreamTransport.fail")(function* (error: unknown) {
+ if (state.fault) {
+ return
+ }
+
+ state.fault = error
+ const next = state.wait
+ state.wait = undefined
+ if (!next) {
+ return
+ }
+
+ yield* Deferred.fail(next.done, error).pipe(Effect.ignore)
+ })
+
+ const touch = (event: Event) => {
+ const next = state.wait
+ if (!next || !active(event, input.sessionID)) {
+ return
+ }
+
+ next.live = true
+ }
+
+ const mark = Effect.fn("RunStreamTransport.mark")(function* (event: Event) {
+ if (
+ event.type !== "session.status" ||
+ event.properties.sessionID !== input.sessionID ||
+ event.properties.status.type !== "idle"
+ ) {
+ return
+ }
+
+ const next = state.wait
+ if (!next || !next.armed || !next.live) {
+ return
+ }
+
+ if (!(yield* idle()) || state.wait !== next) {
+ return
+ }
+
+ state.tick = next.tick + 1
+ state.wait = undefined
+ yield* Deferred.succeed(next.done, undefined).pipe(Effect.ignore)
+ })
+
+ const flush = (type: "turn.abort" | "turn.cancel") => {
+ const commits: StreamCommit[] = []
+ flushInterrupted(state.data, commits)
+ syncFooter(commits)
+ input.trace?.write(type, {
+ sessionID: input.sessionID,
+ })
+ }
+
+ const watch = Effect.fn("RunStreamTransport.watch")(() =>
+ Stream.fromAsyncIterable(events.stream as AsyncIterable, (error) =>
+ error instanceof Error ? error : new Error(String(error)),
+ ).pipe(
+ Stream.takeUntil(() => input.footer.isClosed || abort.signal.aborted),
+ Stream.runForEach(
+ Effect.fn("RunStreamTransport.event")(function* (item: unknown) {
+ if (input.footer.isClosed) {
+ abort.abort()
+ return
+ }
+
+ const event = item as Event
+ input.trace?.write("recv.event", event)
+ trackBlocker(event)
+
+ const prev = event.type === "message.part.updated" ? listSubagentTabs(state.subagent) : undefined
+ const next = reduceSessionData({
+ data: state.data,
+ event,
+ sessionID: input.sessionID,
+ thinking: input.thinking,
+ limits: input.limits(),
+ })
+ state.data = next.data
+
+ const changed = reduceSubagentData({
+ data: state.subagent,
+ event,
+ sessionID: input.sessionID,
+ thinking: input.thinking,
+ limits: input.limits(),
+ })
+ if (changed && prev) {
+ traceTabs(input.trace, prev, listSubagentTabs(state.subagent))
+ }
+ releaseBlocker(event)
+
+ syncFooter(next.commits, next.footer?.patch, changed ? currentSubagentState() : undefined)
+
+ touch(event)
+ yield* mark(event)
+ }),
+ ),
+ Effect.catch((error) => (abort.signal.aborted ? Effect.void : fail(error))),
+ Effect.ensuring(
+ Effect.gen(function* () {
+ if (!abort.signal.aborted && !state.fault) {
+ yield* fail(new Error("session event stream closed"))
+ }
+ closeStream()
+ }),
+ ),
+ ),
+ )
+
+ yield* bootstrap()
+ yield* Scope.provide(scope)(watch().pipe(Effect.forkScoped))
+
+ const runPromptTurn = Effect.fn("RunStreamTransport.runPromptTurn")(function* (next: SessionTurnInput) {
+ if (closed || next.signal?.aborted || input.footer.isClosed) {
+ return
+ }
+
+ if (state.fault) {
+ return yield* Effect.fail(state.fault)
+ }
+
+ if (state.wait) {
+ return yield* Effect.fail(new Error("prompt already running"))
+ }
+
+ const prev = listSubagentTabs(state.subagent)
+ if (clearFinishedSubagents(state.subagent)) {
+ const snapshot = currentSubagentState()
+ traceTabs(input.trace, prev, snapshot.tabs)
+ syncFooter([], undefined, snapshot)
+ }
+
+ const item: Wait = {
+ tick: state.tick,
+ armed: false,
+ live: false,
+ done: yield* Deferred.make(),
+ }
+ state.wait = item
+ state.data.announced = false
+
+ const turn = new AbortController()
+ const stop = () => {
+ turn.abort()
+ }
+ next.signal?.addEventListener("abort", stop, { once: true })
+ abort.signal.addEventListener("abort", stop, { once: true })
+
+ const req = {
+ sessionID: input.sessionID,
+ agent: next.agent,
+ model: next.model,
+ variant: next.variant,
+ parts: [
+ ...(next.includeFiles ? next.files : []),
+ { type: "text" as const, text: next.prompt.text },
+ ...next.prompt.parts,
+ ],
+ }
+ input.trace?.write("send.prompt", req)
+
+ const send = Effect.promise(() =>
+ input.sdk.session.promptAsync(req, {
+ signal: turn.signal,
+ }),
+ ).pipe(
+ Effect.tap(() =>
+ Effect.sync(() => {
+ input.trace?.write("send.prompt.ok", {
+ sessionID: input.sessionID,
+ })
+ item.armed = true
+ }),
+ ),
+ )
+
+ return yield* send.pipe(
+ Effect.flatMap(() => {
+ if (turn.signal.aborted || next.signal?.aborted || input.footer.isClosed || closed) {
+ if (state.wait === item) {
+ state.wait = undefined
+ }
+ flush("turn.abort")
+ return Effect.void
+ }
+
+ if (!input.footer.isClosed && !state.data.announced) {
+ input.trace?.write("ui.patch", {
+ phase: "running",
+ status: "waiting for assistant",
+ })
+ input.footer.event({
+ type: "turn.wait",
+ })
+ }
+
+ if (state.tick > item.tick) {
+ if (state.wait === item) {
+ state.wait = undefined
+ }
+ return Effect.void
+ }
+
+ return waitTurn(item.done, turn.signal).pipe(
+ Effect.flatMap((status) =>
+ Effect.sync(() => {
+ if (state.wait === item) {
+ state.wait = undefined
+ }
+
+ if (status === "abort") {
+ flush("turn.abort")
+ }
+ }),
+ ),
+ )
+ }),
+ Effect.catch((error) => {
+ if (state.wait === item) {
+ state.wait = undefined
+ }
+
+ const canceled = turn.signal.aborted || next.signal?.aborted === true || input.footer.isClosed || closed
+ if (canceled) {
+ flush("turn.cancel")
+ return Effect.void
+ }
+
+ if (error === state.fault) {
+ return Effect.fail(error)
+ }
+
+ input.trace?.write("send.prompt.error", {
+ sessionID: input.sessionID,
+ error: formatUnknownError(error),
+ })
+ return Effect.fail(error)
+ }),
+ Effect.ensuring(
+ Effect.sync(() => {
+ input.trace?.write("turn.end", {
+ sessionID: input.sessionID,
+ })
+ next.signal?.removeEventListener("abort", stop)
+ abort.signal.removeEventListener("abort", stop)
+ }),
+ ),
+ )
+ })
+
+ const selectSubagent = Effect.fn("RunStreamTransport.selectSubagent")((sessionID: string | undefined) =>
+ Effect.sync(() => {
+ if (closed) {
+ return
+ }
+
+ const next = sessionID && state.subagent.tabs.has(sessionID) ? sessionID : undefined
+ if (state.selectedSubagent === next) {
+ return
+ }
+
+ state.selectedSubagent = next
+ syncFooter([], undefined, currentSubagentState())
+ }),
+ )
+
+ const close = Effect.fn("RunStreamTransport.close")(function* () {
+ yield* closeScope()
+ })
+
+ return Service.of({
+ runPromptTurn,
+ selectSubagent,
+ close,
+ })
+ }),
+ ),
+ )
+}
+
// Opens an SDK event subscription and returns a SessionTransport.
//
// The background `watch` loop consumes every SDK event, runs it through the
@@ -326,442 +845,12 @@ function traceTabs(trace: Trace | undefined, prev: FooterSubagentTab[], next: Fo
// The transport is single-turn: only one runPromptTurn() call can be active
// at a time. The prompt queue enforces this from above.
export async function createSessionTransport(input: StreamInput): Promise {
- const abort = new AbortController()
- const halt = () => {
- abort.abort()
- }
- input.signal?.addEventListener("abort", halt, { once: true })
-
- const events = await input.sdk.event.subscribe(undefined, {
- signal: abort.signal,
- })
- input.trace?.write("recv.subscribe", {
- sessionID: input.sessionID,
- })
-
- const closeStream = () => {
- // Pass undefined explicitly so TS accepts AsyncGenerator.return().
- void events.stream.return(undefined).catch(() => {})
- }
-
- let data = createSessionData()
- let subagent = createSubagentData()
- let wait: Wait | undefined
- let tick = 0
- let fault: unknown
- let closed = false
- let footerView: FooterView = { type: "prompt" }
- let blockerTick = 0
- let selectedSubagent: string | undefined
- const blockers = new Map()
-
- const currentSubagentState = () => {
- if (selectedSubagent && !subagent.tabs.has(selectedSubagent)) {
- selectedSubagent = undefined
- }
-
- return snapshotSelectedSubagentData(subagent, selectedSubagent)
- }
-
- const seedBlocker = (id: string) => {
- if (blockers.has(id)) {
- return
- }
-
- blockerTick += 1
- blockers.set(id, blockerTick)
- }
-
- const trackBlocker = (event: Event) => {
- if (event.type !== "permission.asked" && event.type !== "question.asked") {
- return
- }
-
- if (event.properties.sessionID !== input.sessionID && !subagent.tabs.has(event.properties.sessionID)) {
- return
- }
-
- seedBlocker(event.properties.id)
- }
-
- const releaseBlocker = (event: Event) => {
- if (
- event.type !== "permission.replied" &&
- event.type !== "question.replied" &&
- event.type !== "question.rejected"
- ) {
- return
- }
-
- blockers.delete(event.properties.requestID)
- }
-
- const syncFooter = (commits: StreamCommit[], patch?: FooterPatch, nextSubagent?: FooterSubagentState) => {
- const current = pickView(data, subagent, blockers)
- const footer = composeFooter({
- patch,
- subagent: nextSubagent,
- current,
- previous: footerView,
- })
-
- if (commits.length === 0 && !footer) {
- footerView = current
- return
- }
-
- input.trace?.write("reduce.output", {
- commits,
- footer: traceFooterOutput(footer),
- })
- writeSessionOutput(
- {
- footer: input.footer,
- trace: input.trace,
- },
- {
- commits,
- footer,
- },
- )
- footerView = current
- }
-
- const bootstrap = async () => {
- const [messages, children, permissions, questions] = await Promise.all([
- input.sdk.session
- .messages({
- sessionID: input.sessionID,
- limit: SUBAGENT_BOOTSTRAP_LIMIT,
- })
- .then((x) => x.data ?? [])
- .catch(() => []),
- input.sdk.session
- .children({
- sessionID: input.sessionID,
- })
- .then((x) => x.data ?? [])
- .catch(() => []),
- input.sdk.permission
- .list()
- .then((x) => x.data ?? [])
- .catch(() => []),
- input.sdk.question
- .list()
- .then((x) => x.data ?? [])
- .catch(() => []),
- ])
-
- bootstrapSessionData({
- data,
- messages,
- permissions: permissions.filter((item) => item.sessionID === input.sessionID),
- questions: questions.filter((item) => item.sessionID === input.sessionID),
- })
- bootstrapSubagentData({
- data: subagent,
- messages,
- children,
- permissions,
- questions,
- })
-
- const callSessions = [
- ...new Set(
- listSubagentPermissions(subagent)
- .filter((item) => item.tool && item.metadata?.input === undefined)
- .map((item) => item.sessionID),
- ),
- ]
- if (callSessions.length > 0) {
- await Promise.all(
- callSessions.map(async (sessionID) => {
- const messages = await input.sdk.session
- .messages({
- sessionID,
- limit: SUBAGENT_CALL_BOOTSTRAP_LIMIT,
- })
- .then((x) => x.data ?? [])
- .catch(() => [])
-
- bootstrapSubagentCalls({
- data: subagent,
- sessionID,
- messages,
- })
- }),
- )
- }
-
- for (const request of [
- ...data.permissions,
- ...listSubagentPermissions(subagent),
- ...data.questions,
- ...listSubagentQuestions(subagent),
- ].sort((a, b) => a.id.localeCompare(b.id))) {
- seedBlocker(request.id)
- }
-
- const snapshot = currentSubagentState()
- traceTabs(input.trace, [], snapshot.tabs)
- syncFooter([], undefined, snapshot)
- }
-
- await bootstrap()
-
- const idle = async () => {
- try {
- const out = await input.sdk.session.status()
- const state = out.data?.[input.sessionID]
- return !state || state.type === "idle"
- } catch {
- return true
- }
- }
-
- const fail = (error: unknown) => {
- if (fault) {
- return
- }
-
- fault = error
- const next = wait
- wait = undefined
- next?.reject(error)
- }
-
- const touch = (event: Event) => {
- const next = wait
- if (!next || !active(event, input.sessionID)) {
- return
- }
-
- next.live = true
- }
-
- const mark = async (event: Event) => {
- if (
- event.type !== "session.status" ||
- event.properties.sessionID !== input.sessionID ||
- event.properties.status.type !== "idle"
- ) {
- return
- }
-
- const next = wait
- if (!next || !next.armed || !next.live) {
- return
- }
-
- if (!(await idle()) || wait !== next) {
- return
- }
-
- tick = next.tick + 1
- wait = undefined
- next.resolve()
- }
-
- const flush = (type: "turn.abort" | "turn.cancel") => {
- const commits: StreamCommit[] = []
- flushInterrupted(data, commits)
- syncFooter(commits)
- input.trace?.write(type, {
- sessionID: input.sessionID,
- })
- }
-
- const watch = (async () => {
- try {
- for await (const item of events.stream) {
- if (input.footer.isClosed) {
- break
- }
-
- const event = item as Event
- input.trace?.write("recv.event", event)
- trackBlocker(event)
- const prevTabs = event.type === "message.part.updated" ? listSubagentTabs(subagent) : undefined
- const next = reduceSessionData({
- data,
- event,
- sessionID: input.sessionID,
- thinking: input.thinking,
- limits: input.limits(),
- })
- data = next.data
-
- const subagentChanged = reduceSubagentData({
- data: subagent,
- event,
- sessionID: input.sessionID,
- thinking: input.thinking,
- limits: input.limits(),
- })
- if (subagentChanged && prevTabs) {
- traceTabs(input.trace, prevTabs, listSubagentTabs(subagent))
- }
- releaseBlocker(event)
-
- syncFooter(next.commits, next.footer?.patch, subagentChanged ? currentSubagentState() : undefined)
-
- touch(event)
- await mark(event)
- }
- } catch (error) {
- if (!abort.signal.aborted) {
- fail(error)
- }
- } finally {
- if (!abort.signal.aborted && !fault) {
- fail(new Error("session event stream closed"))
- }
- closeStream()
- }
- })()
-
- const runPromptTurn = async (next: SessionTurnInput): Promise => {
- if (next.signal?.aborted || input.footer.isClosed) {
- return
- }
-
- if (fault) {
- throw fault
- }
-
- if (wait) {
- throw new Error("prompt already running")
- }
-
- const prevTabs = listSubagentTabs(subagent)
- if (clearFinishedSubagents(subagent)) {
- const snapshot = currentSubagentState()
- traceTabs(input.trace, prevTabs, snapshot.tabs)
- syncFooter([], undefined, snapshot)
- }
-
- const item = defer(tick)
- wait = item
- data.announced = false
-
- const turn = new AbortController()
- const stop = () => {
- turn.abort()
- }
- next.signal?.addEventListener("abort", stop, { once: true })
- abort.signal.addEventListener("abort", stop, { once: true })
-
- try {
- const req = {
- sessionID: input.sessionID,
- agent: next.agent,
- model: next.model,
- variant: next.variant,
- parts: [
- ...(next.includeFiles ? next.files : []),
- { type: "text" as const, text: next.prompt.text },
- ...next.prompt.parts,
- ],
- }
- input.trace?.write("send.prompt", req)
- await input.sdk.session.promptAsync(req, {
- signal: turn.signal,
- })
- input.trace?.write("send.prompt.ok", {
- sessionID: input.sessionID,
- })
-
- item.armed = true
-
- if (turn.signal.aborted || next.signal?.aborted || input.footer.isClosed) {
- if (wait === item) {
- wait = undefined
- }
- flush("turn.abort")
- return
- }
-
- if (!input.footer.isClosed && !data.announced) {
- input.trace?.write("ui.patch", {
- phase: "running",
- status: "waiting for assistant",
- })
- input.footer.event({
- type: "turn.wait",
- })
- }
-
- if (tick > item.tick) {
- if (wait === item) {
- wait = undefined
- }
- return
- }
-
- const state = await waitTurn(item.done, turn.signal)
- if (wait === item) {
- wait = undefined
- }
-
- if (state === "abort") {
- flush("turn.abort")
- }
-
- return
- } catch (error) {
- if (wait === item) {
- wait = undefined
- }
-
- const canceled = turn.signal.aborted || next.signal?.aborted === true || input.footer.isClosed
- if (canceled) {
- flush("turn.cancel")
- return
- }
-
- if (error === fault) {
- throw error
- }
-
- input.trace?.write("send.prompt.error", {
- sessionID: input.sessionID,
- error: formatUnknownError(error),
- })
- throw error
- } finally {
- input.trace?.write("turn.end", {
- sessionID: input.sessionID,
- })
- next.signal?.removeEventListener("abort", stop)
- abort.signal.removeEventListener("abort", stop)
- }
- }
-
- const selectSubagent = (sessionID: string | undefined): void => {
- const next = sessionID && subagent.tabs.has(sessionID) ? sessionID : undefined
- if (selectedSubagent === next) {
- return
- }
-
- selectedSubagent = next
- syncFooter([], undefined, currentSubagentState())
- }
-
- const close = async () => {
- if (closed) {
- return
- }
-
- closed = true
- input.signal?.removeEventListener("abort", halt)
- abort.abort()
- closeStream()
- await watch.catch(() => {})
- }
+ const runtime = makeRuntime(Service, createLayer(input))
+ await runtime.runPromise(() => Effect.void)
return {
- runPromptTurn,
- selectSubagent,
- close,
+ runPromptTurn: (next) => runtime.runPromise((svc) => svc.runPromptTurn(next)),
+ selectSubagent: (sessionID) => runtime.runSync((svc) => svc.selectSubagent(sessionID)),
+ close: () => runtime.runPromise((svc) => svc.close()),
}
}
diff --git a/packages/opencode/test/cli/run/stream.transport.test.ts b/packages/opencode/test/cli/run/stream.transport.test.ts
index 3ea3cff7ce..43a72a4a09 100644
--- a/packages/opencode/test/cli/run/stream.transport.test.ts
+++ b/packages/opencode/test/cli/run/stream.transport.test.ts
@@ -102,6 +102,46 @@ function feed() {
}
}
+function blockingFeed() {
+ let done = false
+ let wake: (() => void) | undefined
+ const started = defer()
+
+ const stream: AsyncIterableIterator = {
+ [Symbol.asyncIterator]() {
+ return this
+ },
+ next() {
+ started.resolve()
+ if (done) {
+ return Promise.resolve({ done: true, value: undefined })
+ }
+
+ return new Promise((resolve) => {
+ wake = () => {
+ done = true
+ wake = undefined
+ resolve({ done: true, value: undefined })
+ }
+ })
+ },
+ return() {
+ done = true
+ wake?.()
+ wake = undefined
+ return Promise.resolve({ done: true, value: undefined })
+ },
+ throw(error) {
+ done = true
+ wake?.()
+ wake = undefined
+ return Promise.reject(error)
+ },
+ }
+
+ return { stream, started }
+}
+
function footer(fn?: (commit: StreamCommit) => void) {
const commits: StreamCommit[] = []
const events: FooterEvent[] = []
@@ -580,6 +620,96 @@ describe("run stream transport", () => {
}
})
+ test("closes an active turn without rejecting it", async () => {
+ const src = feed()
+ const ui = footer()
+ const ready = defer()
+ let aborted = false
+
+ const transport = await createSessionTransport({
+ sdk: sdk(src, {
+ promptAsync: async (_input, opt) => {
+ ready.resolve()
+ await new Promise((resolve) => {
+ const onAbort = () => {
+ aborted = true
+ opt?.signal?.removeEventListener("abort", onAbort)
+ resolve()
+ }
+
+ opt?.signal?.addEventListener("abort", onAbort, { once: true })
+ })
+ },
+ }),
+ sessionID: "session-1",
+ thinking: true,
+ limits: () => ({}),
+ footer: ui.api,
+ })
+
+ try {
+ const task = transport.runPromptTurn({
+ agent: undefined,
+ model: undefined,
+ variant: undefined,
+ prompt: { text: "hello", parts: [] },
+ files: [],
+ includeFiles: false,
+ })
+
+ await ready.promise
+ await transport.close()
+ await task
+
+ expect(aborted).toBe(true)
+ } finally {
+ src.close()
+ await transport.close()
+ }
+ })
+
+ test("closes while the event stream is waiting for the next item", async () => {
+ const src = blockingFeed()
+ const ui = footer()
+ const transport = await createSessionTransport({
+ sdk: {
+ event: {
+ subscribe: async () => ({
+ stream: src.stream,
+ }),
+ },
+ session: {
+ promptAsync: async () => {},
+ status: async () => ({ data: {} }),
+ messages: async () => ({ data: [] }),
+ children: async () => ({ data: [] }),
+ },
+ permission: {
+ list: async () => ({ data: [] }),
+ },
+ question: {
+ list: async () => ({ data: [] }),
+ },
+ } as unknown as OpencodeClient,
+ sessionID: "session-1",
+ thinking: true,
+ limits: () => ({}),
+ footer: ui.api,
+ })
+
+ try {
+ await src.started.promise
+ await Promise.race([
+ transport.close(),
+ new Promise((_, reject) => {
+ setTimeout(() => reject(new Error("close timed out")), 100)
+ }),
+ ])
+ } finally {
+ await transport.close()
+ }
+ })
+
test("ignores stale idle events from an earlier turn", async () => {
const src = feed()
const ui = footer()