Skip to content

Commit dc1542b

Browse files
committed
stream: apply more stream/iter conformance fixes
1 parent 882b208 commit dc1542b

3 files changed

Lines changed: 100 additions & 60 deletions

File tree

lib/internal/streams/iter/consumers.js

Lines changed: 71 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,13 @@
1010
const {
1111
ArrayBufferPrototypeGetByteLength,
1212
ArrayBufferPrototypeSlice,
13-
ArrayPrototypeFilter,
1413
ArrayPrototypeMap,
1514
ArrayPrototypePush,
1615
ArrayPrototypeSlice,
16+
Promise,
1717
PromisePrototypeThen,
18-
PromiseResolve,
1918
SafePromiseAllReturnVoid,
20-
SafePromiseRace,
2119
SymbolAsyncIterator,
22-
SymbolIterator,
2320
TypedArrayPrototypeGetBuffer,
2421
TypedArrayPrototypeGetByteLength,
2522
TypedArrayPrototypeGetByteOffset,
@@ -372,11 +369,7 @@ function ondrain(drainable) {
372369
return null;
373370
}
374371

375-
try {
376-
return drainable[drainableProtocol]();
377-
} catch {
378-
return null;
379-
}
372+
return drainable[drainableProtocol]();
380373
}
381374

382375
// =============================================================================
@@ -403,87 +396,105 @@ function merge(...args) {
403396
validateAbortSignal(options.signal, 'options.signal');
404397
}
405398

