From 4b053cb72e680ce3323ac288ccc3575848bf2890 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Mon, 25 May 2026 12:28:33 -0700 Subject: [PATCH] stream: fix merge() idle source draining Defer pulling a merged source again until the consumer requests the next merged item. This prevents fast sources from being drained while the merged iterator is idle. Fixes: https://github.com/nodejs/node/issues/63566 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 --- lib/internal/streams/iter/consumers.js | 150 ++++++++++++------ .../test-stream-iter-consumers-merge.js | 26 +++ 2 files changed, 129 insertions(+), 47 deletions(-) diff --git a/lib/internal/streams/iter/consumers.js b/lib/internal/streams/iter/consumers.js index bcfe5d5ab29749..eb7719038d96b0 100644 --- a/lib/internal/streams/iter/consumers.js +++ b/lib/internal/streams/iter/consumers.js @@ -17,6 +17,7 @@ const { ArrayPrototypeSlice, Promise, PromisePrototypeThen, + PromiseResolve, SafePromiseAllReturnVoid, SymbolAsyncIterator, TypedArrayPrototypeGetBuffer, @@ -412,19 +413,22 @@ function merge(...args) { return { __proto__: null, - async *[SymbolAsyncIterator]() { + [SymbolAsyncIterator]() { const signal = options?.signal; signal?.throwIfAborted(); - if (normalized.length === 0) return; + if (normalized.length === 0) { + return (async function*() {})(); + } if (normalized.length === 1) { - for await (const batch of normalized[0]) { - signal?.throwIfAborted(); - yield batch; - } - return; + return (async function*() { + for await (const batch of normalized[0]) { + signal?.throwIfAborted(); + yield batch; + } + })(); } // Multiple sources - use a ready queue so that batches that settle @@ -434,27 +438,22 @@ function merge(...args) { const ready = []; let activeCount = normalized.length; let waitResolve = null; + let lastIterator = null; + let started = false; + let closed = false; // Called when a source's .next() settles. Pushes the result into // the ready queue and wakes the consumer if it's waiting. const onSettled = (iterator, result) => { + if (closed) return; if (result.done) { activeCount--; } else { - ArrayPrototypePush(ready, result.value); - // Immediately request the next value from this source - // (at most one pending .next() per source) - PromisePrototypeThen( - iterator.next(), - (r) => onSettled(iterator, r), - (err) => { - ArrayPrototypePush(ready, { __proto__: null, error: err }); - if (waitResolve) { - waitResolve(); - waitResolve = null; - } - }, - ); + ArrayPrototypePush(ready, { + __proto__: null, + iterator, + value: result.value, + }); } if (waitResolve) { waitResolve(); @@ -462,15 +461,12 @@ function merge(...args) { } }; - // Start one .next() per source - const iterators = []; - for (let i = 0; i < normalized.length; i++) { - const iterator = normalized[i][SymbolAsyncIterator](); - ArrayPrototypePush(iterators, iterator); + const requestNext = (iterator) => { PromisePrototypeThen( iterator.next(), (r) => onSettled(iterator, r), (err) => { + if (closed) return; ArrayPrototypePush(ready, { __proto__: null, error: err }); if (waitResolve) { waitResolve(); @@ -478,30 +474,29 @@ function merge(...args) { } }, ); - } + }; - try { - while (activeCount > 0 || ready.length > 0) { - signal?.throwIfAborted(); + const iterators = []; - // Drain ready queue synchronously - while (ready.length > 0) { - const item = ArrayPrototypeShift(ready); - if (item?.error) { - throw item.error; - } - yield item; - } + const start = () => { + if (started) return; + started = true; + for (let i = 0; i < normalized.length; i++) { + const iterator = normalized[i][SymbolAsyncIterator](); + ArrayPrototypePush(iterators, iterator); + requestNext(iterator); + } + }; - // If sources are still active, wait for the next settlement - if (activeCount > 0) { - await new Promise((resolve) => { - waitResolve = resolve; - }); - } + const cleanup = async () => { + if (closed) { + return { __proto__: null, done: true, value: undefined }; + } + closed = true; + if (waitResolve) { + waitResolve(); + waitResolve = null; } - } finally { - // Clean up: return all iterators await SafePromiseAllReturnVoid(iterators, async (iterator) => { if (iterator.return) { try { @@ -511,7 +506,68 @@ function merge(...args) { } } }); - } + return { __proto__: null, done: true, value: undefined }; + }; + + const nextImpl = async () => { + try { + if (closed) { + return cleanup(); + } + + signal?.throwIfAborted(); + + start(); + + if (lastIterator !== null) { + requestNext(lastIterator); + lastIterator = null; + } + + while (activeCount > 0 || ready.length > 0) { + signal?.throwIfAborted(); + + if (ready.length > 0) { + const item = ArrayPrototypeShift(ready); + if (item?.error) { + await cleanup(); + throw item.error; + } + lastIterator = item.iterator; + return { __proto__: null, done: false, value: item.value }; + } + + await new Promise((resolve) => { waitResolve = resolve; }); + } + + return cleanup(); + } catch (err) { + await cleanup(); + throw err; + } + }; + + let nextQueue = PromiseResolve(); + const enqueue = (fn) => { + const result = PromisePrototypeThen(nextQueue, fn, fn); + nextQueue = PromisePrototypeThen(result, () => {}, () => {}); + return result; + }; + + return { + __proto__: null, + [SymbolAsyncIterator]() { + return this; + }, + + next() { + return enqueue(nextImpl); + }, + + return() { + return enqueue(cleanup); + }, + }; }, }; } diff --git a/test/parallel/test-stream-iter-consumers-merge.js b/test/parallel/test-stream-iter-consumers-merge.js index b777a3ee205fad..0b9bcadc95ca7f 100644 --- a/test/parallel/test-stream-iter-consumers-merge.js +++ b/test/parallel/test-stream-iter-consumers-merge.js @@ -131,6 +131,31 @@ async function testMergeConsumerBreak() { assert.strictEqual(source1Return && source2Return, true); } +async function testMergeDoesNotDrainIdleSources() { + function source(n) { + return { + pulls: 0, + async *[Symbol.asyncIterator]() { + while (this.pulls < n) { + yield [Buffer.from(`${++this.pulls}`)]; + } + }, + }; + } + + const source1 = source(5); + const source2 = source(5); + const iterator = merge(source1, source2)[Symbol.asyncIterator](); + + await iterator.next(); + await new Promise((resolve) => setTimeout(resolve, 20)); + + assert.ok(source1.pulls <= 1); + assert.ok(source2.pulls <= 1); + + await iterator.return?.(); +} + async function testMergeSignalMidIteration() { const ac = new AbortController(); async function* slowSource() { @@ -190,6 +215,7 @@ Promise.all([ testMergeSyncSources(), testMergeSourceError(), testMergeConsumerBreak(), + testMergeDoesNotDrainIdleSources(), testMergeSignalMidIteration(), testMergeStringSources(), testMergeObjectLikeSources(),