Skip to content

Commit f177ef3

Browse files
committed
stream: make multiple updates, cleanups, and fixes
1 parent 888fced commit f177ef3

12 files changed

Lines changed: 311 additions & 182 deletions

File tree

lib/internal/streams/iter/broadcast.js

Lines changed: 50 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ const {
1212
ArrayPrototypePush,
1313
ArrayPrototypeSlice,
1414
MathMax,
15-
Promise,
1615
PromisePrototypeThen,
1716
PromiseResolve,
1817
PromiseWithResolvers,
@@ -29,6 +28,7 @@ const { isError, lazyDOMException } = require('internal/util');
2928
const {
3029
codes: {
3130
ERR_INVALID_ARG_TYPE,
31+
ERR_INVALID_RETURN_VALUE,
3232
ERR_INVALID_STATE,
3333
},
3434
} = require('internal/errors');
@@ -81,16 +81,17 @@ function isPushStreamOptions(value) {
8181

8282
function parsePushArgs(args) {
8383
if (args.length === 0) {
84-
return { transforms: [], options: undefined };
84+
return { __proto__: null, transforms: [], options: undefined };
8585
}
8686
const last = args[args.length - 1];
8787
if (isPushStreamOptions(last)) {
8888
return {
89+
__proto__: null,
8990
transforms: ArrayPrototypeSlice(args, 0, -1),
9091
options: last,
9192
};
9293
}
93-
return { transforms: args, options: undefined };
94+
return { __proto__: null, transforms: args, options: undefined };
9495
}
9596

9697
// =============================================================================
@@ -140,7 +141,7 @@ class BroadcastImpl {
140141
if (transforms.length > 0) {
141142
if (options?.signal) {
142143
return pullWithTransforms(
143-
rawConsumer, ...transforms, { signal: options.signal });
144+
rawConsumer, ...transforms, { __proto__: null, signal: options.signal });
144145
}
145146
return pullWithTransforms(rawConsumer, ...transforms);
146147
}
@@ -149,6 +150,7 @@ class BroadcastImpl {
149150

150151
#createRawConsumer() {
151152
const state = {
153+
__proto__: null,
152154
cursor: this.#bufferStart + this.#buffer.length,
153155
resolve: null,
154156
reject: null,
@@ -399,9 +401,9 @@ class BroadcastWriter {
399401
const desired = this.desiredSize;
400402
if (desired === null) return null;
401403
if (desired > 0) return PromiseResolve(true);
402-
return new Promise((resolve, reject) => {
403-
ArrayPrototypePush(this.#pendingDrains, { resolve, reject });
404-
});
404+
const { promise, resolve, reject } = PromiseWithResolvers();
405+
ArrayPrototypePush(this.#pendingDrains, { __proto__: null, resolve, reject });
406+
return promise;
405407
}
406408

407409
get desiredSize() {
@@ -442,10 +444,8 @@ class BroadcastWriter {
442444
async #writevSlow(chunks, options) {
443445
const signal = options?.signal;
444446

445-
// Check for pre-aborted signal
446-
if (signal?.aborted) {
447-
throw signal.reason ?? lazyDOMException('Aborted', 'AbortError');
448-
}
447+
// Check for pre-aborted
448+
signal?.throwIfAborted();
449449

450450
if (this.#closed || this.#aborted) {
451451
throw new ERR_INVALID_STATE('Writer is closed');
@@ -558,35 +558,37 @@ class BroadcastWriter {
558558
* Create a pending write promise, optionally racing against a signal.
559559
* If the signal fires, the entry is removed from pendingWrites and the
560560
* promise rejects. Signal listeners are cleaned up on normal resolution.
561+
* @returns {Promise<void>}
561562
*/
562563
#createPendingWrite(chunk, signal) {
563-
return new Promise((resolve, reject) => {
564-
const entry = { chunk, resolve, reject };
565-
this.#pendingWrites.push(entry);
566-
567-
if (!signal) return;
568-
569-
const onAbort = () => {
570-
// Remove from queue so it doesn't occupy a slot
571-
const idx = this.#pendingWrites.indexOf(entry);
572-
if (idx !== -1) this.#pendingWrites.removeAt(idx);
573-
reject(signal.reason ?? lazyDOMException('Aborted', 'AbortError'));
574-
};
575-
576-
// Wrap resolve/reject to clean up signal listener
577-
const origResolve = entry.resolve;
578-
const origReject = entry.reject;
579-
entry.resolve = function() {
580-
signal.removeEventListener('abort', onAbort);
581-
origResolve();
582-
};
583-
entry.reject = function(reason) {
584-
signal.removeEventListener('abort', onAbort);
585-
origReject(reason);
586-
};
587-
588-
signal.addEventListener('abort', onAbort, { once: true });
589-
});
564+
const { promise, resolve, reject } = PromiseWithResolvers();
565+
const entry = { __proto__: null, chunk, resolve, reject };
566+
this.#pendingWrites.push(entry);
567+
568+
if (!signal) return promise;
569+
570+
const onAbort = () => {
571+
// Remove from queue so it doesn't occupy a slot
572+
const idx = this.#pendingWrites.indexOf(entry);
573+
if (idx !== -1) this.#pendingWrites.removeAt(idx);
574+
reject(signal.reason ?? lazyDOMException('Aborted', 'AbortError'));
575+
};
576+
577+
// Wrap resolve/reject to clean up signal listener
578+
const origResolve = entry.resolve;
579+
const origReject = entry.reject;
580+
entry.resolve = function() {
581+
signal.removeEventListener('abort', onAbort);
582+
origResolve();
583+
};
584+
entry.reject = function(reason) {
585+
signal.removeEventListener('abort', onAbort);
586+
origReject(reason);
587+
};
588+
589+
signal.addEventListener('abort', onAbort, { __proto__: null, once: true });
590+
591+
return promise;
590592
}
591593

592594
#resolvePendingWrites() {
@@ -639,6 +641,7 @@ class BroadcastWriter {
639641
*/
640642
function broadcast(options) {
641643
const opts = {
644+
__proto__: null,
642645
highWaterMark: MathMax(1, options?.highWaterMark ?? 16),
643646
backpressure: options?.backpressure ?? 'strict',
644647
signal: options?.signal,
@@ -654,11 +657,11 @@ function broadcast(options) {
654657
} else {
655658
opts.signal.addEventListener('abort', () => {
656659
broadcastImpl.cancel();
657-
}, { once: true });
660+
}, { __proto__: null, once: true });
658661
}
659662
}
660663

661-
return { writer, broadcast: broadcastImpl };
664+
return { __proto__: null, writer, broadcast: broadcastImpl };
662665
}
663666

664667
function isBroadcastable(value) {
@@ -674,7 +677,11 @@ const Broadcast = {
674677
from(input, options) {
675678
if (isBroadcastable(input)) {
676679
const bc = input[broadcastProtocol](options);
677-
return { writer: {}, broadcast: bc };
680+
if (bc === null || typeof bc !== 'object') {
681+
throw new ERR_INVALID_RETURN_VALUE(
682+
'an object', '[Symbol.for(\'Stream.broadcastProtocol\')]', bc);
683+
}
684+
return { __proto__: null, writer: { __proto__: null }, broadcast: bc };
678685
}
679686

680687
const result = broadcast(options);
@@ -685,10 +692,7 @@ const Broadcast = {
685692
try {
686693
if (isAsyncIterable(input)) {
687694
for await (const chunks of input) {
688-
if (signal?.aborted) {
689-
throw signal.reason ??
690-
lazyDOMException('Aborted', 'AbortError');
691-
}
695+
signal?.throwIfAborted();
692696
if (ArrayIsArray(chunks)) {
693697
if (!w.writevSync(chunks)) {
694698
await w.writev(chunks, signal ? { signal } : undefined);
@@ -699,10 +703,7 @@ const Broadcast = {
699703
}
700704
} else if (isSyncIterable(input)) {
701705
for (const chunks of input) {
702-
if (signal?.aborted) {
703-
throw signal.reason ??
704-
lazyDOMException('Aborted', 'AbortError');
705-
}
706+
signal?.throwIfAborted();
706707
if (ArrayIsArray(chunks)) {
707708
if (!w.writevSync(chunks)) {
708709
await w.writev(chunks, signal ? { signal } : undefined);

lib/internal/streams/iter/consumers.js

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ const {
3333
},
3434
} = require('internal/errors');
3535
const { TextDecoder } = require('internal/encoding');
36-
const { lazyDOMException } = require('internal/util');
3736

3837
const {
3938
isAsyncIterable,
@@ -107,9 +106,7 @@ function collectSync(source, limit) {
107106
* @returns {Promise<Uint8Array[]>}
108107
*/
109108
async function collectAsync(source, signal, limit) {
110-
if (signal?.aborted) {
111-
throw signal.reason ?? lazyDOMException('Aborted', 'AbortError');
112-
}
109+
signal?.throwIfAborted();
113110

114111
const chunks = [];
115112

@@ -141,9 +138,7 @@ async function collectAsync(source, signal, limit) {
141138
if (isAsyncIterable(source)) {
142139
for await (const raw of source) {
143140
const batch = ensureBatch(raw);
144-
if (signal?.aborted) {
145-
throw signal.reason ?? lazyDOMException('Aborted', 'AbortError');
146-
}
141+
signal?.throwIfAborted();
147142
for (let i = 0; i < batch.length; i++) {
148143
const chunk = batch[i];
149144
if (limit !== undefined) {
@@ -158,9 +153,7 @@ async function collectAsync(source, signal, limit) {
158153
} else if (isSyncIterable(source)) {
159154
for (const raw of source) {
160155
const batch = ensureBatch(raw);
161-
if (signal?.aborted) {
162-
throw signal.reason ?? lazyDOMException('Aborted', 'AbortError');
163-
}
156+
signal?.throwIfAborted();
164157
for (let i = 0; i < batch.length; i++) {
165158
const chunk = batch[i];
166159
if (limit !== undefined) {
@@ -219,6 +212,7 @@ function bytesSync(source, options) {
219212
function textSync(source, options) {
220213
const data = bytesSync(source, options);
221214
const decoder = new TextDecoder(options?.encoding ?? 'utf-8', {
215+
__proto__: null,
222216
fatal: true,
223217
ignoreBOM: true,
224218
});
@@ -269,6 +263,7 @@ async function bytes(source, options) {
269263
async function text(source, options) {
270264
const data = await bytes(source, options);
271265
const decoder = new TextDecoder(options?.encoding ?? 'utf-8', {
266+
__proto__: null,
272267
fatal: true,
273268
ignoreBOM: true,
274269
});
@@ -376,20 +371,17 @@ function merge(...args) {
376371
}
377372

378373
return {
374+
__proto__: null,
379375
async *[SymbolAsyncIterator]() {
380376
const signal = options?.signal;
381377

382-
if (signal?.aborted) {
383-
throw signal.reason ?? lazyDOMException('Aborted', 'AbortError');
384-
}
378+
signal?.throwIfAborted();
385379

386380
if (sources.length === 0) return;
387381

388382
if (sources.length === 1) {
389383
for await (const batch of sources[0]) {
390-
if (signal?.aborted) {
391-
throw signal.reason ?? lazyDOMException('Aborted', 'AbortError');
392-
}
384+
signal?.throwIfAborted();
393385
yield batch;
394386
}
395387
return;
@@ -404,6 +396,7 @@ function merge(...args) {
404396
// Wrap sync iterator to async
405397
const syncIter = source[SymbolIterator]();
406398
iterator = {
399+
__proto__: null,
407400
next() { return PromiseResolve(syncIter.next()); },
408401
return() {
409402
return PromiseResolve(syncIter.return?.() ??
@@ -414,7 +407,7 @@ function merge(...args) {
414407
throw new ERR_INVALID_ARG_TYPE(
415408
'source', ['AsyncIterable', 'Iterable'], source);
416409
}
417-
return { iterator, done: false, pending: null };
410+
return { __proto__: null, iterator, done: false, pending: null };
418411
});
419412

420413
const startIterator = (state, index) => {
@@ -432,9 +425,7 @@ function merge(...args) {
432425

433426
try {
434427
while (true) {
435-
if (signal?.aborted) {
436-
throw signal.reason ?? lazyDOMException('Aborted', 'AbortError');
437-
}
428+
signal?.throwIfAborted();
438429

439430
const pending = ArrayPrototypeFilter(
440431
ArrayPrototypeMap(states,

lib/internal/streams/iter/duplex.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ function duplex(options) {
4040
let bWriterRef = bWriter;
4141

4242
const channelA = {
43+
__proto__: null,
4344
get writer() { return aWriter; },
4445
readable: aReadable,
4546
async close() {
@@ -56,6 +57,7 @@ function duplex(options) {
5657
};
5758

5859
const channelB = {
60+
__proto__: null,
5961
get writer() { return bWriter; },
6062
readable: bReadable,
6163
async close() {

lib/internal/streams/iter/from.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,7 @@ function fromSync(input) {
468468
if (isPrimitiveChunk(input)) {
469469
const chunk = primitiveToUint8Array(input);
470470
return {
471+
__proto__: null,
471472
*[SymbolIterator]() {
472473
yield [chunk];
473474
},
@@ -482,6 +483,7 @@ function fromSync(input) {
482483
if (ArrayIsArray(input)) {
483484
if (input.length === 0) {
484485
return {
486+
__proto__: null,
485487
*[SymbolIterator]() {
486488
// Empty - yield nothing
487489
},
@@ -493,6 +495,7 @@ function fromSync(input) {
493495
if (allUint8) {
494496
const batch = input;
495497
return {
498+
__proto__: null,
496499
*[SymbolIterator]() {
497500
if (batch.length <= FROM_BATCH_SIZE) {
498501
yield batch;
@@ -517,6 +520,7 @@ function fromSync(input) {
517520
}
518521

519522
return {
523+
__proto__: null,
520524
*[SymbolIterator]() {
521525
yield* normalizeSyncSource(input);
522526
},
@@ -533,6 +537,7 @@ function from(input) {
533537
if (isPrimitiveChunk(input)) {
534538
const chunk = primitiveToUint8Array(input);
535539
return {
540+
__proto__: null,
536541
async *[SymbolAsyncIterator]() {
537542
yield [chunk];
538543
},
@@ -547,6 +552,7 @@ function from(input) {
547552
if (ArrayIsArray(input)) {
548553
if (input.length === 0) {
549554
return {
555+
__proto__: null,
550556
async *[SymbolAsyncIterator]() {
551557
// Empty - yield nothing
552558
},
@@ -557,6 +563,7 @@ function from(input) {
557563
if (allUint8) {
558564
const batch = input;
559565
return {
566+
__proto__: null,
560567
async *[SymbolAsyncIterator]() {
561568
if (batch.length <= FROM_BATCH_SIZE) {
562569
yield batch;
@@ -581,6 +588,7 @@ function from(input) {
581588
}
582589

583590
return {
591+
__proto__: null,
584592
async *[SymbolAsyncIterator]() {
585593
yield* normalizeAsyncSource(input);
586594
},

0 commit comments

Comments
 (0)