From e8471256f23c022348ff109f95fb84aa599fc230 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 13 Apr 2026 19:53:30 -0400 Subject: [PATCH] refactor(session): move llm stream into layer (#22358) --- packages/opencode/src/session/llm.ts | 693 +++++++++++---------- packages/opencode/test/session/llm.test.ts | 119 +--- 2 files changed, 364 insertions(+), 448 deletions(-) diff --git a/packages/opencode/src/session/llm.ts b/packages/opencode/src/session/llm.ts index 8df70b6739..3ab35958a4 100644 --- a/packages/opencode/src/session/llm.ts +++ b/packages/opencode/src/session/llm.ts @@ -1,7 +1,6 @@ import { Provider } from "@/provider/provider" import { Log } from "@/util/log" -import { Cause, Effect, Layer, Record, Context } from "effect" -import * as Queue from "effect/Queue" +import { Context, Effect, Layer, Record } from "effect" import * as Stream from "effect/Stream" import { streamText, wrapLanguageModel, type ModelMessage, type Tool, tool, jsonSchema } from "ai" import { mergeDeep, pipe } from "remeda" @@ -21,11 +20,13 @@ import { Wildcard } from "@/util/wildcard" import { SessionID } from "@/session/schema" import { Auth } from "@/auth" import { Installation } from "@/installation" -import { AppRuntime } from "@/effect/app-runtime" +import { makeRuntime } from "@/effect/run-service" export namespace LLM { const log = Log.create({ service: "llm" }) + const perms = makeRuntime(Permission.Service, Permission.defaultLayer) export const OUTPUT_TOKEN_MAX = ProviderTransform.OUTPUT_TOKEN_MAX + type Result = Awaited> export type StreamInput = { user: MessageV2.User @@ -46,7 +47,7 @@ export namespace LLM { abort: AbortSignal } - export type Event = Awaited>["fullStream"] extends AsyncIterable ? T : never + export type Event = Result["fullStream"] extends AsyncIterable ? T : never export interface Interface { readonly stream: (input: StreamInput) => Stream.Stream @@ -54,12 +55,340 @@ export namespace LLM { export class Service extends Context.Service()("@opencode/LLM") {} - export const layer = Layer.effect( - Service, - Effect.gen(function* () { - return Service.of({ - stream(input) { - return Stream.scoped( + export const layer: Layer.Layer = + Layer.effect( + Service, + Effect.gen(function* () { + const auth = yield* Auth.Service + const config = yield* Config.Service + const provider = yield* Provider.Service + const plugin = yield* Plugin.Service + + const run = Effect.fn("LLM.run")(function* (input: StreamRequest) { + const l = log + .clone() + .tag("providerID", input.model.providerID) + .tag("modelID", input.model.id) + .tag("sessionID", input.sessionID) + .tag("small", (input.small ?? false).toString()) + .tag("agent", input.agent.name) + .tag("mode", input.agent.mode) + l.info("stream", { + modelID: input.model.id, + providerID: input.model.providerID, + }) + + const [language, cfg, item, info] = yield* Effect.all( + [ + provider.getLanguage(input.model), + config.get(), + provider.getProvider(input.model.providerID), + auth.get(input.model.providerID), + ], + { concurrency: "unbounded" }, + ) + + // TODO: move this to a proper hook + const isOpenaiOauth = item.id === "openai" && info?.type === "oauth" + + const system: string[] = [] + system.push( + [ + // use agent prompt otherwise provider prompt + ...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)), + // any custom prompt passed into this call + ...input.system, + // any custom prompt from last user message + ...(input.user.system ? [input.user.system] : []), + ] + .filter((x) => x) + .join("\n"), + ) + + const header = system[0] + yield* plugin.trigger( + "experimental.chat.system.transform", + { sessionID: input.sessionID, model: input.model }, + { system }, + ) + // rejoin to maintain 2-part structure for caching if header unchanged + if (system.length > 2 && system[0] === header) { + const rest = system.slice(1) + system.length = 0 + system.push(header, rest.join("\n")) + } + + const variant = + !input.small && input.model.variants && input.user.model.variant + ? input.model.variants[input.user.model.variant] + : {} + const base = input.small + ? ProviderTransform.smallOptions(input.model) + : ProviderTransform.options({ + model: input.model, + sessionID: input.sessionID, + providerOptions: item.options, + }) + const options: Record = pipe( + base, + mergeDeep(input.model.options), + mergeDeep(input.agent.options), + mergeDeep(variant), + ) + if (isOpenaiOauth) { + options.instructions = system.join("\n") + } + + const isWorkflow = language instanceof GitLabWorkflowLanguageModel + const messages = isOpenaiOauth + ? input.messages + : isWorkflow + ? input.messages + : [ + ...system.map( + (x): ModelMessage => ({ + role: "system", + content: x, + }), + ), + ...input.messages, + ] + + const params = yield* plugin.trigger( + "chat.params", + { + sessionID: input.sessionID, + agent: input.agent.name, + model: input.model, + provider: item, + message: input.user, + }, + { + temperature: input.model.capabilities.temperature + ? (input.agent.temperature ?? ProviderTransform.temperature(input.model)) + : undefined, + topP: input.agent.topP ?? ProviderTransform.topP(input.model), + topK: ProviderTransform.topK(input.model), + maxOutputTokens: ProviderTransform.maxOutputTokens(input.model), + options, + }, + ) + + const { headers } = yield* plugin.trigger( + "chat.headers", + { + sessionID: input.sessionID, + agent: input.agent.name, + model: input.model, + provider: item, + message: input.user, + }, + { + headers: {}, + }, + ) + + const tools = resolveTools(input) + + // LiteLLM and some Anthropic proxies require the tools parameter to be present + // when message history contains tool calls, even if no tools are being used. + // Add a dummy tool that is never called to satisfy this validation. + // This is enabled for: + // 1. Providers with "litellm" in their ID or API ID (auto-detected) + // 2. Providers with explicit "litellmProxy: true" option (opt-in for custom gateways) + const isLiteLLMProxy = + item.options?.["litellmProxy"] === true || + input.model.providerID.toLowerCase().includes("litellm") || + input.model.api.id.toLowerCase().includes("litellm") + + // LiteLLM/Bedrock rejects requests where the message history contains tool + // calls but no tools param is present. When there are no active tools (e.g. + // during compaction), inject a stub tool to satisfy the validation requirement. + // The stub description explicitly tells the model not to call it. + if (isLiteLLMProxy && Object.keys(tools).length === 0 && hasToolCalls(input.messages)) { + tools["_noop"] = tool({ + description: "Do not call this tool. It exists only for API compatibility and must never be invoked.", + inputSchema: jsonSchema({ + type: "object", + properties: { + reason: { type: "string", description: "Unused" }, + }, + }), + execute: async () => ({ output: "", title: "", metadata: {} }), + }) + } + + // Wire up toolExecutor for DWS workflow models so that tool calls + // from the workflow service are executed via opencode's tool system + // and results sent back over the WebSocket. + if (language instanceof GitLabWorkflowLanguageModel) { + const workflowModel = language as GitLabWorkflowLanguageModel & { + sessionID?: string + sessionPreapprovedTools?: string[] + approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }> + } + workflowModel.sessionID = input.sessionID + workflowModel.systemPrompt = system.join("\n") + workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => { + const t = tools[toolName] + if (!t || !t.execute) { + return { result: "", error: `Unknown tool: ${toolName}` } + } + try { + const result = await t.execute!(JSON.parse(argsJson), { + toolCallId: _requestID, + messages: input.messages, + abortSignal: input.abort, + }) + const output = typeof result === "string" ? result : (result?.output ?? JSON.stringify(result)) + return { + result: output, + metadata: typeof result === "object" ? result?.metadata : undefined, + title: typeof result === "object" ? result?.title : undefined, + } + } catch (e: any) { + return { result: "", error: e.message ?? String(e) } + } + } + + const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? []) + workflowModel.sessionPreapprovedTools = Object.keys(tools).filter((name) => { + const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission)) + return !match || match.action !== "ask" + }) + + const approvedToolsForSession = new Set() + workflowModel.approvalHandler = Instance.bind(async (approvalTools) => { + const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[] + // Auto-approve tools that were already approved in this session + // (prevents infinite approval loops for server-side MCP tools) + if (uniqueNames.every((name) => approvedToolsForSession.has(name))) { + return { approved: true } + } + + const id = PermissionID.ascending() + let reply: Permission.Reply | undefined + let unsub: (() => void) | undefined + try { + unsub = Bus.subscribe(Permission.Event.Replied, (evt) => { + if (evt.properties.requestID === id) reply = evt.properties.reply + }) + const toolPatterns = approvalTools.map((t: { name: string; args: string }) => { + try { + const parsed = JSON.parse(t.args) as Record + const title = (parsed?.title ?? parsed?.name ?? "") as string + return title ? `${t.name}: ${title}` : t.name + } catch { + return t.name + } + }) + const uniquePatterns = [...new Set(toolPatterns)] as string[] + await perms.runPromise((svc) => + svc.ask({ + id, + sessionID: SessionID.make(input.sessionID), + permission: "workflow_tool_approval", + patterns: uniquePatterns, + metadata: { tools: approvalTools }, + always: uniquePatterns, + ruleset: [], + }), + ) + for (const name of uniqueNames) approvedToolsForSession.add(name) + workflowModel.sessionPreapprovedTools = [ + ...(workflowModel.sessionPreapprovedTools ?? []), + ...uniqueNames, + ] + return { approved: true } + } catch { + return { approved: false } + } finally { + unsub?.() + } + }) + } + + return streamText({ + onError(error) { + l.error("stream error", { + error, + }) + }, + async experimental_repairToolCall(failed) { + const lower = failed.toolCall.toolName.toLowerCase() + if (lower !== failed.toolCall.toolName && tools[lower]) { + l.info("repairing tool call", { + tool: failed.toolCall.toolName, + repaired: lower, + }) + return { + ...failed.toolCall, + toolName: lower, + } + } + return { + ...failed.toolCall, + input: JSON.stringify({ + tool: failed.toolCall.toolName, + error: failed.error.message, + }), + toolName: "invalid", + } + }, + temperature: params.temperature, + topP: params.topP, + topK: params.topK, + providerOptions: ProviderTransform.providerOptions(input.model, params.options), + activeTools: Object.keys(tools).filter((x) => x !== "invalid"), + tools, + toolChoice: input.toolChoice, + maxOutputTokens: params.maxOutputTokens, + abortSignal: input.abort, + headers: { + ...(input.model.providerID.startsWith("opencode") + ? { + "x-opencode-project": Instance.project.id, + "x-opencode-session": input.sessionID, + "x-opencode-request": input.user.id, + "x-opencode-client": Flag.OPENCODE_CLIENT, + } + : { + "x-session-affinity": input.sessionID, + ...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}), + "User-Agent": `opencode/${Installation.VERSION}`, + }), + ...input.model.headers, + ...headers, + }, + maxRetries: input.retries ?? 0, + messages, + model: wrapLanguageModel({ + model: language, + middleware: [ + { + specificationVersion: "v3" as const, + async transformParams(args) { + if (args.type === "stream") { + // @ts-expect-error + args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options) + } + return args.params + }, + }, + ], + }), + experimental_telemetry: { + isEnabled: cfg.experimental?.openTelemetry, + metadata: { + userId: cfg.username ?? "unknown", + sessionId: input.sessionID, + }, + }, + }) + }) + + const stream: Interface["stream"] = (input) => + Stream.scoped( Stream.unwrap( Effect.gen(function* () { const ctrl = yield* Effect.acquireRelease( @@ -67,7 +396,7 @@ export namespace LLM { (ctrl) => Effect.sync(() => ctrl.abort()), ) - const result = yield* Effect.promise(() => LLM.stream({ ...input, abort: ctrl.signal })) + const result = yield* run({ ...input, abort: ctrl.signal }) return Stream.fromAsyncIterable(result.fullStream, (e) => e instanceof Error ? e : new Error(String(e)), @@ -75,339 +404,19 @@ export namespace LLM { }), ), ) - }, - }) - }), - ) - export const defaultLayer = layer - - export async function stream(input: StreamRequest) { - const l = log - .clone() - .tag("providerID", input.model.providerID) - .tag("modelID", input.model.id) - .tag("sessionID", input.sessionID) - .tag("small", (input.small ?? false).toString()) - .tag("agent", input.agent.name) - .tag("mode", input.agent.mode) - l.info("stream", { - modelID: input.model.id, - providerID: input.model.providerID, - }) - const [language, cfg, provider, info] = await Effect.runPromise( - Effect.gen(function* () { - const auth = yield* Auth.Service - const cfg = yield* Config.Service - const provider = yield* Provider.Service - return yield* Effect.all( - [ - provider.getLanguage(input.model), - cfg.get(), - provider.getProvider(input.model.providerID), - auth.get(input.model.providerID), - ], - { concurrency: "unbounded" }, - ) - }).pipe(Effect.provide(Layer.mergeAll(Auth.defaultLayer, Config.defaultLayer, Provider.defaultLayer))), - ) - // TODO: move this to a proper hook - const isOpenaiOauth = provider.id === "openai" && info?.type === "oauth" - - const system: string[] = [] - system.push( - [ - // use agent prompt otherwise provider prompt - ...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)), - // any custom prompt passed into this call - ...input.system, - // any custom prompt from last user message - ...(input.user.system ? [input.user.system] : []), - ] - .filter((x) => x) - .join("\n"), - ) - - const header = system[0] - await Plugin.trigger( - "experimental.chat.system.transform", - { sessionID: input.sessionID, model: input.model }, - { system }, - ) - // rejoin to maintain 2-part structure for caching if header unchanged - if (system.length > 2 && system[0] === header) { - const rest = system.slice(1) - system.length = 0 - system.push(header, rest.join("\n")) - } - - const variant = - !input.small && input.model.variants && input.user.model.variant - ? input.model.variants[input.user.model.variant] - : {} - const base = input.small - ? ProviderTransform.smallOptions(input.model) - : ProviderTransform.options({ - model: input.model, - sessionID: input.sessionID, - providerOptions: provider.options, - }) - const options: Record = pipe( - base, - mergeDeep(input.model.options), - mergeDeep(input.agent.options), - mergeDeep(variant), - ) - if (isOpenaiOauth) { - options.instructions = system.join("\n") - } - - const isWorkflow = language instanceof GitLabWorkflowLanguageModel - const messages = isOpenaiOauth - ? input.messages - : isWorkflow - ? input.messages - : [ - ...system.map( - (x): ModelMessage => ({ - role: "system", - content: x, - }), - ), - ...input.messages, - ] - - const params = await Plugin.trigger( - "chat.params", - { - sessionID: input.sessionID, - agent: input.agent.name, - model: input.model, - provider, - message: input.user, - }, - { - temperature: input.model.capabilities.temperature - ? (input.agent.temperature ?? ProviderTransform.temperature(input.model)) - : undefined, - topP: input.agent.topP ?? ProviderTransform.topP(input.model), - topK: ProviderTransform.topK(input.model), - maxOutputTokens: ProviderTransform.maxOutputTokens(input.model), - options, - }, - ) - - const { headers } = await Plugin.trigger( - "chat.headers", - { - sessionID: input.sessionID, - agent: input.agent.name, - model: input.model, - provider, - message: input.user, - }, - { - headers: {}, - }, - ) - - const tools = resolveTools(input) - - // LiteLLM and some Anthropic proxies require the tools parameter to be present - // when message history contains tool calls, even if no tools are being used. - // Add a dummy tool that is never called to satisfy this validation. - // This is enabled for: - // 1. Providers with "litellm" in their ID or API ID (auto-detected) - // 2. Providers with explicit "litellmProxy: true" option (opt-in for custom gateways) - const isLiteLLMProxy = - provider.options?.["litellmProxy"] === true || - input.model.providerID.toLowerCase().includes("litellm") || - input.model.api.id.toLowerCase().includes("litellm") - - // LiteLLM/Bedrock rejects requests where the message history contains tool - // calls but no tools param is present. When there are no active tools (e.g. - // during compaction), inject a stub tool to satisfy the validation requirement. - // The stub description explicitly tells the model not to call it. - if (isLiteLLMProxy && Object.keys(tools).length === 0 && hasToolCalls(input.messages)) { - tools["_noop"] = tool({ - description: "Do not call this tool. It exists only for API compatibility and must never be invoked.", - inputSchema: jsonSchema({ - type: "object", - properties: { - reason: { type: "string", description: "Unused" }, - }, - }), - execute: async () => ({ output: "", title: "", metadata: {} }), - }) - } - - // Wire up toolExecutor for DWS workflow models so that tool calls - // from the workflow service are executed via opencode's tool system - // and results sent back over the WebSocket. - if (language instanceof GitLabWorkflowLanguageModel) { - const workflowModel = language as GitLabWorkflowLanguageModel & { - sessionID?: string - sessionPreapprovedTools?: string[] - approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }> - } - workflowModel.sessionID = input.sessionID - workflowModel.systemPrompt = system.join("\n") - workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => { - const t = tools[toolName] - if (!t || !t.execute) { - return { result: "", error: `Unknown tool: ${toolName}` } - } - try { - const result = await t.execute!(JSON.parse(argsJson), { - toolCallId: _requestID, - messages: input.messages, - abortSignal: input.abort, - }) - const output = typeof result === "string" ? result : (result?.output ?? JSON.stringify(result)) - return { - result: output, - metadata: typeof result === "object" ? result?.metadata : undefined, - title: typeof result === "object" ? result?.title : undefined, - } - } catch (e: any) { - return { result: "", error: e.message ?? String(e) } - } - } - - const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? []) - workflowModel.sessionPreapprovedTools = Object.keys(tools).filter((name) => { - const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission)) - return !match || match.action !== "ask" - }) - - const approvedToolsForSession = new Set() - workflowModel.approvalHandler = Instance.bind(async (approvalTools) => { - const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[] - // Auto-approve tools that were already approved in this session - // (prevents infinite approval loops for server-side MCP tools) - if (uniqueNames.every((name) => approvedToolsForSession.has(name))) { - return { approved: true } - } - - const id = PermissionID.ascending() - let reply: Permission.Reply | undefined - let unsub: (() => void) | undefined - try { - unsub = Bus.subscribe(Permission.Event.Replied, (evt) => { - if (evt.properties.requestID === id) reply = evt.properties.reply - }) - const toolPatterns = approvalTools.map((t: { name: string; args: string }) => { - try { - const parsed = JSON.parse(t.args) as Record - const title = (parsed?.title ?? parsed?.name ?? "") as string - return title ? `${t.name}: ${title}` : t.name - } catch { - return t.name - } - }) - const uniquePatterns = [...new Set(toolPatterns)] as string[] - await AppRuntime.runPromise( - Permission.Service.use((svc) => - svc.ask({ - id, - sessionID: SessionID.make(input.sessionID), - permission: "workflow_tool_approval", - patterns: uniquePatterns, - metadata: { tools: approvalTools }, - always: uniquePatterns, - ruleset: [], - }), - ), - ) - for (const name of uniqueNames) approvedToolsForSession.add(name) - workflowModel.sessionPreapprovedTools = [...(workflowModel.sessionPreapprovedTools ?? []), ...uniqueNames] - return { approved: true } - } catch { - return { approved: false } - } finally { - unsub?.() - } - }) - } - - return streamText({ - onError(error) { - l.error("stream error", { - error, - }) - }, - async experimental_repairToolCall(failed) { - const lower = failed.toolCall.toolName.toLowerCase() - if (lower !== failed.toolCall.toolName && tools[lower]) { - l.info("repairing tool call", { - tool: failed.toolCall.toolName, - repaired: lower, - }) - return { - ...failed.toolCall, - toolName: lower, - } - } - return { - ...failed.toolCall, - input: JSON.stringify({ - tool: failed.toolCall.toolName, - error: failed.error.message, - }), - toolName: "invalid", - } - }, - temperature: params.temperature, - topP: params.topP, - topK: params.topK, - providerOptions: ProviderTransform.providerOptions(input.model, params.options), - activeTools: Object.keys(tools).filter((x) => x !== "invalid"), - tools, - toolChoice: input.toolChoice, - maxOutputTokens: params.maxOutputTokens, - abortSignal: input.abort, - headers: { - ...(input.model.providerID.startsWith("opencode") - ? { - "x-opencode-project": Instance.project.id, - "x-opencode-session": input.sessionID, - "x-opencode-request": input.user.id, - "x-opencode-client": Flag.OPENCODE_CLIENT, - } - : { - "x-session-affinity": input.sessionID, - ...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}), - "User-Agent": `opencode/${Installation.VERSION}`, - }), - ...input.model.headers, - ...headers, - }, - maxRetries: input.retries ?? 0, - messages, - model: wrapLanguageModel({ - model: language, - middleware: [ - { - specificationVersion: "v3" as const, - async transformParams(args) { - if (args.type === "stream") { - // @ts-expect-error - args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options) - } - return args.params - }, - }, - ], + return Service.of({ stream }) }), - experimental_telemetry: { - isEnabled: cfg.experimental?.openTelemetry, - metadata: { - userId: cfg.username ?? "unknown", - sessionId: input.sessionID, - }, - }, - }) - } + ) + + export const defaultLayer = Layer.suspend(() => + layer.pipe( + Layer.provide(Auth.defaultLayer), + Layer.provide(Config.defaultLayer), + Layer.provide(Provider.defaultLayer), + Layer.provide(Plugin.defaultLayer), + ), + ) function resolveTools(input: Pick) { const disabled = Permission.disabled( diff --git a/packages/opencode/test/session/llm.test.ts b/packages/opencode/test/session/llm.test.ts index 3974ca9810..cbf767b4bd 100644 --- a/packages/opencode/test/session/llm.test.ts +++ b/packages/opencode/test/session/llm.test.ts @@ -26,6 +26,12 @@ async function getModel(providerID: ProviderID, modelID: ModelID) { ) } +const llm = makeRuntime(LLM.Service, LLM.defaultLayer) + +async function drain(input: LLM.StreamInput) { + return llm.runPromise((svc) => svc.stream(input).pipe(Stream.runDrain)) +} + describe("session.llm.hasToolCalls", () => { test("returns false for empty messages array", () => { expect(LLM.hasToolCalls([])).toBe(false) @@ -355,20 +361,16 @@ describe("session.llm.stream", () => { model: { providerID: ProviderID.make(providerID), modelID: resolved.id, variant: "high" }, } satisfies MessageV2.User - const stream = await LLM.stream({ + await drain({ user, sessionID, model: resolved, agent, system: ["You are a helpful assistant."], - abort: new AbortController().signal, messages: [{ role: "user", content: "Hello" }], tools: {}, }) - for await (const _ of stream.fullStream) { - } - const capture = await request const body = capture.body const headers = capture.headers @@ -393,80 +395,6 @@ describe("session.llm.stream", () => { }) }) - test("raw stream abort signal cancels provider response body promptly", async () => { - const server = state.server - if (!server) throw new Error("Server not initialized") - - const providerID = "alibaba" - const modelID = "qwen-plus" - const fixture = await loadFixture(providerID, modelID) - const model = fixture.model - const pending = waitStreamingRequest("/chat/completions") - - await using tmp = await tmpdir({ - init: async (dir) => { - await Bun.write( - path.join(dir, "opencode.json"), - JSON.stringify({ - $schema: "https://opencode.ai/config.json", - enabled_providers: [providerID], - provider: { - [providerID]: { - options: { - apiKey: "test-key", - baseURL: `${server.url.origin}/v1`, - }, - }, - }, - }), - ) - }, - }) - - await Instance.provide({ - directory: tmp.path, - fn: async () => { - const resolved = await getModel(ProviderID.make(providerID), ModelID.make(model.id)) - const sessionID = SessionID.make("session-test-raw-abort") - const agent = { - name: "test", - mode: "primary", - options: {}, - permission: [{ permission: "*", pattern: "*", action: "allow" }], - } satisfies Agent.Info - const user = { - id: MessageID.make("user-raw-abort"), - sessionID, - role: "user", - time: { created: Date.now() }, - agent: agent.name, - model: { providerID: ProviderID.make(providerID), modelID: resolved.id }, - } satisfies MessageV2.User - - const ctrl = new AbortController() - const result = await LLM.stream({ - user, - sessionID, - model: resolved, - agent, - system: ["You are a helpful assistant."], - abort: ctrl.signal, - messages: [{ role: "user", content: "Hello" }], - tools: {}, - }) - - const iter = result.fullStream[Symbol.asyncIterator]() - await pending.request - await iter.next() - ctrl.abort() - - await Promise.race([pending.responseCanceled, timeout(500)]) - await Promise.race([pending.requestAborted, timeout(500)]).catch(() => undefined) - await iter.return?.() - }, - }) - }) - test("service stream cancellation cancels provider response body promptly", async () => { const server = state.server if (!server) throw new Error("Server not initialized") @@ -518,8 +446,7 @@ describe("session.llm.stream", () => { } satisfies MessageV2.User const ctrl = new AbortController() - const { runPromiseExit } = makeRuntime(LLM.Service, LLM.defaultLayer) - const run = runPromiseExit( + const run = llm.runPromiseExit( (svc) => svc .stream({ @@ -610,14 +537,13 @@ describe("session.llm.stream", () => { tools: { question: true }, } satisfies MessageV2.User - const stream = await LLM.stream({ + await drain({ user, sessionID, model: resolved, agent, permission: [{ permission: "question", pattern: "*", action: "allow" }], system: ["You are a helpful assistant."], - abort: new AbortController().signal, messages: [{ role: "user", content: "Hello" }], tools: { question: tool({ @@ -628,9 +554,6 @@ describe("session.llm.stream", () => { }, }) - for await (const _ of stream.fullStream) { - } - const capture = await request const tools = capture.body.tools as Array<{ function?: { name?: string } }> | undefined expect(tools?.some((item) => item.function?.name === "question")).toBe(true) @@ -728,20 +651,16 @@ describe("session.llm.stream", () => { model: { providerID: ProviderID.make("openai"), modelID: resolved.id, variant: "high" }, } satisfies MessageV2.User - const stream = await LLM.stream({ + await drain({ user, sessionID, model: resolved, agent, system: ["You are a helpful assistant."], - abort: new AbortController().signal, messages: [{ role: "user", content: "Hello" }], tools: {}, }) - for await (const _ of stream.fullStream) { - } - const capture = await request const body = capture.body @@ -847,13 +766,12 @@ describe("session.llm.stream", () => { model: { providerID: ProviderID.make("openai"), modelID: resolved.id }, } satisfies MessageV2.User - const stream = await LLM.stream({ + await drain({ user, sessionID, model: resolved, agent, system: ["You are a helpful assistant."], - abort: new AbortController().signal, messages: [ { role: "user", @@ -871,9 +789,6 @@ describe("session.llm.stream", () => { tools: {}, }) - for await (const _ of stream.fullStream) { - } - const capture = await request expect(capture.url.pathname.endsWith("/responses")).toBe(true) }, @@ -972,20 +887,16 @@ describe("session.llm.stream", () => { model: { providerID: ProviderID.make("minimax"), modelID: ModelID.make("MiniMax-M2.5") }, } satisfies MessageV2.User - const stream = await LLM.stream({ + await drain({ user, sessionID, model: resolved, agent, system: ["You are a helpful assistant."], - abort: new AbortController().signal, messages: [{ role: "user", content: "Hello" }], tools: {}, }) - for await (const _ of stream.fullStream) { - } - const capture = await request const body = capture.body @@ -1073,20 +984,16 @@ describe("session.llm.stream", () => { model: { providerID: ProviderID.make(providerID), modelID: resolved.id }, } satisfies MessageV2.User - const stream = await LLM.stream({ + await drain({ user, sessionID, model: resolved, agent, system: ["You are a helpful assistant."], - abort: new AbortController().signal, messages: [{ role: "user", content: "Hello" }], tools: {}, }) - for await (const _ of stream.fullStream) { - } - const capture = await request const body = capture.body const config = body.generationConfig as