fix(session): cancel subtask child sessions (#25798)

This commit is contained in:
Kit Langton 2026-05-04 22:36:06 -04:00 committed by GitHub
parent 39c88f9afb
commit 75d141b574
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 339 additions and 247 deletions

View file

@ -1,6 +1,5 @@
import path from "path"
import os from "os"
import z from "zod"
import * as EffectZod from "@/util/effect-zod"
import { SessionID, MessageID, PartID } from "./schema"
import { MessageV2 } from "./message-v2"
@ -121,9 +120,8 @@ export const layer = Layer.effect(
return yield* EffectBridge.make()
})
const ops = Effect.fn("SessionPrompt.ops")(function* () {
const run = yield* runner()
return {
cancel: (sessionID: SessionID) => run.fork(cancel(sessionID)),
cancel: (sessionID: SessionID) => cancel(sessionID),
resolvePromptParts: (template: string) => resolvePromptParts(template),
prompt: (input: PromptInput) => prompt(input),
} satisfies TaskPromptOps

View file

@ -6,10 +6,11 @@ import { MessageV2 } from "../session/message-v2"
import { Agent } from "../agent/agent"
import type { SessionPrompt } from "../session/prompt"
import { Config } from "@/config/config"
import { Effect, Schema } from "effect"
import { Effect, Exit, Schema } from "effect"
import { EffectBridge } from "@/effect/bridge"
export interface TaskPromptOps {
cancel(sessionID: SessionID): void
cancel(sessionID: SessionID): Effect.Effect<void>
resolvePromptParts(template: string): Effect.Effect<SessionPrompt.PromptInput["parts"]>
prompt(input: SessionPrompt.PromptInput): Effect.Effect<MessageV2.WithParts>
}
@ -118,16 +119,18 @@ export const TaskTool = Tool.define(
const ops = ctx.extra?.promptOps as TaskPromptOps
if (!ops) return yield* Effect.fail(new Error("TaskTool requires promptOps in ctx.extra"))
const runCancel = yield* EffectBridge.make()
const messageID = MessageID.ascending()
const cancel = ops.cancel(nextSession.id)
function cancel() {
ops.cancel(nextSession.id)
function onAbort() {
runCancel.fork(cancel)
}
return yield* Effect.acquireUseRelease(
Effect.sync(() => {
ctx.abort.addEventListener("abort", cancel)
ctx.abort.addEventListener("abort", onAbort)
}),
() =>
Effect.gen(function* () {
@ -163,10 +166,16 @@ export const TaskTool = Tool.define(
].join("\n"),
}
}),
() =>
Effect.sync(() => {
ctx.abort.removeEventListener("abort", cancel)
}),
(_, exit) =>
Effect.gen(function* () {
if (Exit.hasInterrupts(exit)) yield* cancel
}).pipe(
Effect.ensuring(
Effect.sync(() => {
ctx.abort.removeEventListener("abort", onAbort)
}),
),
),
)
})

View file

@ -858,6 +858,43 @@ it.live(
30_000,
)
it.live(
"cancel propagates from slash command subtask to child session",
() =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const status = yield* SessionStatus.Service
const chat = yield* sessions.create({ title: "Pinned" })
yield* llm.hang
const msg = yield* user(chat.id, "hello")
yield* addSubtask(chat.id, msg.id)
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* llm.wait(1)
const msgs = yield* MessageV2.filterCompactedEffect(chat.id)
const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
const tool = taskMsg ? toolPart(taskMsg.parts) : undefined
const sessionID = tool?.state.status === "running" ? tool.state.metadata?.sessionId : undefined
expect(typeof sessionID).toBe("string")
if (typeof sessionID !== "string") throw new Error("missing child session id")
const childID = SessionID.make(sessionID)
expect((yield* status.get(childID)).type).toBe("busy")
yield* prompt.cancel(chat.id)
const exit = yield* Fiber.await(fiber)
expect(Exit.isSuccess(exit)).toBe(true)
expect((yield* status.get(chat.id)).type).toBe("idle")
expect((yield* status.get(childID)).type).toBe("idle")
}),
{ git: true, config: providerCfg },
),
10_000,
)
it.live(
"cancel with queued callers resolves all cleanly",
() =>

View file

@ -1,18 +1,17 @@
import { afterEach, describe, expect } from "bun:test"
import { Effect, Layer } from "effect"
import { Effect, Exit, Fiber, Layer } from "effect"
import { Agent } from "../../src/agent/agent"
import { Config } from "@/config/config"
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
import { Instance } from "../../src/project/instance"
import { Session } from "@/session/session"
import { MessageV2 } from "../../src/session/message-v2"
import type { SessionPrompt } from "../../src/session/prompt"
import { MessageID, PartID } from "../../src/session/schema"
import { MessageID, PartID, SessionID } from "../../src/session/schema"
import { ModelID, ProviderID } from "../../src/provider/schema"
import { TaskTool, type TaskPromptOps } from "../../src/tool/task"
import { Truncate } from "@/tool/truncate"
import { ToolRegistry } from "@/tool/registry"
import { disposeAllInstances, provideTmpdirInstance } from "../fixture/fixture"
import { disposeAllInstances } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
afterEach(async () => {
@ -35,6 +34,14 @@ const it = testEffect(
),
)
function defer<T>() {
let resolve!: (value: T | PromiseLike<T>) => void
const promise = new Promise<T>((done) => {
resolve = done
})
return { promise, resolve }
}
const seed = Effect.fn("TaskToolTest.seed")(function* (title = "Pinned") {
const session = yield* Session.Service
const chat = yield* session.create({ title })
@ -66,7 +73,7 @@ const seed = Effect.fn("TaskToolTest.seed")(function* (title = "Pinned") {
function stubOps(opts?: { onPrompt?: (input: SessionPrompt.PromptInput) => void; text?: string }): TaskPromptOps {
return {
cancel() {},
cancel: () => Effect.void,
resolvePromptParts: (template) => Effect.succeed([{ type: "text" as const, text: template }]),
prompt: (input) =>
Effect.sync(() => {
@ -107,102 +114,270 @@ function reply(input: SessionPrompt.PromptInput, text: string): MessageV2.WithPa
}
describe("tool.task", () => {
it.live("description sorts subagents by name and is stable across calls", () =>
provideTmpdirInstance(
() =>
Effect.gen(function* () {
const agent = yield* Agent.Service
const build = yield* agent.get("build")
const registry = yield* ToolRegistry.Service
const get = Effect.fnUntraced(function* () {
const tools = yield* registry.tools({ ...ref, agent: build })
return tools.find((tool) => tool.id === TaskTool.id)?.description ?? ""
})
const first = yield* get()
const second = yield* get()
it.instance(
"description sorts subagents by name and is stable across calls",
() =>
Effect.gen(function* () {
const agent = yield* Agent.Service
const build = yield* agent.get("build")
const registry = yield* ToolRegistry.Service
const get = Effect.fnUntraced(function* () {
const tools = yield* registry.tools({ ...ref, agent: build })
return tools.find((tool) => tool.id === TaskTool.id)?.description ?? ""
})
const first = yield* get()
const second = yield* get()
expect(first).toBe(second)
expect(first).toBe(second)
const alpha = first.indexOf("- alpha: Alpha agent")
const explore = first.indexOf("- explore:")
const general = first.indexOf("- general:")
const zebra = first.indexOf("- zebra: Zebra agent")
const alpha = first.indexOf("- alpha: Alpha agent")
const explore = first.indexOf("- explore:")
const general = first.indexOf("- general:")
const zebra = first.indexOf("- zebra: Zebra agent")
expect(alpha).toBeGreaterThan(-1)
expect(explore).toBeGreaterThan(alpha)
expect(general).toBeGreaterThan(explore)
expect(zebra).toBeGreaterThan(general)
}),
{
config: {
agent: {
zebra: {
description: "Zebra agent",
mode: "subagent",
},
alpha: {
description: "Alpha agent",
mode: "subagent",
},
expect(alpha).toBeGreaterThan(-1)
expect(explore).toBeGreaterThan(alpha)
expect(general).toBeGreaterThan(explore)
expect(zebra).toBeGreaterThan(general)
}),
{
config: {
agent: {
zebra: {
description: "Zebra agent",
mode: "subagent",
},
alpha: {
description: "Alpha agent",
mode: "subagent",
},
},
},
),
},
)
it.live("description hides denied subagents for the caller", () =>
provideTmpdirInstance(
() =>
Effect.gen(function* () {
const agent = yield* Agent.Service
const build = yield* agent.get("build")
const registry = yield* ToolRegistry.Service
const description =
(yield* registry.tools({ ...ref, agent: build })).find((tool) => tool.id === TaskTool.id)?.description ?? ""
it.instance(
"description hides denied subagents for the caller",
() =>
Effect.gen(function* () {
const agent = yield* Agent.Service
const build = yield* agent.get("build")
const registry = yield* ToolRegistry.Service
const description =
(yield* registry.tools({ ...ref, agent: build })).find((tool) => tool.id === TaskTool.id)?.description ?? ""
expect(description).toContain("- alpha: Alpha agent")
expect(description).not.toContain("- zebra: Zebra agent")
}),
{
config: {
permission: {
task: {
"*": "allow",
zebra: "deny",
},
expect(description).toContain("- alpha: Alpha agent")
expect(description).not.toContain("- zebra: Zebra agent")
}),
{
config: {
permission: {
task: {
"*": "allow",
zebra: "deny",
},
agent: {
zebra: {
description: "Zebra agent",
mode: "subagent",
},
alpha: {
description: "Alpha agent",
mode: "subagent",
},
},
agent: {
zebra: {
description: "Zebra agent",
mode: "subagent",
},
alpha: {
description: "Alpha agent",
mode: "subagent",
},
},
},
),
},
)
it.live("execute resumes an existing task session from task_id", () =>
provideTmpdirInstance(() =>
it.instance("execute resumes an existing task session from task_id", () =>
Effect.gen(function* () {
const sessions = yield* Session.Service
const { chat, assistant } = yield* seed()
const child = yield* sessions.create({ parentID: chat.id, title: "Existing child" })
const tool = yield* TaskTool
const def = yield* tool.init()
let seen: SessionPrompt.PromptInput | undefined
const promptOps = stubOps({ text: "resumed", onPrompt: (input) => (seen = input) })
const result = yield* def.execute(
{
description: "inspect bug",
prompt: "look into the cache key path",
subagent_type: "general",
task_id: child.id,
},
{
sessionID: chat.id,
messageID: assistant.id,
agent: "build",
abort: new AbortController().signal,
extra: { promptOps },
messages: [],
metadata: () => Effect.void,
ask: () => Effect.void,
},
)
const kids = yield* sessions.children(chat.id)
expect(kids).toHaveLength(1)
expect(kids[0]?.id).toBe(child.id)
expect(result.metadata.sessionId).toBe(child.id)
expect(result.output).toContain(`task_id: ${child.id}`)
expect(seen?.sessionID).toBe(child.id)
}),
)
it.instance("execute asks by default and skips checks when bypassed", () =>
Effect.gen(function* () {
const { chat, assistant } = yield* seed()
const tool = yield* TaskTool
const def = yield* tool.init()
const calls: unknown[] = []
const promptOps = stubOps()
const exec = (extra?: Record<string, any>) =>
def.execute(
{
description: "inspect bug",
prompt: "look into the cache key path",
subagent_type: "general",
},
{
sessionID: chat.id,
messageID: assistant.id,
agent: "build",
abort: new AbortController().signal,
extra: { promptOps, ...extra },
messages: [],
metadata: () => Effect.void,
ask: (input) =>
Effect.sync(() => {
calls.push(input)
}),
},
)
yield* exec()
yield* exec({ bypassAgentCheck: true })
expect(calls).toHaveLength(1)
expect(calls[0]).toEqual({
permission: "task",
patterns: ["general"],
always: ["*"],
metadata: {
description: "inspect bug",
subagent_type: "general",
},
})
}),
)
it.instance("execute cancels child session when abort signal fires", () =>
Effect.gen(function* () {
const { chat, assistant } = yield* seed()
const tool = yield* TaskTool
const def = yield* tool.init()
const ready = defer<SessionPrompt.PromptInput>()
const cancelled = defer<SessionID>()
const abort = new AbortController()
const promptOps: TaskPromptOps = {
cancel: (sessionID) =>
Effect.sync(() => {
cancelled.resolve(sessionID)
}),
resolvePromptParts: (template) => Effect.succeed([{ type: "text" as const, text: template }]),
prompt: (input) =>
Effect.promise(() => {
ready.resolve(input)
return cancelled.promise
}).pipe(Effect.as(reply(input, "cancelled"))),
}
const fiber = yield* def
.execute(
{
description: "inspect bug",
prompt: "look into the cache key path",
subagent_type: "general",
},
{
sessionID: chat.id,
messageID: assistant.id,
agent: "build",
abort: abort.signal,
extra: { promptOps },
messages: [],
metadata: () => Effect.void,
ask: () => Effect.void,
},
)
.pipe(Effect.forkChild)
const input = yield* Effect.promise(() => ready.promise)
abort.abort()
expect(yield* Effect.promise(() => cancelled.promise)).toBe(input.sessionID)
const exit = yield* Fiber.await(fiber)
expect(Exit.isSuccess(exit)).toBe(true)
}),
)
it.instance("execute creates a child when task_id does not exist", () =>
Effect.gen(function* () {
const sessions = yield* Session.Service
const { chat, assistant } = yield* seed()
const tool = yield* TaskTool
const def = yield* tool.init()
let seen: SessionPrompt.PromptInput | undefined
const promptOps = stubOps({ text: "created", onPrompt: (input) => (seen = input) })
const result = yield* def.execute(
{
description: "inspect bug",
prompt: "look into the cache key path",
subagent_type: "general",
task_id: "ses_missing",
},
{
sessionID: chat.id,
messageID: assistant.id,
agent: "build",
abort: new AbortController().signal,
extra: { promptOps },
messages: [],
metadata: () => Effect.void,
ask: () => Effect.void,
},
)
const kids = yield* sessions.children(chat.id)
expect(kids).toHaveLength(1)
expect(kids[0]?.id).toBe(result.metadata.sessionId)
expect(result.metadata.sessionId).not.toBe("ses_missing")
expect(result.output).toContain(`task_id: ${result.metadata.sessionId}`)
expect(seen?.sessionID).toBe(result.metadata.sessionId)
}),
)
it.instance(
"execute shapes child permissions for task, todowrite, and primary tools",
() =>
Effect.gen(function* () {
const sessions = yield* Session.Service
const { chat, assistant } = yield* seed()
const child = yield* sessions.create({ parentID: chat.id, title: "Existing child" })
const tool = yield* TaskTool
const def = yield* tool.init()
let seen: SessionPrompt.PromptInput | undefined
const promptOps = stubOps({ text: "resumed", onPrompt: (input) => (seen = input) })
const promptOps = stubOps({ onPrompt: (input) => (seen = input) })
const result = yield* def.execute(
{
description: "inspect bug",
prompt: "look into the cache key path",
subagent_type: "general",
task_id: child.id,
subagent_type: "reviewer",
},
{
sessionID: chat.id,
@ -216,172 +391,45 @@ describe("tool.task", () => {
},
)
const kids = yield* sessions.children(chat.id)
expect(kids).toHaveLength(1)
expect(kids[0]?.id).toBe(child.id)
expect(result.metadata.sessionId).toBe(child.id)
expect(result.output).toContain(`task_id: ${child.id}`)
expect(seen?.sessionID).toBe(child.id)
}),
),
)
it.live("execute asks by default and skips checks when bypassed", () =>
provideTmpdirInstance(() =>
Effect.gen(function* () {
const { chat, assistant } = yield* seed()
const tool = yield* TaskTool
const def = yield* tool.init()
const calls: unknown[] = []
const promptOps = stubOps()
const exec = (extra?: Record<string, any>) =>
def.execute(
{
description: "inspect bug",
prompt: "look into the cache key path",
subagent_type: "general",
},
{
sessionID: chat.id,
messageID: assistant.id,
agent: "build",
abort: new AbortController().signal,
extra: { promptOps, ...extra },
messages: [],
metadata: () => Effect.void,
ask: (input) =>
Effect.sync(() => {
calls.push(input)
}),
},
)
yield* exec()
yield* exec({ bypassAgentCheck: true })
expect(calls).toHaveLength(1)
expect(calls[0]).toEqual({
permission: "task",
patterns: ["general"],
always: ["*"],
metadata: {
description: "inspect bug",
subagent_type: "general",
const child = yield* sessions.get(result.metadata.sessionId)
expect(child.parentID).toBe(chat.id)
expect(child.permission).toEqual([
{
permission: "todowrite",
pattern: "*",
action: "deny",
},
{
permission: "bash",
pattern: "*",
action: "allow",
},
{
permission: "read",
pattern: "*",
action: "allow",
},
])
expect(seen?.tools).toEqual({
todowrite: false,
bash: false,
read: false,
})
}),
),
)
it.live("execute creates a child when task_id does not exist", () =>
provideTmpdirInstance(() =>
Effect.gen(function* () {
const sessions = yield* Session.Service
const { chat, assistant } = yield* seed()
const tool = yield* TaskTool
const def = yield* tool.init()
let seen: SessionPrompt.PromptInput | undefined
const promptOps = stubOps({ text: "created", onPrompt: (input) => (seen = input) })
const result = yield* def.execute(
{
description: "inspect bug",
prompt: "look into the cache key path",
subagent_type: "general",
task_id: "ses_missing",
},
{
sessionID: chat.id,
messageID: assistant.id,
agent: "build",
abort: new AbortController().signal,
extra: { promptOps },
messages: [],
metadata: () => Effect.void,
ask: () => Effect.void,
},
)
const kids = yield* sessions.children(chat.id)
expect(kids).toHaveLength(1)
expect(kids[0]?.id).toBe(result.metadata.sessionId)
expect(result.metadata.sessionId).not.toBe("ses_missing")
expect(result.output).toContain(`task_id: ${result.metadata.sessionId}`)
expect(seen?.sessionID).toBe(result.metadata.sessionId)
}),
),
)
it.live("execute shapes child permissions for task, todowrite, and primary tools", () =>
provideTmpdirInstance(
() =>
Effect.gen(function* () {
const sessions = yield* Session.Service
const { chat, assistant } = yield* seed()
const tool = yield* TaskTool
const def = yield* tool.init()
let seen: SessionPrompt.PromptInput | undefined
const promptOps = stubOps({ onPrompt: (input) => (seen = input) })
const result = yield* def.execute(
{
description: "inspect bug",
prompt: "look into the cache key path",
subagent_type: "reviewer",
{
config: {
agent: {
reviewer: {
mode: "subagent",
permission: {
task: "allow",
},
{
sessionID: chat.id,
messageID: assistant.id,
agent: "build",
abort: new AbortController().signal,
extra: { promptOps },
messages: [],
metadata: () => Effect.void,
ask: () => Effect.void,
},
)
const child = yield* sessions.get(result.metadata.sessionId)
expect(child.parentID).toBe(chat.id)
expect(child.permission).toEqual([
{
permission: "todowrite",
pattern: "*",
action: "deny",
},
{
permission: "bash",
pattern: "*",
action: "allow",
},
{
permission: "read",
pattern: "*",
action: "allow",
},
])
expect(seen?.tools).toEqual({
todowrite: false,
bash: false,
read: false,
})
}),
{
config: {
agent: {
reviewer: {
mode: "subagent",
permission: {
task: "allow",
},
},
},
experimental: {
primary_tools: ["bash", "read"],
},
},
experimental: {
primary_tools: ["bash", "read"],
},
},
),
},
)
})