diff --git a/src/fs-utils.ts b/src/fs-utils.ts index 823a630..bf25c50 100644 --- a/src/fs-utils.ts +++ b/src/fs-utils.ts @@ -93,3 +93,29 @@ export async function* readSessionLines(filePath: string): AsyncGenerator { + let size: number + try { + size = (await stat(filePath)).size + } catch (err) { + warn(`stat failed for ${filePath}: ${(err as NodeJS.ErrnoException).code ?? 'unknown'}`) + return + } + + if (size > MAX_SESSION_FILE_BYTES) { + warn(`skipped oversize file ${filePath} (${size} bytes > cap ${MAX_SESSION_FILE_BYTES})`) + return + } + + const stream = createReadStream(filePath, { + encoding: 'utf-8', + start: Math.max(0, startOffset), + }) + const rl = createInterface({ input: stream, crlfDelay: Infinity }) + try { + for await (const line of rl) yield line + } catch (err) { + warn(`stream read failed for ${filePath}: ${(err as NodeJS.ErrnoException).code ?? 'unknown'}`) + } +} diff --git a/src/parser.ts b/src/parser.ts index 14b50d8..e02bb9e 100644 --- a/src/parser.ts +++ b/src/parser.ts @@ -1,6 +1,7 @@ +import { createHash } from 'crypto' import { readdir, stat } from 'fs/promises' import { basename, join } from 'path' -import { readSessionFile } from './fs-utils.js' +import { readSessionFile, readSessionLinesFromOffset } from './fs-utils.js' import { calculateCost, getShortModelName } from './models.js' import { discoverAllSessions, getProvider } from './providers/index.js' import type { ParsedProviderCall, Provider, SessionSource } from './providers/types.js' @@ -317,7 +318,7 @@ function filterSessionSummaryToRange(session: SessionSummary, dateRange?: DateRa return buildSessionSummary(session.sessionId, session.project, turns) } -function addSeenKeysFromSessions(sessions: SessionSummary[], seenKeys: Set) { +function addSeenDeduplicationKeysFromSessions(sessions: SessionSummary[], seenKeys: Set) { for (const session of sessions) { for (const turn of session.turns) { for (const call of turn.assistantCalls) { @@ -327,6 +328,45 @@ function addSeenKeysFromSessions(sessions: SessionSummary[], seenKeys: Set, + sessionIdFallback: string, + dateRange?: DateRange, +): SessionSummary | null { + if (entries.length === 0) return null + + let filteredEntries = entries + if (dateRange) { + filteredEntries = entries.filter(entry => { + if (!entry.timestamp) return entry.type === 'user' + const ts = new Date(entry.timestamp) + return ts >= dateRange.start && ts <= dateRange.end + }) + if (filteredEntries.length === 0) return null + } + + const sessionId = entries.find(entry => typeof entry.sessionId === 'string')?.sessionId ?? sessionIdFallback + const turns = groupIntoTurns(filteredEntries, seenMsgIds) + if (turns.length === 0) return null + + return buildSessionSummary(sessionId, project, turns.map(classifyTurn)) +} + +function buildClaudeSessionSummaryFromLines( + lines: string[], + project: string, + seenMsgIds: Set, + sessionIdFallback: string, + dateRange?: DateRange, +): SessionSummary | null { + const entries = lines + .map(parseJsonlLine) + .filter((entry): entry is JournalEntry => entry !== null) + return buildSessionSummaryFromEntries(entries, project, seenMsgIds, sessionIdFallback, dateRange) +} + async function parseSessionFile( filePath: string, project: string, @@ -345,30 +385,7 @@ async function parseSessionFile( const content = await readSessionFile(filePath) if (content === null) return null const lines = content.split('\n').filter(l => l.trim()) - const entries: JournalEntry[] = [] - - for (const line of lines) { - const entry = parseJsonlLine(line) - if (entry) entries.push(entry) - } - - if (entries.length === 0) return null - - let filteredEntries = entries - if (dateRange) { - filteredEntries = entries.filter(e => { - if (!e.timestamp) return e.type === 'user' - const ts = new Date(e.timestamp) - return ts >= dateRange.start && ts <= dateRange.end - }) - if (filteredEntries.length === 0) return null - } - - const sessionId = basename(filePath, '.jsonl') - const turns = groupIntoTurns(filteredEntries, seenMsgIds) - const classified = turns.map(classifyTurn) - - return buildSessionSummary(sessionId, project, classified) + return buildClaudeSessionSummaryFromLines(lines, project, seenMsgIds, basename(filePath, '.jsonl'), dateRange) } async function collectJsonlFiles(dirPath: string): Promise { @@ -387,18 +404,168 @@ async function collectJsonlFiles(dirPath: string): Promise { return jsonlFiles } -async function scanProjectDirs(dirs: Array<{ path: string; name: string }>, seenMsgIds: Set, dateRange?: DateRange): Promise { - const projectMap = new Map() +type ClaudeCacheUnit = { + path: string + project: string + progressLabel: string +} - for (const { path: dirPath, name: dirName } of dirs) { - const jsonlFiles = await collectJsonlFiles(dirPath) +async function listClaudeCacheUnits(dirPath: string, dirName: string): Promise { + const jsonlFiles = await collectJsonlFiles(dirPath) + return jsonlFiles.map(filePath => ({ + path: filePath, + project: dirName, + progressLabel: filePath.split(/[\\/]/).slice(-2).join('/'), + })) +} - for (const filePath of jsonlFiles) { - const session = await parseSessionFile(filePath, dirName, seenMsgIds, dateRange) - if (session) addSessionToProjectMap(projectMap, session) +function appendStateTailHash(session: SessionSummary): string { + return createHash('sha1').update(session.lastTimestamp).digest('hex') +} + +function fingerprintsMatch( + left: { mtimeMs: number; sizeBytes: number }, + right: { mtimeMs: number; sizeBytes: number }, +): boolean { + return left.mtimeMs === right.mtimeMs && left.sizeBytes === right.sizeBytes +} + +async function refreshClaudeCacheUnit( + manifest: Awaited>, + unit: ClaudeCacheUnit, + seenMsgIds: Set, + parserVersion: string, + options: ParseOptions, +): Promise<{ session: SessionSummary | null; wrote: boolean; refreshed: boolean }> { + let reportedRefresh = false + const cached = options.noCache + ? null + : await readSourceCacheEntry(manifest, 'claude', unit.path, { allowStaleFingerprint: true }) + const fingerprint = await computeFileFingerprint(unit.path) + + if ( + cached + && cached.parserVersion === parserVersion + && cached.cacheStrategy === 'append-jsonl' + && fingerprintsMatch(fingerprint, cached.fingerprint) + ) { + addSeenDeduplicationKeysFromSessions(cached.sessions, seenMsgIds) + return { session: cached.sessions[0] ?? null, wrote: false, refreshed: false } + } + + if ( + cached + && cached.parserVersion === parserVersion + && cached.cacheStrategy === 'append-jsonl' + && cached.appendState + && fingerprint.sizeBytes > cached.fingerprint.sizeBytes + ) { + reportedRefresh = true + options.progress?.advance(unit.progressLabel) + addSeenDeduplicationKeysFromSessions(cached.sessions, seenMsgIds) + const appendedLines: string[] = [] + for await (const line of readSessionLinesFromOffset(unit.path, cached.appendState.endOffset)) { + if (line.trim()) appendedLines.push(line) + } + + const appended = buildClaudeSessionSummaryFromLines( + appendedLines, + unit.project, + seenMsgIds, + cached.sessions[0]?.sessionId ?? basename(unit.path, '.jsonl'), + ) + + if (appended && cached.sessions[0]) { + const merged = buildSessionSummary( + cached.sessions[0].sessionId, + unit.project, + [...cached.sessions[0].turns, ...appended.turns], + ) + await writeSourceCacheEntry(manifest, { + version: SOURCE_CACHE_VERSION, + provider: 'claude', + logicalPath: unit.path, + fingerprintPath: unit.path, + cacheStrategy: 'append-jsonl', + parserVersion, + fingerprint, + sessions: [merged], + appendState: { + endOffset: fingerprint.sizeBytes, + tailHash: appendStateTailHash(merged), + }, + }) + return { session: merged, wrote: true, refreshed: true } } } + if (!reportedRefresh) options.progress?.advance(unit.progressLabel) + const session = await parseSessionFile(unit.path, unit.project, seenMsgIds) + if (!session) return { session: null, wrote: false, refreshed: true } + + await writeSourceCacheEntry(manifest, { + version: SOURCE_CACHE_VERSION, + provider: 'claude', + logicalPath: unit.path, + fingerprintPath: unit.path, + cacheStrategy: 'append-jsonl', + parserVersion, + fingerprint, + sessions: [session], + appendState: { + endOffset: fingerprint.sizeBytes, + tailHash: appendStateTailHash(session), + }, + }) + return { session, wrote: true, refreshed: true } +} + +async function scanClaudeDirsWithCache( + dirs: Array<{ path: string; name: string }>, + seenMsgIds: Set, + dateRange?: DateRange, + options: ParseOptions = {}, +): Promise { + const projectMap = new Map() + const manifest = await loadSourceCacheManifest() + const parserVersion = 'claude:v1' + const units = (await Promise.all( + dirs.map(dir => listClaudeCacheUnits(dir.path, dir.name)), + )).flat() + const refreshStates = await Promise.all(units.map(async unit => { + const cached = options.noCache + ? null + : await readSourceCacheEntry(manifest, 'claude', unit.path, { allowStaleFingerprint: true }) + const fingerprint = await computeFileFingerprint(unit.path).catch(() => null) + const reusable = !!( + cached + && fingerprint + && cached.parserVersion === parserVersion + && cached.cacheStrategy === 'append-jsonl' + && fingerprintsMatch(fingerprint, cached.fingerprint) + ) + return { unit, refreshed: !reusable } + })) + + const refreshCount = refreshStates.filter(state => state.refreshed).length + let wroteManifest = false + + if (refreshCount > 0) options.progress?.start('Updating cache', refreshCount) + + try { + for (const { unit } of refreshStates) { + const { session, wrote } = await refreshClaudeCacheUnit(manifest, unit, seenMsgIds, parserVersion, options) + if (wrote) wroteManifest = true + if (!session) continue + + const filtered = filterSessionSummaryToRange(session, dateRange) + if (filtered) addSessionToProjectMap(projectMap, filtered) + } + } finally { + if (refreshCount > 0) options.progress?.finish() + } + + if (wroteManifest) await saveSourceCacheManifest(manifest) return buildProjects(projectMap) } @@ -470,7 +637,7 @@ async function parseProviderSources( let fullSessions = state.cachedSessions if (fullSessions) { - addSeenKeysFromSessions(fullSessions, seenKeys) + addSeenDeduplicationKeysFromSessions(fullSessions, seenKeys) } else { provider ??= await getProvider(providerName) if (!provider) continue @@ -518,23 +685,42 @@ function cacheKey(dateRange?: DateRange, providerFilter?: string, noCache = fals async function sourceSignatureForCache(sources: SessionSource[]): Promise { const fingerprints = await Promise.all(sources.map(async source => { + if (source.provider === 'claude') { + const jsonlFiles = await collectJsonlFiles(source.path) + return Promise.all(jsonlFiles.map(async filePath => { + try { + const meta = await stat(filePath) + return [ + source.provider, + source.project, + filePath, + filePath, + String(meta.mtimeMs), + String(meta.size), + ].join(':') + } catch { + return [source.provider, source.project, filePath, filePath, 'missing'].join(':') + } + })) + } + const fingerprintPath = source.fingerprintPath ?? source.path try { const meta = await stat(fingerprintPath) - return [ + return [[ source.provider, source.project, source.path, fingerprintPath, String(meta.mtimeMs), String(meta.size), - ].join(':') + ].join(':')] } catch { - return [source.provider, source.project, source.path, fingerprintPath, 'missing'].join(':') + return [[source.provider, source.project, source.path, fingerprintPath, 'missing'].join(':')] } })) - return fingerprints.sort().join('|') + return fingerprints.flat().sort().join('|') } function cachePut(key: string, data: ProjectSummary[], sourceSignature: string) { @@ -622,7 +808,7 @@ export async function parseAllSessions( const nonClaudeSources = allSources.filter(s => s.provider !== 'claude') const claudeDirs = claudeSources.map(s => ({ path: s.path, name: s.project })) - const claudeProjects = await scanProjectDirs(claudeDirs, seenMsgIds, dateRange) + const claudeProjects = await scanClaudeDirsWithCache(claudeDirs, seenMsgIds, dateRange, options) const providerGroups = new Map() for (const source of nonClaudeSources) { diff --git a/src/source-cache.ts b/src/source-cache.ts index bd65dcf..3dc652f 100644 --- a/src/source-cache.ts +++ b/src/source-cache.ts @@ -37,6 +37,10 @@ export type SourceCacheManifest = { entries: Record } +export type ReadSourceCacheEntryOptions = { + allowStaleFingerprint?: boolean +} + function isPlainObject(value: unknown): value is Record { return !!value && typeof value === 'object' && !Array.isArray(value) } @@ -246,6 +250,7 @@ export async function readSourceCacheEntry( manifest: SourceCacheManifest, provider: string, logicalPath: string, + options: ReadSourceCacheEntryOptions = {}, ): Promise { const meta = manifest.entries[sourceKey(provider, logicalPath)] if (!meta) return null @@ -260,12 +265,14 @@ export async function readSourceCacheEntry( if (!isSourceCacheEntry(entry) || entry.version !== SOURCE_CACHE_VERSION) return null if (entry.provider !== provider || entry.logicalPath !== logicalPath) return null - const currentFingerprint = await computeFileFingerprint(entry.fingerprintPath) - if ( - currentFingerprint.mtimeMs !== entry.fingerprint.mtimeMs - || currentFingerprint.sizeBytes !== entry.fingerprint.sizeBytes - ) { - return null + if (!options.allowStaleFingerprint) { + const currentFingerprint = await computeFileFingerprint(entry.fingerprintPath) + if ( + currentFingerprint.mtimeMs !== entry.fingerprint.mtimeMs + || currentFingerprint.sizeBytes !== entry.fingerprint.sizeBytes + ) { + return null + } } return entry diff --git a/tests/fs-utils.test.ts b/tests/fs-utils.test.ts index 6510900..b23941a 100644 --- a/tests/fs-utils.test.ts +++ b/tests/fs-utils.test.ts @@ -8,6 +8,7 @@ import { STREAM_THRESHOLD_BYTES, readSessionFile, readSessionLines, + readSessionLinesFromOffset, } from '../src/fs-utils.js' describe('readSessionFile', () => { @@ -96,3 +97,33 @@ describe('readSessionLines', () => { await gen.return(undefined) }) }) + +describe('readSessionLinesFromOffset', () => { + const tmpDirs: string[] = [] + + afterEach(async () => { + while (tmpDirs.length > 0) { + const d = tmpDirs.pop() + if (d) await rm(d, { recursive: true, force: true }) + } + }) + + async function tmpPath(content: string): Promise { + const base = await mkdtemp(join(tmpdir(), 'codeburn-fs-offset-')) + tmpDirs.push(base) + const p = join(base, 'offset.txt') + await writeFile(p, content, 'utf-8') + return p + } + + it('starts at the requested byte offset', async () => { + const p = await tmpPath('alpha\nbeta\ngamma\n') + const lines: string[] = [] + + for await (const line of readSessionLinesFromOffset(p, Buffer.byteLength('alpha\n', 'utf-8'))) { + lines.push(line) + } + + expect(lines).toEqual(['beta', 'gamma']) + }) +}) diff --git a/tests/parser-cache.test.ts b/tests/parser-cache.test.ts index 58459fb..446d629 100644 --- a/tests/parser-cache.test.ts +++ b/tests/parser-cache.test.ts @@ -1,5 +1,5 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' -import { mkdtemp, readFile, rm, writeFile } from 'fs/promises' +import { appendFile, mkdir, mkdtemp, readFile, rm, writeFile } from 'fs/promises' import { tmpdir } from 'os' import { join } from 'path' @@ -8,6 +8,8 @@ import type { ParsedProviderCall, Provider, SessionSource } from '../src/provide let root = '' let sourcePath = '' let parseCalls = 0 +let claudeRoot = '' +let claudeSessionPath = '' function makeCall(index: number): ParsedProviderCall { const second = String(index).padStart(2, '0') @@ -35,13 +37,38 @@ function makeCall(index: number): ParsedProviderCall { beforeEach(async () => { root = await mkdtemp(join(tmpdir(), 'codeburn-parser-cache-')) sourcePath = join(root, 'fake.jsonl') + claudeRoot = join(root, '.claude') + claudeSessionPath = join(claudeRoot, 'projects', 'demo-project', 'session.jsonl') parseCalls = 0 process.env['CODEBURN_CACHE_DIR'] = join(root, 'cache') + process.env['CLAUDE_CONFIG_DIR'] = claudeRoot await writeFile(sourcePath, 'one\n', 'utf-8') + await mkdir(join(claudeRoot, 'projects', 'demo-project'), { recursive: true }) + await writeFile(claudeSessionPath, [ + JSON.stringify({ + type: 'user', + timestamp: '2026-04-20T09:00:00.000Z', + sessionId: 'sess-1', + message: { role: 'user', content: 'first' }, + }), + JSON.stringify({ + type: 'assistant', + timestamp: '2026-04-20T09:00:01.000Z', + message: { + id: 'msg-1', + model: 'claude-sonnet-4-6', + role: 'assistant', + type: 'message', + content: [], + usage: { input_tokens: 10, output_tokens: 20 }, + }, + }), + ].join('\n') + '\n', 'utf-8') }) afterEach(async () => { delete process.env['CODEBURN_CACHE_DIR'] + delete process.env['CLAUDE_CONFIG_DIR'] await rm(root, { recursive: true, force: true }) vi.resetModules() vi.clearAllMocks() @@ -150,4 +177,37 @@ describe('parseAllSessions source cache', () => { expect(onlyFirstDay[0]?.totalApiCalls).toBe(1) }) + + it('refreshes appended Claude log entries on the next run', async () => { + vi.doUnmock('../src/providers/index.js') + vi.resetModules() + const { parseAllSessions } = await import('../src/parser.js') + + const first = await parseAllSessions(undefined, 'claude') + expect(first.find(project => project.project === 'demo-project')?.totalApiCalls).toBe(1) + + await appendFile(claudeSessionPath, [ + JSON.stringify({ + type: 'user', + timestamp: '2026-04-20T09:05:00.000Z', + sessionId: 'sess-1', + message: { role: 'user', content: 'second' }, + }), + JSON.stringify({ + type: 'assistant', + timestamp: '2026-04-20T09:05:01.000Z', + message: { + id: 'msg-2', + model: 'claude-sonnet-4-6', + role: 'assistant', + type: 'message', + content: [], + usage: { input_tokens: 11, output_tokens: 21 }, + }, + }), + ].join('\n') + '\n', 'utf-8') + + const second = await parseAllSessions(undefined, 'claude') + expect(second.find(project => project.project === 'demo-project')?.totalApiCalls).toBe(2) + }) })