refactor(app): extract refcount utility and clean up server sdk context (#29155)

This commit is contained in:
Brendan Allan 2026-05-25 10:00:00 +08:00 committed by GitHub
parent 03bb53c389
commit 9495ecd536
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 264 additions and 278 deletions

View file

@ -8,10 +8,11 @@ import {
getSessionPrefetchPromise,
setSessionPrefetch,
} from "./global-sync/session-prefetch"
import { useServerSync } from "./server-sync"
import { createServerSyncContext, useServerSync } from "./server-sync"
import type { Message, OpencodeClient, Part } from "@opencode-ai/sdk/v2/client"
import { SESSION_CACHE_LIMIT, dropSessionCaches, pickSessionCacheEvictions } from "./global-sync/session-cache"
import { diffs as list, message as clean } from "@/utils/diffs"
import { useServerSDK } from "./server-sdk"
const SKIP_PARTS = new Set(["patch", "step-start", "step-finish"])
@ -164,8 +165,9 @@ function setOptimisticRemove(setStore: (...args: unknown[]) => void, input: Opti
})
}
export const createDirSyncContext = (client: OpencodeClient, directory: string) => {
const serverSync = useServerSync()
export const createDirSyncContext = (directory: string, serverSync: ReturnType<typeof createServerSyncContext>) => {
const serverSDK = useServerSDK()
const client = serverSDK.createClient({ directory, throwOnError: true })
type Child = ReturnType<(typeof serverSync)["child"]>
type Setter = Child[1]

View file

@ -6,6 +6,6 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
init: (props: { directory: string }) => {
const serverSDK = useServerSDK()
return serverSDK.createDirSyncContext(props.directory)
return serverSDK.createDirSdkContext(props.directory)
},
})

View file

