diff --git a/packages/sdk-typescript/src/utils/Stream.ts b/packages/sdk-typescript/src/utils/Stream.ts index 5fcb43525..df5ff9bb6 100644 --- a/packages/sdk-typescript/src/utils/Stream.ts +++ b/packages/sdk-typescript/src/utils/Stream.ts @@ -71,6 +71,13 @@ export class Stream implements AsyncIterable { return(): Promise> { this.isDone = true; + // Settle any pending next() promise so awaiters don't hang forever. + if (this.readResolve) { + const resolve = this.readResolve; + this.readResolve = undefined; + this.readReject = undefined; + resolve({ done: true, value: undefined }); + } if (this.returned) { this.returned(); } diff --git a/packages/sdk-typescript/test/unit/Stream.test.ts b/packages/sdk-typescript/test/unit/Stream.test.ts index 2113a2023..c367c2507 100644 --- a/packages/sdk-typescript/test/unit/Stream.test.ts +++ b/packages/sdk-typescript/test/unit/Stream.test.ts @@ -233,6 +233,27 @@ describe('Stream', () => { }); }); + describe('return() cleanup', () => { + it('settles a pending next() promise so awaiters do not hang', async () => { + // Consumer starts awaiting before any value is enqueued. + const pending = stream.next(); + // External code cancels the iterator (e.g., timeout race, manual close). + await stream.return(); + // The original pending promise must resolve cleanly with done:true. + const result = await pending; + expect(result).toEqual({ done: true, value: undefined }); + }); + + it('return() invokes the optional returned callback', async () => { + let called = 0; + const s = new Stream(() => { + called++; + }); + await s.return(); + expect(called).toBe(1); + }); + }); + describe('Iteration Restrictions', () => { it('should only allow iteration once', async () => { const stream = new Stream();