Skip to content

Commit 412ce49

Browse files
committed
stream: apply more stream/iter spec conformance fixes
1 parent 83355dd commit 412ce49

9 files changed

Lines changed: 531 additions & 345 deletions

File tree

doc/api/stream_iter.md

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -520,15 +520,19 @@ Including the `node:` prefix on the module specifier is optional.
520520
added: REPLACEME
521521
-->
522522

523-
* `input` {string|ArrayBuffer|ArrayBufferView|Iterable|AsyncIterable}
523+
* `input` {string|ArrayBuffer|ArrayBufferView|Iterable|AsyncIterable|Object}
524+
Must not be `null` or `undefined`.
524525
* Returns: {AsyncIterable\<Uint8Array\[]>}
525526

526527
Create an async byte stream from the given input. Strings are UTF-8 encoded.
527528
`ArrayBuffer` and `ArrayBufferView` values are wrapped as `Uint8Array`. Arrays
528529
and iterables are recursively flattened and normalized.
529530

530531
Objects implementing `Symbol.for('Stream.toAsyncStreamable')` or
531-
`Symbol.for('Stream.toStreamable')` are converted via those protocols.
532+
`Symbol.for('Stream.toStreamable')` are converted via those protocols. The
533+
`toAsyncStreamable` protocol takes precedence over `toStreamable`, which takes
534+
precedence over the iteration protocols (`Symbol.asyncIterator`,
535+
`Symbol.iterator`).
532536

533537
```mjs
534538
import { Buffer } from 'node:buffer';
@@ -556,11 +560,15 @@ run().catch(console.error);
556560
added: REPLACEME
557561
-->
558562

559-
* `input` {string|ArrayBuffer|ArrayBufferView|Iterable}
563+
* `input` {string|ArrayBuffer|ArrayBufferView|Iterable|Object}
564+
Must not be `null` or `undefined`.
560565
* Returns: {Iterable\<Uint8Array\[]>}
561566

562567
Synchronous version of [`from()`][]. Returns a sync iterable. Cannot accept
563-
async iterables or promises.
568+
async iterables or promises. Objects implementing
569+
`Symbol.for('Stream.toStreamable')` are converted via that protocol (takes
570+
precedence over `Symbol.iterator`). The `toAsyncStreamable` protocol is
571+
ignored entirely.
564572

565573
```mjs
566574
import { fromSync, textSync } from 'node:stream/iter';

lib/internal/streams/iter/broadcast.js

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ const {
2121
SafeSet,
2222
String,
2323
Symbol,
24-
SymbolAsyncDispose,
25-
SymbolAsyncIterator,
26-
SymbolDispose,
24+
SymbolAsyncDispose,
25+
SymbolAsyncIterator,
26+
SymbolDispose,
2727
TypedArrayPrototypeGetByteLength,
2828
} = primordials;
2929

