Merge pull request #201 from getagentseal/fix/streaming-dedup
Some checks are pending
CI / semgrep (push) Waiting to run

Fix streaming dedup: keep last message.id occurrence for accurate tokens and tools
This commit is contained in:
Resham Joshi 2026-05-02 22:30:48 -07:00 committed by GitHub
commit 95585febf4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -121,6 +121,30 @@ function parseApiCall(entry: JournalEntry): ParsedApiCall | null {
}
}
function dedupeStreamingMessageIds(entries: JournalEntry[]): JournalEntry[] {
const firstIdxById = new Map<string, number>()
const lastIdxById = new Map<string, number>()
for (let i = 0; i < entries.length; i++) {
const id = getMessageId(entries[i]!)
if (!id) continue
if (!firstIdxById.has(id)) firstIdxById.set(id, i)
lastIdxById.set(id, i)
}
if (lastIdxById.size === 0) return entries
const result: JournalEntry[] = []
for (let i = 0; i < entries.length; i++) {
const id = getMessageId(entries[i]!)
if (id && lastIdxById.get(id) !== i) continue
if (id && firstIdxById.get(id) !== i) {
const firstTs = entries[firstIdxById.get(id)!]!.timestamp
result.push({ ...entries[i]!, timestamp: firstTs ?? entries[i]!.timestamp })
continue
}
result.push(entries[i]!)
}
return result
}
function groupIntoTurns(entries: JournalEntry[], seenMsgIds: Set<string>): ParsedTurn[] {
const turns: ParsedTurn[] = []
let currentUserMessage = ''
@ -291,7 +315,8 @@ async function parseSessionFile(
if (entries.length === 0) return null
const sessionId = basename(filePath, '.jsonl')
let turns = groupIntoTurns(entries, seenMsgIds)
const dedupedEntries = dedupeStreamingMessageIds(entries)
let turns = groupIntoTurns(dedupedEntries, seenMsgIds)
if (dateRange) {
// Bucket a turn by the timestamp of its first assistant call (when the cost was
// actually incurred). Filtering entries directly produced orphan assistant calls