From 76b90e3fb1ad708a75a29877f9b3a51dd0c48abc Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Thu, 2 Apr 2026 13:08:23 -0400 Subject: [PATCH] debug: add dev memory telemetry scaffolding --- packages/opencode/src/cli/cmd/tui/thread.ts | 3 + packages/opencode/src/cli/cmd/tui/worker.ts | 4 + packages/opencode/src/debug/memory.ts | 122 ++++++++++++++++++ packages/opencode/src/server/routes/event.ts | 3 +- packages/opencode/src/server/routes/global.ts | 9 +- packages/opencode/src/util/queue.ts | 72 ++++++++++- 6 files changed, 206 insertions(+), 7 deletions(-) create mode 100644 packages/opencode/src/debug/memory.ts diff --git a/packages/opencode/src/cli/cmd/tui/thread.ts b/packages/opencode/src/cli/cmd/tui/thread.ts index 3bb56937a6..58095babed 100644 --- a/packages/opencode/src/cli/cmd/tui/thread.ts +++ b/packages/opencode/src/cli/cmd/tui/thread.ts @@ -16,6 +16,7 @@ import { win32DisableProcessedInput, win32InstallCtrlCGuard } from "./win32" import { TuiConfig } from "@/config/tui" import { Instance } from "@/project/instance" import { writeHeapSnapshot } from "v8" +import { Memory } from "@/debug/memory" declare global { const OPENCODE_WORKER_PATH: string @@ -129,6 +130,7 @@ export const TuiThreadCommand = cmd({ return } const cwd = Filesystem.resolve(process.cwd()) + const stopMem = Memory.start("tui") const worker = new Worker(file, { env: Object.fromEntries( @@ -161,6 +163,7 @@ export const TuiThreadCommand = cmd({ process.off("uncaughtException", error) process.off("unhandledRejection", error) process.off("SIGUSR2", reload) + stopMem() await withTimeout(client.call("shutdown", undefined), 5000).catch((error) => { Log.Default.warn("worker shutdown failed", { error: errorMessage(error), diff --git a/packages/opencode/src/cli/cmd/tui/worker.ts b/packages/opencode/src/cli/cmd/tui/worker.ts index a83645d892..bc98b7fbc8 100644 --- a/packages/opencode/src/cli/cmd/tui/worker.ts +++ b/packages/opencode/src/cli/cmd/tui/worker.ts @@ -13,6 +13,7 @@ import { Flag } from "@/flag/flag" import { setTimeout as sleep } from "node:timers/promises" import { writeHeapSnapshot } from "node:v8" import { WorkspaceID } from "@/control-plane/schema" +import { Memory } from "@/debug/memory" await Log.init({ print: process.argv.includes("--print-logs"), @@ -35,6 +36,8 @@ process.on("uncaughtException", (e) => { }) }) +const stopMem = Memory.start("server") + // Subscribe to global events and forward them via RPC GlobalBus.on("event", (event) => { Rpc.emit("global.event", event) @@ -156,6 +159,7 @@ export const rpc = { }, async shutdown() { Log.Default.info("worker shutting down") + stopMem() if (eventStream.abort) eventStream.abort.abort() await Instance.disposeAll() if (server) await server.stop(true) diff --git a/packages/opencode/src/debug/memory.ts b/packages/opencode/src/debug/memory.ts new file mode 100644 index 0000000000..1f149802f9 --- /dev/null +++ b/packages/opencode/src/debug/memory.ts @@ -0,0 +1,122 @@ +import { Global } from "@/global" +import { Installation } from "@/installation" +import { stats } from "@/util/queue" +import { Log } from "@/util/log" +import { Filesystem } from "@/util/filesystem" +import { appendFile, mkdir } from "fs/promises" +import { writeHeapSnapshot } from "node:v8" +import path from "path" + +const log = Log.create({ service: "memory" }) + +const root = process.env.OPENCODE_DEBUG_DIR ?? path.join(Global.Path.state, "debug") +const file = path.join(root, "memory.jsonl") +const snap = path.join(root, "snapshots") + +export namespace Memory { + export function start(name: string) { + if (!Installation.isLocal()) return () => {} + + let busy = false + let last = 0 + const every = num("OPENCODE_DEBUG_MEMORY_INTERVAL_MS") ?? 10_000 + const limit = (num("OPENCODE_DEBUG_MEMORY_RSS_MB") ?? 1_500) * 1024 * 1024 + const cool = num("OPENCODE_DEBUG_MEMORY_COOLDOWN_MS") ?? 5 * 60 * 1000 + + const tick = async (kind: "start" | "sample") => { + if (busy) return + busy = true + + try { + const now = Date.now() + const mem = process.memoryUsage() + const q = stats() + .filter((item) => item.size > 0 || item.max > 0) + .sort((a, b) => b.size - a.size || b.max - a.max) + .slice(0, 10) + const row = { + kind: "sample", + time: new Date(now).toISOString(), + name, + pid: process.pid, + rss: mem.rss, + heap: mem.heapUsed, + ext: mem.external, + buf: mem.arrayBuffers, + queues: q, + } + + await line(row) + + if (kind === "start" || mem.rss < limit || now - last < cool) return + + last = now + const tag = stamp(now) + const heap = path.join(snap, `${tag}-${name}.heapsnapshot`) + await mkdir(snap, { recursive: true }) + writeHeapSnapshot(heap) + + const meta = { + kind: "snapshot", + time: row.time, + name, + pid: process.pid, + trigger: { + type: "rss", + limit, + value: mem.rss, + }, + memory: mem, + queues: q, + } + + await Filesystem.writeJson(path.join(snap, `${tag}-${name}.json`), meta) + await line({ ...meta, heap }) + log.warn("memory snapshot written", { + name, + pid: process.pid, + rss_mb: mb(mem.rss), + limit_mb: mb(limit), + heap, + }) + } catch (err) { + log.warn("memory monitor failed", { + name, + error: err instanceof Error ? err.message : String(err), + }) + } finally { + busy = false + } + } + + const timer = setInterval(() => { + void tick("sample") + }, every) + timer.unref?.() + void tick("start") + + return () => { + clearInterval(timer) + } + } +} + +async function line(input: unknown) { + await mkdir(root, { recursive: true }) + await appendFile(file, JSON.stringify(input) + "\n") +} + +function num(key: string) { + const value = process.env[key] + if (!value) return undefined + const parsed = Number(value) + return Number.isFinite(parsed) && parsed > 0 ? parsed : undefined +} + +function mb(value: number) { + return Math.round((value / 1024 / 1024) * 10) / 10 +} + +function stamp(now: number) { + return new Date(now).toISOString().replaceAll(":", "-").replaceAll(".", "-") +} diff --git a/packages/opencode/src/server/routes/event.ts b/packages/opencode/src/server/routes/event.ts index 989b857710..b4d76efd3d 100644 --- a/packages/opencode/src/server/routes/event.ts +++ b/packages/opencode/src/server/routes/event.ts @@ -32,7 +32,7 @@ export const EventRoutes = () => c.header("X-Accel-Buffering", "no") c.header("X-Content-Type-Options", "nosniff") return streamSSE(c, async (stream) => { - const q = new AsyncQueue() + const q = new AsyncQueue({ name: "sse:event" }) let done = false q.push( @@ -58,6 +58,7 @@ export const EventRoutes = () => clearInterval(heartbeat) unsub() q.push(null) + q.done() log.info("event disconnected") } diff --git a/packages/opencode/src/server/routes/global.ts b/packages/opencode/src/server/routes/global.ts index 16b9e559f2..8e19fffff3 100644 --- a/packages/opencode/src/server/routes/global.ts +++ b/packages/opencode/src/server/routes/global.ts @@ -17,9 +17,9 @@ const log = Log.create({ service: "server" }) export const GlobalDisposedEvent = BusEvent.define("global.disposed", z.object({})) -async function streamEvents(c: Context, subscribe: (q: AsyncQueue) => () => void) { +async function streamEvents(c: Context, name: string, subscribe: (q: AsyncQueue) => () => void) { return streamSSE(c, async (stream) => { - const q = new AsyncQueue() + const q = new AsyncQueue({ name }) let done = false q.push( @@ -49,6 +49,7 @@ async function streamEvents(c: Context, subscribe: (q: AsyncQueue clearInterval(heartbeat) unsub() q.push(null) + q.done() log.info("global event disconnected") } @@ -122,7 +123,7 @@ export const GlobalRoutes = lazy(() => c.header("X-Accel-Buffering", "no") c.header("X-Content-Type-Options", "nosniff") - return streamEvents(c, (q) => { + return streamEvents(c, "sse:global", (q) => { async function handler(event: any) { q.push(JSON.stringify(event)) } @@ -161,7 +162,7 @@ export const GlobalRoutes = lazy(() => c.header("Cache-Control", "no-cache, no-transform") c.header("X-Accel-Buffering", "no") c.header("X-Content-Type-Options", "nosniff") - return streamEvents(c, (q) => { + return streamEvents(c, "sse:sync", (q) => { return SyncEvent.subscribeAll(({ def, event }) => { // TODO: don't pass def, just pass the type (and it should // be versioned) diff --git a/packages/opencode/src/util/queue.ts b/packages/opencode/src/util/queue.ts index a1af53fe8f..73bf6a5969 100644 --- a/packages/opencode/src/util/queue.ts +++ b/packages/opencode/src/util/queue.ts @@ -1,21 +1,89 @@ +type Stat = { + id: number + name: string + size: number + max: number + push: number + pull: number + wait: number +} + +const all = new Map() +let next = 0 + +export function stats() { + return [...all.values()].map((item) => ({ ...item })) +} + export class AsyncQueue implements AsyncIterable { private queue: T[] = [] private resolvers: ((value: T) => void)[] = [] + private id: number | undefined + + constructor(input?: { name?: string }) { + if (!input?.name) return + this.id = ++next + all.set(this.id, { + id: this.id, + name: input.name, + size: 0, + max: 0, + push: 0, + pull: 0, + wait: 0, + }) + } push(item: T) { + const itemStat = this.item() + if (itemStat) itemStat.push++ const resolve = this.resolvers.shift() if (resolve) resolve(item) else this.queue.push(item) + this.sync() } async next(): Promise { - if (this.queue.length > 0) return this.queue.shift()! - return new Promise((resolve) => this.resolvers.push(resolve)) + if (this.queue.length > 0) { + const value = this.queue.shift()! + const itemStat = this.item() + if (itemStat) itemStat.pull++ + this.sync() + return value + } + + return new Promise((resolve) => { + this.resolvers.push((value) => { + const itemStat = this.item() + if (itemStat) itemStat.pull++ + this.sync() + resolve(value) + }) + this.sync() + }) + } + + done() { + if (this.id === undefined) return + all.delete(this.id) } async *[Symbol.asyncIterator]() { while (true) yield await this.next() } + + private item() { + if (this.id === undefined) return + return all.get(this.id) + } + + private sync() { + const itemStat = this.item() + if (!itemStat) return + itemStat.size = this.queue.length + itemStat.max = Math.max(itemStat.max, itemStat.size) + itemStat.wait = this.resolvers.length + } } export async function work(concurrency: number, items: T[], fn: (item: T) => Promise) {