fix(sdk): settle pending next() promise in Stream.return() to prevent hangs (#2981)

* fix(sdk): settle pending next() promise in Stream.return() to prevent hangs

* test(sdk): add regression tests for Stream.return() pending-promise cleanup
This commit is contained in:
chinesepowered 2026-04-17 18:46:56 -07:00 committed by GitHub
parent b82ad2bd4c
commit ed6f9e056e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 28 additions and 0 deletions

View file

@ -71,6 +71,13 @@ export class Stream<T> implements AsyncIterable<T> {
return(): Promise<IteratorResult<T>> {
this.isDone = true;
// Settle any pending next() promise so awaiters don't hang forever.
if (this.readResolve) {
const resolve = this.readResolve;
this.readResolve = undefined;
this.readReject = undefined;
resolve({ done: true, value: undefined });
}
if (this.returned) {
this.returned();
}

View file

@ -233,6 +233,27 @@ describe('Stream', () => {
});
});
describe('return() cleanup', () => {
it('settles a pending next() promise so awaiters do not hang', async () => {
// Consumer starts awaiting before any value is enqueued.
const pending = stream.next();
// External code cancels the iterator (e.g., timeout race, manual close).
await stream.return();
// The original pending promise must resolve cleanly with done:true.
const result = await pending;
expect(result).toEqual({ done: true, value: undefined });
});
it('return() invokes the optional returned callback', async () => {
let called = 0;
const s = new Stream<string>(() => {
called++;
});
await s.return();
expect(called).toBe(1);
});
});
describe('Iteration Restrictions', () => {
it('should only allow iteration once', async () => {
const stream = new Stream<string>();