From 9495ecd536740dcf310440859fafb52dc3bf9382 Mon Sep 17 00:00:00 2001 From: Brendan Allan <14191578+Brendonovich@users.noreply.github.com> Date: Mon, 25 May 2026 10:00:00 +0800 Subject: [PATCH] refactor(app): extract refcount utility and clean up server sdk context (#29155) --- packages/app/src/context/directory-sync.ts | 8 +- packages/app/src/context/sdk.tsx | 2 +- packages/app/src/context/server-sdk.tsx | 457 ++++++++++----------- packages/app/src/context/server-sync.tsx | 49 +-- packages/app/src/utils/refcount.ts | 26 ++ 5 files changed, 264 insertions(+), 278 deletions(-) create mode 100644 packages/app/src/utils/refcount.ts diff --git a/packages/app/src/context/directory-sync.ts b/packages/app/src/context/directory-sync.ts index baf3183d85..a54701f0db 100644 --- a/packages/app/src/context/directory-sync.ts +++ b/packages/app/src/context/directory-sync.ts @@ -8,10 +8,11 @@ import { getSessionPrefetchPromise, setSessionPrefetch, } from "./global-sync/session-prefetch" -import { useServerSync } from "./server-sync" +import { createServerSyncContext, useServerSync } from "./server-sync" import type { Message, OpencodeClient, Part } from "@opencode-ai/sdk/v2/client" import { SESSION_CACHE_LIMIT, dropSessionCaches, pickSessionCacheEvictions } from "./global-sync/session-cache" import { diffs as list, message as clean } from "@/utils/diffs" +import { useServerSDK } from "./server-sdk" const SKIP_PARTS = new Set(["patch", "step-start", "step-finish"]) @@ -164,8 +165,9 @@ function setOptimisticRemove(setStore: (...args: unknown[]) => void, input: Opti }) } -export const createDirSyncContext = (client: OpencodeClient, directory: string) => { - const serverSync = useServerSync() +export const createDirSyncContext = (directory: string, serverSync: ReturnType) => { + const serverSDK = useServerSDK() + const client = serverSDK.createClient({ directory, throwOnError: true }) type Child = ReturnType<(typeof serverSync)["child"]> type Setter = Child[1] diff --git a/packages/app/src/context/sdk.tsx b/packages/app/src/context/sdk.tsx index d2c2630531..46e8c1b213 100644 --- a/packages/app/src/context/sdk.tsx +++ b/packages/app/src/context/sdk.tsx @@ -6,6 +6,6 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({ init: (props: { directory: string }) => { const serverSDK = useServerSDK() - return serverSDK.createDirSyncContext(props.directory) + return serverSDK.createDirSdkContext(props.directory) }, }) diff --git a/packages/app/src/context/server-sdk.tsx b/packages/app/src/context/server-sdk.tsx index 41181e9736..47446b2d86 100644 --- a/packages/app/src/context/server-sdk.tsx +++ b/packages/app/src/context/server-sdk.tsx @@ -6,272 +6,255 @@ import { batch, onCleanup, onMount } from "solid-js" import { createSdkForServer } from "@/utils/server" import { useLanguage } from "./language" import { usePlatform } from "./platform" -import { useServer } from "./server" +import { ServerConnection, useServer } from "./server" +import { createRefCountMap } from "@/utils/refcount" const isAbortError = (error: unknown) => error !== null && typeof error === "object" && "name" in error && error.name === "AbortError" -export const { use: useServerSDK, provider: ServerSDKProvider } = createSimpleContext({ - name: "GlobalSDK", - init: () => { - const language = useLanguage() - const server = useServer() - const platform = usePlatform() - const abort = new AbortController() +function createServerSdkContext(server: ServerConnection.Any) { + const platform = usePlatform() + const abort = new AbortController() - const eventFetch = (() => { - if (!platform.fetch || !server.current) return - try { - const url = new URL(server.current.http.url) - const loopback = url.hostname === "localhost" || url.hostname === "127.0.0.1" || url.hostname === "::1" - if (url.protocol === "http:" && !loopback) return platform.fetch - } catch { - return - } - })() - - const currentServer = server.current - if (!currentServer) throw new Error(language.t("error.serverSDK.noServerAvailable")) - - const eventSdk = createSdkForServer({ - signal: abort.signal, - fetch: eventFetch, - server: currentServer.http, - }) - const emitter = createGlobalEmitter<{ - [key: string]: Event - }>() - - type Queued = { directory: string; payload: Event } - const FLUSH_FRAME_MS = 16 - const STREAM_YIELD_MS = 8 - const RECONNECT_DELAY_MS = 250 - - let queue: Queued[] = [] - let buffer: Queued[] = [] - const coalesced = new Map() - const staleDeltas = new Set() - let timer: ReturnType | undefined - let last = 0 - - const deltaKey = (directory: string, messageID: string, partID: string) => `${directory}:${messageID}:${partID}` - - const key = (directory: string, payload: Event) => { - if (payload.type === "session.status") return `session.status:${directory}:${payload.properties.sessionID}` - if (payload.type === "lsp.updated") return `lsp.updated:${directory}` - if (payload.type === "message.part.updated") { - const part = payload.properties.part - return `message.part.updated:${directory}:${part.messageID}:${part.id}` - } + const eventFetch = (() => { + if (!platform.fetch || !server) return + try { + const url = new URL(server.http.url) + const loopback = url.hostname === "localhost" || url.hostname === "127.0.0.1" || url.hostname === "::1" + if (url.protocol === "http:" && !loopback) return platform.fetch + } catch { + return } + })() - const flush = () => { - if (timer) clearTimeout(timer) - timer = undefined + const eventSdk = createSdkForServer({ + signal: abort.signal, + fetch: eventFetch, + server: server.http, + }) + const emitter = createGlobalEmitter<{ + [key: string]: Event + }>() - if (queue.length === 0) return + type Queued = { directory: string; payload: Event } + const FLUSH_FRAME_MS = 16 + const STREAM_YIELD_MS = 8 + const RECONNECT_DELAY_MS = 250 - const events = queue - const skip = staleDeltas.size > 0 ? new Set(staleDeltas) : undefined - queue = buffer - buffer = events - queue.length = 0 - coalesced.clear() - staleDeltas.clear() + let queue: Queued[] = [] + let buffer: Queued[] = [] + const coalesced = new Map() + const staleDeltas = new Set() + let timer: ReturnType | undefined + let last = 0 - last = Date.now() - batch(() => { - for (const event of events) { - if (skip && event.payload.type === "message.part.delta") { - const props = event.payload.properties - if (skip.has(deltaKey(event.directory, props.messageID, props.partID))) continue - } - emitter.emit(event.directory, event.payload) + const deltaKey = (directory: string, messageID: string, partID: string) => `${directory}:${messageID}:${partID}` + + const key = (directory: string, payload: Event) => { + if (payload.type === "session.status") return `session.status:${directory}:${payload.properties.sessionID}` + if (payload.type === "lsp.updated") return `lsp.updated:${directory}` + if (payload.type === "message.part.updated") { + const part = payload.properties.part + return `message.part.updated:${directory}:${part.messageID}:${part.id}` + } + } + + const flush = () => { + if (timer) clearTimeout(timer) + timer = undefined + + if (queue.length === 0) return + + const events = queue + const skip = staleDeltas.size > 0 ? new Set(staleDeltas) : undefined + queue = buffer + buffer = events + queue.length = 0 + coalesced.clear() + staleDeltas.clear() + + last = Date.now() + batch(() => { + for (const event of events) { + if (skip && event.payload.type === "message.part.delta") { + const props = event.payload.properties + if (skip.has(deltaKey(event.directory, props.messageID, props.partID))) continue } - }) + emitter.emit(event.directory, event.payload) + } + }) - buffer.length = 0 - } + buffer.length = 0 + } - const schedule = () => { - if (timer) return - const elapsed = Date.now() - last - timer = setTimeout(flush, Math.max(0, FLUSH_FRAME_MS - elapsed)) - } + const schedule = () => { + if (timer) return + const elapsed = Date.now() - last + timer = setTimeout(flush, Math.max(0, FLUSH_FRAME_MS - elapsed)) + } - let streamErrorLogged = false - const wait = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) - const aborted = isAbortError + let streamErrorLogged = false + const wait = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) + const aborted = isAbortError - let attempt: AbortController | undefined - let run: Promise | undefined - let started = false - const HEARTBEAT_TIMEOUT_MS = 15_000 - let lastEventAt = Date.now() - let heartbeat: ReturnType | undefined - const resetHeartbeat = () => { - lastEventAt = Date.now() - if (heartbeat) clearTimeout(heartbeat) - heartbeat = setTimeout(() => { - attempt?.abort() - }, HEARTBEAT_TIMEOUT_MS) - } - const clearHeartbeat = () => { - if (!heartbeat) return - clearTimeout(heartbeat) - heartbeat = undefined - } + let attempt: AbortController | undefined + let run: Promise | undefined + let started = false + const HEARTBEAT_TIMEOUT_MS = 15_000 + let lastEventAt = Date.now() + let heartbeat: ReturnType | undefined + const resetHeartbeat = () => { + lastEventAt = Date.now() + if (heartbeat) clearTimeout(heartbeat) + heartbeat = setTimeout(() => { + attempt?.abort() + }, HEARTBEAT_TIMEOUT_MS) + } + const clearHeartbeat = () => { + if (!heartbeat) return + clearTimeout(heartbeat) + heartbeat = undefined + } - const start = () => { - if (started) return run - started = true - run = (async () => { - // oxlint-disable-next-line no-unmodified-loop-condition -- `started` is set to false by stop() which also aborts; both flags are checked to allow graceful exit - while (!abort.signal.aborted && started) { - attempt = new AbortController() - lastEventAt = Date.now() - const onAbort = () => { - attempt?.abort() - } - abort.signal.addEventListener("abort", onAbort) - try { - const events = await eventSdk.global.event({ - signal: attempt.signal, - onSseError: (error) => { - if (aborted(error)) return - if (streamErrorLogged) return - streamErrorLogged = true - console.error("[global-sdk] event stream error", { - url: currentServer.http.url, - fetch: eventFetch ? "platform" : "webview", - error, - }) - }, - }) - let yielded = Date.now() - resetHeartbeat() - for await (const event of events.stream) { - resetHeartbeat() - streamErrorLogged = false - const directory = event.directory ?? "global" - if (event.payload.type === "sync") { - continue - } - - const payload = event.payload as Event - - const k = key(directory, payload) - if (k) { - const i = coalesced.get(k) - if (i !== undefined) { - queue[i] = { directory, payload } - if (payload.type === "message.part.updated") { - const part = payload.properties.part - staleDeltas.add(deltaKey(directory, part.messageID, part.id)) - } - continue - } - coalesced.set(k, queue.length) - } - queue.push({ directory, payload }) - schedule() - - if (Date.now() - yielded < STREAM_YIELD_MS) continue - yielded = Date.now() - await wait(0) - } - } catch (error) { - if (!aborted(error) && !streamErrorLogged) { + const start = () => { + if (started) return run + started = true + run = (async () => { + // oxlint-disable-next-line no-unmodified-loop-condition -- `started` is set to false by stop() which also aborts; both flags are checked to allow graceful exit + while (!abort.signal.aborted && started) { + attempt = new AbortController() + lastEventAt = Date.now() + const onAbort = () => { + attempt?.abort() + } + abort.signal.addEventListener("abort", onAbort) + try { + const events = await eventSdk.global.event({ + signal: attempt.signal, + onSseError: (error) => { + if (aborted(error)) return + if (streamErrorLogged) return streamErrorLogged = true - console.error("[global-sdk] event stream failed", { - url: currentServer.http.url, + console.error("[global-sdk] event stream error", { + url: server.http.url, fetch: eventFetch ? "platform" : "webview", error, }) + }, + }) + let yielded = Date.now() + resetHeartbeat() + for await (const event of events.stream) { + resetHeartbeat() + streamErrorLogged = false + const directory = event.directory ?? "global" + if (event.payload.type === "sync") { + continue } - } finally { - abort.signal.removeEventListener("abort", onAbort) - attempt = undefined - clearHeartbeat() + + const payload = event.payload as Event + + const k = key(directory, payload) + if (k) { + const i = coalesced.get(k) + if (i !== undefined) { + queue[i] = { directory, payload } + if (payload.type === "message.part.updated") { + const part = payload.properties.part + staleDeltas.add(deltaKey(directory, part.messageID, part.id)) + } + continue + } + coalesced.set(k, queue.length) + } + queue.push({ directory, payload }) + schedule() + + if (Date.now() - yielded < STREAM_YIELD_MS) continue + yielded = Date.now() + await wait(0) } - - if (abort.signal.aborted || !started) return - await wait(RECONNECT_DELAY_MS) + } catch (error) { + if (!aborted(error) && !streamErrorLogged) { + streamErrorLogged = true + console.error("[global-sdk] event stream failed", { + url: server.http.url, + fetch: eventFetch ? "platform" : "webview", + error, + }) + } + } finally { + abort.signal.removeEventListener("abort", onAbort) + attempt = undefined + clearHeartbeat() } - })().finally(() => { - run = undefined - flush() - }) - return run - } - const stop = () => { - started = false - attempt?.abort() - clearHeartbeat() - } - - onMount(() => { - makeEventListener(document, "visibilitychange", () => { - if (document.visibilityState !== "visible") return - if (!started) return - if (Date.now() - lastEventAt < HEARTBEAT_TIMEOUT_MS) return - attempt?.abort() - }) - }) - - onCleanup(() => { - stop() - abort.abort() + if (abort.signal.aborted || !started) return + await wait(RECONNECT_DELAY_MS) + } + })().finally(() => { + run = undefined flush() }) + return run + } - const sdk = createSdkForServer({ - server: server.current.http, - fetch: platform.fetch, - throwOnError: true, + const stop = () => { + started = false + attempt?.abort() + clearHeartbeat() + } + + onMount(() => { + makeEventListener(document, "visibilitychange", () => { + if (document.visibilityState !== "visible") return + if (!started) return + if (Date.now() - lastEventAt < HEARTBEAT_TIMEOUT_MS) return + attempt?.abort() }) + }) - const dirSyncContexts = new Map>() - const dirSdkContextRefCounts = new Map() + onCleanup(() => { + stop() + abort.abort() + flush() + }) + const sdk = createSdkForServer({ + server: server.http, + fetch: platform.fetch, + throwOnError: true, + }) + + return { + url: server.http.url, + client: sdk, + event: { + on: emitter.on.bind(emitter), + listen: emitter.listen.bind(emitter), + start, + }, + createClient(opts: Omit[0], "server" | "fetch">) { + return createSdkForServer({ + server: server.http, + fetch: platform.fetch, + ...opts, + }) + }, + } +} + +export const { use: useServerSDK, provider: ServerSDKProvider } = createSimpleContext({ + name: "ServerSDK", + init: () => { + const language = useLanguage() + const server = useServer() + + if (!server.current) throw new Error(language.t("error.serverSDK.noServerAvailable")) + const sdk = createServerSdkContext(server.current) return { - url: currentServer.http.url, - client: sdk, - event: { - on: emitter.on.bind(emitter), - listen: emitter.listen.bind(emitter), - start, - }, - createClient(opts: Omit[0], "server" | "fetch">) { - const s = server.current - if (!s) throw new Error(language.t("error.serverSDK.serverNotAvailable")) - return createSdkForServer({ - server: s.http, - fetch: platform.fetch, - ...opts, - }) - }, - createDirSyncContext: (directory: string) => { - onCleanup(() => { - dirSdkContextRefCounts.set(directory, (dirSdkContextRefCounts.get(directory) ?? 0) - 1) - if (dirSdkContextRefCounts.get(directory) === 0) { - dirSyncContexts.delete(directory) - dirSdkContextRefCounts.delete(directory) - } - }) - - const cached = dirSyncContexts.get(directory) - if (cached) { - dirSdkContextRefCounts.set(directory, (dirSdkContextRefCounts.get(directory) ?? 0) + 1) - return cached - } - const ctx = createDirSdkContext(directory) - dirSyncContexts.set(directory, ctx) - dirSdkContextRefCounts.set(directory, 1) - - return ctx - }, + ...sdk, + createDirSdkContext: createRefCountMap((dir) => createDirSdkContext(dir, sdk)), } }, }) @@ -280,9 +263,7 @@ type SDKEventMap = { [key in Event["type"]]: Extract } -function createDirSdkContext(directory: string) { - const serverSDK = useServerSDK() - +function createDirSdkContext(directory: string, serverSDK: ReturnType) { const client = serverSDK.createClient({ directory, throwOnError: true, diff --git a/packages/app/src/context/server-sync.tsx b/packages/app/src/context/server-sync.tsx index 7016d5888c..8b177e96c1 100644 --- a/packages/app/src/context/server-sync.tsx +++ b/packages/app/src/context/server-sync.tsx @@ -29,7 +29,8 @@ import { createRefreshQueue } from "./global-sync/queue" import { directoryKey } from "./global-sync/utils" import { PathKey } from "@/utils/path-key" import { createDirSyncContext } from "./directory-sync" -import { NormalizedProviderListResponse } from "@opencode-ai/ui/context" +import { createSimpleContext, NormalizedProviderListResponse } from "@opencode-ai/ui/context" +import { createRefCountMap } from "@/utils/refcount" type GlobalStore = { ready: boolean @@ -72,7 +73,7 @@ function makeQueryOptionsApi(serverSDK: () => OpencodeClient, sdkFor: (dir: Path } export type QueryOptionsApi = ReturnType -function createServerSyncContext() { +export function createServerSyncContext() { const serverSDK = useServerSDK() const language = useLanguage() const owner = getOwner() @@ -425,9 +426,6 @@ function createServerSyncContext() { }, })) - const dirSyncContexts = new Map>() - const dirSyncContextRefCounts = new Map() - return { data: globalStore, set, @@ -446,41 +444,20 @@ function createServerSyncContext() { todo: { set: setSessionTodo, }, - createDirSyncContext: (directory: string) => { - onCleanup(() => { - dirSyncContextRefCounts.set(directory, (dirSyncContextRefCounts.get(directory) ?? 0) - 1) - if (dirSyncContextRefCounts.get(directory) === 0) { - dirSyncContexts.delete(directory) - dirSyncContextRefCounts.delete(directory) - } - }) - - const cached = dirSyncContexts.get(directory) - if (cached) { - dirSyncContextRefCounts.set(directory, (dirSyncContextRefCounts.get(directory) ?? 0) + 1) - return cached - } - const ctx = createDirSyncContext(serverSDK.createClient({ directory, throwOnError: true }), directory) - dirSyncContexts.set(directory, ctx) - dirSyncContextRefCounts.set(directory, 1) - - return ctx - }, } } -const ServerSyncContext = createContext>() +export const { use: useServerSync, provider: ServerSyncProvider } = createSimpleContext({ + name: "ServerSync", + init: () => { + const sync = createServerSyncContext() -export function ServerSyncProvider(props: ParentProps) { - const value = createServerSyncContext() - return {props.children} -} - -export function useServerSync() { - const context = useContext(ServerSyncContext) - if (!context) throw new Error("useServerSync must be used within ServerSyncProvider") - return context -} + return { + ...sync, + createDirSyncContext: createRefCountMap((dir) => createDirSyncContext(dir, sync)), + } + }, +}) export function useQueryOptions() { return useServerSync().queryOptions diff --git a/packages/app/src/utils/refcount.ts b/packages/app/src/utils/refcount.ts new file mode 100644 index 0000000000..6877d82f2e --- /dev/null +++ b/packages/app/src/utils/refcount.ts @@ -0,0 +1,26 @@ +import { onCleanup } from "solid-js" + +export function createRefCountMap(create: (key: string) => T) { + const items = new Map() + const refCounts = new Map() + + return (key: string) => { + onCleanup(() => { + refCounts.set(key, (refCounts.get(key) ?? 0) - 1) + if (refCounts.get(key) === 0) { + items.delete(key) + refCounts.delete(key) + } + }) + + const cached = items.get(key) + if (cached) { + refCounts.set(key, (refCounts.get(key) ?? 0) + 1) + return cached + } + const item = create(key) + items.set(key, item) + refCounts.set(key, 1) + return item + } +}