@@ -352,7 +352,7 @@ class BroadcastImpl {
352352
[kCanWrite]() {
353353
if (this.#ended || this.#cancelled) return false;
354354
if ((this.#options.backpressure === 'strict' ||
355-
this.#options.backpressure === 'block') &&
355+
this.#options.backpressure === 'block') &&
356356
this.#buffer.length >= this.#options.highWaterMark) {
357357
return false;
358358
}
@@ -566,15 +566,15 @@ class BroadcastWriter {
566566
return this.#totalBytes;
567567
}
568568

569-
fail(reason) {
570-
if (this.#aborted || this.#closed) return;
571-
this.#aborted = true;
572-
this.#closed = true;
573-
const error = reason ?? new ERR_INVALID_STATE('Failed');
574-
this.#rejectPendingWrites(error);
575-
this.#rejectPendingDrains(error);
576-
this.#broadcast[kAbort](error);
577-
}
569+
fail(reason) {
570+
if (this.#aborted || this.#closed) return;
571+
this.#aborted = true;
572+
this.#closed = true;
573+
const error = reason ?? new ERR_INVALID_STATE('Failed');
574+
this.#rejectPendingWrites(error);
575+
this.#rejectPendingDrains(error);
576+
this.#broadcast[kAbort](error);
577+
}
578578

579579
[SymbolAsyncDispose]() {
580580
this.fail();
@@ -632,7 +632,7 @@ class BroadcastWriter {
632632

633633
#resolvePendingWrites() {
634634
while (this.#pendingWrites.length > 0 &&
635-
this.#broadcast[kCanWrite]()) {
635+
this.#broadcast[kCanWrite]()) {
636636
const pending = this.#pendingWrites.shift();
637637
if (this.#broadcast[kWrite](pending.chunk)) {
638638
for (let i = 0; i < pending.chunk.length; i++) {
@@ -776,7 +776,7 @@ const Broadcast = {
776776
} catch (error) {
777777
const err = isError(error) ? error :
778778
new ERR_OPERATION_FAILED(String(error));
779-
w.fail(err);
779+
w.fail(err);
780780
}
781781
};
782782
PromisePrototypeThen(pump(), undefined, () => {});

lib/internal/streams/iter/from.js

Lines changed: 64 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@ const {
1111
ArrayPrototypeEvery,
1212
ArrayPrototypePush,
1313
ArrayPrototypeSlice,
14+
DataViewPrototypeGetBuffer,
15+
DataViewPrototypeGetByteLength,
16+
DataViewPrototypeGetByteOffset,
1417
FunctionPrototypeCall,
15-
ObjectPrototypeToString,
1618
SymbolAsyncIterator,
1719
SymbolIterator,
18-
SymbolToPrimitive,
1920
TypedArrayPrototypeGetBuffer,
2021
TypedArrayPrototypeGetByteLength,
2122
TypedArrayPrototypeGetByteOffset,
@@ -30,6 +31,7 @@ const {
3031

3132
const {
3233
isArrayBuffer,
34+
isDataView,
3335
isPromise,
3436
isUint8Array,
3537
} = require('internal/util/types');
@@ -112,27 +114,6 @@ function isAsyncIterable(value) {
112114
);
113115
}
114116

115-
/**
116-
* Check if object has a custom toString() (not Object.prototype.toString).
117-
* @returns {boolean}
118-
*/
119-
function hasCustomToString(obj) {
120-
const toString = obj.toString;
121-
return typeof toString === 'function' &&
122-
toString !== ObjectPrototypeToString;
123-
}
124-
125-
/**
126-
* Check if object has Symbol.toPrimitive.
127-
* @returns {boolean}
128-
*/
129-
function hasToPrimitive(obj) {
130-
return (
131-
SymbolToPrimitive in obj &&
132-
typeof obj[SymbolToPrimitive] === 'function'
133-
);
134-
}
135-
136117
// =============================================================================
137118
// Primitive Conversion
138119
// =============================================================================
@@ -156,39 +137,20 @@ function primitiveToUint8Array(chunk) {
156137
return chunk;
157138
}
158139
// Other ArrayBufferView types (Int8Array, DataView, etc.)
140+
if (isDataView(chunk)) {
141+
return new Uint8Array(
142+
DataViewPrototypeGetBuffer(chunk),
143+
DataViewPrototypeGetByteOffset(chunk),
144+
DataViewPrototypeGetByteLength(chunk),
145+
);
146+
}
159147
return new Uint8Array(
160148
TypedArrayPrototypeGetBuffer(chunk),
161149
TypedArrayPrototypeGetByteOffset(chunk),
162150
TypedArrayPrototypeGetByteLength(chunk),
163151
);
164152
}
165153

166-
/**
167-
* Try to coerce an object to string using custom methods.
168-
* Returns null if object has no custom string coercion.
169-
* @returns {string|null}
170-
*/
171-
function tryStringCoercion(obj) {
172-
// Check for Symbol.toPrimitive first
173-
if (hasToPrimitive(obj)) {
174-
const toPrimitive = obj[SymbolToPrimitive];
175-
const result = FunctionPrototypeCall(toPrimitive, obj, 'string');
176-
if (typeof result === 'string') {
177-
return result;
178-
}
179-
// toPrimitive returned non-string, fall through to toString
180-
}
181-
182-
// Check for custom toString
183-
if (hasCustomToString(obj)) {
184-
const result = FunctionPrototypeCall(obj.toString, obj);
185-
if (typeof result === 'string') {
186-
return result;
187-
}
188-
}
189-
190-
return null;
191-
}
192154

193155
// =============================================================================
194156
// Sync Normalization (for fromSync and sync contexts)
@@ -229,19 +191,10 @@ function* normalizeSyncValue(value) {
229191
return;
230192
}
231193

232-
// Try string coercion for objects with custom toString/toPrimitive
233-
if (typeof value === 'object' && value !== null) {
234-
const str = tryStringCoercion(value);
235-
if (str !== null) {
236-
yield toUint8Array(str);
237-
return;
238-
}
239-
}
240-
241194
// Reject: no valid conversion
242195
throw new ERR_INVALID_ARG_TYPE(
243196
'value',
244-
['string', 'ArrayBuffer', 'ArrayBufferView', 'Iterable'],
197+
['string', 'ArrayBuffer', 'ArrayBufferView', 'Iterable', 'toStreamable'],
245198
value,
246199
);
247200
}
@@ -362,19 +315,11 @@ async function* normalizeAsyncValue(value) {
362315
return;
363316
}
364317

365-
// Try string coercion for objects with custom toString/toPrimitive
366-
if (typeof value === 'object' && value !== null) {
367-
const str = tryStringCoercion(value);
368-
if (str !== null) {
369-
yield toUint8Array(str);
370-
return;
371-
}
372-
}
373-
374318
// Reject: no valid conversion
375319
throw new ERR_INVALID_ARG_TYPE(
376320
'value',
377-
['string', 'ArrayBuffer', 'ArrayBufferView', 'Iterable', 'AsyncIterable'],
321+
['string', 'ArrayBuffer', 'ArrayBufferView', 'Iterable', 'AsyncIterable',
322+
'toStreamable', 'toAsyncStreamable'],
378323
value,
379324
);
380325
}
@@ -472,6 +417,10 @@ async function* normalizeAsyncSource(source) {
472417
* @returns {Iterable<Uint8Array[]>}
473418
*/
474419
function fromSync(input) {
420+
if (input == null) {
421+
throw new ERR_INVALID_ARG_TYPE('input', 'a non-null value', input);
422+
}
423+
475424
// Check for primitives first (ByteInput)
476425
if (isPrimitiveChunk(input)) {
477426
const chunk = primitiveToUint8Array(input);
@@ -518,11 +467,33 @@ function fromSync(input) {
518467
}
519468
}
520469

470+
// Check toStreamable protocol (takes precedence over iteration protocols).
471+
// toAsyncStreamable is ignored entirely in fromSync.
472+
if (typeof input[toStreamable] === 'function') {
473+
return fromSync(input[toStreamable]());
474+
}
475+
476+
// Reject explicit async inputs
477+
if (isAsyncIterable(input)) {
478+
throw new ERR_INVALID_ARG_TYPE(
479+
'input',
480+
'a synchronous input (not AsyncIterable)',
481+
input,
482+
);
483+
}
484+
if (typeof input === 'object' && input !== null && typeof input.then === 'function') {
485+
throw new ERR_INVALID_ARG_TYPE(
486+
'input',
487+
'a synchronous input (not Promise)',
488+
input,
489+
);
490+
}
491+
521492
// Must be a SyncStreamable
522493
if (!isSyncIterable(input)) {
523494
throw new ERR_INVALID_ARG_TYPE(
524495
'input',
525-
['string', 'ArrayBuffer', 'ArrayBufferView', 'Iterable'],
496+
['string', 'ArrayBuffer', 'ArrayBufferView', 'Iterable', 'toStreamable'],
526497
input,
527498
);
528499
}
@@ -541,6 +512,10 @@ function fromSync(input) {
541512
* @returns {AsyncIterable<Uint8Array[]>}
542513
*/
543514
function from(input) {
515+
if (input == null) {
516+
throw new ERR_INVALID_ARG_TYPE('input', 'a non-null value', input);
517+
}
518+
544519
// Check for primitives first (ByteInput)
545520
if (isPrimitiveChunk(input)) {
546521
const chunk = primitiveToUint8Array(input);
@@ -586,11 +561,29 @@ function from(input) {
586561
}
587562
}
588563

564+
// Check toAsyncStreamable protocol (takes precedence over toStreamable and
565+
// iteration protocols)
566+
if (typeof input[toAsyncStreamable] === 'function') {
567+
return {
568+
__proto__: null,
569+
async *[SymbolAsyncIterator]() {
570+
const result = await input[toAsyncStreamable]();
571+
yield* from(result)[SymbolAsyncIterator]();
572+
},
573+
};
574+
}
575+
576+
// Check toStreamable protocol (takes precedence over iteration protocols)
577+
if (typeof input[toStreamable] === 'function') {
578+
return from(input[toStreamable]());
579+
}
580+
589581
// Must be a Streamable (sync or async iterable)
590582
if (!isSyncIterable(input) && !isAsyncIterable(input)) {
591583
throw new ERR_INVALID_ARG_TYPE(
592584
'input',
593-
['string', 'ArrayBuffer', 'ArrayBufferView', 'Iterable', 'AsyncIterable'],
585+
['string', 'ArrayBuffer', 'ArrayBufferView', 'Iterable',
586+
'AsyncIterable', 'toStreamable', 'toAsyncStreamable'],
594587
input,
595588
);
596589
}

0 commit comments

Comments
 (0)