feat(core): initial implementation of syncing (#17814)

This commit is contained in:
James Long 2026-03-25 10:47:40 -04:00 committed by GitHub
parent 0c0c6f3bdb
commit b0017bf1b9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
32 changed files with 4403 additions and 1760 deletions

View file

@ -60,6 +60,8 @@ function toolEvent(
const payload: EventMessagePartUpdated = {
type: "message.part.updated",
properties: {
sessionID: sessionId,
time: Date.now(),
part: {
id: `part_${opts.callID}`,
sessionID: sessionId,

View file

@ -74,11 +74,17 @@ delete process.env["SAMBANOVA_API_KEY"]
delete process.env["OPENCODE_SERVER_PASSWORD"]
delete process.env["OPENCODE_SERVER_USERNAME"]
// Use in-memory sqlite
process.env["OPENCODE_DB"] = ":memory:"
// Now safe to import from src/
const { Log } = await import("../src/util/log")
const { initProjectors } = await import("../src/server/projectors")
Log.init({
print: false,
dev: true,
level: "DEBUG",
})
initProjectors()

View file

@ -10,8 +10,8 @@ import { MessageID, PartID } from "../../src/session/schema"
const projectRoot = path.join(__dirname, "../..")
Log.init({ print: false })
describe("session.started event", () => {
test("should emit session.started event when session is created", async () => {
describe("session.created event", () => {
test("should emit session.created event when session is created", async () => {
await Instance.provide({
directory: projectRoot,
fn: async () => {
@ -41,14 +41,14 @@ describe("session.started event", () => {
})
})
test("session.started event should be emitted before session.updated", async () => {
test("session.created event should be emitted before session.updated", async () => {
await Instance.provide({
directory: projectRoot,
fn: async () => {
const events: string[] = []
const unsubStarted = Bus.subscribe(Session.Event.Created, () => {
events.push("started")
const unsubCreated = Bus.subscribe(Session.Event.Created, () => {
events.push("created")
})
const unsubUpdated = Bus.subscribe(Session.Event.Updated, () => {
@ -59,12 +59,12 @@ describe("session.started event", () => {
await new Promise((resolve) => setTimeout(resolve, 100))
unsubStarted()
unsubCreated()
unsubUpdated()
expect(events).toContain("started")
expect(events).toContain("created")
expect(events).toContain("updated")
expect(events.indexOf("started")).toBeLessThan(events.indexOf("updated"))
expect(events.indexOf("created")).toBeLessThan(events.indexOf("updated"))
await Session.remove(session.id)
},

View file

@ -6,14 +6,9 @@ import { Database } from "../../src/storage/db"
describe("Database.Path", () => {
test("returns database path for the current channel", () => {
const db = process.env["OPENCODE_DB"]
const expected = db
? path.isAbsolute(db)
? db
: path.join(Global.Path.data, db)
: ["latest", "beta"].includes(Installation.CHANNEL)
? path.join(Global.Path.data, "opencode.db")
: path.join(Global.Path.data, `opencode-${Installation.CHANNEL.replace(/[^a-zA-Z0-9._-]/g, "-")}.db`)
expect(Database.Path).toBe(expected)
const expected = ["latest", "beta"].includes(Installation.CHANNEL)
? path.join(Global.Path.data, "opencode.db")
: path.join(Global.Path.data, `opencode-${Installation.CHANNEL.replace(/[^a-zA-Z0-9._-]/g, "-")}.db`)
expect(Database.getChannelPath()).toBe(expected)
})
})

View file

@ -0,0 +1,187 @@
import { describe, test, expect, beforeEach, afterEach, afterAll } from "bun:test"
import { tmpdir } from "../fixture/fixture"
import z from "zod"
import { Bus } from "../../src/bus"
import { Instance } from "../../src/project/instance"
import { SyncEvent } from "../../src/sync"
import { Database } from "../../src/storage/db"
import { EventTable } from "../../src/sync/event.sql"
import { Identifier } from "../../src/id/id"
import { Flag } from "../../src/flag/flag"
import { initProjectors } from "../../src/server/projectors"
const original = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES
beforeEach(() => {
Database.close()
// @ts-expect-error don't do this normally, but it works
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true
})
afterEach(() => {
// @ts-expect-error don't do this normally, but it works
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = original
})
function withInstance(fn: () => void | Promise<void>) {
return async () => {
await using tmp = await tmpdir()
await Instance.provide({
directory: tmp.path,
fn: async () => {
await fn()
},
})
}
}
describe("SyncEvent", () => {
function setup() {
SyncEvent.reset()
const Created = SyncEvent.define({
type: "item.created",
version: 1,
aggregate: "id",
schema: z.object({ id: z.string(), name: z.string() }),
})
const Sent = SyncEvent.define({
type: "item.sent",
version: 1,
aggregate: "item_id",
schema: z.object({ item_id: z.string(), to: z.string() }),
})
SyncEvent.init({
projectors: [SyncEvent.project(Created, () => {}), SyncEvent.project(Sent, () => {})],
})
return { Created, Sent }
}
afterAll(() => {
SyncEvent.reset()
initProjectors()
})
describe("run", () => {
test(
"inserts event row",
withInstance(() => {
const { Created } = setup()
SyncEvent.run(Created, { id: "evt_1", name: "first" })
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(1)
expect(rows[0].type).toBe("item.created.1")
expect(rows[0].aggregate_id).toBe("evt_1")
}),
)
test(
"increments seq per aggregate",
withInstance(() => {
const { Created } = setup()
SyncEvent.run(Created, { id: "evt_1", name: "first" })
SyncEvent.run(Created, { id: "evt_1", name: "second" })
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(2)
expect(rows[1].seq).toBe(rows[0].seq + 1)
}),
)
test(
"uses custom aggregate field from agg()",
withInstance(() => {
const { Sent } = setup()
SyncEvent.run(Sent, { item_id: "evt_1", to: "james" })
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(1)
expect(rows[0].aggregate_id).toBe("evt_1")
}),
)
test(
"emits events",
withInstance(async () => {
const { Created } = setup()
const events: Array<{
type: string
properties: { id: string; name: string }
}> = []
const unsub = Bus.subscribeAll((event) => events.push(event))
SyncEvent.run(Created, { id: "evt_1", name: "test" })
expect(events).toHaveLength(1)
expect(events[0]).toEqual({
type: "item.created",
properties: {
id: "evt_1",
name: "test",
},
})
unsub()
}),
)
})
describe("replay", () => {
test(
"inserts event from external payload",
withInstance(() => {
const id = Identifier.descending("message")
SyncEvent.replay({
id: "evt_1",
type: "item.created.1",
seq: 0,
aggregateID: id,
data: { id, name: "replayed" },
})
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(1)
expect(rows[0].aggregate_id).toBe(id)
}),
)
test(
"throws on sequence mismatch",
withInstance(() => {
const id = Identifier.descending("message")
SyncEvent.replay({
id: "evt_1",
type: "item.created.1",
seq: 0,
aggregateID: id,
data: { id, name: "first" },
})
expect(() =>
SyncEvent.replay({
id: "evt_1",
type: "item.created.1",
seq: 5,
aggregateID: id,
data: { id, name: "bad" },
}),
).toThrow(/Sequence mismatch/)
}),
)
test(
"throws on unknown event type",
withInstance(() => {
expect(() =>
SyncEvent.replay({
id: "evt_1",
type: "unknown.event.1",
seq: 0,
aggregateID: "x",
data: {},
}),
).toThrow(/Unknown event type/)
}),
)
})
})