mirror of
https://github.com/AgentSeal/codeburn.git
synced 2026-05-19 16:13:56 +00:00
Add persistent disk cache for parsed session data
Some checks are pending
CI / semgrep (push) Waiting to run
Some checks are pending
CI / semgrep (push) Waiting to run
Cache normalized turns/calls to ~/.cache/codeburn/session-cache.json so the CLI skips re-parsing unchanged JSONL files on subsequent runs. File reconciliation uses dev+ino+mtime+size fingerprinting; cost, classification, and summaries are recomputed at query time. Atomic writes via temp+fsync+rename, deep structural validation on load, per-provider env fingerprinting, and best-effort save so cache failures never break the CLI. ~6x speedup on warm cache.
This commit is contained in:
parent
d568c8c103
commit
bd41fa3962
6 changed files with 1236 additions and 51 deletions
392
src/parser.ts
392
src/parser.ts
|
|
@ -6,6 +6,19 @@ import { discoverAllSessions, getProvider } from './providers/index.js'
|
|||
import { flushCodexCache } from './codex-cache.js'
|
||||
import { flushAntigravityCache } from './providers/antigravity.js'
|
||||
import { isSqliteBusyError } from './sqlite.js'
|
||||
import {
|
||||
type CachedCall,
|
||||
type CachedFile,
|
||||
type CachedTurn,
|
||||
type ProviderSection,
|
||||
type SessionCache,
|
||||
cleanupOrphanedTempFiles,
|
||||
computeEnvFingerprint,
|
||||
fingerprintFile,
|
||||
loadCache,
|
||||
reconcileFile,
|
||||
saveCache,
|
||||
} from './session-cache.js'
|
||||
import type { ParsedProviderCall } from './providers/types.js'
|
||||
import type {
|
||||
AssistantMessageContent,
|
||||
|
|
@ -995,6 +1008,7 @@ function parseApiCall(entry: JournalEntry): ParsedApiCall | null {
|
|||
timestamp: entry.timestamp ?? '',
|
||||
bashCommands: bashCmds,
|
||||
deduplicationKey: msg.id ?? `claude:${entry.timestamp}`,
|
||||
cacheCreationOneHourTokens: cacheCreation.oneHourTokens || undefined,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1321,31 +1335,115 @@ async function collectJsonlFiles(dirPath: string): Promise<string[]> {
|
|||
return jsonlFiles
|
||||
}
|
||||
|
||||
async function scanProjectDirs(dirs: Array<{ path: string; name: string }>, seenMsgIds: Set<string>, dateRange?: DateRange): Promise<ProjectSummary[]> {
|
||||
const projectMap = new Map<string, { project: string; projectPath: string; sessions: SessionSummary[] }>()
|
||||
async function scanProjectDirs(
|
||||
dirs: Array<{ path: string; name: string }>,
|
||||
seenMsgIds: Set<string>,
|
||||
diskCache: SessionCache,
|
||||
dateRange?: DateRange,
|
||||
): Promise<ProjectSummary[]> {
|
||||
const section = getOrCreateProviderSection(diskCache, 'claude')
|
||||
const allDiscoveredFiles = new Set<string>()
|
||||
|
||||
type FileInfo = { dirName: string; fp: NonNullable<Awaited<ReturnType<typeof fingerprintFile>>> }
|
||||
const unchangedFiles: Array<{ filePath: string; dirName: string; cached: CachedFile }> = []
|
||||
const changedFiles: Array<{ filePath: string; info: FileInfo }> = []
|
||||
|
||||
for (const { path: dirPath, name: dirName } of dirs) {
|
||||
const jsonlFiles = await collectJsonlFiles(dirPath)
|
||||
|
||||
for (const filePath of jsonlFiles) {
|
||||
const parsed = await parseSessionFile(filePath, dirName, seenMsgIds, dateRange)
|
||||
if (parsed && parsed.session.apiCalls > 0) {
|
||||
const projectPath = parsed.canonicalCwd ?? unsanitizePath(dirName)
|
||||
const projectKey = parsed.canonicalCwd ? normalizeProjectPathKey(parsed.canonicalCwd) : `slug:${dirName}`
|
||||
const existing = projectMap.get(projectKey)
|
||||
if (existing) {
|
||||
existing.sessions.push(parsed.session)
|
||||
} else {
|
||||
projectMap.set(projectKey, { project: dirName, projectPath, sessions: [parsed.session] })
|
||||
}
|
||||
allDiscoveredFiles.add(filePath)
|
||||
const fp = await fingerprintFile(filePath)
|
||||
if (!fp) continue
|
||||
|
||||
const action = reconcileFile(fp, section.files[filePath])
|
||||
if (action.action === 'unchanged') {
|
||||
unchangedFiles.push({ filePath, dirName, cached: section.files[filePath]! })
|
||||
} else {
|
||||
changedFiles.push({ filePath, info: { dirName, fp } })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If a slug has both cwd-keyed and slug-keyed entries (mixed sessions where
|
||||
// some carry a canonical cwd and some don't), fold the slug-keyed sessions
|
||||
// into the cwd-keyed entry so the canonical projectPath is preserved
|
||||
// regardless of file iteration order.
|
||||
// Pre-seed dedup set from cached (unchanged) files
|
||||
for (const { cached } of unchangedFiles) {
|
||||
for (const turn of cached.turns) {
|
||||
for (const call of turn.calls) {
|
||||
seenMsgIds.add(call.deduplicationKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Parse changed files, update cache
|
||||
for (const { filePath, info } of changedFiles) {
|
||||
// Clear stale entry before parse — if parse fails, file is excluded
|
||||
delete section.files[filePath]
|
||||
|
||||
const tracker = { lastCompleteLineOffset: 0 }
|
||||
const entries = await parseClaudeEntries(filePath, tracker)
|
||||
if (!entries) continue
|
||||
|
||||
const turns = groupIntoTurns(dedupeStreamingMessageIds(entries), seenMsgIds)
|
||||
section.files[filePath] = {
|
||||
fingerprint: info.fp,
|
||||
lastCompleteLineOffset: tracker.lastCompleteLineOffset,
|
||||
canonicalCwd: extractCanonicalCwd(entries),
|
||||
mcpInventory: extractMcpInventory(entries),
|
||||
turns: turns.map(parsedTurnToCachedTurn),
|
||||
}
|
||||
}
|
||||
|
||||
// Remove deleted files from cache
|
||||
for (const cachedPath of Object.keys(section.files)) {
|
||||
if (!allDiscoveredFiles.has(cachedPath)) {
|
||||
delete section.files[cachedPath]
|
||||
}
|
||||
}
|
||||
|
||||
// Query-time: derive ProjectSummary[] from all cached turns
|
||||
const projectMap = new Map<string, { project: string; projectPath: string; sessions: SessionSummary[] }>()
|
||||
|
||||
const allFiles = [
|
||||
...unchangedFiles.map(f => ({ filePath: f.filePath, dirName: f.dirName })),
|
||||
...changedFiles.map(f => ({ filePath: f.filePath, dirName: f.info.dirName })),
|
||||
]
|
||||
|
||||
for (const { filePath, dirName } of allFiles) {
|
||||
const cachedFile = section.files[filePath]
|
||||
if (!cachedFile || cachedFile.turns.length === 0) continue
|
||||
|
||||
let classifiedTurns = cachedFile.turns.map(cachedTurnToClassified)
|
||||
|
||||
if (dateRange) {
|
||||
classifiedTurns = classifiedTurns.filter(turn => {
|
||||
if (turn.assistantCalls.length === 0) return false
|
||||
const firstCallTs = turn.assistantCalls[0]!.timestamp
|
||||
if (!firstCallTs) return false
|
||||
const ts = new Date(firstCallTs)
|
||||
return ts >= dateRange.start && ts <= dateRange.end
|
||||
})
|
||||
}
|
||||
|
||||
if (classifiedTurns.length === 0) continue
|
||||
|
||||
const sessionId = basename(filePath, '.jsonl')
|
||||
const projectPath = cachedFile.canonicalCwd ?? unsanitizePath(dirName)
|
||||
const mcpInv = cachedFile.mcpInventory.length > 0 ? cachedFile.mcpInventory : undefined
|
||||
const session = buildSessionSummary(sessionId, dirName, classifiedTurns, mcpInv)
|
||||
|
||||
if (session.apiCalls > 0) {
|
||||
const projectKey = cachedFile.canonicalCwd
|
||||
? normalizeProjectPathKey(cachedFile.canonicalCwd)
|
||||
: `slug:${dirName}`
|
||||
const existing = projectMap.get(projectKey)
|
||||
if (existing) {
|
||||
existing.sessions.push(session)
|
||||
} else {
|
||||
projectMap.set(projectKey, { project: dirName, projectPath, sessions: [session] })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fold slug-keyed entries into cwd-keyed entries
|
||||
const cwdKeyByDirName = new Map<string, string>()
|
||||
for (const [key, entry] of projectMap) {
|
||||
if (!key.startsWith('slug:') && !cwdKeyByDirName.has(entry.project)) {
|
||||
|
|
@ -1411,6 +1509,136 @@ function providerCallToTurn(call: ParsedProviderCall): ParsedTurn {
|
|||
}
|
||||
}
|
||||
|
||||
// ── Cache Conversion ───────────────────────────────────────────────────
|
||||
|
||||
function apiCallToCachedCall(call: ParsedApiCall): CachedCall {
|
||||
return {
|
||||
provider: call.provider,
|
||||
model: call.model,
|
||||
usage: { ...call.usage, cacheCreationOneHourTokens: call.cacheCreationOneHourTokens ?? 0 },
|
||||
speed: call.speed,
|
||||
timestamp: call.timestamp,
|
||||
tools: call.tools,
|
||||
bashCommands: call.bashCommands,
|
||||
skills: call.skills,
|
||||
deduplicationKey: call.deduplicationKey,
|
||||
}
|
||||
}
|
||||
|
||||
function parsedTurnToCachedTurn(turn: ParsedTurn): CachedTurn {
|
||||
return {
|
||||
timestamp: turn.timestamp,
|
||||
sessionId: turn.sessionId,
|
||||
userMessage: turn.userMessage.slice(0, 2000),
|
||||
calls: turn.assistantCalls.map(apiCallToCachedCall),
|
||||
}
|
||||
}
|
||||
|
||||
function providerCallToCachedTurn(call: ParsedProviderCall): CachedTurn {
|
||||
return {
|
||||
timestamp: call.timestamp,
|
||||
sessionId: call.sessionId,
|
||||
userMessage: call.userMessage.slice(0, 2000),
|
||||
calls: [{
|
||||
provider: call.provider,
|
||||
model: call.model,
|
||||
usage: {
|
||||
inputTokens: call.inputTokens,
|
||||
outputTokens: call.outputTokens,
|
||||
cacheCreationInputTokens: call.cacheCreationInputTokens,
|
||||
cacheReadInputTokens: call.cacheReadInputTokens,
|
||||
cachedInputTokens: call.cachedInputTokens,
|
||||
reasoningTokens: call.reasoningTokens,
|
||||
webSearchRequests: call.webSearchRequests,
|
||||
cacheCreationOneHourTokens: 0,
|
||||
},
|
||||
speed: call.speed,
|
||||
timestamp: call.timestamp,
|
||||
tools: call.tools,
|
||||
bashCommands: call.bashCommands,
|
||||
skills: [],
|
||||
deduplicationKey: call.deduplicationKey,
|
||||
project: call.project,
|
||||
projectPath: call.projectPath,
|
||||
}],
|
||||
}
|
||||
}
|
||||
|
||||
function cachedCallToApiCall(call: CachedCall): ParsedApiCall {
|
||||
const u = call.usage
|
||||
const outputForCost = call.provider === 'claude'
|
||||
? u.outputTokens
|
||||
: u.outputTokens + u.reasoningTokens
|
||||
const costUSD = calculateCost(
|
||||
call.model, u.inputTokens, outputForCost,
|
||||
u.cacheCreationInputTokens, u.cacheReadInputTokens,
|
||||
u.webSearchRequests, call.speed, u.cacheCreationOneHourTokens,
|
||||
)
|
||||
return {
|
||||
provider: call.provider,
|
||||
model: call.model,
|
||||
usage: {
|
||||
inputTokens: u.inputTokens,
|
||||
outputTokens: u.outputTokens,
|
||||
cacheCreationInputTokens: u.cacheCreationInputTokens,
|
||||
cacheReadInputTokens: u.cacheReadInputTokens,
|
||||
cachedInputTokens: u.cachedInputTokens,
|
||||
reasoningTokens: u.reasoningTokens,
|
||||
webSearchRequests: u.webSearchRequests,
|
||||
},
|
||||
costUSD,
|
||||
tools: call.tools,
|
||||
mcpTools: extractMcpTools(call.tools),
|
||||
skills: call.skills,
|
||||
hasAgentSpawn: call.tools.includes('Agent'),
|
||||
hasPlanMode: call.tools.includes('EnterPlanMode'),
|
||||
speed: call.speed,
|
||||
timestamp: call.timestamp,
|
||||
bashCommands: call.bashCommands,
|
||||
deduplicationKey: call.deduplicationKey,
|
||||
cacheCreationOneHourTokens: u.cacheCreationOneHourTokens || undefined,
|
||||
}
|
||||
}
|
||||
|
||||
function cachedTurnToClassified(turn: CachedTurn): ClassifiedTurn {
|
||||
const parsed: ParsedTurn = {
|
||||
userMessage: turn.userMessage,
|
||||
assistantCalls: turn.calls.map(cachedCallToApiCall),
|
||||
timestamp: turn.timestamp,
|
||||
sessionId: turn.sessionId,
|
||||
}
|
||||
return classifyTurn(parsed)
|
||||
}
|
||||
|
||||
// ── Cache-Aware Parsing Helpers ────────────────────────────────────────
|
||||
|
||||
async function parseClaudeEntries(
|
||||
filePath: string,
|
||||
tracker: { lastCompleteLineOffset: number },
|
||||
): Promise<JournalEntry[] | null> {
|
||||
const entries: JournalEntry[] = []
|
||||
let hasLines = false
|
||||
for await (const line of readSessionLines(filePath, undefined, {
|
||||
largeLineAsBuffer: true,
|
||||
byteOffsetTracker: tracker,
|
||||
})) {
|
||||
hasLines = true
|
||||
const entry = parseJsonlLine(line)
|
||||
if (entry) entries.push(compactEntry(entry))
|
||||
}
|
||||
if (!hasLines || entries.length === 0) return null
|
||||
return entries
|
||||
}
|
||||
|
||||
function getOrCreateProviderSection(cache: SessionCache, provider: string): ProviderSection {
|
||||
const envFp = computeEnvFingerprint(provider)
|
||||
const existing = cache.providers[provider]
|
||||
if (existing && existing.envFingerprint === envFp) return existing
|
||||
const section = { envFingerprint: envFp, files: {} }
|
||||
cache.providers[provider] = section
|
||||
return section
|
||||
}
|
||||
|
||||
const warnedProviderReadFailures = new Set<string>()
|
||||
|
||||
function warnProviderReadFailureOnce(providerName: string, err: unknown): void {
|
||||
|
|
@ -1428,47 +1656,66 @@ async function parseProviderSources(
|
|||
providerName: string,
|
||||
sources: Array<{ path: string; project: string }>,
|
||||
seenKeys: Set<string>,
|
||||
diskCache: SessionCache,
|
||||
dateRange?: DateRange,
|
||||
): Promise<ProjectSummary[]> {
|
||||
const provider = await getProvider(providerName)
|
||||
if (!provider) return []
|
||||
|
||||
const sessionMap = new Map<string, { project: string; projectPath?: string; turns: ClassifiedTurn[] }>()
|
||||
const section = getOrCreateProviderSection(diskCache, providerName)
|
||||
const allDiscoveredFiles = new Set<string>()
|
||||
|
||||
try {
|
||||
for (const source of sources) {
|
||||
if (dateRange) {
|
||||
try {
|
||||
const s = await stat(source.path)
|
||||
if (s.mtimeMs < dateRange.start.getTime()) continue
|
||||
} catch { /* fall through; treat unknown stat as "may contain data" */ }
|
||||
type SourceInfo = { source: { path: string; project: string }; fp: NonNullable<Awaited<ReturnType<typeof fingerprintFile>>> }
|
||||
const unchangedSources: Array<{ source: { path: string; project: string }; cached: CachedFile }> = []
|
||||
const changedSources: SourceInfo[] = []
|
||||
|
||||
for (const source of sources) {
|
||||
allDiscoveredFiles.add(source.path)
|
||||
const fp = await fingerprintFile(source.path)
|
||||
if (!fp) continue
|
||||
|
||||
const action = reconcileFile(fp, section.files[source.path])
|
||||
if (action.action === 'unchanged') {
|
||||
unchangedSources.push({ source, cached: section.files[source.path]! })
|
||||
} else {
|
||||
changedSources.push({ source, fp })
|
||||
}
|
||||
}
|
||||
|
||||
// Parser dedup: cross-provider keys + cached file keys.
|
||||
// Separate from seenKeys so parsing doesn't suppress query-time output.
|
||||
const parserDedup = new Set(seenKeys)
|
||||
for (const { cached } of unchangedSources) {
|
||||
for (const turn of cached.turns) {
|
||||
for (const call of turn.calls) {
|
||||
parserDedup.add(call.deduplicationKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Parse changed files, update cache
|
||||
let didParse = false
|
||||
try {
|
||||
for (const { source, fp } of changedSources) {
|
||||
if (dateRange) {
|
||||
if (fp.mtimeMs < dateRange.start.getTime()) continue
|
||||
}
|
||||
|
||||
// Clear stale entry before parse — if parse fails, file is excluded
|
||||
delete section.files[source.path]
|
||||
|
||||
const parser = provider.createSessionParser(
|
||||
{ path: source.path, project: source.project, provider: providerName },
|
||||
seenKeys,
|
||||
parserDedup,
|
||||
)
|
||||
|
||||
try {
|
||||
const turns: CachedTurn[] = []
|
||||
for await (const call of parser.parse()) {
|
||||
if (dateRange) {
|
||||
if (!call.timestamp) continue
|
||||
const ts = new Date(call.timestamp)
|
||||
if (ts < dateRange.start || ts > dateRange.end) continue
|
||||
}
|
||||
|
||||
const turn = providerCallToTurn(call)
|
||||
const classified = classifyTurn(turn)
|
||||
const project = call.project ?? source.project
|
||||
const key = `${providerName}:${call.sessionId}:${project}`
|
||||
|
||||
const existing = sessionMap.get(key)
|
||||
if (existing) {
|
||||
existing.turns.push(classified)
|
||||
if (!existing.projectPath && call.projectPath) existing.projectPath = call.projectPath
|
||||
} else {
|
||||
sessionMap.set(key, { project, projectPath: call.projectPath, turns: [classified] })
|
||||
}
|
||||
turns.push(providerCallToCachedTurn(call))
|
||||
}
|
||||
section.files[source.path] = { fingerprint: fp, mcpInventory: [], turns }
|
||||
didParse = true
|
||||
} catch (err) {
|
||||
if (isSqliteBusyError(err)) {
|
||||
warnProviderReadFailureOnce(providerName, err)
|
||||
|
|
@ -1478,13 +1725,57 @@ async function parseProviderSources(
|
|||
}
|
||||
}
|
||||
} finally {
|
||||
if (providerName === 'codex') await flushCodexCache()
|
||||
if (providerName === 'antigravity') {
|
||||
if (didParse && providerName === 'codex') await flushCodexCache()
|
||||
if (didParse && providerName === 'antigravity') {
|
||||
const liveIds = new Set(sources.map(s => basename(s.path, '.pb')))
|
||||
await flushAntigravityCache(liveIds)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove deleted files from cache
|
||||
for (const cachedPath of Object.keys(section.files)) {
|
||||
if (!allDiscoveredFiles.has(cachedPath)) {
|
||||
delete section.files[cachedPath]
|
||||
}
|
||||
}
|
||||
|
||||
// Query-time: derive SessionSummary from all cached turns.
|
||||
// Uses seenKeys (shared across providers) for cross-provider dedup.
|
||||
const sessionMap = new Map<string, { project: string; projectPath?: string; turns: ClassifiedTurn[] }>()
|
||||
|
||||
for (const source of sources) {
|
||||
const cachedFile = section.files[source.path]
|
||||
if (!cachedFile) continue
|
||||
|
||||
for (const turn of cachedFile.turns) {
|
||||
const hasDup = turn.calls.some(c => seenKeys.has(c.deduplicationKey))
|
||||
if (hasDup) continue
|
||||
|
||||
for (const c of turn.calls) seenKeys.add(c.deduplicationKey)
|
||||
|
||||
if (dateRange) {
|
||||
const callTs = turn.calls[0]?.timestamp
|
||||
if (!callTs) continue
|
||||
const ts = new Date(callTs)
|
||||
if (ts < dateRange.start || ts > dateRange.end) continue
|
||||
}
|
||||
|
||||
const classified = cachedTurnToClassified(turn)
|
||||
const project = turn.calls[0]?.project ?? source.project
|
||||
const key = `${providerName}:${turn.sessionId}:${project}`
|
||||
|
||||
const existing = sessionMap.get(key)
|
||||
if (existing) {
|
||||
existing.turns.push(classified)
|
||||
if (!existing.projectPath && turn.calls[0]?.projectPath) {
|
||||
existing.projectPath = turn.calls[0]!.projectPath
|
||||
}
|
||||
} else {
|
||||
sessionMap.set(key, { project, projectPath: turn.calls[0]?.projectPath, turns: [classified] })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const projectMap = new Map<string, { projectPath?: string; sessions: SessionSummary[] }>()
|
||||
for (const [key, { project, projectPath, turns }] of sessionMap) {
|
||||
const sessionId = key.split(':')[1] ?? key
|
||||
|
|
@ -1602,6 +1893,9 @@ export async function parseAllSessions(dateRange?: DateRange, providerFilter?: s
|
|||
const cached = sessionCache.get(key)
|
||||
if (cached && Date.now() - cached.ts < CACHE_TTL_MS) return cached.data
|
||||
|
||||
const diskCache = await loadCache()
|
||||
await cleanupOrphanedTempFiles()
|
||||
|
||||
const seenMsgIds = new Set<string>()
|
||||
const seenKeys = new Set<string>()
|
||||
const allSources = await discoverAllSessions(providerFilter)
|
||||
|
|
@ -1610,7 +1904,7 @@ export async function parseAllSessions(dateRange?: DateRange, providerFilter?: s
|
|||
const nonClaudeSources = allSources.filter(s => s.provider !== 'claude')
|
||||
|
||||
const claudeDirs = claudeSources.map(s => ({ path: s.path, name: s.project }))
|
||||
const claudeProjects = await scanProjectDirs(claudeDirs, seenMsgIds, dateRange)
|
||||
const claudeProjects = await scanProjectDirs(claudeDirs, seenMsgIds, diskCache, dateRange)
|
||||
|
||||
const providerGroups = new Map<string, Array<{ path: string; project: string }>>()
|
||||
for (const source of nonClaudeSources) {
|
||||
|
|
@ -1621,10 +1915,12 @@ export async function parseAllSessions(dateRange?: DateRange, providerFilter?: s
|
|||
|
||||
const otherProjects: ProjectSummary[] = []
|
||||
for (const [providerName, sources] of providerGroups) {
|
||||
const projects = await parseProviderSources(providerName, sources, seenKeys, dateRange)
|
||||
const projects = await parseProviderSources(providerName, sources, seenKeys, diskCache, dateRange)
|
||||
otherProjects.push(...projects)
|
||||
}
|
||||
|
||||
try { await saveCache(diskCache) } catch {}
|
||||
|
||||
const mergedMap = new Map<string, ProjectSummary>()
|
||||
for (const p of [...claudeProjects, ...otherProjects]) {
|
||||
const existing = mergedMap.get(p.project)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue