mirror of
https://github.com/badlogic/pi-mono.git
synced 2026-04-28 06:19:43 +00:00
fix(coding-agent,mom): retry fs watchers on async errors closes #3564
Some checks are pending
CI / build-check-test (push) Waiting to run
Some checks are pending
CI / build-check-test (push) Waiting to run
This commit is contained in:
parent
cc76e73c05
commit
f3a2c9d05e
9 changed files with 300 additions and 88 deletions
|
|
@ -5,6 +5,7 @@
|
|||
### Fixed
|
||||
|
||||
- Fixed `ctx.ui.setWorkingMessage()` to persist across loader recreation, matching the behavior of `ctx.ui.setWorkingIndicator()` ([#3566](https://github.com/badlogic/pi-mono/issues/3566))
|
||||
- Fixed coding-agent `fs.watch` error handling for theme and git-footer watchers to retry after transient watcher failures such as `EMFILE`, avoiding startup crashes in large repos ([#3564](https://github.com/badlogic/pi-mono/issues/3564))
|
||||
|
||||
## [0.69.0] - 2026-04-22
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import { type ExecFileException, execFile, spawnSync } from "child_process";
|
||||
import { existsSync, type FSWatcher, readFileSync, statSync, unwatchFile, watch, watchFile } from "fs";
|
||||
import { existsSync, type FSWatcher, readFileSync, statSync, unwatchFile, watchFile } from "fs";
|
||||
import { dirname, join, resolve } from "path";
|
||||
import { closeWatcher, FS_WATCH_RETRY_DELAY_MS, watchWithErrorHandler } from "../utils/fs-watch.js";
|
||||
|
||||
type GitPaths = {
|
||||
repoDir: string;
|
||||
|
|
@ -97,6 +98,7 @@ export class FooterDataProvider {
|
|||
private branchChangeCallbacks = new Set<() => void>();
|
||||
private availableProviderCount = 0;
|
||||
private refreshTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
private gitWatcherRetryTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
private refreshInFlight = false;
|
||||
private refreshPending = false;
|
||||
private disposed = false;
|
||||
|
|
@ -160,22 +162,7 @@ export class FooterDataProvider {
|
|||
clearTimeout(this.refreshTimer);
|
||||
this.refreshTimer = null;
|
||||
}
|
||||
if (this.headWatcher) {
|
||||
this.headWatcher.close();
|
||||
this.headWatcher = null;
|
||||
}
|
||||
if (this.reftableWatcher) {
|
||||
this.reftableWatcher.close();
|
||||
this.reftableWatcher = null;
|
||||
}
|
||||
if (this.reftableTablesListWatcher) {
|
||||
this.reftableTablesListWatcher.close();
|
||||
this.reftableTablesListWatcher = null;
|
||||
}
|
||||
if (this.reftableTablesListPath) {
|
||||
unwatchFile(this.reftableTablesListPath);
|
||||
this.reftableTablesListPath = null;
|
||||
}
|
||||
this.clearGitWatchers();
|
||||
this.cachedBranch = undefined;
|
||||
this.gitPaths = findGitPaths(cwd);
|
||||
this.setupGitWatcher();
|
||||
|
|
@ -189,22 +176,7 @@ export class FooterDataProvider {
|
|||
clearTimeout(this.refreshTimer);
|
||||
this.refreshTimer = null;
|
||||
}
|
||||
if (this.headWatcher) {
|
||||
this.headWatcher.close();
|
||||
this.headWatcher = null;
|
||||
}
|
||||
if (this.reftableWatcher) {
|
||||
this.reftableWatcher.close();
|
||||
this.reftableWatcher = null;
|
||||
}
|
||||
if (this.reftableTablesListWatcher) {
|
||||
this.reftableTablesListWatcher.close();
|
||||
this.reftableTablesListWatcher = null;
|
||||
}
|
||||
if (this.reftableTablesListPath) {
|
||||
unwatchFile(this.reftableTablesListPath);
|
||||
this.reftableTablesListPath = null;
|
||||
}
|
||||
this.clearGitWatchers();
|
||||
this.branchChangeCallbacks.clear();
|
||||
}
|
||||
|
||||
|
|
@ -280,43 +252,86 @@ export class FooterDataProvider {
|
|||
}
|
||||
}
|
||||
|
||||
private clearGitWatchers(): void {
|
||||
closeWatcher(this.headWatcher);
|
||||
this.headWatcher = null;
|
||||
closeWatcher(this.reftableWatcher);
|
||||
this.reftableWatcher = null;
|
||||
closeWatcher(this.reftableTablesListWatcher);
|
||||
this.reftableTablesListWatcher = null;
|
||||
if (this.reftableTablesListPath) {
|
||||
unwatchFile(this.reftableTablesListPath);
|
||||
this.reftableTablesListPath = null;
|
||||
}
|
||||
if (this.gitWatcherRetryTimer) {
|
||||
clearTimeout(this.gitWatcherRetryTimer);
|
||||
this.gitWatcherRetryTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
private scheduleGitWatcherRetry(): void {
|
||||
if (this.disposed || this.gitWatcherRetryTimer) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.gitWatcherRetryTimer = setTimeout(() => {
|
||||
this.gitWatcherRetryTimer = null;
|
||||
this.setupGitWatcher();
|
||||
}, FS_WATCH_RETRY_DELAY_MS);
|
||||
}
|
||||
|
||||
private handleGitWatcherError(): void {
|
||||
this.clearGitWatchers();
|
||||
this.scheduleGitWatcherRetry();
|
||||
}
|
||||
|
||||
private setupGitWatcher(): void {
|
||||
this.clearGitWatchers();
|
||||
if (!this.gitPaths) return;
|
||||
|
||||
// Watch the directory containing HEAD, not HEAD itself.
|
||||
// Git uses atomic writes (write temp, rename over HEAD), which changes the inode.
|
||||
// fs.watch on a file stops working after the inode changes.
|
||||
try {
|
||||
this.headWatcher = watch(dirname(this.gitPaths.headPath), (_eventType, filename) => {
|
||||
if (!filename || filename.toString() === "HEAD") {
|
||||
this.headWatcher = watchWithErrorHandler(
|
||||
dirname(this.gitPaths.headPath),
|
||||
(_eventType, filename) => {
|
||||
if (!filename || filename === "HEAD") {
|
||||
this.scheduleRefresh();
|
||||
}
|
||||
});
|
||||
} catch {
|
||||
// Silently fail if we can't watch
|
||||
},
|
||||
() => this.handleGitWatcherError(),
|
||||
);
|
||||
if (!this.headWatcher) {
|
||||
return;
|
||||
}
|
||||
|
||||
// In reftable repos, branch switches update files in the reftable directory
|
||||
// instead of HEAD. Watch it separately so the footer picks up those changes.
|
||||
const reftableDir = join(this.gitPaths.commonGitDir, "reftable");
|
||||
if (existsSync(reftableDir)) {
|
||||
try {
|
||||
this.reftableWatcher = watch(reftableDir, () => {
|
||||
this.reftableWatcher = watchWithErrorHandler(
|
||||
reftableDir,
|
||||
() => {
|
||||
this.scheduleRefresh();
|
||||
});
|
||||
} catch {
|
||||
// Silently fail if we can't watch
|
||||
},
|
||||
() => this.handleGitWatcherError(),
|
||||
);
|
||||
if (!this.reftableWatcher) {
|
||||
return;
|
||||
}
|
||||
|
||||
const tablesListPath = join(reftableDir, "tables.list");
|
||||
if (existsSync(tablesListPath)) {
|
||||
this.reftableTablesListPath = tablesListPath;
|
||||
try {
|
||||
this.reftableTablesListWatcher = watch(tablesListPath, () => {
|
||||
this.reftableTablesListWatcher = watchWithErrorHandler(
|
||||
tablesListPath,
|
||||
() => {
|
||||
this.scheduleRefresh();
|
||||
});
|
||||
} catch {
|
||||
// Silently fail if we can't watch
|
||||
},
|
||||
() => this.handleGitWatcherError(),
|
||||
);
|
||||
if (!this.reftableTablesListWatcher) {
|
||||
return;
|
||||
}
|
||||
watchFile(tablesListPath, { interval: 250 }, (current, previous) => {
|
||||
if (
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import { type Static, Type } from "typebox";
|
|||
import { Compile } from "typebox/compile";
|
||||
import { getCustomThemesDir, getThemesDir } from "../../../config.js";
|
||||
import type { SourceInfo } from "../../../core/source-info.js";
|
||||
import { closeWatcher, watchWithErrorHandler } from "../../../utils/fs-watch.js";
|
||||
|
||||
// ============================================================================
|
||||
// Types & Schema
|
||||
|
|
@ -789,32 +790,27 @@ function startThemeWatcher(): void {
|
|||
}, 100);
|
||||
};
|
||||
|
||||
try {
|
||||
themeWatcher = fs.watch(customThemesDir, (_eventType, filename) => {
|
||||
if (currentThemeName !== watchedThemeName) {
|
||||
return;
|
||||
}
|
||||
if (!filename) {
|
||||
themeWatcher =
|
||||
watchWithErrorHandler(
|
||||
customThemesDir,
|
||||
(_eventType, filename) => {
|
||||
if (currentThemeName !== watchedThemeName) {
|
||||
return;
|
||||
}
|
||||
if (!filename) {
|
||||
scheduleReload();
|
||||
return;
|
||||
}
|
||||
if (filename !== watchedFileName) {
|
||||
return;
|
||||
}
|
||||
scheduleReload();
|
||||
return;
|
||||
}
|
||||
const changedFile = String(filename);
|
||||
if (changedFile !== watchedFileName) {
|
||||
return;
|
||||
}
|
||||
scheduleReload();
|
||||
});
|
||||
themeWatcher.on("error", () => {
|
||||
try {
|
||||
themeWatcher?.close();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
themeWatcher = undefined;
|
||||
});
|
||||
} catch (_error) {
|
||||
// Ignore errors starting watcher
|
||||
}
|
||||
},
|
||||
() => {
|
||||
closeWatcher(themeWatcher);
|
||||
themeWatcher = undefined;
|
||||
},
|
||||
) ?? undefined;
|
||||
}
|
||||
|
||||
export function stopThemeWatcher(): void {
|
||||
|
|
@ -822,10 +818,8 @@ export function stopThemeWatcher(): void {
|
|||
clearTimeout(themeReloadTimer);
|
||||
themeReloadTimer = undefined;
|
||||
}
|
||||
if (themeWatcher) {
|
||||
themeWatcher.close();
|
||||
themeWatcher = undefined;
|
||||
}
|
||||
closeWatcher(themeWatcher);
|
||||
themeWatcher = undefined;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
|
|
|
|||
30
packages/coding-agent/src/utils/fs-watch.ts
Normal file
30
packages/coding-agent/src/utils/fs-watch.ts
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
import { type FSWatcher, type WatchListener, watch } from "node:fs";
|
||||
|
||||
export const FS_WATCH_RETRY_DELAY_MS = 5000;
|
||||
|
||||
export function closeWatcher(watcher: FSWatcher | null | undefined): void {
|
||||
if (!watcher) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
watcher.close();
|
||||
} catch {
|
||||
// Ignore watcher close errors
|
||||
}
|
||||
}
|
||||
|
||||
export function watchWithErrorHandler(
|
||||
path: string,
|
||||
listener: WatchListener<string>,
|
||||
onError: () => void,
|
||||
): FSWatcher | null {
|
||||
try {
|
||||
const watcher = watch(path, listener);
|
||||
watcher.on("error", onError);
|
||||
return watcher;
|
||||
} catch {
|
||||
onError();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
import { execFile, spawnSync } from "child_process";
|
||||
import { existsSync, mkdirSync, mkdtempSync, rmSync, writeFileSync } from "fs";
|
||||
import { existsSync, type FSWatcher, mkdirSync, mkdtempSync, rmSync, writeFileSync } from "fs";
|
||||
import { tmpdir } from "os";
|
||||
import { join } from "path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
|
@ -233,4 +233,33 @@ describe("FooterDataProvider reftable branch detection", () => {
|
|||
provider.dispose();
|
||||
}
|
||||
});
|
||||
|
||||
it("retries git watchers 5 seconds after an async fs.watch error", async () => {
|
||||
vi.useFakeTimers();
|
||||
const repoDir = createPlainRepo(tempDir);
|
||||
process.chdir(repoDir);
|
||||
|
||||
const provider = new FooterDataProvider(repoDir);
|
||||
try {
|
||||
const providerWithInternals = provider as unknown as {
|
||||
headWatcher: FSWatcher | null;
|
||||
};
|
||||
const originalWatcher = providerWithInternals.headWatcher;
|
||||
expect(originalWatcher).not.toBeNull();
|
||||
expect(originalWatcher?.listenerCount("error")).toBeGreaterThan(0);
|
||||
|
||||
originalWatcher?.emit("error", new Error("simulated EMFILE"));
|
||||
expect(providerWithInternals.headWatcher).toBeNull();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(4999);
|
||||
expect(providerWithInternals.headWatcher).toBeNull();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
expect(providerWithInternals.headWatcher).not.toBeNull();
|
||||
expect(providerWithInternals.headWatcher).not.toBe(originalWatcher);
|
||||
} finally {
|
||||
provider.dispose();
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -2,6 +2,10 @@
|
|||
|
||||
## [Unreleased]
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed Mom event-directory `fs.watch` error handling to retry after transient watcher failures such as `EMFILE`, avoiding startup crashes ([#3564](https://github.com/badlogic/pi-mono/issues/3564))
|
||||
|
||||
## [0.69.0] - 2026-04-22
|
||||
|
||||
### Breaking Changes
|
||||
|
|
|
|||
|
|
@ -1,7 +1,8 @@
|
|||
import { Cron } from "croner";
|
||||
import { existsSync, type FSWatcher, mkdirSync, readdirSync, statSync, unlinkSync, watch } from "fs";
|
||||
import { existsSync, type FSWatcher, mkdirSync, readdirSync, statSync, unlinkSync } from "fs";
|
||||
import { readFile } from "fs/promises";
|
||||
import { join } from "path";
|
||||
import { closeWatcher, FS_WATCH_RETRY_DELAY_MS, watchWithErrorHandler } from "./fs-watch.js";
|
||||
import * as log from "./log.js";
|
||||
import type { SlackBot, SlackEvent } from "./slack.js";
|
||||
|
||||
|
|
@ -46,7 +47,9 @@ export class EventsWatcher {
|
|||
private debounceTimers: Map<string, NodeJS.Timeout> = new Map();
|
||||
private startTime: number;
|
||||
private watcher: FSWatcher | null = null;
|
||||
private watcherRetryTimer: NodeJS.Timeout | null = null;
|
||||
private knownFiles: Set<string> = new Set();
|
||||
private stopped = true;
|
||||
|
||||
constructor(
|
||||
private eventsDir: string,
|
||||
|
|
@ -59,6 +62,8 @@ export class EventsWatcher {
|
|||
* Start watching for events. Call this after SlackBot is ready.
|
||||
*/
|
||||
start(): void {
|
||||
this.stopped = false;
|
||||
|
||||
// Ensure events directory exists
|
||||
if (!existsSync(this.eventsDir)) {
|
||||
mkdirSync(this.eventsDir, { recursive: true });
|
||||
|
|
@ -69,11 +74,7 @@ export class EventsWatcher {
|
|||
// Scan existing files
|
||||
this.scanExisting();
|
||||
|
||||
// Watch for changes
|
||||
this.watcher = watch(this.eventsDir, (_eventType, filename) => {
|
||||
if (!filename || !filename.endsWith(".json")) return;
|
||||
this.debounce(filename, () => this.handleFileChange(filename));
|
||||
});
|
||||
this.startFsWatcher();
|
||||
|
||||
log.logInfo(`Events watcher started, tracking ${this.knownFiles.size} files`);
|
||||
}
|
||||
|
|
@ -82,10 +83,14 @@ export class EventsWatcher {
|
|||
* Stop watching and cancel all scheduled events.
|
||||
*/
|
||||
stop(): void {
|
||||
this.stopped = true;
|
||||
|
||||
// Stop fs watcher
|
||||
if (this.watcher) {
|
||||
this.watcher.close();
|
||||
this.watcher = null;
|
||||
closeWatcher(this.watcher);
|
||||
this.watcher = null;
|
||||
if (this.watcherRetryTimer) {
|
||||
clearTimeout(this.watcherRetryTimer);
|
||||
this.watcherRetryTimer = null;
|
||||
}
|
||||
|
||||
// Cancel all debounce timers
|
||||
|
|
@ -110,6 +115,40 @@ export class EventsWatcher {
|
|||
log.logInfo("Events watcher stopped");
|
||||
}
|
||||
|
||||
private startFsWatcher(): void {
|
||||
this.watcher = watchWithErrorHandler(
|
||||
this.eventsDir,
|
||||
(_eventType, filename) => {
|
||||
if (!filename || !filename.endsWith(".json")) return;
|
||||
this.debounce(filename, () => this.handleFileChange(filename));
|
||||
},
|
||||
() => this.handleFsWatcherError(),
|
||||
);
|
||||
}
|
||||
|
||||
private handleFsWatcherError(): void {
|
||||
closeWatcher(this.watcher);
|
||||
this.watcher = null;
|
||||
this.scheduleFsWatcherRetry();
|
||||
}
|
||||
|
||||
private scheduleFsWatcherRetry(): void {
|
||||
if (this.stopped || this.watcherRetryTimer) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.watcherRetryTimer = setTimeout(() => {
|
||||
this.watcherRetryTimer = null;
|
||||
if (this.stopped) {
|
||||
return;
|
||||
}
|
||||
this.startFsWatcher();
|
||||
if (this.watcher) {
|
||||
this.rescanExisting();
|
||||
}
|
||||
}, FS_WATCH_RETRY_DELAY_MS);
|
||||
}
|
||||
|
||||
private debounce(filename: string, fn: () => void): void {
|
||||
const existing = this.debounceTimers.get(filename);
|
||||
if (existing) {
|
||||
|
|
@ -138,6 +177,26 @@ export class EventsWatcher {
|
|||
}
|
||||
}
|
||||
|
||||
private rescanExisting(): void {
|
||||
let files: string[];
|
||||
try {
|
||||
files = readdirSync(this.eventsDir).filter((f) => f.endsWith(".json"));
|
||||
} catch (err) {
|
||||
log.logWarning("Failed to read events directory", String(err));
|
||||
return;
|
||||
}
|
||||
|
||||
const currentFiles = new Set(files);
|
||||
for (const filename of files) {
|
||||
this.handleFileChange(filename);
|
||||
}
|
||||
for (const filename of Array.from(this.knownFiles)) {
|
||||
if (!currentFiles.has(filename)) {
|
||||
this.handleDelete(filename);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private handleFileChange(filename: string): void {
|
||||
const filePath = join(this.eventsDir, filename);
|
||||
|
||||
|
|
|
|||
30
packages/mom/src/fs-watch.ts
Normal file
30
packages/mom/src/fs-watch.ts
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
import { type FSWatcher, type WatchListener, watch } from "node:fs";
|
||||
|
||||
export const FS_WATCH_RETRY_DELAY_MS = 5000;
|
||||
|
||||
export function closeWatcher(watcher: FSWatcher | null | undefined): void {
|
||||
if (!watcher) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
watcher.close();
|
||||
} catch {
|
||||
// Ignore watcher close errors
|
||||
}
|
||||
}
|
||||
|
||||
export function watchWithErrorHandler(
|
||||
path: string,
|
||||
listener: WatchListener<string>,
|
||||
onError: () => void,
|
||||
): FSWatcher | null {
|
||||
try {
|
||||
const watcher = watch(path, listener);
|
||||
watcher.on("error", onError);
|
||||
return watcher;
|
||||
} catch {
|
||||
onError();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
50
packages/mom/test/events.test.ts
Normal file
50
packages/mom/test/events.test.ts
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
import { existsSync, type FSWatcher, mkdtempSync, rmSync } from "node:fs";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { EventsWatcher } from "../src/events.js";
|
||||
import type { SlackBot, SlackEvent } from "../src/slack.js";
|
||||
|
||||
describe("EventsWatcher fs.watch error handling", () => {
|
||||
let tempDir: string;
|
||||
|
||||
beforeEach(() => {
|
||||
tempDir = mkdtempSync(join(tmpdir(), "mom-events-"));
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
if (tempDir && existsSync(tempDir)) {
|
||||
rmSync(tempDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("retries the events fs watcher 5 seconds after an async error", async () => {
|
||||
vi.useFakeTimers();
|
||||
const slack = {
|
||||
enqueueEvent: vi.fn((_event: SlackEvent) => true),
|
||||
} as unknown as SlackBot;
|
||||
const eventsDir = join(tempDir, "events");
|
||||
const watcher = new EventsWatcher(eventsDir, slack);
|
||||
|
||||
try {
|
||||
watcher.start();
|
||||
const watcherWithInternals = watcher as unknown as { watcher: FSWatcher | null };
|
||||
const originalWatcher = watcherWithInternals.watcher;
|
||||
expect(originalWatcher).not.toBeNull();
|
||||
expect(originalWatcher?.listenerCount("error")).toBeGreaterThan(0);
|
||||
|
||||
originalWatcher?.emit("error", new Error("simulated EMFILE"));
|
||||
expect(watcherWithInternals.watcher).toBeNull();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(4999);
|
||||
expect(watcherWithInternals.watcher).toBeNull();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
expect(watcherWithInternals.watcher).not.toBeNull();
|
||||
expect(watcherWithInternals.watcher).not.toBe(originalWatcher);
|
||||
} finally {
|
||||
watcher.stop();
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
});
|
||||
Loading…
Add table
Add a link
Reference in a new issue