fix(session): remap compaction tail_start_id when forking (#24898)

Co-authored-by: spark4862 <spark4862@users.noreply.github.com>
Co-authored-by: Aiden Cline <aidenpcline@gmail.com>
This commit is contained in:
spark4862 2026-04-29 13:23:56 +08:00 committed by GitHub
parent 504ca3d3d8
commit d71b827d8c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 75 additions and 2 deletions

View file

@ -616,12 +616,16 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
})
for (const part of msg.parts) {
yield* updatePart({
const p: MessageV2.Part = {
...part,
id: PartID.ascending(),
messageID: cloned.id,
sessionID: session.id,
})
}
if (p.type === "compaction" && p.tail_start_id) {
p.tail_start_id = idMap.get(p.tail_start_id)
}
yield* updatePart(p)
}
}
return session

View file

@ -29,6 +29,9 @@ const svc = {
updatePart<T extends MessageV2.Part>(part: T) {
return run(SessionNs.Service.use((svc) => svc.updatePart(part)))
},
fork(input: { sessionID: SessionID; messageID?: MessageID }) {
return run(SessionNs.Service.use((svc) => svc.fork(input)))
},
}
async function fill(sessionID: SessionID, count: number, time = (i: number) => Date.now() + i) {
@ -837,6 +840,72 @@ describe("MessageV2.filterCompacted", () => {
})
})
test("fork remaps compaction tail_start_id for filterCompacted", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await svc.create({})
const u1 = await addUser(session.id, "first")
const a1 = await addAssistant(session.id, u1, { finish: "end_turn" })
await svc.updatePart({
id: PartID.ascending(),
sessionID: session.id,
messageID: a1,
type: "text",
text: "first reply",
})
const u2 = await addUser(session.id, "second")
const a2 = await addAssistant(session.id, u2, { finish: "end_turn" })
await svc.updatePart({
id: PartID.ascending(),
sessionID: session.id,
messageID: a2,
type: "text",
text: "second reply",
})
const c1 = await addUser(session.id)
await addCompactionPart(session.id, c1, u2)
const s1 = await addAssistant(session.id, c1, { summary: true, finish: "end_turn" })
await svc.updatePart({
id: PartID.ascending(),
sessionID: session.id,
messageID: s1,
type: "text",
text: "summary",
})
const u3 = await addUser(session.id, "third")
const a3 = await addAssistant(session.id, u3, { finish: "end_turn" })
await svc.updatePart({
id: PartID.ascending(),
sessionID: session.id,
messageID: a3,
type: "text",
text: "third reply",
})
const parentFiltered = MessageV2.filterCompacted(MessageV2.stream(session.id))
expect(parentFiltered.map((item) => item.info.id)).toEqual([u2, a2, c1, s1, u3, a3])
const forked = await svc.fork({ sessionID: session.id })
const childFiltered = MessageV2.filterCompacted(MessageV2.stream(forked.id))
expect(childFiltered).toHaveLength(parentFiltered.length)
const tailPart = childFiltered.flatMap((m) => m.parts).find((p) => p.type === "compaction")
expect(tailPart?.type).toBe("compaction")
if (!tailPart || tailPart.type !== "compaction") throw new Error("Expected forked compaction part")
expect(tailPart.tail_start_id).toBeDefined()
expect(childFiltered.some((m) => m.info.id === tailPart.tail_start_id)).toBe(true)
await svc.remove(forked.id)
await svc.remove(session.id)
},
})
})
test("retains an assistant tail when compaction starts inside a turn", async () => {
await Instance.provide({
directory: root,