Skip to content

Commit 88964ab

Browse files
committed
stream: handle sync iterables in merge() multi-source path
The multi-source merge path called `source[Symbol.asyncIterator]()` directly, which throws for sync-only iterables. The single-source fast path handled this via for-await-of, but passing multiple sync sources would fail. Wrap sync iterators with `PromiseResolve` so they participate in the `SafePromiseRace` pattern. The wrapping overhead is negligible relative to the per-iteration promise race.
1 parent eec6f0a commit 88964ab

2 files changed

Lines changed: 35 additions & 5 deletions

File tree

lib/internal/streams/iter/consumers.js

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ const {
1616
ArrayPrototypePush,
1717
ArrayPrototypeSlice,
1818
PromisePrototypeThen,
19+
PromiseResolve,
1920
SafePromiseAllReturnVoid,
2021
SafePromiseRace,
2122
SymbolAsyncIterator,
23+
SymbolIterator,
2224
TypedArrayPrototypeGetBuffer,
2325
TypedArrayPrototypeGetByteLength,
2426
TypedArrayPrototypeGetByteOffset,
@@ -394,11 +396,26 @@ function merge(...args) {
394396
}
395397

396398
// Multiple sources - race them
397-
const states = ArrayPrototypeMap(sources, (source) => ({
398-
iterator: source[SymbolAsyncIterator](),
399-
done: false,
400-
pending: null,
401-
}));
399+
const states = ArrayPrototypeMap(sources, (source) => {
400+
let iterator;
401+
if (source[SymbolAsyncIterator]) {
402+
iterator = source[SymbolAsyncIterator]();
403+
} else if (source[SymbolIterator]) {
404+
// Wrap sync iterator to async
405+
const syncIter = source[SymbolIterator]();
406+
iterator = {
407+
next() { return PromiseResolve(syncIter.next()); },
408+
return() {
409+
return PromiseResolve(syncIter.return?.() ??
410+
{ __proto__: null, done: true, value: undefined });
411+
},
412+
};
413+
} else {
414+
throw new ERR_INVALID_ARG_TYPE(
415+
'source', ['AsyncIterable', 'Iterable'], source);
416+
}
417+
return { iterator, done: false, pending: null };
418+
});
402419

403420
const startIterator = (state, index) => {
404421
if (!state.done && !state.pending) {

test/parallel/test-stream-iter-consumers.js

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,10 +317,23 @@ Promise.all([
317317
testMergeSingleSource(),
318318
testMergeEmpty(),
319319
testMergeWithAbortSignal(),
320+
testMergeSyncSources(),
320321
testConsumersNonArrayBatch(),
321322
testConsumersNonArrayBatchSync(),
322323
]).then(common.mustCall());
323324

325+
// Regression test: merge() with sync iterable sources
326+
async function testMergeSyncSources() {
327+
const s1 = fromSync('abc');
328+
const s2 = fromSync('def');
329+
const result = await text(merge(s1, s2));
330+
// Both sources should be fully consumed; order may vary
331+
assert.strictEqual(result.length, 6);
332+
for (const ch of 'abcdef') {
333+
assert.ok(result.includes(ch), `missing '${ch}' in '${result}'`);
334+
}
335+
}
336+
324337
// Regression test: consumers should tolerate sources that yield raw
325338
// Uint8Array or string values instead of Uint8Array[] batches.
326339
async function testConsumersNonArrayBatch() {

0 commit comments

Comments
 (0)