mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-20 01:12:15 +00:00
fix(bus): acquire PubSub subscription eagerly to close /event race (#27959)
This commit is contained in:
parent
5bfd7fd16c
commit
cb35493242
9 changed files with 659 additions and 43 deletions
|
|
@ -37,8 +37,16 @@ export interface Interface {
|
|||
properties: BusProperties<D>,
|
||||
options?: { id?: string },
|
||||
) => Effect.Effect<void>
|
||||
readonly subscribe: <D extends BusEvent.Definition>(def: D) => Stream.Stream<Payload<D>>
|
||||
readonly subscribeAll: () => Stream.Stream<Payload>
|
||||
// subscribe / subscribeAll are eager: the underlying PubSub subscription is
|
||||
// acquired in the caller's Scope at `yield*` time. Any publish after the
|
||||
// yield is delivered, even if stream consumption starts later. The previous
|
||||
// Stream-returning shape acquired the subscription lazily on first pull,
|
||||
// opening a race window during which publishes were lost — see
|
||||
// test/bus/bus-effect.test.ts RACE tests.
|
||||
readonly subscribe: <D extends BusEvent.Definition>(
|
||||
def: D,
|
||||
) => Effect.Effect<Stream.Stream<Payload<D>>, never, Scope.Scope>
|
||||
readonly subscribeAll: () => Effect.Effect<Stream.Stream<Payload>, never, Scope.Scope>
|
||||
readonly subscribeCallback: <D extends BusEvent.Definition>(
|
||||
def: D,
|
||||
callback: (event: Payload<D>) => unknown,
|
||||
|
|
@ -109,26 +117,26 @@ export const layer = Layer.effect(
|
|||
})
|
||||
}
|
||||
|
||||
function subscribe<D extends BusEvent.Definition>(def: D): Stream.Stream<Payload<D>> {
|
||||
log.info("subscribing", { type: def.type })
|
||||
return Stream.unwrap(
|
||||
Effect.gen(function* () {
|
||||
const s = yield* InstanceState.get(state)
|
||||
const ps = yield* getOrCreate(s, def)
|
||||
return Stream.fromPubSub(ps)
|
||||
}),
|
||||
).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type }))))
|
||||
}
|
||||
const subscribe = <D extends BusEvent.Definition>(
|
||||
def: D,
|
||||
): Effect.Effect<Stream.Stream<Payload<D>>, never, Scope.Scope> =>
|
||||
Effect.gen(function* () {
|
||||
log.info("subscribing", { type: def.type })
|
||||
const s = yield* InstanceState.get(state)
|
||||
const ps = yield* getOrCreate(s, def)
|
||||
const subscription = yield* PubSub.subscribe(ps)
|
||||
yield* Effect.addFinalizer(() => Effect.sync(() => log.info("unsubscribing", { type: def.type })))
|
||||
return Stream.fromSubscription(subscription)
|
||||
})
|
||||
|
||||
function subscribeAll(): Stream.Stream<Payload> {
|
||||
log.info("subscribing", { type: "*" })
|
||||
return Stream.unwrap(
|
||||
Effect.gen(function* () {
|
||||
const s = yield* InstanceState.get(state)
|
||||
return Stream.fromPubSub(s.wildcard)
|
||||
}),
|
||||
).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" }))))
|
||||
}
|
||||
const subscribeAll = (): Effect.Effect<Stream.Stream<Payload>, never, Scope.Scope> =>
|
||||
Effect.gen(function* () {
|
||||
log.info("subscribing", { type: "*" })
|
||||
const s = yield* InstanceState.get(state)
|
||||
const subscription = yield* PubSub.subscribe(s.wildcard)
|
||||
yield* Effect.addFinalizer(() => Effect.sync(() => log.info("unsubscribing", { type: "*" })))
|
||||
return Stream.fromSubscription(subscription)
|
||||
})
|
||||
|
||||
function on<T>(pubsub: PubSub.PubSub<T>, type: string, callback: (event: T) => unknown) {
|
||||
return Effect.gen(function* () {
|
||||
|
|
|
|||
|
|
@ -243,7 +243,7 @@ export const layer = Layer.effect(
|
|||
}
|
||||
|
||||
// Subscribe to bus events, fiber interrupted when scope closes
|
||||
yield* bus.subscribeAll().pipe(
|
||||
yield* (yield* bus.subscribeAll()).pipe(
|
||||
Stream.runForEach((input) =>
|
||||
Effect.sync(() => {
|
||||
for (const hook of hooks) {
|
||||
|
|
|
|||
|
|
@ -425,7 +425,7 @@ export const layer: Layer.Layer<
|
|||
|
||||
const initState = yield* InstanceState.make(
|
||||
Effect.fn("Project.initState")(function* (ctx) {
|
||||
yield* bus.subscribe(Command.Event.Executed).pipe(
|
||||
yield* (yield* bus.subscribe(Command.Event.Executed)).pipe(
|
||||
Stream.runForEach((payload) =>
|
||||
payload.properties.name === Command.Default.INIT ? setInitialized(ctx.project.id) : Effect.void,
|
||||
),
|
||||
|
|
|
|||
|
|
@ -298,7 +298,7 @@ export const layer: Layer.Layer<Service, never, Git.Service | Bus.Service> = Lay
|
|||
const value = { current, root }
|
||||
log.info("initialized", { branch: value.current, default_branch: value.root?.name })
|
||||
|
||||
yield* bus.subscribe(FileWatcher.Event.Updated).pipe(
|
||||
yield* (yield* bus.subscribe(FileWatcher.Event.Updated)).pipe(
|
||||
Stream.filter((evt) => evt.properties.file.endsWith("HEAD")),
|
||||
Stream.runForEach((_evt) =>
|
||||
Effect.gen(function* () {
|
||||
|
|
|
|||
|
|
@ -20,10 +20,11 @@ function eventData(data: unknown): Sse.Event {
|
|||
|
||||
function eventResponse(bus: Bus.Interface) {
|
||||
return Effect.gen(function* () {
|
||||
const context = yield* Effect.context()
|
||||
|
||||
const events = bus.subscribeAll().pipe(
|
||||
Stream.provideContext(context),
|
||||
// Subscribe eagerly: the bus subscription is acquired in the request scope
|
||||
// at this yield, so any publish from now on is queued for the body-pump
|
||||
// fiber to drain — closing the race where Stream.concat(server.connected,
|
||||
// lazy-subscribe) used to drop publishes in the prefix-consume window.
|
||||
const events = (yield* bus.subscribeAll()).pipe(
|
||||
Stream.takeUntil((event) => event.type === Bus.InstanceDisposed.type),
|
||||
)
|
||||
const heartbeat = Stream.tick("10 seconds").pipe(
|
||||
|
|
|
|||
|
|
@ -168,16 +168,20 @@ export const layer = Layer.effect(
|
|||
fn: (evt: { properties: any }) => Effect.Effect<void, unknown>,
|
||||
) =>
|
||||
bus.subscribe(def as never).pipe(
|
||||
Stream.runForEach((evt) =>
|
||||
fn(evt).pipe(
|
||||
Effect.catchCause((cause) =>
|
||||
Effect.sync(() => {
|
||||
log.error("share subscriber failed", { type: def.type, cause })
|
||||
}),
|
||||
Effect.flatMap((stream) =>
|
||||
stream.pipe(
|
||||
Stream.runForEach((evt) =>
|
||||
fn(evt).pipe(
|
||||
Effect.catchCause((cause) =>
|
||||
Effect.sync(() => {
|
||||
log.error("share subscriber failed", { type: def.type, cause })
|
||||
}),
|
||||
),
|
||||
),
|
||||
),
|
||||
Effect.forkScoped,
|
||||
),
|
||||
),
|
||||
Effect.forkScoped,
|
||||
)
|
||||
|
||||
yield* watch(Session.Event.Updated, (evt) =>
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ describe("Bus (Effect-native)", () => {
|
|||
const done = yield* Deferred.make<void>()
|
||||
const ready = yield* Latch.make()
|
||||
|
||||
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
|
||||
yield* Stream.runForEach(yield* bus.subscribe(TestEvent.Ping), (evt) =>
|
||||
Effect.gen(function* () {
|
||||
if (evt.properties.value < 0) {
|
||||
yield* ready.open
|
||||
|
|
@ -71,7 +71,7 @@ describe("Bus (Effect-native)", () => {
|
|||
const done = yield* Deferred.make<void>()
|
||||
const ready = yield* Latch.make()
|
||||
|
||||
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
|
||||
yield* Stream.runForEach(yield* bus.subscribe(TestEvent.Ping), (evt) =>
|
||||
Effect.gen(function* () {
|
||||
if (evt.properties.value < 0) {
|
||||
yield* ready.open
|
||||
|
|
@ -98,7 +98,7 @@ describe("Bus (Effect-native)", () => {
|
|||
const done = yield* Deferred.make<void>()
|
||||
const ready = yield* Latch.make()
|
||||
|
||||
yield* Stream.runForEach(bus.subscribeAll(), (evt) =>
|
||||
yield* Stream.runForEach(yield* bus.subscribeAll(), (evt) =>
|
||||
Effect.gen(function* () {
|
||||
if (evt.type === TestEvent.Warmup.type) {
|
||||
yield* ready.open
|
||||
|
|
@ -129,7 +129,7 @@ describe("Bus (Effect-native)", () => {
|
|||
const readyA = yield* Latch.make()
|
||||
const readyB = yield* Latch.make()
|
||||
|
||||
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
|
||||
yield* Stream.runForEach(yield* bus.subscribe(TestEvent.Ping), (evt) =>
|
||||
Effect.gen(function* () {
|
||||
if (evt.properties.value < 0) {
|
||||
yield* readyA.open
|
||||
|
|
@ -140,7 +140,7 @@ describe("Bus (Effect-native)", () => {
|
|||
}),
|
||||
).pipe(Effect.forkScoped)
|
||||
|
||||
yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
|
||||
yield* Stream.runForEach(yield* bus.subscribe(TestEvent.Ping), (evt) =>
|
||||
Effect.gen(function* () {
|
||||
if (evt.properties.value < 0) {
|
||||
yield* readyB.open
|
||||
|
|
@ -162,6 +162,92 @@ describe("Bus (Effect-native)", () => {
|
|||
}),
|
||||
)
|
||||
|
||||
// RACE 1: eager subscription means publishing immediately after yield*
|
||||
// bus.subscribe is delivered. Regression for the old lazy `Stream.unwrap`
|
||||
// shape where PubSub.subscribe ran on first pull and missed any publish
|
||||
// in the hand-off window.
|
||||
it.instance("eager subscribe: publish after yield* is delivered without consumer-activation race", () =>
|
||||
Effect.gen(function* () {
|
||||
const bus = yield* Bus.Service
|
||||
const stream = yield* bus.subscribe(TestEvent.Ping)
|
||||
|
||||
// Hand-off window: subscription is alive (we yielded). Publish goes
|
||||
// straight into the subscription queue, even with no consumer running.
|
||||
yield* bus.publish(TestEvent.Ping, { value: 99 })
|
||||
|
||||
const collected = yield* stream.pipe(
|
||||
Stream.take(1),
|
||||
Stream.runCollect,
|
||||
Effect.timeout("400 millis"),
|
||||
Effect.option,
|
||||
)
|
||||
|
||||
expect(collected._tag).toBe("Some")
|
||||
if (collected._tag === "Some") {
|
||||
const arr = Array.from(collected.value)
|
||||
expect(arr[0].properties.value).toBe(99)
|
||||
}
|
||||
}),
|
||||
)
|
||||
|
||||
// RACE 2: same property for subscribeAll.
|
||||
it.instance("eager subscribeAll: publish after yield* is delivered", () =>
|
||||
Effect.gen(function* () {
|
||||
const bus = yield* Bus.Service
|
||||
const stream = yield* bus.subscribeAll()
|
||||
|
||||
yield* bus.publish(TestEvent.Ping, { value: 42 })
|
||||
|
||||
const collected = yield* stream.pipe(
|
||||
Stream.take(1),
|
||||
Stream.runCollect,
|
||||
Effect.timeout("400 millis"),
|
||||
Effect.option,
|
||||
)
|
||||
|
||||
expect(collected._tag).toBe("Some")
|
||||
if (collected._tag === "Some") {
|
||||
const arr = Array.from(collected.value)
|
||||
expect(arr[0].type).toBe(TestEvent.Ping.type)
|
||||
}
|
||||
}),
|
||||
)
|
||||
|
||||
// RACE 3: the /event-handler shape exactly. With eager subscription, the
|
||||
// bus subscription is alive before Stream.concat ever starts. Publishes
|
||||
// during the prefix consumption window are queued and delivered.
|
||||
it.instance("eager subscribe: Stream.concat(initial, subscribe) delivers publish during prefix", () =>
|
||||
Effect.gen(function* () {
|
||||
const bus = yield* Bus.Service
|
||||
const sawInitial = yield* Deferred.make<void>()
|
||||
const sawPublish = yield* Deferred.make<number>()
|
||||
|
||||
type Frame = { marker?: "initial"; value?: number }
|
||||
const subscriptionStream = yield* bus.subscribe(TestEvent.Ping)
|
||||
const handlerStream: Stream.Stream<Frame> = Stream.make({ marker: "initial" } as Frame).pipe(
|
||||
Stream.concat(subscriptionStream.pipe(Stream.map((evt): Frame => ({ value: evt.properties.value })))),
|
||||
)
|
||||
|
||||
yield* Stream.runForEach(handlerStream, (frame) =>
|
||||
Effect.gen(function* () {
|
||||
if (frame.marker === "initial") {
|
||||
Deferred.doneUnsafe(sawInitial, Effect.void)
|
||||
return
|
||||
}
|
||||
if (frame.value !== undefined) Deferred.doneUnsafe(sawPublish, Effect.succeed(frame.value))
|
||||
}),
|
||||
).pipe(Effect.forkScoped)
|
||||
|
||||
yield* Deferred.await(sawInitial).pipe(Effect.timeout("1 second"))
|
||||
|
||||
yield* bus.publish(TestEvent.Ping, { value: 7 })
|
||||
|
||||
const got = yield* Deferred.await(sawPublish).pipe(Effect.timeout("1 second"), Effect.option)
|
||||
expect(got._tag).toBe("Some")
|
||||
if (got._tag === "Some") expect(got.value).toBe(7)
|
||||
}),
|
||||
)
|
||||
|
||||
it.live("subscribeAll stream sees InstanceDisposed on disposal", () =>
|
||||
Effect.gen(function* () {
|
||||
const dir = yield* tmpdirScoped()
|
||||
|
|
@ -174,7 +260,7 @@ describe("Bus (Effect-native)", () => {
|
|||
yield* Effect.gen(function* () {
|
||||
const bus = yield* Bus.Service
|
||||
|
||||
yield* Stream.runForEach(bus.subscribeAll(), (evt) =>
|
||||
yield* Stream.runForEach(yield* bus.subscribeAll(), (evt) =>
|
||||
Effect.gen(function* () {
|
||||
if (evt.type === TestEvent.Warmup.type) {
|
||||
yield* ready.open
|
||||
|
|
|
|||
453
packages/opencode/test/server/httpapi-event-diagnostics.test.ts
Normal file
453
packages/opencode/test/server/httpapi-event-diagnostics.test.ts
Normal file
|
|
@ -0,0 +1,453 @@
|
|||
// Diagnostic suite for /event SSE delivery.
|
||||
//
|
||||
// Each test isolates ONE variable in the publisher chain while keeping the
|
||||
// subscriber path constant (raw `app().request` reading the SSE body — no SDK
|
||||
// consumer involvement). The pass/fail pattern across tests tells us where the
|
||||
// bug lives:
|
||||
//
|
||||
// D1 (baseline): publish via Bus.Service.use via AppRuntime — mirror of the
|
||||
// existing httpapi-event.test.ts test 3. Confirms /event SSE delivery
|
||||
// works for a SOME publish path.
|
||||
//
|
||||
// D2: publish N times in quick succession via Bus.Service.use. If the bus
|
||||
// subscription is acquired correctly there should be no message loss.
|
||||
//
|
||||
// D3: publish via SyncEvent.use.run via AppRuntime — exercises the same path
|
||||
// the HTTP handlers use (Session.updatePart → sync.run → bus.publish)
|
||||
// without the HTTP roundtrip. Tells us whether the sync path itself can
|
||||
// deliver in-process.
|
||||
//
|
||||
// D4: publish via SyncEvent.use.run from a fresh `Effect.provide` scope
|
||||
// (mimicking what happens if a handler's layer was scoped per-request).
|
||||
//
|
||||
// D5: in-process Bus.Service callback subscriber AND raw /event SSE subscriber
|
||||
// receive the same publish. If both receive: no bug. If only the
|
||||
// callback receives: the /event handler has an acquisition race.
|
||||
import { afterEach, describe, expect, test } from "bun:test"
|
||||
import { Bus } from "../../src/bus"
|
||||
import { AppRuntime } from "../../src/effect/app-runtime"
|
||||
import { InstanceRef } from "../../src/effect/instance-ref"
|
||||
import { Server } from "../../src/server/server"
|
||||
import { EventPaths } from "../../src/server/routes/instance/httpapi/groups/event"
|
||||
import { Event as ServerEvent } from "../../src/server/event"
|
||||
import { SyncEvent } from "../../src/sync"
|
||||
import { MessageV2 } from "../../src/session/message-v2"
|
||||
import { MessageID, PartID, SessionID } from "../../src/session/schema"
|
||||
import * as Log from "@opencode-ai/core/util/log"
|
||||
import { Effect, Schema } from "effect"
|
||||
import { resetDatabase } from "../fixture/db"
|
||||
import { disposeAllInstances, reloadTestInstance, tmpdir } from "../fixture/fixture"
|
||||
|
||||
void Log.init({ print: false })
|
||||
|
||||
function app() {
|
||||
return Server.Default().app
|
||||
}
|
||||
|
||||
const EventData = Schema.Struct({
|
||||
id: Schema.optional(Schema.String),
|
||||
type: Schema.String,
|
||||
properties: Schema.Record(Schema.String, Schema.Any),
|
||||
})
|
||||
|
||||
type SseEvent = Schema.Schema.Type<typeof EventData>
|
||||
|
||||
async function readChunk(reader: ReadableStreamDefaultReader<Uint8Array>, timeoutMs = 3_000) {
|
||||
let timeout: ReturnType<typeof setTimeout> | undefined
|
||||
try {
|
||||
return await Promise.race([
|
||||
reader.read(),
|
||||
new Promise<never>((_, reject) => {
|
||||
timeout = setTimeout(() => reject(new Error(`timed out after ${timeoutMs}ms`)), timeoutMs)
|
||||
}),
|
||||
])
|
||||
} finally {
|
||||
if (timeout) clearTimeout(timeout)
|
||||
}
|
||||
}
|
||||
|
||||
const textDecoder = new TextDecoder()
|
||||
|
||||
function decodeFrame(value: Uint8Array): SseEvent[] {
|
||||
// SSE frames are separated by blank lines and each starts with "data: ".
|
||||
// For our happy-path tests one chunk == one frame, but be defensive.
|
||||
const text = textDecoder.decode(value)
|
||||
return text
|
||||
.split(/\n\n+/)
|
||||
.map((part) => part.trim())
|
||||
.filter((part) => part.length > 0)
|
||||
.map((part) => {
|
||||
const payload = part.replace(/^data: /, "")
|
||||
return Schema.decodeUnknownSync(EventData)(JSON.parse(payload))
|
||||
})
|
||||
}
|
||||
|
||||
async function readNextEvent(reader: ReadableStreamDefaultReader<Uint8Array>, timeoutMs = 3_000): Promise<SseEvent> {
|
||||
const result = await readChunk(reader, timeoutMs)
|
||||
if (result.done || !result.value) throw new Error("event stream closed")
|
||||
const frames = decodeFrame(result.value)
|
||||
if (frames.length === 0) throw new Error("empty SSE frame")
|
||||
return frames[0]
|
||||
}
|
||||
|
||||
async function collectUntil(
|
||||
reader: ReadableStreamDefaultReader<Uint8Array>,
|
||||
predicate: (event: SseEvent) => boolean,
|
||||
timeoutMs = 3_000,
|
||||
): Promise<SseEvent[]> {
|
||||
const events: SseEvent[] = []
|
||||
const deadline = Date.now() + timeoutMs
|
||||
while (Date.now() < deadline) {
|
||||
const remaining = deadline - Date.now()
|
||||
const result = await readChunk(reader, remaining).catch((cause) => {
|
||||
throw new Error(`collectUntil timed out after ${events.length} events: ${cause}`)
|
||||
})
|
||||
if (result.done || !result.value) throw new Error("event stream closed mid-collect")
|
||||
for (const event of decodeFrame(result.value)) {
|
||||
events.push(event)
|
||||
if (predicate(event)) return events
|
||||
}
|
||||
}
|
||||
throw new Error(`collectUntil deadline exceeded; collected ${events.length}: ${JSON.stringify(events)}`)
|
||||
}
|
||||
|
||||
afterEach(async () => {
|
||||
await disposeAllInstances()
|
||||
await resetDatabase()
|
||||
})
|
||||
|
||||
describe("/event SSE delivery diagnostics", () => {
|
||||
// Sanity: baseline same as httpapi-event.test.ts test 3 (already known to pass)
|
||||
// but explicit about timing — publish happens with NO wait after reading
|
||||
// server.connected. If this fails we have a deeper problem than just sync.
|
||||
test("D1: delivers a single bus event published right after server.connected", async () => {
|
||||
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
||||
const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } })
|
||||
if (!response.body) throw new Error("missing response body")
|
||||
const reader = response.body.getReader()
|
||||
try {
|
||||
const first = await readNextEvent(reader)
|
||||
expect(first.type).toBe("server.connected")
|
||||
|
||||
const ctx = await reloadTestInstance({ directory: tmp.path })
|
||||
// NO wait — publish immediately
|
||||
await AppRuntime.runPromise(
|
||||
Bus.Service.use((svc) => svc.publish(ServerEvent.Connected, {})).pipe(Effect.provideService(InstanceRef, ctx)),
|
||||
)
|
||||
|
||||
const next = await readNextEvent(reader)
|
||||
expect(next.type).toBe("server.connected") // ServerEvent.Connected.type === "server.connected"
|
||||
} finally {
|
||||
await reader.cancel()
|
||||
}
|
||||
})
|
||||
|
||||
// If D1 passes but D2 fails, we have a queue-drain or partial-loss issue.
|
||||
test("D2: delivers all N bus events published in rapid succession", async () => {
|
||||
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
||||
const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } })
|
||||
if (!response.body) throw new Error("missing response body")
|
||||
const reader = response.body.getReader()
|
||||
try {
|
||||
const first = await readNextEvent(reader)
|
||||
expect(first.type).toBe("server.connected")
|
||||
|
||||
const ctx = await reloadTestInstance({ directory: tmp.path })
|
||||
const N = 5
|
||||
for (let i = 0; i < N; i++) {
|
||||
await AppRuntime.runPromise(
|
||||
Bus.Service.use((svc) => svc.publish(ServerEvent.Connected, {})).pipe(
|
||||
Effect.provideService(InstanceRef, ctx),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
const received: SseEvent[] = []
|
||||
for (let i = 0; i < N; i++) {
|
||||
received.push(await readNextEvent(reader))
|
||||
}
|
||||
expect(received).toHaveLength(N)
|
||||
for (const event of received) expect(event.type).toBe("server.connected")
|
||||
} finally {
|
||||
await reader.cancel()
|
||||
}
|
||||
})
|
||||
|
||||
// The critical test. If D1 passes but this fails, the bus-identity fix is
|
||||
// incomplete OR the sync.run publish path doesn't reach the same bus
|
||||
// /event subscribes to, even within the same AppRuntime.
|
||||
test("D3: delivers a SyncEvent published via SyncEvent.use.run after server.connected", async () => {
|
||||
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
||||
const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } })
|
||||
if (!response.body) throw new Error("missing response body")
|
||||
const reader = response.body.getReader()
|
||||
try {
|
||||
const first = await readNextEvent(reader)
|
||||
expect(first.type).toBe("server.connected")
|
||||
|
||||
const ctx = await reloadTestInstance({ directory: tmp.path })
|
||||
const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`)
|
||||
const messageID = MessageID.ascending()
|
||||
const partID = PartID.ascending()
|
||||
const part: MessageV2.Part = {
|
||||
id: partID,
|
||||
sessionID,
|
||||
messageID,
|
||||
type: "text",
|
||||
text: "diag",
|
||||
}
|
||||
|
||||
await AppRuntime.runPromise(
|
||||
SyncEvent.use
|
||||
.run(MessageV2.Event.PartUpdated, {
|
||||
sessionID,
|
||||
part: structuredClone(part) as MessageV2.Part,
|
||||
time: Date.now(),
|
||||
})
|
||||
.pipe(Effect.provideService(InstanceRef, ctx)),
|
||||
)
|
||||
|
||||
const collected = await collectUntil(
|
||||
reader,
|
||||
(event) => event.type === MessageV2.Event.PartUpdated.type,
|
||||
4_000,
|
||||
)
|
||||
const updated = collected.find((event) => event.type === MessageV2.Event.PartUpdated.type)
|
||||
expect(updated).toBeDefined()
|
||||
expect((updated as any).properties.part.id).toBe(partID)
|
||||
} finally {
|
||||
await reader.cancel()
|
||||
}
|
||||
})
|
||||
|
||||
// If D3 passes but D5 (the SDK E2E in httpapi-sdk.test.ts) fails, then the
|
||||
// bug is specifically in the cross-request / cross-fiber HTTP path, not in
|
||||
// the publish itself. If D3 also fails, the publish chain is broken.
|
||||
//
|
||||
// D4: ensure the publish reaches an in-process Bus subscriber too. Confirms
|
||||
// pub/sub identity end-to-end without involving /event SSE.
|
||||
test("D4: SyncEvent.use.run publish reaches an in-process Bus.Service.use callback", async () => {
|
||||
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
||||
const ctx = await reloadTestInstance({ directory: tmp.path })
|
||||
|
||||
let resolveReceived: (event: { id: string; type: string; properties: unknown }) => void
|
||||
const received = new Promise<{ id: string; type: string; properties: unknown }>(
|
||||
(resolve) => (resolveReceived = resolve as typeof resolveReceived),
|
||||
)
|
||||
|
||||
const dispose = await AppRuntime.runPromise(
|
||||
Bus.Service.use((svc) =>
|
||||
svc.subscribeAllCallback((event) => {
|
||||
if (event.type === MessageV2.Event.PartUpdated.type) resolveReceived(event)
|
||||
}),
|
||||
).pipe(Effect.provideService(InstanceRef, ctx)),
|
||||
)
|
||||
|
||||
try {
|
||||
const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`)
|
||||
const messageID = MessageID.ascending()
|
||||
const partID = PartID.ascending()
|
||||
const part: MessageV2.Part = { id: partID, sessionID, messageID, type: "text", text: "diag-d4" }
|
||||
|
||||
await AppRuntime.runPromise(
|
||||
SyncEvent.use
|
||||
.run(MessageV2.Event.PartUpdated, {
|
||||
sessionID,
|
||||
part: structuredClone(part) as MessageV2.Part,
|
||||
time: Date.now(),
|
||||
})
|
||||
.pipe(Effect.provideService(InstanceRef, ctx)),
|
||||
)
|
||||
|
||||
const event = await Promise.race([
|
||||
received,
|
||||
new Promise<never>((_, reject) => setTimeout(() => reject(new Error("D4 timed out")), 3_000)),
|
||||
])
|
||||
expect(event.type).toBe(MessageV2.Event.PartUpdated.type)
|
||||
expect((event.properties as any).part.id).toBe(partID)
|
||||
} finally {
|
||||
dispose()
|
||||
}
|
||||
})
|
||||
|
||||
// D5: BOTH subscribers attached simultaneously. Trigger ONE publish via
|
||||
// SyncEvent.use.run. Both subscribers should receive it. If only one does
|
||||
// we know exactly which side of the chain is failing.
|
||||
test("D5: same SyncEvent.use.run publish reaches BOTH /event SSE and in-process callback", async () => {
|
||||
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
||||
const ctx = await reloadTestInstance({ directory: tmp.path })
|
||||
|
||||
// In-process callback subscriber
|
||||
let resolveCallback: (event: { type: string; properties: unknown }) => void
|
||||
const callbackReceived = new Promise<{ type: string; properties: unknown }>(
|
||||
(resolve) => (resolveCallback = resolve as typeof resolveCallback),
|
||||
)
|
||||
const dispose = await AppRuntime.runPromise(
|
||||
Bus.Service.use((svc) =>
|
||||
svc.subscribeAllCallback((event) => {
|
||||
if (event.type === MessageV2.Event.PartUpdated.type) resolveCallback(event)
|
||||
}),
|
||||
).pipe(Effect.provideService(InstanceRef, ctx)),
|
||||
)
|
||||
|
||||
// SSE subscriber via raw HTTP
|
||||
const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } })
|
||||
if (!response.body) throw new Error("missing response body")
|
||||
const reader = response.body.getReader()
|
||||
|
||||
try {
|
||||
const first = await readNextEvent(reader)
|
||||
expect(first.type).toBe("server.connected")
|
||||
|
||||
const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`)
|
||||
const messageID = MessageID.ascending()
|
||||
const partID = PartID.ascending()
|
||||
const part: MessageV2.Part = { id: partID, sessionID, messageID, type: "text", text: "diag-d5" }
|
||||
|
||||
await AppRuntime.runPromise(
|
||||
SyncEvent.use
|
||||
.run(MessageV2.Event.PartUpdated, {
|
||||
sessionID,
|
||||
part: structuredClone(part) as MessageV2.Part,
|
||||
time: Date.now(),
|
||||
})
|
||||
.pipe(Effect.provideService(InstanceRef, ctx)),
|
||||
)
|
||||
|
||||
const sseCollected = await collectUntil(
|
||||
reader,
|
||||
(event) => event.type === MessageV2.Event.PartUpdated.type,
|
||||
4_000,
|
||||
).catch((err) => err as Error)
|
||||
const callbackResult = await Promise.race([
|
||||
callbackReceived,
|
||||
new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), 1_000)),
|
||||
])
|
||||
|
||||
const sseSaw =
|
||||
Array.isArray(sseCollected) &&
|
||||
sseCollected.some((event) => event.type === MessageV2.Event.PartUpdated.type)
|
||||
const callbackSaw = callbackResult !== "timeout"
|
||||
|
||||
// Both should see it. The reason we use a single assert with the boolean
|
||||
// pair is so the test failure message tells us exactly which side broke.
|
||||
expect({ sseSaw, callbackSaw }).toEqual({ sseSaw: true, callbackSaw: true })
|
||||
} finally {
|
||||
await reader.cancel()
|
||||
dispose()
|
||||
}
|
||||
})
|
||||
|
||||
// D7: like D5 but the "second subscriber" is a NO-OP AppRuntime.runPromise
|
||||
// call (no PubSub.subscribe). If D7 passes, the specific subscribeAllCallback
|
||||
// is what breaks SSE — not arbitrary AppRuntime usage. If D7 fails, anything
|
||||
// running through AppRuntime concurrently with /event SSE breaks delivery.
|
||||
test("D7: SSE receives sync.run publish even with concurrent no-op AppRuntime activity", async () => {
|
||||
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
||||
const ctx = await reloadTestInstance({ directory: tmp.path })
|
||||
|
||||
// No-op: just touches the runtime, no bus interaction
|
||||
await AppRuntime.runPromise(Effect.void)
|
||||
|
||||
const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } })
|
||||
if (!response.body) throw new Error("missing response body")
|
||||
const reader = response.body.getReader()
|
||||
try {
|
||||
const first = await readNextEvent(reader)
|
||||
expect(first.type).toBe("server.connected")
|
||||
|
||||
const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`)
|
||||
const messageID = MessageID.ascending()
|
||||
const partID = PartID.ascending()
|
||||
const part: MessageV2.Part = { id: partID, sessionID, messageID, type: "text", text: "diag-d7" }
|
||||
|
||||
await AppRuntime.runPromise(
|
||||
SyncEvent.use
|
||||
.run(MessageV2.Event.PartUpdated, {
|
||||
sessionID,
|
||||
part: structuredClone(part) as MessageV2.Part,
|
||||
time: Date.now(),
|
||||
})
|
||||
.pipe(Effect.provideService(InstanceRef, ctx)),
|
||||
)
|
||||
|
||||
const collected = await collectUntil(
|
||||
reader,
|
||||
(event) => event.type === MessageV2.Event.PartUpdated.type,
|
||||
4_000,
|
||||
)
|
||||
const updated = collected.find((event) => event.type === MessageV2.Event.PartUpdated.type)
|
||||
expect(updated).toBeDefined()
|
||||
} finally {
|
||||
await reader.cancel()
|
||||
}
|
||||
})
|
||||
|
||||
// D6: same as D5 but the callback subscriber is attached AFTER /event SSE
|
||||
// subscription is established. If D5 fails and D6 passes, the order of
|
||||
// subscriber setup is the determining factor.
|
||||
test("D6: /event SSE receives sync.run publish when callback is attached AFTER /event opens", async () => {
|
||||
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
||||
const ctx = await reloadTestInstance({ directory: tmp.path })
|
||||
|
||||
// Open SSE FIRST
|
||||
const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } })
|
||||
if (!response.body) throw new Error("missing response body")
|
||||
const reader = response.body.getReader()
|
||||
|
||||
try {
|
||||
const first = await readNextEvent(reader)
|
||||
expect(first.type).toBe("server.connected")
|
||||
|
||||
// THEN attach callback subscriber
|
||||
let resolveCallback: (event: { type: string; properties: unknown }) => void
|
||||
const callbackReceived = new Promise<{ type: string; properties: unknown }>(
|
||||
(resolve) => (resolveCallback = resolve as typeof resolveCallback),
|
||||
)
|
||||
const dispose = await AppRuntime.runPromise(
|
||||
Bus.Service.use((svc) =>
|
||||
svc.subscribeAllCallback((event) => {
|
||||
if (event.type === MessageV2.Event.PartUpdated.type) resolveCallback(event)
|
||||
}),
|
||||
).pipe(Effect.provideService(InstanceRef, ctx)),
|
||||
)
|
||||
|
||||
try {
|
||||
const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`)
|
||||
const messageID = MessageID.ascending()
|
||||
const partID = PartID.ascending()
|
||||
const part: MessageV2.Part = { id: partID, sessionID, messageID, type: "text", text: "diag-d6" }
|
||||
|
||||
await AppRuntime.runPromise(
|
||||
SyncEvent.use
|
||||
.run(MessageV2.Event.PartUpdated, {
|
||||
sessionID,
|
||||
part: structuredClone(part) as MessageV2.Part,
|
||||
time: Date.now(),
|
||||
})
|
||||
.pipe(Effect.provideService(InstanceRef, ctx)),
|
||||
)
|
||||
|
||||
const sseCollected = await collectUntil(
|
||||
reader,
|
||||
(event) => event.type === MessageV2.Event.PartUpdated.type,
|
||||
4_000,
|
||||
).catch((err) => err as Error)
|
||||
const callbackResult = await Promise.race([
|
||||
callbackReceived,
|
||||
new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), 1_000)),
|
||||
])
|
||||
|
||||
const sseSaw =
|
||||
Array.isArray(sseCollected) &&
|
||||
sseCollected.some((event) => event.type === MessageV2.Event.PartUpdated.type)
|
||||
const callbackSaw = callbackResult !== "timeout"
|
||||
expect({ sseSaw, callbackSaw }).toEqual({ sseSaw: true, callbackSaw: true })
|
||||
} finally {
|
||||
dispose()
|
||||
}
|
||||
} finally {
|
||||
await reader.cancel()
|
||||
}
|
||||
})
|
||||
})
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
import { afterEach, describe, expect } from "bun:test"
|
||||
import { ConfigProvider, Effect, Layer } from "effect"
|
||||
import { ConfigProvider, Deferred, Effect, Layer } from "effect"
|
||||
import type * as Scope from "effect/Scope"
|
||||
import { HttpRouter } from "effect/unstable/http"
|
||||
import { ChildProcessSpawner } from "effect/unstable/process"
|
||||
|
|
@ -22,7 +22,7 @@ import { TestLLMServer } from "../lib/llm-server"
|
|||
import path from "path"
|
||||
import { resetDatabase } from "../fixture/db"
|
||||
import { disposeAllInstances, TestInstance, tmpdirScoped } from "../fixture/fixture"
|
||||
import { testEffect } from "../lib/effect"
|
||||
import { awaitWithTimeout, testEffect } from "../lib/effect"
|
||||
|
||||
const noopBootstrap = Layer.succeed(InstanceBootstrap.Service, InstanceBootstrap.Service.of({ run: Effect.void }))
|
||||
const it = testEffect(
|
||||
|
|
@ -671,6 +671,70 @@ describe("HttpApi SDK", () => {
|
|||
),
|
||||
)
|
||||
|
||||
// Regression: SyncEvent must publish on the same ProjectBus the /event handler
|
||||
// subscribes to, AND the /event stream must forward handler ALS/context into the
|
||||
// body-pump fiber. Drives the full SDK → /event → Session.updatePart → sync.run →
|
||||
// bus.publish → SDK subscriber path. Goes red if either the publisher uses a
|
||||
// different bus instance (Bug 2 / pre-#27825) or the stream loses context (Bug 1 /
|
||||
// pre-#27425).
|
||||
serverPathParity("streams sync-backed part updates to /event subscribers", (serverPath) =>
|
||||
withStandardProject(serverPath, ({ sdk, directory }) =>
|
||||
Effect.gen(function* () {
|
||||
const session = yield* capture(() => sdk.session.create({ title: "sync-backed part event" }))
|
||||
const sessionID = String(record(session.data).id)
|
||||
const seeded = yield* seedMessage(directory, sessionID)
|
||||
|
||||
const controller = new AbortController()
|
||||
yield* Effect.addFinalizer(() => Effect.sync(() => controller.abort()))
|
||||
const events = yield* call(() => sdk.event.subscribe(undefined, { signal: controller.signal }))
|
||||
yield* Effect.addFinalizer(() =>
|
||||
call(async () => void (await events.stream.return?.(undefined))).pipe(Effect.ignore),
|
||||
)
|
||||
|
||||
const ready = yield* Deferred.make<void>()
|
||||
const received = yield* Deferred.make<unknown>()
|
||||
|
||||
yield* call(async () => {
|
||||
for await (const event of events.stream) {
|
||||
const payload = record(event).payload ?? event
|
||||
const type = record(payload).type
|
||||
if (type === "server.connected") {
|
||||
Deferred.doneUnsafe(ready, Effect.void)
|
||||
continue
|
||||
}
|
||||
if (type === MessageV2.Event.PartUpdated.type) {
|
||||
Deferred.doneUnsafe(received, Effect.succeed(payload))
|
||||
return
|
||||
}
|
||||
}
|
||||
}).pipe(Effect.forkScoped)
|
||||
|
||||
yield* awaitWithTimeout(Deferred.await(ready), "timed out waiting for /event server.connected", "2 seconds")
|
||||
|
||||
const updated = yield* capture(() =>
|
||||
sdk.part.update({
|
||||
sessionID,
|
||||
messageID: seeded.message.id,
|
||||
partID: seeded.part.id,
|
||||
part: { ...seeded.part, text: "updated via sync" } as NonNullable<
|
||||
Parameters<Sdk["part"]["update"]>[0]["part"]
|
||||
>,
|
||||
}),
|
||||
)
|
||||
expect(updated.status).toBe(200)
|
||||
|
||||
const event = yield* awaitWithTimeout(
|
||||
Deferred.await(received),
|
||||
"timed out waiting for message.part.updated bus payload over /event",
|
||||
"5 seconds",
|
||||
)
|
||||
const properties = record(record(event).properties)
|
||||
expect(record(properties.part)).toMatchObject({ id: seeded.part.id, type: "text" })
|
||||
return { type: record(event).type, partType: record(properties.part).type }
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
serverPathParity("matches generated SDK prompt no-reply routes", (serverPath) =>
|
||||
withStandardProject(serverPath, ({ sdk }) =>
|
||||
Effect.gen(function* () {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue