fix(openai): keep websocket response timeouts active (#29699)

This commit is contained in:
Luke Parker 2026-05-28 18:28:01 +10:00 committed by GitHub
parent 14e0b9b17f
commit 913659890d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 29 additions and 19 deletions

View file

@ -159,9 +159,6 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption
if (!options.idleTimeout) return
if (idleTimer) clearTimeout(idleTimer)
idleTimer = setTimeout(() => invalidate(new ProviderError.ResponseStreamError(message)), options.idleTimeout)
if (typeof idleTimer === "object" && "unref" in idleTimer && typeof idleTimer.unref === "function") {
idleTimer.unref()
}
}
async function onMessage(data: WebSocket.RawData, isBinary: boolean) {

View file

@ -51,7 +51,7 @@ describe("plugin.openai.ws", () => {
onConnectionInvalid: (error) => invalid.push(error.message),
})
await expect(response.text()).rejects.toThrow("idle timeout sending websocket request")
expect((await readTextError(response.text())).message).toContain("idle timeout sending websocket request")
expect(invalid).toEqual(["idle timeout sending websocket request"])
})
@ -100,7 +100,7 @@ describe("plugin.openai.ws", () => {
onConnectionInvalid: (error) => invalid.push(error),
})
await expect(response.text()).rejects.toThrow(
expect((await readTextError(response.text())).message).toContain(
"WebSocket closed before response.completed (code 1009: message too big: payload too large)",
)
expect(invalid[0]).toBeInstanceOf(ProviderError.ResponseStreamError)
@ -124,7 +124,7 @@ describe("plugin.openai.ws", () => {
onConnectionInvalid: (error) => invalid.push(error.message),
})
await expect(response.text()).rejects.toThrow("Unexpected binary WebSocket frame")
expect((await readTextError(response.text())).message).toContain("Unexpected binary WebSocket frame")
expect(invalid).toEqual(["Unexpected binary WebSocket frame"])
})
})
@ -186,7 +186,7 @@ describe("plugin.openai.ws-pool", () => {
})
const first = await fetch(server.url, streamRequest({ [TITLE_HEADER]: "false" }))
await expect(first.text()).rejects.toBeInstanceOf(ProviderError.ResponseStreamError)
expect(await readTextError(first.text())).toBeInstanceOf(ProviderError.ResponseStreamError)
const second = await fetch(server.url, streamRequest({ [TITLE_HEADER]: "false" }))
const third = await fetch(server.url, streamRequest({ [TITLE_HEADER]: "false" }))
@ -271,7 +271,7 @@ describe("plugin.openai.ws-pool", () => {
})
const first = await fetch(server.url, streamRequest())
await expect(first.text()).rejects.toThrow("Responses websocket connection limit reached")
expect((await readTextError(first.text())).message).toContain("Responses websocket connection limit reached")
const second = await fetch(server.url, streamRequest())
const text = await second.text()
@ -308,9 +308,9 @@ describe("plugin.openai.ws-pool", () => {
})
const first = await fetch(server.url, streamRequest())
await expect(first.text()).rejects.toThrow("Responses websocket connection limit reached")
expect((await readTextError(first.text())).message).toContain("Responses websocket connection limit reached")
const second = await fetch(server.url, streamRequest())
await expect(second.text()).rejects.toThrow("Responses websocket connection limit reached")
expect((await readTextError(second.text())).message).toContain("Responses websocket connection limit reached")
const third = await fetch(server.url, streamRequest())
const fourth = await fetch(server.url, streamRequest())
@ -348,7 +348,7 @@ describe("plugin.openai.ws-pool", () => {
})
const first = await fetch(server.url, streamRequest())
await expect(first.text()).rejects.toThrow("WebSocket closed before response.completed")
expect((await readTextError(first.text())).message).toContain("WebSocket closed before response.completed")
const second = await fetch(server.url, streamRequest())
expect(await second.text()).toBe("http")
@ -370,7 +370,7 @@ describe("plugin.openai.ws-pool", () => {
})
const first = await fetch(server.url, streamRequest())
await expect(first.text()).rejects.toThrow("idle timeout waiting for websocket")
expect((await readTextError(first.text())).message).toContain("idle timeout waiting for websocket")
const second = await fetch(server.url, streamRequest())
const third = await fetch(server.url, streamRequest())
@ -394,9 +394,9 @@ describe("plugin.openai.ws-pool", () => {
})
const first = await fetch(server.url, streamRequest())
await expect(first.text()).rejects.toThrow("idle timeout waiting for websocket")
expect((await readTextError(first.text())).message).toContain("idle timeout waiting for websocket")
const second = await fetch(server.url, streamRequest())
await expect(second.text()).rejects.toThrow("idle timeout waiting for websocket")
expect((await readTextError(second.text())).message).toContain("idle timeout waiting for websocket")
const third = await fetch(server.url, streamRequest())
expect(await third.text()).toBe("http")
@ -425,11 +425,11 @@ describe("plugin.openai.ws-pool", () => {
})
const first = await fetch(server.url, streamRequest())
await expect(first.text()).rejects.toThrow("WebSocket closed before response.completed")
expect((await readTextError(first.text())).message).toContain("WebSocket closed before response.completed")
const second = await fetch(server.url, streamRequest())
expect(await second.text()).toContain("data: [DONE]")
const third = await fetch(server.url, streamRequest())
await expect(third.text()).rejects.toThrow("WebSocket closed before response.completed")
expect((await readTextError(third.text())).message).toContain("WebSocket closed before response.completed")
const fourth = await fetch(server.url, streamRequest())
expect(await fourth.text()).toContain("data: [DONE]")
@ -480,7 +480,7 @@ describe("plugin.openai.ws-pool", () => {
expect(server.httpRequests).toHaveLength(1)
expect(connections).toBe(1)
abort.abort(new Error("stop"))
await expect(firstText).rejects.toThrow("stop")
expect((await readTextError(firstText)).message).toContain("stop")
fetch.close()
})
@ -518,7 +518,7 @@ describe("plugin.openai.ws-pool", () => {
})
const first = await fetch(server.url, streamRequest())
await expect(first.text()).rejects.toThrow("WebSocket closed before response.completed")
expect((await readTextError(first.text())).message).toContain("WebSocket closed before response.completed")
const second = await fetch(server.url, streamRequest())
const third = await fetch(server.url, streamRequest())
@ -550,7 +550,7 @@ describe("plugin.openai.ws-pool", () => {
const firstText = first.text()
await waitFor(() => connections === 1, "first websocket did not connect")
abort.abort(new Error("stop"))
await expect(firstText).rejects.toThrow("stop")
expect((await readTextError(firstText)).message).toContain("stop")
const second = await fetch(server.url, streamRequest())
@ -602,6 +602,19 @@ function streamRequest(headers?: Record<string, string>, signal?: AbortSignal):
}
}
async function readTextError(promise: Promise<string>) {
// Bun 1.3.14 hangs on expect(response.text()).rejects for streams errored from ws callbacks.
return promise.then(
() => {
throw new Error("Expected response text to reject")
},
(error) => {
expect(error).toBeInstanceOf(Error)
return error as Error
},
)
}
async function createWebSocketServer(onConnection: (socket: WebSocket, request: IncomingMessage) => void) {
const http = await createHttpServer()
const server = new WebSocketServer({ server: http.server })