mirror of
https://github.com/anomalyco/opencode.git
synced 2026-06-01 06:11:30 +00:00
feat: add headerTimeout cfg option, default it on only for openai w/ default of 10s (#29484)
This commit is contained in:
parent
519d344470
commit
f965db9e13
8 changed files with 257 additions and 8 deletions
|
|
@ -92,11 +92,19 @@ export const Info = Schema.Struct({
|
|||
timeout: Schema.optional(
|
||||
Schema.Union([PositiveInt, Schema.Literal(false)]).annotate({
|
||||
description:
|
||||
"Timeout in milliseconds for requests to this provider. Default is 300000 (5 minutes). Set to false to disable timeout.",
|
||||
"Timeout in milliseconds for full requests to this provider. Set to false to disable timeout.",
|
||||
}),
|
||||
).annotate({
|
||||
description: "Timeout in milliseconds for full requests to this provider. Set to false to disable timeout.",
|
||||
}),
|
||||
headerTimeout: Schema.optional(
|
||||
Schema.Union([PositiveInt, Schema.Literal(false)]).annotate({
|
||||
description:
|
||||
"Timeout in milliseconds to wait for response headers. Provider integrations may set defaults. Set to false to disable timeout.",
|
||||
}),
|
||||
).annotate({
|
||||
description:
|
||||
"Timeout in milliseconds for requests to this provider. Default is 300000 (5 minutes). Set to false to disable timeout.",
|
||||
"Timeout in milliseconds to wait for response headers. Provider integrations may set defaults. Set to false to disable timeout.",
|
||||
}),
|
||||
chunkTimeout: Schema.optional(PositiveInt).annotate({
|
||||
description:
|
||||
|
|
|
|||
|
|
@ -3,6 +3,14 @@ import { STATUS_CODES } from "http"
|
|||
import { iife } from "@/util/iife"
|
||||
import type { ProviderID } from "./schema"
|
||||
|
||||
export class HeaderTimeoutError extends Error {
|
||||
public override readonly name = "ProviderHeaderTimeoutError"
|
||||
|
||||
constructor(public readonly ms: number) {
|
||||
super(`Provider response headers timed out after ${ms}ms`)
|
||||
}
|
||||
}
|
||||
|
||||
// Adapted from overflow detection patterns in:
|
||||
// https://github.com/badlogic/pi-mono/blob/main/packages/ai/src/utils/overflow.ts
|
||||
const OVERFLOW_PATTERNS = [
|
||||
|
|
|
|||
|
|
@ -28,9 +28,10 @@ import * as ProviderTransform from "./transform"
|
|||
import { ModelID, ProviderID } from "./schema"
|
||||
import { ModelStatus } from "./model-status"
|
||||
import { RuntimeFlags } from "@/effect/runtime-flags"
|
||||
import { ProviderError } from "./error"
|
||||
|
||||
const log = Log.create({ service: "provider" })
|
||||
|
||||
const OPENAI_HEADER_TIMEOUT_DEFAULT = 10_000
|
||||
function shouldUseCopilotResponsesApi(modelID: string): boolean {
|
||||
const match = /^gpt-(\d+)/.exec(modelID)
|
||||
if (!match) return false
|
||||
|
|
@ -85,6 +86,15 @@ function wrapSSE(res: Response, ms: number, ctl: AbortController) {
|
|||
})
|
||||
}
|
||||
|
||||
function timeoutController(ms: number) {
|
||||
const ctl = new AbortController()
|
||||
const id = setTimeout(() => ctl.abort(new ProviderError.HeaderTimeoutError(ms)), ms)
|
||||
return {
|
||||
signal: ctl.signal,
|
||||
clear: () => clearTimeout(id),
|
||||
}
|
||||
}
|
||||
|
||||
function googleVertexAnthropicBaseURL(project: string | undefined, location: string | undefined) {
|
||||
if (!project) return
|
||||
if (location !== "eu" && location !== "us") return
|
||||
|
|
@ -194,7 +204,7 @@ function custom(dep: CustomDep): Record<string, CustomLoader> {
|
|||
async getModel(sdk: any, modelID: string, _options?: Record<string, any>) {
|
||||
return sdk.responses(modelID)
|
||||
},
|
||||
options: {},
|
||||
options: { headerTimeout: OPENAI_HEADER_TIMEOUT_DEFAULT },
|
||||
}),
|
||||
xai: () =>
|
||||
Effect.succeed({
|
||||
|
|
@ -1601,16 +1611,21 @@ export const layer = Layer.effect(
|
|||
|
||||
const customFetch = options["fetch"]
|
||||
const chunkTimeout = options["chunkTimeout"]
|
||||
const headerTimeout = options["headerTimeout"]
|
||||
delete options["chunkTimeout"]
|
||||
delete options["headerTimeout"]
|
||||
|
||||
options["fetch"] = async (input: any, init?: BunFetchRequestInit) => {
|
||||
const fetchFn = customFetch ?? fetch
|
||||
const opts = init ?? {}
|
||||
const chunkAbortCtl = typeof chunkTimeout === "number" && chunkTimeout > 0 ? new AbortController() : undefined
|
||||
const headerTimeoutMs = headerTimeout === false ? undefined : headerTimeout
|
||||
const headerTimeoutCtl = typeof headerTimeoutMs === "number" ? timeoutController(headerTimeoutMs) : undefined
|
||||
const signals: AbortSignal[] = []
|
||||
|
||||
if (opts.signal) signals.push(opts.signal)
|
||||
if (chunkAbortCtl) signals.push(chunkAbortCtl.signal)
|
||||
if (headerTimeoutCtl) signals.push(headerTimeoutCtl.signal)
|
||||
if (options["timeout"] !== undefined && options["timeout"] !== null && options["timeout"] !== false)
|
||||
signals.push(AbortSignal.timeout(options["timeout"]))
|
||||
|
||||
|
|
@ -1639,7 +1654,7 @@ export const layer = Layer.effect(
|
|||
...opts,
|
||||
// @ts-ignore see here: https://github.com/oven-sh/bun/issues/16682
|
||||
timeout: false,
|
||||
})
|
||||
}).finally(() => headerTimeoutCtl?.clear())
|
||||
|
||||
if (!chunkAbortCtl) return res
|
||||
return wrapSSE(res, chunkTimeout, chunkAbortCtl)
|
||||
|
|
|
|||
|
|
@ -1143,6 +1143,18 @@ export function fromError(
|
|||
},
|
||||
{ cause: e },
|
||||
).toObject()
|
||||
case e instanceof ProviderError.HeaderTimeoutError:
|
||||
return new APIError(
|
||||
{
|
||||
message: e.message,
|
||||
isRetryable: true,
|
||||
metadata: {
|
||||
code: e.name,
|
||||
timeoutMs: String(e.ms),
|
||||
},
|
||||
},
|
||||
{ cause: e },
|
||||
).toObject()
|
||||
case APICallError.isInstance(e):
|
||||
const parsed = ProviderError.parseAPICallError({
|
||||
providerID: ctx.providerID,
|
||||
|
|
|
|||
192
packages/opencode/test/provider/header-timeout.test.ts
Normal file
192
packages/opencode/test/provider/header-timeout.test.ts
Normal file
|
|
@ -0,0 +1,192 @@
|
|||
import { afterEach, expect } from "bun:test"
|
||||
import { createServer, type Server } from "node:http"
|
||||
import { streamText } from "ai"
|
||||
import { Effect, Layer } from "effect"
|
||||
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
|
||||
import { disposeAllInstances, provideTmpdirInstance, provideTmpdirServer } from "../fixture/fixture"
|
||||
import { testEffect } from "../lib/effect"
|
||||
import { reply, TestLLMServer } from "../lib/llm-server"
|
||||
import { testProviderConfig } from "../lib/test-provider"
|
||||
import { Env } from "@/env"
|
||||
import { Plugin } from "@/plugin"
|
||||
import { Provider } from "@/provider/provider"
|
||||
import { ModelID, ProviderID } from "@/provider/schema"
|
||||
|
||||
afterEach(async () => {
|
||||
await disposeAllInstances()
|
||||
})
|
||||
|
||||
const it = testEffect(
|
||||
Layer.mergeAll(Provider.defaultLayer, Env.defaultLayer, Plugin.defaultLayer, TestLLMServer.layer, CrossSpawnSpawner.defaultLayer),
|
||||
)
|
||||
|
||||
it.live("headerTimeout does not abort delayed SSE body after headers arrive", () =>
|
||||
provideTmpdirServer(
|
||||
({ llm }) =>
|
||||
Effect.gen(function* () {
|
||||
yield* llm.push(reply().wait(Bun.sleep(250)).text("late").stop())
|
||||
|
||||
const provider = yield* Provider.Service
|
||||
const model = yield* provider.getModel(ProviderID.make("test"), ModelID.make("test-model"))
|
||||
const result = streamText({
|
||||
model: yield* provider.getLanguage(model),
|
||||
messages: [{ role: "user", content: "hello" }],
|
||||
})
|
||||
|
||||
expect(yield* Effect.promise(() => result.text)).toBe("late")
|
||||
}),
|
||||
{
|
||||
config: (url) => {
|
||||
const config = testProviderConfig(url)
|
||||
return {
|
||||
...config,
|
||||
provider: {
|
||||
test: {
|
||||
...config.provider.test,
|
||||
options: { ...config.provider.test.options, headerTimeout: 50 },
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
),
|
||||
)
|
||||
|
||||
it.live("headerTimeout aborts when response headers do not arrive", () =>
|
||||
Effect.gen(function* () {
|
||||
const server = yield* Effect.acquireRelease(
|
||||
Effect.promise(() => delayedHeaderServer(250)),
|
||||
(server) => Effect.sync(() => server.server.close()),
|
||||
)
|
||||
|
||||
yield* provideTmpdirInstance(
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const provider = yield* Provider.Service
|
||||
const model = yield* provider.getModel(ProviderID.make("test"), ModelID.make("test-model"))
|
||||
const result = streamText({
|
||||
model: yield* provider.getLanguage(model),
|
||||
onError() {},
|
||||
messages: [{ role: "user", content: "hello" }],
|
||||
})
|
||||
|
||||
const errors = yield* Effect.promise(async () => {
|
||||
const errors: string[] = []
|
||||
for await (const part of result.fullStream) {
|
||||
if (part.type === "error") errors.push(String(part.error))
|
||||
}
|
||||
return errors
|
||||
})
|
||||
expect(errors.join("\n")).toContain("response headers timed out")
|
||||
}),
|
||||
{ config: providerConfig(server.url, { headerTimeout: 50 }) },
|
||||
)
|
||||
}),
|
||||
)
|
||||
|
||||
it.live("headerTimeout is opt-in for non-OpenAI providers", () =>
|
||||
Effect.gen(function* () {
|
||||
const server = yield* Effect.acquireRelease(
|
||||
Effect.promise(() => delayedHeaderServer(100)),
|
||||
(server) => Effect.sync(() => server.server.close()),
|
||||
)
|
||||
|
||||
yield* provideTmpdirInstance(
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const provider = yield* Provider.Service
|
||||
const model = yield* provider.getModel(ProviderID.make("test"), ModelID.make("test-model"))
|
||||
const result = streamText({
|
||||
model: yield* provider.getLanguage(model),
|
||||
messages: [{ role: "user", content: "hello" }],
|
||||
})
|
||||
|
||||
expect(yield* Effect.promise(() => result.text)).toBe("ok")
|
||||
}),
|
||||
{ config: providerConfig(server.url) },
|
||||
)
|
||||
}),
|
||||
)
|
||||
|
||||
it.live("OpenAI Codex headerTimeout default can be disabled by config", () =>
|
||||
Effect.gen(function* () {
|
||||
yield* withAuthContent(
|
||||
Effect.gen(function* () {
|
||||
yield* provideTmpdirInstance(
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const provider = yield* Provider.Service
|
||||
const openai = yield* provider.getProvider(ProviderID.openai)
|
||||
expect(openai.options.headerTimeout).toBe(false)
|
||||
}),
|
||||
{ config: { provider: { openai: { options: { headerTimeout: false } } } } },
|
||||
)
|
||||
}),
|
||||
)
|
||||
}),
|
||||
)
|
||||
|
||||
it.live("OpenAI API auth gets default headerTimeout", () =>
|
||||
Effect.gen(function* () {
|
||||
yield* withAuthContent(
|
||||
Effect.gen(function* () {
|
||||
yield* provideTmpdirInstance(() =>
|
||||
Effect.gen(function* () {
|
||||
const provider = yield* Provider.Service
|
||||
const openai = yield* provider.getProvider(ProviderID.openai)
|
||||
expect(openai.options.headerTimeout).toBe(10_000)
|
||||
}),
|
||||
)
|
||||
}),
|
||||
{ openai: { type: "api", key: "sk-test" } },
|
||||
)
|
||||
}),
|
||||
)
|
||||
|
||||
function providerConfig(url: string, options: Record<string, unknown> = {}) {
|
||||
const config = testProviderConfig(url)
|
||||
return {
|
||||
...config,
|
||||
provider: {
|
||||
test: {
|
||||
...config.provider.test,
|
||||
options: { ...config.provider.test.options, ...options },
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async function delayedHeaderServer(delay: number): Promise<{ server: Server; url: string }> {
|
||||
const server = createServer((_, res) => {
|
||||
setTimeout(() => {
|
||||
res.writeHead(200, { "content-type": "text/event-stream" })
|
||||
res.end('data: {"choices":[{"delta":{"content":"ok"}}]}\n\ndata: [DONE]\n\n')
|
||||
}, delay)
|
||||
})
|
||||
await new Promise<void>((resolve) => server.listen(0, "127.0.0.1", resolve))
|
||||
const address = server.address()
|
||||
if (!address || typeof address === "string") throw new Error("server did not bind to a TCP port")
|
||||
return { server, url: `http://127.0.0.1:${address.port}` }
|
||||
}
|
||||
|
||||
function withAuthContent<A, E, R>(self: Effect.Effect<A, E, R>, value: Record<string, unknown> = defaultAuthContent()) {
|
||||
return Effect.acquireUseRelease(
|
||||
Effect.sync(() => {
|
||||
const previous = process.env.OPENCODE_AUTH_CONTENT
|
||||
process.env.OPENCODE_AUTH_CONTENT = JSON.stringify(value)
|
||||
return previous
|
||||
}),
|
||||
() => self,
|
||||
(previous) =>
|
||||
Effect.sync(() => {
|
||||
if (previous === undefined) delete process.env.OPENCODE_AUTH_CONTENT
|
||||
else process.env.OPENCODE_AUTH_CONTENT = previous
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
function defaultAuthContent() {
|
||||
return {
|
||||
openai: { type: "oauth", refresh: "refresh", access: "access", expires: Date.now() + 60_000 },
|
||||
}
|
||||
}
|
||||
|
|
@ -282,9 +282,10 @@ it.instance(
|
|||
expect(providers[ProviderID.anthropic]).toBeDefined()
|
||||
// Config options should be merged
|
||||
expect(providers[ProviderID.anthropic].options.timeout).toBe(60000)
|
||||
expect(providers[ProviderID.anthropic].options.headerTimeout).toBe(10000)
|
||||
expect(providers[ProviderID.anthropic].options.chunkTimeout).toBe(15000)
|
||||
}),
|
||||
{ config: { provider: { anthropic: { options: { timeout: 60000, chunkTimeout: 15000 } } } } },
|
||||
{ config: { provider: { anthropic: { options: { timeout: 60000, headerTimeout: 10000, chunkTimeout: 15000 } } } } },
|
||||
)
|
||||
|
||||
it.instance("getModel returns model for valid provider/model", () =>
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
|
|||
import { SessionRetry } from "../../src/session/retry"
|
||||
import { MessageV2 } from "../../src/session/message-v2"
|
||||
import { ProviderID } from "../../src/provider/schema"
|
||||
import { ProviderError } from "../../src/provider/error"
|
||||
import { SessionID } from "../../src/session/schema"
|
||||
import { SessionStatus } from "../../src/session/status"
|
||||
import { provideTmpdirInstance } from "../fixture/fixture"
|
||||
|
|
@ -163,6 +164,14 @@ describe("session.retry.retryable", () => {
|
|||
expect(SessionRetry.retryable(error, retryProvider)).toEqual({ message: msg })
|
||||
})
|
||||
|
||||
test("retries transport timeout errors", () => {
|
||||
const request = MessageV2.fromError(new ProviderError.HeaderTimeoutError(10000), { providerID })
|
||||
expect(MessageV2.APIError.isInstance(request)).toBe(true)
|
||||
expect(SessionRetry.retryable(request, retryProvider)).toEqual({
|
||||
message: "Provider response headers timed out after 10000ms",
|
||||
})
|
||||
})
|
||||
|
||||
test("does not retry context overflow errors", () => {
|
||||
const error = new MessageV2.ContextOverflowError({
|
||||
message: "Input exceeds context window of this model",
|
||||
|
|
|
|||
|
|
@ -1049,11 +1049,15 @@ export type ProviderConfig = {
|
|||
enterpriseUrl?: string
|
||||
setCacheKey?: boolean
|
||||
/**
|
||||
* Timeout in milliseconds for requests to this provider. Default is 300000 (5 minutes). Set to false to disable timeout.
|
||||
* Timeout in milliseconds for full requests to this provider. Set to false to disable timeout.
|
||||
*/
|
||||
timeout?: number | false
|
||||
/**
|
||||
* Timeout in milliseconds to wait for response headers. Provider integrations may set defaults. Set to false to disable timeout.
|
||||
*/
|
||||
headerTimeout?: number | false
|
||||
chunkTimeout?: number
|
||||
[key: string]: unknown | string | boolean | number | false | number | undefined
|
||||
[key: string]: unknown | string | boolean | number | false | number | false | number | undefined
|
||||
}
|
||||
models?: {
|
||||
[key: string]: {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue