Skip to content

Commit 577e78f

Browse files
committed
stream: make additional fixups to stream/iter arg validation
1 parent dadd875 commit 577e78f

8 files changed

Lines changed: 35 additions & 36 deletions

File tree

lib/internal/streams/iter/broadcast.js

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ const {
1212
ArrayPrototypePush,
1313
ArrayPrototypeSlice,
1414
MathMax,
15+
MathMin,
16+
NumberMAX_SAFE_INTEGER,
1517
PromisePrototypeThen,
1618
PromiseResolve,
1719
PromiseWithResolvers,
@@ -30,6 +32,7 @@ const {
3032
ERR_INVALID_ARG_TYPE,
3133
ERR_INVALID_RETURN_VALUE,
3234
ERR_INVALID_STATE,
35+
ERR_OPERATION_FAILED,
3336
},
3437
} = require('internal/errors');
3538
const {
@@ -115,7 +118,6 @@ class BroadcastImpl {
115118
#writer = null;
116119

117120
constructor(options) {
118-
validateBackpressure(options.backpressure);
119121
this.#options = options;
120122
this[kOnBufferDrained] = null;
121123
}
@@ -652,15 +654,15 @@ function broadcast(options = { __proto__: null }) {
652654
backpressure = 'strict',
653655
signal,
654656
} = options;
655-
validateInteger(highWaterMark, 'options.highWaterMark', 1);
657+
validateInteger(highWaterMark, 'options.highWaterMark');
656658
validateBackpressure(backpressure);
657659
if (signal !== undefined) {
658660
validateAbortSignal(signal, 'options.signal');
659661
}
660662

661663
const opts = {
662664
__proto__: null,
663-
highWaterMark: MathMax(1, highWaterMark),
665+
highWaterMark: MathMax(1, MathMin(NumberMAX_SAFE_INTEGER, highWaterMark)),
664666
backpressure,
665667
signal,
666668
};
@@ -741,7 +743,7 @@ const Broadcast = {
741743
}
742744
} catch (error) {
743745
const err = isError(error) ? error :
744-
new ERR_INVALID_ARG_TYPE('error', 'Error', String(error));
746+
new ERR_OPERATION_FAILED(String(error));
745747
if (!w.failSync(err)) {
746748
await w.fail(err);
747749
}

lib/internal/streams/iter/consumers.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ function validateSyncConsumerOptions(options) {
251251
*/
252252
function bytesSync(source, options = { __proto__: null }) {
253253
validateSyncConsumerOptions(options);
254-
return concatBytes(collectSync(source, options?.limit));
254+
return concatBytes(collectSync(source, options.limit));
255255
}
256256

257257
/**

lib/internal/streams/iter/from.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,9 @@ function tryStringCoercion(obj) {
185185
// Check for custom toString
186186
if (hasCustomToString(obj)) {
187187
const result = FunctionPrototypeCall(obj.toString, obj);
188-
return result;
188+
if (typeof result === 'string') {
189+
return result;
190+
}
189191
}
190192

191193
return null;

lib/internal/streams/iter/pull.js

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -595,16 +595,15 @@ function pipeToSync(source, ...args) {
595595
const { transforms, writer, options } = parsePipeToArgs(args);
596596

597597
// Handle transform-writer
598-
const finalTransforms = ArrayPrototypeSlice(transforms);
599598
if (isTransformObject(writer)) {
600-
ArrayPrototypePush(finalTransforms, writer);
599+
ArrayPrototypePush(transforms, writer);
601600
}
602601

603602
// Create pipeline
604-
const pipeline = finalTransforms.length > 0 ?
603+
const pipeline = transforms.length > 0 ?
605604
createSyncPipeline(
606605
{ [SymbolIterator]: () => source[SymbolIterator]() },
607-
finalTransforms) :
606+
transforms) :
608607
source;
609608

610609
let totalBytes = 0;
@@ -652,9 +651,8 @@ async function pipeTo(source, ...args) {
652651
}
653652

654653
// Handle transform-writer
655-
const finalTransforms = ArrayPrototypeSlice(transforms);
656654
if (isTransformObject(writer)) {
657-
ArrayPrototypePush(finalTransforms, writer);
655+
ArrayPrototypePush(transforms, writer);
658656
}
659657

660658
const signal = options?.signal;
@@ -691,7 +689,7 @@ async function pipeTo(source, ...args) {
691689

692690
try {
693691
// Fast path: no transforms - iterate directly
694-
if (finalTransforms.length === 0) {
692+
if (transforms.length === 0) {
695693
if (isAsyncIterable(source)) {
696694
for await (const batch of source) {
697695
signal?.throwIfAborted();
@@ -710,7 +708,7 @@ async function pipeTo(source, ...args) {
710708
{ [SymbolIterator]: () => source[SymbolIterator]() };
711709

712710
const pipeline = createAsyncPipeline(
713-
streamableSource, finalTransforms, signal);
711+
streamableSource, transforms, signal);
714712

715713
for await (const batch of pipeline) {
716714
signal?.throwIfAborted();

lib/internal/streams/iter/push.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ const {
1010
ArrayPrototypePush,
1111
ArrayPrototypeSlice,
1212
MathMax,
13+
MathMin,
14+
NumberMAX_SAFE_INTEGER,
1315
PromiseResolve,
1416
PromiseWithResolvers,
1517
SymbolAsyncIterator,
@@ -79,18 +81,18 @@ class PushQueue {
7981
#signal;
8082
#abortHandler;
8183

82-
constructor(options = {}) {
84+
constructor(options = { __proto__: null }) {
8385
const {
8486
highWaterMark = kPushDefaultHWM,
8587
backpressure = 'strict',
8688
signal,
8789
} = options;
88-
validateInteger(highWaterMark, 'options.highWaterMark', 1);
90+
validateInteger(highWaterMark, 'options.highWaterMark');
8991
validateBackpressure(backpressure);
9092
if (signal !== undefined) {
9193
validateAbortSignal(signal, 'options.signal');
9294
}
93-
this.#highWaterMark = MathMax(1, highWaterMark);
95+
this.#highWaterMark = MathMax(1, MathMin(NumberMAX_SAFE_INTEGER, highWaterMark));
9496
this.#backpressure = backpressure;
9597
this.#signal = signal;
9698
this.#abortHandler = undefined;

lib/internal/streams/iter/share.js

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
const {
99
ArrayPrototypePush,
1010
MathMax,
11+
MathMin,
12+
NumberMAX_SAFE_INTEGER,
1113
PromisePrototypeThen,
1214
PromiseResolve,
1315
PromiseWithResolvers,
@@ -158,7 +160,7 @@ class ShareImpl {
158160
}
159161

160162
// Need to pull from source - check buffer limit
161-
const canPull = await self.#waitForBufferSpace(state);
163+
const canPull = await self.#waitForBufferSpace();
162164
if (!canPull) {
163165
state.detached = true;
164166
self.#consumers.delete(state);
@@ -230,7 +232,7 @@ class ShareImpl {
230232

231233
// Internal methods
232234

233-
async #waitForBufferSpace(_state) {
235+
async #waitForBufferSpace() {
234236
while (this.#buffer.length >= this.#options.highWaterMark) {
235237
if (this.#cancelled || this.#sourceError || this.#sourceExhausted) {
236238
return !this.#cancelled;
@@ -363,7 +365,6 @@ class SyncShareImpl {
363365
#cancelled = false;
364366

365367
constructor(source, options) {
366-
validateBackpressure(options.backpressure);
367368
this.#source = source;
368369
this.#options = options;
369370
}
@@ -387,6 +388,7 @@ class SyncShareImpl {
387388

388389
#createRawConsumer() {
389390
const state = {
391+
__proto__: null,
390392
cursor: this.#bufferStart,
391393
detached: false,
392394
};
@@ -577,15 +579,15 @@ function share(source, options = { __proto__: null }) {
577579
backpressure = 'strict',
578580
signal,
579581
} = options;
580-
validateInteger(highWaterMark, 'options.highWaterMark', 1);
582+
validateInteger(highWaterMark, 'options.highWaterMark');
581583
validateBackpressure(backpressure);
582584
if (signal !== undefined) {
583585
validateAbortSignal(signal, 'options.signal');
584586
}
585587

586588
const opts = {
587589
__proto__: null,
588-
highWaterMark: MathMax(1, highWaterMark),
590+
highWaterMark: MathMax(1, MathMin(NumberMAX_SAFE_INTEGER, highWaterMark)),
589591
backpressure,
590592
signal,
591593
};
@@ -614,12 +616,12 @@ function shareSync(source, options = { __proto__: null }) {
614616
highWaterMark = kMultiConsumerDefaultHWM,
615617
backpressure = 'strict',
616618
} = options;
617-
validateInteger(highWaterMark, 'options.highWaterMark', 1);
619+
validateInteger(highWaterMark, 'options.highWaterMark');
618620
validateBackpressure(backpressure);
619621

620622
const opts = {
621623
__proto__: null,
622-
highWaterMark: MathMax(1, highWaterMark),
624+
highWaterMark: MathMax(1, MathMin(NumberMAX_SAFE_INTEGER, highWaterMark)),
623625
backpressure,
624626
};
625627

lib/internal/streams/iter/utils.js

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,6 @@ function isPullOptions(value) {
110110
);
111111
}
112112

113-
/**
114-
* Parse variadic arguments for pull/pullSync.
115-
* Returns { transforms, options }
116-
* @param {Array} args
117-
* @returns {{ transforms: Array, options: object|undefined }}
118-
*/
119113
/**
120114
* Check if a value is a valid transform (function or transform object).
121115
* @param {unknown} value

test/parallel/test-stream-iter-validation.js

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@ const {
2020

2121
// HighWaterMark must be integer >= 1
2222
assert.throws(() => push({ highWaterMark: 'bad' }), { code: 'ERR_INVALID_ARG_TYPE' });
23-
assert.throws(() => push({ highWaterMark: 0 }), { code: 'ERR_OUT_OF_RANGE' });
24-
assert.throws(() => push({ highWaterMark: -1 }), { code: 'ERR_OUT_OF_RANGE' });
2523
assert.throws(() => push({ highWaterMark: 1.5 }), { code: 'ERR_OUT_OF_RANGE' });
24+
// Values < 1 are clamped to 1
25+
push({ highWaterMark: 0 }).writer.endSync();
26+
push({ highWaterMark: -1 }).writer.endSync();
27+
// Values above MAX_SAFE_INTEGER are clamped
28+
push({ highWaterMark: Number.MAX_SAFE_INTEGER }).writer.endSync();
2629

2730
// Signal must be AbortSignal
2831
assert.throws(() => push({ signal: 'bad' }), { code: 'ERR_INVALID_ARG_TYPE' });
@@ -80,7 +83,6 @@ assert.throws(() => pullSync(fromSync('a'), 42), { code: 'ERR_INVALID_ARG_TYPE'
8083
// =============================================================================
8184

8285
assert.throws(() => broadcast({ highWaterMark: 'bad' }), { code: 'ERR_INVALID_ARG_TYPE' });
83-
assert.throws(() => broadcast({ highWaterMark: 0 }), { code: 'ERR_OUT_OF_RANGE' });
8486
assert.throws(() => broadcast({ signal: {} }), { code: 'ERR_INVALID_ARG_TYPE' });
8587
assert.throws(() => broadcast({ backpressure: 'bad' }), { code: 'ERR_INVALID_ARG_VALUE' });
8688

@@ -94,15 +96,12 @@ assert.throws(() => Broadcast.from('bad'), { code: 'ERR_INVALID_ARG_TYPE' });
9496

9597
assert.throws(() => share(42), { code: 'ERR_INVALID_ARG_TYPE' });
9698
assert.throws(() => share(from('a'), { highWaterMark: 'bad' }), { code: 'ERR_INVALID_ARG_TYPE' });
97-
assert.throws(() => share(from('a'), { highWaterMark: 0 }), { code: 'ERR_OUT_OF_RANGE' });
9899
assert.throws(() => share(from('a'), { signal: {} }), { code: 'ERR_INVALID_ARG_TYPE' });
99100
assert.throws(() => share(from('a'), { backpressure: 'bad' }), { code: 'ERR_INVALID_ARG_VALUE' });
100101

101102
assert.throws(() => shareSync(42), { code: 'ERR_INVALID_ARG_TYPE' });
102103
assert.throws(() => shareSync(fromSync('a'), { highWaterMark: 'bad' }),
103104
{ code: 'ERR_INVALID_ARG_TYPE' });
104-
assert.throws(() => shareSync(fromSync('a'), { highWaterMark: 0 }),
105-
{ code: 'ERR_OUT_OF_RANGE' });
106105

107106
// Share.from / SyncShare.fromSync reject non-iterable
108107
assert.throws(() => Share.from(42), { code: 'ERR_INVALID_ARG_TYPE' });

0 commit comments

Comments
 (0)