mirror of
https://github.com/QwenLM/qwen-code.git
synced 2026-05-05 07:10:55 +00:00
perf(core): cut runtime sync I/O on tool hot path by 91% (#3581)
* perf(core): make chat recording writes async
Every recorded chat event (user message, assistant turn, tool call,
tool result, slash command, etc.) was issuing 4 sync fs syscalls on
the main event loop: existsSync(dir) + mkdirSync(dir) + existsSync(file)
+ appendFileSync(file). For a tool-heavy prompt this added ~88 sync
I/O calls per session, blocking the UI render and keypress handler
during each one.
- chatRecordingService.appendRecord: cache ensure-flags so dir/file
creation runs once per session, then enqueue the actual write on a
per-instance promise chain (writeChain). lastRecordUuid is updated
synchronously so chained createBaseRecord still sees the right
parentUuid without waiting for the previous write.
- chatRecordingService.flush: drains the chain — wired into
Config.shutdown so no records are lost on exit.
- jsonl-utils.writeLine: now actually async (fs.promises.mkdir +
fs.promises.appendFile) with per-dir mkdir cache. The existing
per-file mutex still serializes writes correctly.
- Tests updated to await flush() before assertions.
Trace measurement on a single tool-heavy prompt: 110 → 20 sync I/O
calls (-82%), with chatRecordingService dropping from 88 to 0.
* perf(core): cache repeated fs lookups on tool hot path
Each tool invocation went through validatePath → isPathWithinWorkspace
→ fullyResolvedPath, plus its own existence/dir checks. The same paths
got re-resolved across back-to-back tool calls, and ripGrep re-
discovered .qwenignore on every Grep.
- workspaceContext.fullyResolvedPath: bounded LRU on input path
(1024, FIFO). Failed resolutions are NOT cached so retries work.
- paths.validatePath: cache positive isDirectory results; ENOENT
falls through every time so a freshly created file is picked up
immediately.
- ripGrep: module-level caches for searchPath-is-dir and per-dir
.qwenignore presence (256 each, FIFO).
- fileUtils.processSingleFileContent: drop the existsSync gate;
let fs.promises.stat throw ENOENT and convert to FILE_NOT_FOUND
in catch.
Trace: 20 → 10 sync I/O calls. Cumulative reduction since the
chat-recording change: 110 → 10, -91%. All 6057 core tests pass.
* test(core): cache reset hooks + regression-guards from audit
Self-review pass on the previous two perf commits surfaced a few
follow-ups worth pinning down before they bite:
- Module-level caches (paths.isDirectoryCache, ripGrep dirIsDir/qwen-
Ignore, jsonl-utils.ensuredDirs) persisted across vitest cases
silently. Added underscore-prefixed `_reset*ForTest` exports and
wired one into the validatePath describe block so future cases
mutating the same absolute paths can't pass by accident.
- Documented the parentUuid-chain tradeoff on chatRecordingService
.appendRecord: when the async write rejects, lastRecordUuid was
already set sync, so subsequent records reference an absent
ancestor — readers like sessionService.reconstructHistory then
silently drop those descendants. Same observable failure mode as
the prior sync code's caught-and-logged throw.
- Documented the dir<->file mutation and mid-session .qwenignore
staleness windows for the validatePath / ripGrep caches.
- Added regression tests:
* validatePath does NOT cache ENOENT (Edit-then-Read works)
* validatePath skips re-stat on cache hit (perf assertion)
* flush() resolves immediately on a fresh service
* a rejected writeLine does not block the next record
Full core suite: 6061 pass, 2 skipped — no regressions.
* fix(core): cache chatsDirEnsured only on mkdir success
Pre-fix, the flag flipped to true even when mkdirSync threw, so a single
transient failure (NFS EACCES, sandbox mount race, parent dir briefly
missing) would short-circuit every subsequent appendRecord and silently
drop the rest of the session's transcript with no error surfaced.
Reported by zhangxy-zju on #3581.
* fix(cli): destroy stdout instead of process.exit on EPIPE
Routine CLI patterns like `qwen -p ... | head -1` / `| less` / `| grep -m1`
close the downstream pipe and trigger EPIPE. The previous handler called
process.exit(0), which bypassed the caller's runExitCleanup -> Config
.shutdown -> chat-recording flush() chain and silently dropped queued
JSONL writes (most recent assistant turn + tool results).
Destroying stdout instead lets writes fail fast and the natural function
return drive cleanup. We deliberately do not also abortController.abort()
here: the abort path runs handleCancellationError which itself calls
process.exit(130), re-introducing the same bypass.
Reported by zhangxy-zju on #3581.
* fix(cli): bound runExitCleanup with per-fn + wall-clock timeouts
Pre-fix, runExitCleanup was an unbounded series of awaits. After the
async-jsonl change moved chat-recording writes off the calling thread
(Config.shutdown now `await flush()`s the queue), any hung syscall
(slow disk, dead NFS mount, stuck MCP socket, telemetry HTTP stall)
would hang process exit indefinitely — sync writes were inherently
bounded by syscall return; async writes are not.
Adds per-cleanup 2s + overall 5s wall-clock failsafes on the same
shape as Claude Code's gracefulShutdown.ts. Also replaces dead
test-isolation code (`global['cleanupFunctions']` was never on global,
the array is module-private) with a `_resetCleanupFunctionsForTest`
hook matching the convention from d6485964c.
Follow-up flagged by zhangxy-zju on #3581.
---------
Co-authored-by: wenshao <wenshao@U-K7F6PQY3-2157.local>
This commit is contained in:
parent
5e4ff3755c
commit
609b4324f6
15 changed files with 659 additions and 145 deletions
|
|
@ -273,6 +273,38 @@ describe('runNonInteractive', () => {
|
|||
expect(mockShutdownTelemetry).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('on EPIPE, destroys stdout and returns normally instead of process.exit', async () => {
|
||||
// Regression: process.exit(0) on EPIPE bypassed runExitCleanup → flush()
|
||||
// and dropped queued JSONL writes for `qwen -p ... | head -1` patterns.
|
||||
// process.exit is mocked to throw in beforeEach, so reaching the
|
||||
// assertion also proves the bypass route is gone.
|
||||
setupMetricsMock();
|
||||
const stdoutDestroySpy = vi
|
||||
.spyOn(process.stdout, 'destroy')
|
||||
.mockReturnValue(process.stdout);
|
||||
|
||||
mockGeminiClient.sendMessageStream.mockImplementation(
|
||||
async function* mockStream(): AsyncGenerator<ServerGeminiStreamEvent> {
|
||||
process.stdout.emit(
|
||||
'error',
|
||||
Object.assign(new Error('EPIPE'), { code: 'EPIPE' }),
|
||||
);
|
||||
yield { type: GeminiEventType.Content, value: 'Hello' };
|
||||
yield {
|
||||
type: GeminiEventType.Finished,
|
||||
value: {
|
||||
reason: undefined,
|
||||
usageMetadata: { totalTokenCount: 0 },
|
||||
},
|
||||
};
|
||||
},
|
||||
);
|
||||
|
||||
await runNonInteractive(mockConfig, mockSettings, 'test', 'p1');
|
||||
|
||||
expect(stdoutDestroySpy).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should handle a single tool call and respond', async () => {
|
||||
setupMetricsMock();
|
||||
const toolCallEvent: ServerGeminiStreamEvent = {
|
||||
|
|
|
|||
|
|
@ -175,16 +175,22 @@ export async function runNonInteractive(
|
|||
let totalApiDurationMs = 0;
|
||||
const startTime = Date.now();
|
||||
|
||||
const stdoutErrorHandler = (err: NodeJS.ErrnoException) => {
|
||||
if (err.code === 'EPIPE') {
|
||||
process.stdout.removeListener('error', stdoutErrorHandler);
|
||||
process.exit(0);
|
||||
}
|
||||
};
|
||||
|
||||
const geminiClient = config.getGeminiClient();
|
||||
const abortController = options.abortController ?? new AbortController();
|
||||
|
||||
// EPIPE: don't process.exit here — that bypasses the caller's
|
||||
// runExitCleanup → flush() and drops queued JSONL writes. Destroy
|
||||
// stdout instead and let the natural return drive cleanup. (Aborting
|
||||
// is also wrong: the abort path runs handleCancellationError → exit
|
||||
// 130 and re-introduces the same bypass.)
|
||||
let pipeBroken = false;
|
||||
const stdoutErrorHandler = (err: NodeJS.ErrnoException) => {
|
||||
if (err.code === 'EPIPE' && !pipeBroken) {
|
||||
pipeBroken = true;
|
||||
process.stdout.destroy();
|
||||
}
|
||||
};
|
||||
|
||||
// Setup signal handlers for graceful shutdown
|
||||
const shutdownHandler = () => {
|
||||
debugLogger.debug('[runNonInteractive] Shutdown signal received');
|
||||
|
|
|
|||
|
|
@ -5,19 +5,19 @@
|
|||
*/
|
||||
|
||||
import { vi } from 'vitest';
|
||||
import { registerCleanup, runExitCleanup } from './cleanup';
|
||||
import {
|
||||
_resetCleanupFunctionsForTest,
|
||||
registerCleanup,
|
||||
runExitCleanup,
|
||||
} from './cleanup';
|
||||
|
||||
describe('cleanup', () => {
|
||||
const originalCleanupFunctions = global['cleanupFunctions'];
|
||||
|
||||
beforeEach(() => {
|
||||
// Isolate cleanup functions for each test
|
||||
global['cleanupFunctions'] = [];
|
||||
});
|
||||
|
||||
afterAll(() => {
|
||||
// Restore original cleanup functions
|
||||
global['cleanupFunctions'] = originalCleanupFunctions;
|
||||
// The previous `global['cleanupFunctions'] = []` setup was dead code —
|
||||
// the array is module-private, not on `global`. Tests passed by accident
|
||||
// because `runExitCleanup` itself clears at the end. A test that throws
|
||||
// before reaching `runExitCleanup` would leak state into the next case.
|
||||
_resetCleanupFunctionsForTest();
|
||||
});
|
||||
|
||||
it('should run a registered synchronous function', async () => {
|
||||
|
|
@ -65,4 +65,79 @@ describe('cleanup', () => {
|
|||
expect(errorFn).toHaveBeenCalledTimes(1);
|
||||
expect(successFn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
describe('timeout failsafes', () => {
|
||||
// Without these the async-jsonl flush() could hang exit forever on slow
|
||||
// disks / dead sockets — sync writes were inherently bounded, async aren't.
|
||||
|
||||
it('caps a hung cleanup at the per-fn timeout and proceeds to the next one', async () => {
|
||||
const hangFn = vi.fn(() => new Promise<void>(() => {}));
|
||||
const nextFn = vi.fn();
|
||||
|
||||
registerCleanup(hangFn);
|
||||
registerCleanup(nextFn);
|
||||
|
||||
const start = Date.now();
|
||||
await runExitCleanup({
|
||||
_testPerFnTimeoutMs: 50,
|
||||
_testOverallTimeoutMs: 5_000,
|
||||
});
|
||||
const elapsed = Date.now() - start;
|
||||
|
||||
expect(hangFn).toHaveBeenCalledTimes(1);
|
||||
expect(nextFn).toHaveBeenCalledTimes(1);
|
||||
expect(elapsed).toBeLessThan(500);
|
||||
});
|
||||
|
||||
it('caps overall wall-clock time when many cleanups all hang', async () => {
|
||||
// 100 × 50ms perFn ≈ 5000ms drain — structurally impossible for "drain
|
||||
// finished naturally" to satisfy < 800ms, so the upper bound proves
|
||||
// wallClock actually fired. Lower bound proves we waited for it and
|
||||
// didn't short-circuit. 800ms slack absorbs CI scheduler jitter.
|
||||
for (let i = 0; i < 100; i++) {
|
||||
registerCleanup(() => new Promise<void>(() => {}));
|
||||
}
|
||||
|
||||
const start = Date.now();
|
||||
await runExitCleanup({
|
||||
_testPerFnTimeoutMs: 50,
|
||||
_testOverallTimeoutMs: 100,
|
||||
});
|
||||
const elapsed = Date.now() - start;
|
||||
|
||||
expect(elapsed).toBeLessThan(800);
|
||||
expect(elapsed).toBeGreaterThanOrEqual(80);
|
||||
});
|
||||
|
||||
it('still calls fast cleanups normally when timeouts are configured', async () => {
|
||||
const fastFn = vi.fn().mockResolvedValue(undefined);
|
||||
registerCleanup(fastFn);
|
||||
|
||||
await runExitCleanup({
|
||||
_testPerFnTimeoutMs: 1_000,
|
||||
_testOverallTimeoutMs: 2_000,
|
||||
});
|
||||
|
||||
expect(fastFn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('does not let a rejected cleanup poison the chain', async () => {
|
||||
// The original `for…await` already swallowed sync throws; this guards
|
||||
// the new withTimeout wrapper against rejected-async-cleanup leaks.
|
||||
const rejectFn = vi.fn().mockRejectedValue(new Error('boom'));
|
||||
const nextFn = vi.fn();
|
||||
|
||||
registerCleanup(rejectFn);
|
||||
registerCleanup(nextFn);
|
||||
|
||||
await expect(
|
||||
runExitCleanup({
|
||||
_testPerFnTimeoutMs: 50,
|
||||
_testOverallTimeoutMs: 1_000,
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
expect(rejectFn).toHaveBeenCalledTimes(1);
|
||||
expect(nextFn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -14,15 +14,93 @@ export function registerCleanup(fn: (() => void) | (() => Promise<void>)) {
|
|||
cleanupFunctions.push(fn);
|
||||
}
|
||||
|
||||
export async function runExitCleanup() {
|
||||
for (const fn of cleanupFunctions) {
|
||||
try {
|
||||
await fn();
|
||||
} catch (_) {
|
||||
// Ignore errors during cleanup.
|
||||
/**
|
||||
* Per-cleanup ceiling. Caps any single hung cleanup (slow disk on
|
||||
* `chatRecording.flush`, MCP disconnect on a dead socket, telemetry HTTP
|
||||
* stall) so it can't starve the rest of the cleanup chain.
|
||||
*/
|
||||
const PER_CLEANUP_TIMEOUT_MS = 2_000;
|
||||
|
||||
/**
|
||||
* Wall-clock ceiling for the whole cleanup pass. Pre-async-jsonl, sync
|
||||
* fs writes were inherently bounded by their syscall return; with the
|
||||
* write queue moved off-thread, an unbounded `await flush()` could now
|
||||
* hang exit indefinitely. This ceiling guarantees the process always
|
||||
* exits within a bounded time, even if a cleanup never resolves.
|
||||
*/
|
||||
const OVERALL_CLEANUP_TIMEOUT_MS = 5_000;
|
||||
|
||||
/**
|
||||
* Awaits `promise`, but resolves to `undefined` if `ms` elapses first.
|
||||
* Rejection collapses to the same undefined resolution — caller treats
|
||||
* cleanup errors as best-effort. Timer is unrefed so it can't keep the
|
||||
* event loop alive on its own.
|
||||
*/
|
||||
function withTimeout<T>(promise: Promise<T>, ms: number): Promise<T | void> {
|
||||
return new Promise((resolve) => {
|
||||
const timer = setTimeout(() => resolve(undefined), ms);
|
||||
timer.unref?.();
|
||||
promise.then(
|
||||
(value) => {
|
||||
clearTimeout(timer);
|
||||
resolve(value);
|
||||
},
|
||||
() => {
|
||||
clearTimeout(timer);
|
||||
resolve(undefined);
|
||||
},
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
export interface RunExitCleanupOptions {
|
||||
/** TEST ONLY — override per-cleanup-function timeout (default 2s). */
|
||||
_testPerFnTimeoutMs?: number;
|
||||
/** TEST ONLY — override overall wall-clock timeout (default 5s). */
|
||||
_testOverallTimeoutMs?: number;
|
||||
}
|
||||
|
||||
export async function runExitCleanup(
|
||||
options: RunExitCleanupOptions = {},
|
||||
): Promise<void> {
|
||||
const perFn = options._testPerFnTimeoutMs ?? PER_CLEANUP_TIMEOUT_MS;
|
||||
const overall = options._testOverallTimeoutMs ?? OVERALL_CLEANUP_TIMEOUT_MS;
|
||||
|
||||
const drain = (async () => {
|
||||
for (const fn of cleanupFunctions) {
|
||||
try {
|
||||
await withTimeout(Promise.resolve().then(fn), perFn);
|
||||
} catch (_) {
|
||||
// Ignore errors during cleanup.
|
||||
}
|
||||
}
|
||||
})();
|
||||
|
||||
// clearTimeout when drain wins; unref keeps the handle from blocking exit.
|
||||
let wallClockTimer: NodeJS.Timeout | undefined;
|
||||
const wallClock = new Promise<void>((resolve) => {
|
||||
wallClockTimer = setTimeout(() => resolve(), overall);
|
||||
wallClockTimer.unref?.();
|
||||
});
|
||||
|
||||
try {
|
||||
await Promise.race([drain, wallClock]);
|
||||
} finally {
|
||||
if (wallClockTimer) clearTimeout(wallClockTimer);
|
||||
cleanupFunctions.length = 0; // Clear the array
|
||||
}
|
||||
cleanupFunctions.length = 0; // Clear the array
|
||||
}
|
||||
|
||||
/**
|
||||
* Test-only: clear the registered cleanup functions array. Module-private
|
||||
* state otherwise leaks across vitest cases — the previous test isolation
|
||||
* via `global['cleanupFunctions']` was a no-op (the array isn't on global)
|
||||
* and only happened to work because `runExitCleanup` itself clears at the
|
||||
* end. Naming follows the `_reset*ForTest` convention from
|
||||
* d6485964c (paths, jsonl-utils, ripGrep).
|
||||
*/
|
||||
export function _resetCleanupFunctionsForTest(): void {
|
||||
cleanupFunctions.length = 0;
|
||||
}
|
||||
|
||||
export async function cleanupCheckpoints() {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue