diff --git a/packages/coding-agent/CHANGELOG.md b/packages/coding-agent/CHANGELOG.md index 52b4ae91d..8027be1da 100644 --- a/packages/coding-agent/CHANGELOG.md +++ b/packages/coding-agent/CHANGELOG.md @@ -5,6 +5,7 @@ ### Fixed - Fixed `ctx.ui.setWorkingMessage()` to persist across loader recreation, matching the behavior of `ctx.ui.setWorkingIndicator()` ([#3566](https://github.com/badlogic/pi-mono/issues/3566)) +- Fixed coding-agent `fs.watch` error handling for theme and git-footer watchers to retry after transient watcher failures such as `EMFILE`, avoiding startup crashes in large repos ([#3564](https://github.com/badlogic/pi-mono/issues/3564)) ## [0.69.0] - 2026-04-22 diff --git a/packages/coding-agent/src/core/footer-data-provider.ts b/packages/coding-agent/src/core/footer-data-provider.ts index 120caa0b0..5d7292214 100644 --- a/packages/coding-agent/src/core/footer-data-provider.ts +++ b/packages/coding-agent/src/core/footer-data-provider.ts @@ -1,6 +1,7 @@ import { type ExecFileException, execFile, spawnSync } from "child_process"; -import { existsSync, type FSWatcher, readFileSync, statSync, unwatchFile, watch, watchFile } from "fs"; +import { existsSync, type FSWatcher, readFileSync, statSync, unwatchFile, watchFile } from "fs"; import { dirname, join, resolve } from "path"; +import { closeWatcher, FS_WATCH_RETRY_DELAY_MS, watchWithErrorHandler } from "../utils/fs-watch.js"; type GitPaths = { repoDir: string; @@ -97,6 +98,7 @@ export class FooterDataProvider { private branchChangeCallbacks = new Set<() => void>(); private availableProviderCount = 0; private refreshTimer: ReturnType | null = null; + private gitWatcherRetryTimer: ReturnType | null = null; private refreshInFlight = false; private refreshPending = false; private disposed = false; @@ -160,22 +162,7 @@ export class FooterDataProvider { clearTimeout(this.refreshTimer); this.refreshTimer = null; } - if (this.headWatcher) { - this.headWatcher.close(); - this.headWatcher = null; - } - if (this.reftableWatcher) { - this.reftableWatcher.close(); - this.reftableWatcher = null; - } - if (this.reftableTablesListWatcher) { - this.reftableTablesListWatcher.close(); - this.reftableTablesListWatcher = null; - } - if (this.reftableTablesListPath) { - unwatchFile(this.reftableTablesListPath); - this.reftableTablesListPath = null; - } + this.clearGitWatchers(); this.cachedBranch = undefined; this.gitPaths = findGitPaths(cwd); this.setupGitWatcher(); @@ -189,22 +176,7 @@ export class FooterDataProvider { clearTimeout(this.refreshTimer); this.refreshTimer = null; } - if (this.headWatcher) { - this.headWatcher.close(); - this.headWatcher = null; - } - if (this.reftableWatcher) { - this.reftableWatcher.close(); - this.reftableWatcher = null; - } - if (this.reftableTablesListWatcher) { - this.reftableTablesListWatcher.close(); - this.reftableTablesListWatcher = null; - } - if (this.reftableTablesListPath) { - unwatchFile(this.reftableTablesListPath); - this.reftableTablesListPath = null; - } + this.clearGitWatchers(); this.branchChangeCallbacks.clear(); } @@ -280,43 +252,86 @@ export class FooterDataProvider { } } + private clearGitWatchers(): void { + closeWatcher(this.headWatcher); + this.headWatcher = null; + closeWatcher(this.reftableWatcher); + this.reftableWatcher = null; + closeWatcher(this.reftableTablesListWatcher); + this.reftableTablesListWatcher = null; + if (this.reftableTablesListPath) { + unwatchFile(this.reftableTablesListPath); + this.reftableTablesListPath = null; + } + if (this.gitWatcherRetryTimer) { + clearTimeout(this.gitWatcherRetryTimer); + this.gitWatcherRetryTimer = null; + } + } + + private scheduleGitWatcherRetry(): void { + if (this.disposed || this.gitWatcherRetryTimer) { + return; + } + + this.gitWatcherRetryTimer = setTimeout(() => { + this.gitWatcherRetryTimer = null; + this.setupGitWatcher(); + }, FS_WATCH_RETRY_DELAY_MS); + } + + private handleGitWatcherError(): void { + this.clearGitWatchers(); + this.scheduleGitWatcherRetry(); + } + private setupGitWatcher(): void { + this.clearGitWatchers(); if (!this.gitPaths) return; // Watch the directory containing HEAD, not HEAD itself. // Git uses atomic writes (write temp, rename over HEAD), which changes the inode. // fs.watch on a file stops working after the inode changes. - try { - this.headWatcher = watch(dirname(this.gitPaths.headPath), (_eventType, filename) => { - if (!filename || filename.toString() === "HEAD") { + this.headWatcher = watchWithErrorHandler( + dirname(this.gitPaths.headPath), + (_eventType, filename) => { + if (!filename || filename === "HEAD") { this.scheduleRefresh(); } - }); - } catch { - // Silently fail if we can't watch + }, + () => this.handleGitWatcherError(), + ); + if (!this.headWatcher) { + return; } // In reftable repos, branch switches update files in the reftable directory // instead of HEAD. Watch it separately so the footer picks up those changes. const reftableDir = join(this.gitPaths.commonGitDir, "reftable"); if (existsSync(reftableDir)) { - try { - this.reftableWatcher = watch(reftableDir, () => { + this.reftableWatcher = watchWithErrorHandler( + reftableDir, + () => { this.scheduleRefresh(); - }); - } catch { - // Silently fail if we can't watch + }, + () => this.handleGitWatcherError(), + ); + if (!this.reftableWatcher) { + return; } const tablesListPath = join(reftableDir, "tables.list"); if (existsSync(tablesListPath)) { this.reftableTablesListPath = tablesListPath; - try { - this.reftableTablesListWatcher = watch(tablesListPath, () => { + this.reftableTablesListWatcher = watchWithErrorHandler( + tablesListPath, + () => { this.scheduleRefresh(); - }); - } catch { - // Silently fail if we can't watch + }, + () => this.handleGitWatcherError(), + ); + if (!this.reftableTablesListWatcher) { + return; } watchFile(tablesListPath, { interval: 250 }, (current, previous) => { if ( diff --git a/packages/coding-agent/src/modes/interactive/theme/theme.ts b/packages/coding-agent/src/modes/interactive/theme/theme.ts index 5159315b0..451d39b96 100644 --- a/packages/coding-agent/src/modes/interactive/theme/theme.ts +++ b/packages/coding-agent/src/modes/interactive/theme/theme.ts @@ -7,6 +7,7 @@ import { type Static, Type } from "typebox"; import { Compile } from "typebox/compile"; import { getCustomThemesDir, getThemesDir } from "../../../config.js"; import type { SourceInfo } from "../../../core/source-info.js"; +import { closeWatcher, watchWithErrorHandler } from "../../../utils/fs-watch.js"; // ============================================================================ // Types & Schema @@ -789,32 +790,27 @@ function startThemeWatcher(): void { }, 100); }; - try { - themeWatcher = fs.watch(customThemesDir, (_eventType, filename) => { - if (currentThemeName !== watchedThemeName) { - return; - } - if (!filename) { + themeWatcher = + watchWithErrorHandler( + customThemesDir, + (_eventType, filename) => { + if (currentThemeName !== watchedThemeName) { + return; + } + if (!filename) { + scheduleReload(); + return; + } + if (filename !== watchedFileName) { + return; + } scheduleReload(); - return; - } - const changedFile = String(filename); - if (changedFile !== watchedFileName) { - return; - } - scheduleReload(); - }); - themeWatcher.on("error", () => { - try { - themeWatcher?.close(); - } catch { - /* ignore */ - } - themeWatcher = undefined; - }); - } catch (_error) { - // Ignore errors starting watcher - } + }, + () => { + closeWatcher(themeWatcher); + themeWatcher = undefined; + }, + ) ?? undefined; } export function stopThemeWatcher(): void { @@ -822,10 +818,8 @@ export function stopThemeWatcher(): void { clearTimeout(themeReloadTimer); themeReloadTimer = undefined; } - if (themeWatcher) { - themeWatcher.close(); - themeWatcher = undefined; - } + closeWatcher(themeWatcher); + themeWatcher = undefined; } // ============================================================================ diff --git a/packages/coding-agent/src/utils/fs-watch.ts b/packages/coding-agent/src/utils/fs-watch.ts new file mode 100644 index 000000000..daaf80900 --- /dev/null +++ b/packages/coding-agent/src/utils/fs-watch.ts @@ -0,0 +1,30 @@ +import { type FSWatcher, type WatchListener, watch } from "node:fs"; + +export const FS_WATCH_RETRY_DELAY_MS = 5000; + +export function closeWatcher(watcher: FSWatcher | null | undefined): void { + if (!watcher) { + return; + } + + try { + watcher.close(); + } catch { + // Ignore watcher close errors + } +} + +export function watchWithErrorHandler( + path: string, + listener: WatchListener, + onError: () => void, +): FSWatcher | null { + try { + const watcher = watch(path, listener); + watcher.on("error", onError); + return watcher; + } catch { + onError(); + return null; + } +} diff --git a/packages/coding-agent/test/footer-data-provider.test.ts b/packages/coding-agent/test/footer-data-provider.test.ts index 8a999fab4..3a6e1d47a 100644 --- a/packages/coding-agent/test/footer-data-provider.test.ts +++ b/packages/coding-agent/test/footer-data-provider.test.ts @@ -1,5 +1,5 @@ import { execFile, spawnSync } from "child_process"; -import { existsSync, mkdirSync, mkdtempSync, rmSync, writeFileSync } from "fs"; +import { existsSync, type FSWatcher, mkdirSync, mkdtempSync, rmSync, writeFileSync } from "fs"; import { tmpdir } from "os"; import { join } from "path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; @@ -233,4 +233,33 @@ describe("FooterDataProvider reftable branch detection", () => { provider.dispose(); } }); + + it("retries git watchers 5 seconds after an async fs.watch error", async () => { + vi.useFakeTimers(); + const repoDir = createPlainRepo(tempDir); + process.chdir(repoDir); + + const provider = new FooterDataProvider(repoDir); + try { + const providerWithInternals = provider as unknown as { + headWatcher: FSWatcher | null; + }; + const originalWatcher = providerWithInternals.headWatcher; + expect(originalWatcher).not.toBeNull(); + expect(originalWatcher?.listenerCount("error")).toBeGreaterThan(0); + + originalWatcher?.emit("error", new Error("simulated EMFILE")); + expect(providerWithInternals.headWatcher).toBeNull(); + + await vi.advanceTimersByTimeAsync(4999); + expect(providerWithInternals.headWatcher).toBeNull(); + + await vi.advanceTimersByTimeAsync(1); + expect(providerWithInternals.headWatcher).not.toBeNull(); + expect(providerWithInternals.headWatcher).not.toBe(originalWatcher); + } finally { + provider.dispose(); + vi.useRealTimers(); + } + }); }); diff --git a/packages/mom/CHANGELOG.md b/packages/mom/CHANGELOG.md index 8f197eae1..0ec74d1f7 100644 --- a/packages/mom/CHANGELOG.md +++ b/packages/mom/CHANGELOG.md @@ -2,6 +2,10 @@ ## [Unreleased] +### Fixed + +- Fixed Mom event-directory `fs.watch` error handling to retry after transient watcher failures such as `EMFILE`, avoiding startup crashes ([#3564](https://github.com/badlogic/pi-mono/issues/3564)) + ## [0.69.0] - 2026-04-22 ### Breaking Changes diff --git a/packages/mom/src/events.ts b/packages/mom/src/events.ts index 2bb099e4b..78603802d 100644 --- a/packages/mom/src/events.ts +++ b/packages/mom/src/events.ts @@ -1,7 +1,8 @@ import { Cron } from "croner"; -import { existsSync, type FSWatcher, mkdirSync, readdirSync, statSync, unlinkSync, watch } from "fs"; +import { existsSync, type FSWatcher, mkdirSync, readdirSync, statSync, unlinkSync } from "fs"; import { readFile } from "fs/promises"; import { join } from "path"; +import { closeWatcher, FS_WATCH_RETRY_DELAY_MS, watchWithErrorHandler } from "./fs-watch.js"; import * as log from "./log.js"; import type { SlackBot, SlackEvent } from "./slack.js"; @@ -46,7 +47,9 @@ export class EventsWatcher { private debounceTimers: Map = new Map(); private startTime: number; private watcher: FSWatcher | null = null; + private watcherRetryTimer: NodeJS.Timeout | null = null; private knownFiles: Set = new Set(); + private stopped = true; constructor( private eventsDir: string, @@ -59,6 +62,8 @@ export class EventsWatcher { * Start watching for events. Call this after SlackBot is ready. */ start(): void { + this.stopped = false; + // Ensure events directory exists if (!existsSync(this.eventsDir)) { mkdirSync(this.eventsDir, { recursive: true }); @@ -69,11 +74,7 @@ export class EventsWatcher { // Scan existing files this.scanExisting(); - // Watch for changes - this.watcher = watch(this.eventsDir, (_eventType, filename) => { - if (!filename || !filename.endsWith(".json")) return; - this.debounce(filename, () => this.handleFileChange(filename)); - }); + this.startFsWatcher(); log.logInfo(`Events watcher started, tracking ${this.knownFiles.size} files`); } @@ -82,10 +83,14 @@ export class EventsWatcher { * Stop watching and cancel all scheduled events. */ stop(): void { + this.stopped = true; + // Stop fs watcher - if (this.watcher) { - this.watcher.close(); - this.watcher = null; + closeWatcher(this.watcher); + this.watcher = null; + if (this.watcherRetryTimer) { + clearTimeout(this.watcherRetryTimer); + this.watcherRetryTimer = null; } // Cancel all debounce timers @@ -110,6 +115,40 @@ export class EventsWatcher { log.logInfo("Events watcher stopped"); } + private startFsWatcher(): void { + this.watcher = watchWithErrorHandler( + this.eventsDir, + (_eventType, filename) => { + if (!filename || !filename.endsWith(".json")) return; + this.debounce(filename, () => this.handleFileChange(filename)); + }, + () => this.handleFsWatcherError(), + ); + } + + private handleFsWatcherError(): void { + closeWatcher(this.watcher); + this.watcher = null; + this.scheduleFsWatcherRetry(); + } + + private scheduleFsWatcherRetry(): void { + if (this.stopped || this.watcherRetryTimer) { + return; + } + + this.watcherRetryTimer = setTimeout(() => { + this.watcherRetryTimer = null; + if (this.stopped) { + return; + } + this.startFsWatcher(); + if (this.watcher) { + this.rescanExisting(); + } + }, FS_WATCH_RETRY_DELAY_MS); + } + private debounce(filename: string, fn: () => void): void { const existing = this.debounceTimers.get(filename); if (existing) { @@ -138,6 +177,26 @@ export class EventsWatcher { } } + private rescanExisting(): void { + let files: string[]; + try { + files = readdirSync(this.eventsDir).filter((f) => f.endsWith(".json")); + } catch (err) { + log.logWarning("Failed to read events directory", String(err)); + return; + } + + const currentFiles = new Set(files); + for (const filename of files) { + this.handleFileChange(filename); + } + for (const filename of Array.from(this.knownFiles)) { + if (!currentFiles.has(filename)) { + this.handleDelete(filename); + } + } + } + private handleFileChange(filename: string): void { const filePath = join(this.eventsDir, filename); diff --git a/packages/mom/src/fs-watch.ts b/packages/mom/src/fs-watch.ts new file mode 100644 index 000000000..daaf80900 --- /dev/null +++ b/packages/mom/src/fs-watch.ts @@ -0,0 +1,30 @@ +import { type FSWatcher, type WatchListener, watch } from "node:fs"; + +export const FS_WATCH_RETRY_DELAY_MS = 5000; + +export function closeWatcher(watcher: FSWatcher | null | undefined): void { + if (!watcher) { + return; + } + + try { + watcher.close(); + } catch { + // Ignore watcher close errors + } +} + +export function watchWithErrorHandler( + path: string, + listener: WatchListener, + onError: () => void, +): FSWatcher | null { + try { + const watcher = watch(path, listener); + watcher.on("error", onError); + return watcher; + } catch { + onError(); + return null; + } +} diff --git a/packages/mom/test/events.test.ts b/packages/mom/test/events.test.ts new file mode 100644 index 000000000..f628ed183 --- /dev/null +++ b/packages/mom/test/events.test.ts @@ -0,0 +1,50 @@ +import { existsSync, type FSWatcher, mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { EventsWatcher } from "../src/events.js"; +import type { SlackBot, SlackEvent } from "../src/slack.js"; + +describe("EventsWatcher fs.watch error handling", () => { + let tempDir: string; + + beforeEach(() => { + tempDir = mkdtempSync(join(tmpdir(), "mom-events-")); + }); + + afterEach(() => { + if (tempDir && existsSync(tempDir)) { + rmSync(tempDir, { recursive: true, force: true }); + } + }); + + it("retries the events fs watcher 5 seconds after an async error", async () => { + vi.useFakeTimers(); + const slack = { + enqueueEvent: vi.fn((_event: SlackEvent) => true), + } as unknown as SlackBot; + const eventsDir = join(tempDir, "events"); + const watcher = new EventsWatcher(eventsDir, slack); + + try { + watcher.start(); + const watcherWithInternals = watcher as unknown as { watcher: FSWatcher | null }; + const originalWatcher = watcherWithInternals.watcher; + expect(originalWatcher).not.toBeNull(); + expect(originalWatcher?.listenerCount("error")).toBeGreaterThan(0); + + originalWatcher?.emit("error", new Error("simulated EMFILE")); + expect(watcherWithInternals.watcher).toBeNull(); + + await vi.advanceTimersByTimeAsync(4999); + expect(watcherWithInternals.watcher).toBeNull(); + + await vi.advanceTimersByTimeAsync(1); + expect(watcherWithInternals.watcher).not.toBeNull(); + expect(watcherWithInternals.watcher).not.toBe(originalWatcher); + } finally { + watcher.stop(); + vi.useRealTimers(); + } + }); +});