@ -6,272 +6,255 @@ import { batch, onCleanup, onMount } from "solid-js"
import { createSdkForServer } from "@/utils/server"
import { useLanguage } from "./language"
import { usePlatform } from "./platform"
import { useServer } from "./server"
import { ServerConnection, useServer } from "./server"
import { createRefCountMap } from "@/utils/refcount"
const isAbortError = (error: unknown) =>
error !== null && typeof error === "object" && "name" in error && error.name === "AbortError"
export const { use: useServerSDK, provider: ServerSDKProvider } = createSimpleContext({
name: "GlobalSDK",
init: () => {
const language = useLanguage()
const server = useServer()
const platform = usePlatform()
const abort = new AbortController()
function createServerSdkContext(server: ServerConnection.Any) {
const platform = usePlatform()
const abort = new AbortController()
const eventFetch = (() => {
if (!platform.fetch || !server.current) return
try {
const url = new URL(server.current.http.url)
const loopback = url.hostname === "localhost" || url.hostname === "127.0.0.1" || url.hostname === "::1"
if (url.protocol === "http:" && !loopback) return platform.fetch
} catch {
return
}
})()
const currentServer = server.current
if (!currentServer) throw new Error(language.t("error.serverSDK.noServerAvailable"))
const eventSdk = createSdkForServer({
signal: abort.signal,
fetch: eventFetch,
server: currentServer.http,
})
const emitter = createGlobalEmitter<{
[key: string]: Event
}>()
type Queued = { directory: string; payload: Event }
const FLUSH_FRAME_MS = 16
const STREAM_YIELD_MS = 8
const RECONNECT_DELAY_MS = 250
let queue: Queued[] = []
let buffer: Queued[] = []
const coalesced = new Map<string, number>()
const staleDeltas = new Set<string>()
let timer: ReturnType<typeof setTimeout> | undefined
let last = 0
const deltaKey = (directory: string, messageID: string, partID: string) => `${directory}:${messageID}:${partID}`
const key = (directory: string, payload: Event) => {
if (payload.type === "session.status") return `session.status:${directory}:${payload.properties.sessionID}`
if (payload.type === "lsp.updated") return `lsp.updated:${directory}`
if (payload.type === "message.part.updated") {
const part = payload.properties.part
return `message.part.updated:${directory}:${part.messageID}:${part.id}`
}
const eventFetch = (() => {
if (!platform.fetch || !server) return
try {
const url = new URL(server.http.url)
const loopback = url.hostname === "localhost" || url.hostname === "127.0.0.1" || url.hostname === "::1"
if (url.protocol === "http:" && !loopback) return platform.fetch
} catch {
return
}
})()
const flush = () => {
if (timer) clearTimeout(timer)
timer = undefined
const eventSdk = createSdkForServer({
signal: abort.signal,
fetch: eventFetch,
server: server.http,
})
const emitter = createGlobalEmitter<{
[key: string]: Event
}>()
if (queue.length === 0) return
type Queued = { directory: string; payload: Event }
const FLUSH_FRAME_MS = 16
const STREAM_YIELD_MS = 8
const RECONNECT_DELAY_MS = 250
const events = queue
const skip = staleDeltas.size > 0 ? new Set(staleDeltas) : undefined
queue = buffer
buffer = events
queue.length = 0
coalesced.clear()
staleDeltas.clear()
let queue: Queued[] = []
let buffer: Queued[] = []
const coalesced = new Map<string, number>()
const staleDeltas = new Set<string>()
let timer: ReturnType<typeof setTimeout> | undefined
let last = 0
last = Date.now()
batch(() => {
for (const event of events) {
if (skip && event.payload.type === "message.part.delta") {
const props = event.payload.properties
if (skip.has(deltaKey(event.directory, props.messageID, props.partID))) continue
}
emitter.emit(event.directory, event.payload)
const deltaKey = (directory: string, messageID: string, partID: string) => `${directory}:${messageID}:${partID}`
const key = (directory: string, payload: Event) => {
if (payload.type === "session.status") return `session.status:${directory}:${payload.properties.sessionID}`
if (payload.type === "lsp.updated") return `lsp.updated:${directory}`
if (payload.type === "message.part.updated") {
const part = payload.properties.part
return `message.part.updated:${directory}:${part.messageID}:${part.id}`
}
}
const flush = () => {
if (timer) clearTimeout(timer)
timer = undefined
if (queue.length === 0) return
const events = queue
const skip = staleDeltas.size > 0 ? new Set(staleDeltas) : undefined
queue = buffer
buffer = events
queue.length = 0
coalesced.clear()
staleDeltas.clear()
last = Date.now()
batch(() => {
for (const event of events) {
if (skip && event.payload.type === "message.part.delta") {
const props = event.payload.properties
if (skip.has(deltaKey(event.directory, props.messageID, props.partID))) continue
}
})
emitter.emit(event.directory, event.payload)
}
})
buffer.length = 0
}
buffer.length = 0
}
const schedule = () => {
if (timer) return
const elapsed = Date.now() - last
timer = setTimeout(flush, Math.max(0, FLUSH_FRAME_MS - elapsed))
}
const schedule = () => {
if (timer) return
const elapsed = Date.now() - last
timer = setTimeout(flush, Math.max(0, FLUSH_FRAME_MS - elapsed))
}
let streamErrorLogged = false
const wait = (ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms))
const aborted = isAbortError
let streamErrorLogged = false
const wait = (ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms))
const aborted = isAbortError
let attempt: AbortController | undefined
let run: Promise<void> | undefined
let started = false
const HEARTBEAT_TIMEOUT_MS = 15_000
let lastEventAt = Date.now()
let heartbeat: ReturnType<typeof setTimeout> | undefined
const resetHeartbeat = () => {
lastEventAt = Date.now()
if (heartbeat) clearTimeout(heartbeat)
heartbeat = setTimeout(() => {
attempt?.abort()
}, HEARTBEAT_TIMEOUT_MS)
}
const clearHeartbeat = () => {
if (!heartbeat) return
clearTimeout(heartbeat)
heartbeat = undefined
}
let attempt: AbortController | undefined
let run: Promise<void> | undefined
let started = false
const HEARTBEAT_TIMEOUT_MS = 15_000
let lastEventAt = Date.now()
let heartbeat: ReturnType<typeof setTimeout> | undefined
const resetHeartbeat = () => {
lastEventAt = Date.now()
if (heartbeat) clearTimeout(heartbeat)
heartbeat = setTimeout(() => {
attempt?.abort()
}, HEARTBEAT_TIMEOUT_MS)
}
const clearHeartbeat = () => {
if (!heartbeat) return
clearTimeout(heartbeat)
heartbeat = undefined
}
const start = () => {
if (started) return run
started = true
run = (async () => {
// oxlint-disable-next-line no-unmodified-loop-condition -- `started` is set to false by stop() which also aborts; both flags are checked to allow graceful exit
while (!abort.signal.aborted && started) {
attempt = new AbortController()
lastEventAt = Date.now()
const onAbort = () => {
attempt?.abort()
}
abort.signal.addEventListener("abort", onAbort)
try {
const events = await eventSdk.global.event({
signal: attempt.signal,
onSseError: (error) => {
if (aborted(error)) return
if (streamErrorLogged) return
streamErrorLogged = true
console.error("[global-sdk] event stream error", {
url: currentServer.http.url,
fetch: eventFetch ? "platform" : "webview",
error,
})
},
})
let yielded = Date.now()
resetHeartbeat()
for await (const event of events.stream) {
resetHeartbeat()
streamErrorLogged = false
const directory = event.directory ?? "global"
if (event.payload.type === "sync") {
continue
}
const payload = event.payload as Event
const k = key(directory, payload)
if (k) {
const i = coalesced.get(k)
if (i !== undefined) {
queue[i] = { directory, payload }
if (payload.type === "message.part.updated") {
const part = payload.properties.part
staleDeltas.add(deltaKey(directory, part.messageID, part.id))
}
continue
}
coalesced.set(k, queue.length)
}
queue.push({ directory, payload })
schedule()
if (Date.now() - yielded < STREAM_YIELD_MS) continue
yielded = Date.now()
await wait(0)
}
} catch (error) {
if (!aborted(error) && !streamErrorLogged) {
const start = () => {
if (started) return run
started = true
run = (async () => {
// oxlint-disable-next-line no-unmodified-loop-condition -- `started` is set to false by stop() which also aborts; both flags are checked to allow graceful exit
while (!abort.signal.aborted && started) {
attempt = new AbortController()
lastEventAt = Date.now()
const onAbort = () => {
attempt?.abort()
}
abort.signal.addEventListener("abort", onAbort)
try {
const events = await eventSdk.global.event({
signal: attempt.signal,
onSseError: (error) => {
if (aborted(error)) return
if (streamErrorLogged) return
streamErrorLogged = true
console.error("[global-sdk] event stream failed", {
url: currentServer.http.url,
console.error("[global-sdk] event stream error", {
url: server.http.url,
fetch: eventFetch ? "platform" : "webview",
error,
})
},
})
let yielded = Date.now()
resetHeartbeat()
for await (const event of events.stream) {
resetHeartbeat()
streamErrorLogged = false
const directory = event.directory ?? "global"
if (event.payload.type === "sync") {
continue
}
} finally {
abort.signal.removeEventListener("abort", onAbort)
attempt = undefined
clearHeartbeat()
const payload = event.payload as Event
const k = key(directory, payload)
if (k) {
const i = coalesced.get(k)
if (i !== undefined) {
queue[i] = { directory, payload }
if (payload.type === "message.part.updated") {
const part = payload.properties.part
staleDeltas.add(deltaKey(directory, part.messageID, part.id))
}
continue
}
coalesced.set(k, queue.length)
}
queue.push({ directory, payload })
schedule()
if (Date.now() - yielded < STREAM_YIELD_MS) continue
yielded = Date.now()
await wait(0)
}
if (abort.signal.aborted || !started) return
await wait(RECONNECT_DELAY_MS)
} catch (error) {
if (!aborted(error) && !streamErrorLogged) {
streamErrorLogged = true
console.error("[global-sdk] event stream failed", {
url: server.http.url,
fetch: eventFetch ? "platform" : "webview",
error,
})
}
} finally {
abort.signal.removeEventListener("abort", onAbort)
attempt = undefined
clearHeartbeat()
}
})().finally(() => {
run = undefined
flush()
})
return run
}
const stop = () => {
started = false
attempt?.abort()
clearHeartbeat()
}
onMount(() => {
makeEventListener(document, "visibilitychange", () => {
if (document.visibilityState !== "visible") return
if (!started) return
if (Date.now() - lastEventAt < HEARTBEAT_TIMEOUT_MS) return
attempt?.abort()
})
})
onCleanup(() => {
stop()
abort.abort()
if (abort.signal.aborted || !started) return
await wait(RECONNECT_DELAY_MS)
}
})().finally(() => {
run = undefined
flush()
})
return run
}
const sdk = createSdkForServer({
server: server.current.http,
fetch: platform.fetch,
throwOnError: true,
const stop = () => {
started = false
attempt?.abort()
clearHeartbeat()
}
onMount(() => {
makeEventListener(document, "visibilitychange", () => {
if (document.visibilityState !== "visible") return
if (!started) return
if (Date.now() - lastEventAt < HEARTBEAT_TIMEOUT_MS) return
attempt?.abort()
})
})
const dirSyncContexts = new Map<string, ReturnType<typeof createDirSdkContext>>()
const dirSdkContextRefCounts = new Map<string, number>()
onCleanup(() => {
stop()
abort.abort()
flush()
})
const sdk = createSdkForServer({
server: server.http,
fetch: platform.fetch,
throwOnError: true,
})
return {
url: server.http.url,
client: sdk,
event: {
on: emitter.on.bind(emitter),
listen: emitter.listen.bind(emitter),
start,
},
createClient(opts: Omit<Parameters<typeof createSdkForServer>[0], "server" | "fetch">) {
return createSdkForServer({
server: server.http,
fetch: platform.fetch,
...opts,
})
},
}
}
export const { use: useServerSDK, provider: ServerSDKProvider } = createSimpleContext({
name: "ServerSDK",
init: () => {
const language = useLanguage()
const server = useServer()
if (!server.current) throw new Error(language.t("error.serverSDK.noServerAvailable"))
const sdk = createServerSdkContext(server.current)
return {
url: currentServer.http.url,
client: sdk,
event: {
on: emitter.on.bind(emitter),
listen: emitter.listen.bind(emitter),
start,
},
createClient(opts: Omit<Parameters<typeof createSdkForServer>[0], "server" | "fetch">) {
const s = server.current
if (!s) throw new Error(language.t("error.serverSDK.serverNotAvailable"))
return createSdkForServer({
server: s.http,
fetch: platform.fetch,
...opts,
})
},
createDirSyncContext: (directory: string) => {
onCleanup(() => {
dirSdkContextRefCounts.set(directory, (dirSdkContextRefCounts.get(directory) ?? 0) - 1)
if (dirSdkContextRefCounts.get(directory) === 0) {
dirSyncContexts.delete(directory)
dirSdkContextRefCounts.delete(directory)
}
})
const cached = dirSyncContexts.get(directory)
if (cached) {
dirSdkContextRefCounts.set(directory, (dirSdkContextRefCounts.get(directory) ?? 0) + 1)
return cached
}
const ctx = createDirSdkContext(directory)
dirSyncContexts.set(directory, ctx)
dirSdkContextRefCounts.set(directory, 1)
return ctx
},
...sdk,
createDirSdkContext: createRefCountMap((dir) => createDirSdkContext(dir, sdk)),
}
},
})
@ -280,9 +263,7 @@ type SDKEventMap = {
[key in Event["type"]]: Extract<Event, { type: key }>
}
function createDirSdkContext(directory: string) {
const serverSDK = useServerSDK()
function createDirSdkContext(directory: string, serverSDK: ReturnType<typeof createServerSdkContext>) {
const client = serverSDK.createClient({
directory,
throwOnError: true,

View file

@ -29,7 +29,8 @@ import { createRefreshQueue } from "./global-sync/queue"
import { directoryKey } from "./global-sync/utils"
import { PathKey } from "@/utils/path-key"
import { createDirSyncContext } from "./directory-sync"
import { NormalizedProviderListResponse } from "@opencode-ai/ui/context"
import { createSimpleContext, NormalizedProviderListResponse } from "@opencode-ai/ui/context"
import { createRefCountMap } from "@/utils/refcount"
type GlobalStore = {
ready: boolean
@ -72,7 +73,7 @@ function makeQueryOptionsApi(serverSDK: () => OpencodeClient, sdkFor: (dir: Path
}
export type QueryOptionsApi = ReturnType<typeof makeQueryOptionsApi>
function createServerSyncContext() {
export function createServerSyncContext() {
const serverSDK = useServerSDK()
const language = useLanguage()
const owner = getOwner()
@ -425,9 +426,6 @@ function createServerSyncContext() {
},
}))
const dirSyncContexts = new Map<string, ReturnType<typeof createDirSyncContext>>()
const dirSyncContextRefCounts = new Map<string, number>()
return {
data: globalStore,
set,
@ -446,41 +444,20 @@ function createServerSyncContext() {
todo: {
set: setSessionTodo,
},
createDirSyncContext: (directory: string) => {
onCleanup(() => {
dirSyncContextRefCounts.set(directory, (dirSyncContextRefCounts.get(directory) ?? 0) - 1)
if (dirSyncContextRefCounts.get(directory) === 0) {
dirSyncContexts.delete(directory)
dirSyncContextRefCounts.delete(directory)
}
})
const cached = dirSyncContexts.get(directory)
if (cached) {
dirSyncContextRefCounts.set(directory, (dirSyncContextRefCounts.get(directory) ?? 0) + 1)
return cached
}
const ctx = createDirSyncContext(serverSDK.createClient({ directory, throwOnError: true }), directory)
dirSyncContexts.set(directory, ctx)
dirSyncContextRefCounts.set(directory, 1)
return ctx
},
}
}
const ServerSyncContext = createContext<ReturnType<typeof createServerSyncContext>>()
export const { use: useServerSync, provider: ServerSyncProvider } = createSimpleContext({
name: "ServerSync",
init: () => {
const sync = createServerSyncContext()
export function ServerSyncProvider(props: ParentProps) {
const value = createServerSyncContext()
return <ServerSyncContext.Provider value={value}>{props.children}</ServerSyncContext.Provider>
}
export function useServerSync() {
const context = useContext(ServerSyncContext)
if (!context) throw new Error("useServerSync must be used within ServerSyncProvider")
return context
}
return {
...sync,
createDirSyncContext: createRefCountMap((dir) => createDirSyncContext(dir, sync)),
}
},
})
export function useQueryOptions() {
return useServerSync().queryOptions

View file

@ -0,0 +1,26 @@
import { onCleanup } from "solid-js"
export function createRefCountMap<T>(create: (key: string) => T) {
const items = new Map<string, T>()
const refCounts = new Map<string, number>()
return (key: string) => {
onCleanup(() => {
refCounts.set(key, (refCounts.get(key) ?? 0) - 1)
if (refCounts.get(key) === 0) {
items.delete(key)
refCounts.delete(key)
}
})
const cached = items.get(key)
if (cached) {
refCounts.set(key, (refCounts.get(key) ?? 0) + 1)
return cached
}
const item = create(key)
items.set(key, item)
refCounts.set(key, 1)
return item
}
}