From ed6f9e056e47e3987c7add51dca2a7d32c043369 Mon Sep 17 00:00:00 2001 From: chinesepowered Date: Fri, 17 Apr 2026 18:46:56 -0700 Subject: [PATCH] fix(sdk): settle pending next() promise in Stream.return() to prevent hangs (#2981) * fix(sdk): settle pending next() promise in Stream.return() to prevent hangs * test(sdk): add regression tests for Stream.return() pending-promise cleanup --- packages/sdk-typescript/src/utils/Stream.ts | 7 +++++++ .../sdk-typescript/test/unit/Stream.test.ts | 21 +++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/packages/sdk-typescript/src/utils/Stream.ts b/packages/sdk-typescript/src/utils/Stream.ts index 5fcb43525..df5ff9bb6 100644 --- a/packages/sdk-typescript/src/utils/Stream.ts +++ b/packages/sdk-typescript/src/utils/Stream.ts @@ -71,6 +71,13 @@ export class Stream implements AsyncIterable { return(): Promise> { this.isDone = true; + // Settle any pending next() promise so awaiters don't hang forever. + if (this.readResolve) { + const resolve = this.readResolve; + this.readResolve = undefined; + this.readReject = undefined; + resolve({ done: true, value: undefined }); + } if (this.returned) { this.returned(); } diff --git a/packages/sdk-typescript/test/unit/Stream.test.ts b/packages/sdk-typescript/test/unit/Stream.test.ts index 2113a2023..c367c2507 100644 --- a/packages/sdk-typescript/test/unit/Stream.test.ts +++ b/packages/sdk-typescript/test/unit/Stream.test.ts @@ -233,6 +233,27 @@ describe('Stream', () => { }); }); + describe('return() cleanup', () => { + it('settles a pending next() promise so awaiters do not hang', async () => { + // Consumer starts awaiting before any value is enqueued. + const pending = stream.next(); + // External code cancels the iterator (e.g., timeout race, manual close). + await stream.return(); + // The original pending promise must resolve cleanly with done:true. + const result = await pending; + expect(result).toEqual({ done: true, value: undefined }); + }); + + it('return() invokes the optional returned callback', async () => { + let called = 0; + const s = new Stream(() => { + called++; + }); + await s.return(); + expect(called).toBe(1); + }); + }); + describe('Iteration Restrictions', () => { it('should only allow iteration once', async () => { const stream = new Stream();