399+
// Normalize each source via from()
400+
const normalized = ArrayPrototypeMap(sources, (source) => from(source));
401+
406402
return {
407403
__proto__: null,
408404
async *[SymbolAsyncIterator]() {
409405
const signal = options?.signal;
410406

411407
signal?.throwIfAborted();
412408

413-
if (sources.length === 0) return;
409+
if (normalized.length === 0) return;
414410

415-
if (sources.length === 1) {
416-
for await (const batch of sources[0]) {
411+
if (normalized.length === 1) {
412+
for await (const batch of normalized[0]) {
417413
signal?.throwIfAborted();
418414
yield batch;
419415
}
420416
return;
421417
}
422418

423-
// Multiple sources - race them
424-
const states = ArrayPrototypeMap(sources, (source) => {
425-
let iterator;
426-
if (source[SymbolAsyncIterator]) {
427-
iterator = source[SymbolAsyncIterator]();
428-
} else if (source[SymbolIterator]) {
429-
// Wrap sync iterator to async
430-
const syncIter = source[SymbolIterator]();
431-
iterator = {
432-
__proto__: null,
433-
next() { return PromiseResolve(syncIter.next()); },
434-
return() {
435-
return PromiseResolve(syncIter.return?.() ??
436-
{ __proto__: null, done: true, value: undefined });
437-
},
438-
};
419+
// Multiple sources - use a ready queue so that batches that settle
420+
// between consumer pulls are drained synchronously without an extra
421+
// async tick per batch. Each source has at most one pending .next()
422+
// at a time. Every batch from every source is preserved.
423+
const ready = [];
424+
let activeCount = normalized.length;
425+
let waitResolve = null;
426+
427+
// Called when a source's .next() settles. Pushes the result into
428+
// the ready queue and wakes the consumer if it's waiting.
429+
const onSettled = (iterator, result) => {
430+
if (result.done) {
431+
activeCount--;
439432
} else {
440-
throw new ERR_INVALID_ARG_TYPE(
441-
'source', ['AsyncIterable', 'Iterable'], source);
433+
ArrayPrototypePush(ready, result.value);
434+
// Immediately request the next value from this source
435+
// (at most one pending .next() per source)
436+
PromisePrototypeThen(
437+
iterator.next(),
438+
(r) => onSettled(iterator, r),
439+
(err) => {
440+
ArrayPrototypePush(ready, { __proto__: null, error: err });
441+
if (waitResolve) {
442+
waitResolve();
443+
waitResolve = null;
444+
}
445+
},
446+
);
442447
}
443-
return { __proto__: null, iterator, done: false, pending: null };
444-
});
445-
446-
const startIterator = (state, index) => {
447-
if (!state.done && !state.pending) {
448-
state.pending = PromisePrototypeThen(
449-
state.iterator.next(),
450-
(result) => ({ __proto__: null, index, result }));
448+
if (waitResolve) {
449+
waitResolve();
450+
waitResolve = null;
451451
}
452452
};
453453

454-
// Start all
455-
for (let i = 0; i < states.length; i++) {
456-
startIterator(states[i], i);
454+
// Start one .next() per source
455+
const iterators = [];
456+
for (let i = 0; i < normalized.length; i++) {
457+
const iterator = normalized[i][SymbolAsyncIterator]();
458+
ArrayPrototypePush(iterators, iterator);
459+
PromisePrototypeThen(
460+
iterator.next(),
461+
(r) => onSettled(iterator, r),
462+
(err) => {
463+
ArrayPrototypePush(ready, { __proto__: null, error: err });
464+
if (waitResolve) {
465+
waitResolve();
466+
waitResolve = null;
467+
}
468+
},
469+
);
457470
}
458471

459472
try {
460-
while (true) {
473+
while (activeCount > 0 || ready.length > 0) {
461474
signal?.throwIfAborted();
462475

463-
const pending = ArrayPrototypeFilter(
464-
ArrayPrototypeMap(states,
465-
(state) => state.pending),
466-
(p) => p !== null);
467-
468-
if (pending.length === 0) break;
469-
470-
const { index, result } = await SafePromiseRace(pending);
471-
472-
states[index].pending = null;
476+
// Drain ready queue synchronously
477+
while (ready.length > 0) {
478+
const item = ready.shift();
479+
if (item?.error) {
480+
throw item.error;
481+
}
482+
yield item;
483+
}
473484

474-
if (result.done) {
475-
states[index].done = true;
476-
} else {
477-
yield result.value;
478-
startIterator(states[index], index);
485+
// If sources are still active, wait for the next settlement
486+
if (activeCount > 0) {
487+
await new Promise((resolve) => {
488+
waitResolve = resolve;
489+
});
479490
}
480491
}
481492
} finally {
482493
// Clean up: return all iterators
483-
await SafePromiseAllReturnVoid(states, async (state) => {
484-
if (!state.done && state.iterator.return) {
494+
await SafePromiseAllReturnVoid(iterators, async (iterator) => {
495+
if (iterator.return) {
485496
try {
486-
await state.iterator.return();
497+
await iterator.return();
487498
} catch {
488499
// Ignore return errors
489500
}

test/parallel/test-stream-iter-consumers-merge.js

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,21 @@ async function testMergeSignalMidIteration() {
147147
await assert.rejects(() => iter.next(), { name: 'AbortError' });
148148
}
149149

150+
// merge() accepts string sources (normalized via from())
151+
async function testMergeStringSources() {
152+
const batches = [];
153+
for await (const batch of merge('hello', 'world')) {
154+
batches.push(batch);
155+
}
156+
// Each string becomes a single-batch source
157+
assert.strictEqual(batches.length >= 2, true);
158+
const combined = new TextDecoder().decode(
159+
Buffer.concat(batches.flat()));
160+
// Both strings should appear (order may vary)
161+
assert.ok(combined.includes('hello'));
162+
assert.ok(combined.includes('world'));
163+
}
164+
150165
Promise.all([
151166
testMergeTwoSources(),
152167
testMergeSingleSource(),
@@ -156,4 +171,5 @@ Promise.all([
156171
testMergeSourceError(),
157172
testMergeConsumerBreak(),
158173
testMergeSignalMidIteration(),
174+
testMergeStringSources(),
159175
]).then(common.mustCall());

test/parallel/test-stream-iter-push-writer.js

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,18 @@ async function testOndrainNonDrainable() {
2626
assert.strictEqual(ondrain('string'), null);
2727
}
2828

29+
async function testOndrainProtocolErrorPropagates() {
30+
const badDrainable = {
31+
[Symbol.for('Stream.drainableProtocol')]() {
32+
throw new Error('protocol error');
33+
},
34+
};
35+
assert.throws(
36+
() => ondrain(badDrainable),
37+
{ message: 'protocol error' },
38+
);
39+
}
40+
2941
async function testWriteWithSignalRejects() {
3042
const { writer, readable } = push({ highWaterMark: 1 });
3143

@@ -331,6 +343,7 @@ Promise.all([
331343
testWritevMixedTypes(),
332344
testWriteAfterEnd(),
333345
testWriteAfterFail(),
346+
testOndrainProtocolErrorPropagates(),
334347
testFail(),
335348
testEndAsyncReturnValue(),
336349
testWriteUint8Array(),

0 commit comments

Comments
 (0)