Skip to content

Commit 882b208

Browse files
committed
stream: apply more stream/iter conformance fixes
1 parent b6c57f0 commit 882b208

4 files changed

Lines changed: 92 additions & 56 deletions

File tree

lib/internal/streams/iter/consumers.js

Lines changed: 23 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
const {
1111
ArrayBufferPrototypeGetByteLength,
1212
ArrayBufferPrototypeSlice,
13-
ArrayIsArray,
1413
ArrayPrototypeFilter,
1514
ArrayPrototypeMap,
1615
ArrayPrototypePush,
@@ -42,6 +41,8 @@ const {
4241
} = require('internal/validators');
4342

4443
const {
44+
from,
45+
fromSync,
4546
isAsyncIterable,
4647
isSyncIterable,
4748
} = require('internal/streams/iter/from');
@@ -69,11 +70,6 @@ function isMergeOptions(value) {
6970

7071
// Normalize a yielded value to a Uint8Array[] batch. Sources should yield
7172
// Uint8Array[] but a raw Uint8Array or string is tolerated by wrapping it.
72-
function ensureBatch(batch) {
73-
if (ArrayIsArray(batch)) return batch;
74-
return [batch];
75-
}
76-
7773
// =============================================================================
7874
// Shared chunk collection helpers
7975
// =============================================================================
@@ -85,11 +81,12 @@ function ensureBatch(batch) {
8581
* @returns {Uint8Array[]}
8682
*/
8783
function collectSync(source, limit) {
84+
// Normalize source via fromSync() - accepts strings, ArrayBuffers, protocols, etc.
85+
const normalized = fromSync(source);
8886
const chunks = [];
8987
let totalBytes = 0;
9088

91-
for (const raw of source) {
92-
const batch = ensureBatch(raw);
89+
for (const batch of normalized) {
9390
for (let i = 0; i < batch.length; i++) {
9491
const chunk = batch[i];
9592
if (limit !== undefined) {
@@ -115,65 +112,35 @@ function collectSync(source, limit) {
115112
async function collectAsync(source, signal, limit) {
116113
signal?.throwIfAborted();
117114

115+
// Normalize source via from() - accepts strings, ArrayBuffers, protocols, etc.
116+
const normalized = from(source);
118117
const chunks = [];
119118

120119
// Fast path: no signal and no limit
121120
if (!signal && limit === undefined) {
122-
if (isAsyncIterable(source)) {
123-
for await (const raw of source) {
124-
const batch = ensureBatch(raw);
125-
for (let i = 0; i < batch.length; i++) {
126-
ArrayPrototypePush(chunks, batch[i]);
127-
}
128-
}
129-
} else if (isSyncIterable(source)) {
130-
for (const raw of source) {
131-
const batch = ensureBatch(raw);
132-
for (let i = 0; i < batch.length; i++) {
133-
ArrayPrototypePush(chunks, batch[i]);
134-
}
121+
for await (const batch of normalized) {
122+
for (let i = 0; i < batch.length; i++) {
123+
ArrayPrototypePush(chunks, batch[i]);
135124
}
136-
} else {
137-
throw new ERR_INVALID_ARG_TYPE('source', ['AsyncIterable', 'Iterable'], source);
138125
}
139126
return chunks;
140127
}
141128

142129
// Slow path: with signal or limit checks
143130
let totalBytes = 0;
144131

145-
if (isAsyncIterable(source)) {
146-
for await (const raw of source) {
147-
const batch = ensureBatch(raw);
148-
signal?.throwIfAborted();
149-
for (let i = 0; i < batch.length; i++) {
150-
const chunk = batch[i];
151-
if (limit !== undefined) {
152-
totalBytes += TypedArrayPrototypeGetByteLength(chunk);
153-
if (totalBytes > limit) {
154-
throw new ERR_OUT_OF_RANGE('totalBytes', `<= ${limit}`, totalBytes);
155-
}
156-
}
157-
ArrayPrototypePush(chunks, chunk);
158-
}
159-
}
160-
} else if (isSyncIterable(source)) {
161-
for (const raw of source) {
162-
const batch = ensureBatch(raw);
163-
signal?.throwIfAborted();
164-
for (let i = 0; i < batch.length; i++) {
165-
const chunk = batch[i];
166-
if (limit !== undefined) {
167-
totalBytes += TypedArrayPrototypeGetByteLength(chunk);
168-
if (totalBytes > limit) {
169-
throw new ERR_OUT_OF_RANGE('totalBytes', `<= ${limit}`, totalBytes);
170-
}
132+
for await (const batch of normalized) {
133+
signal?.throwIfAborted();
134+
for (let i = 0; i < batch.length; i++) {
135+
const chunk = batch[i];
136+
if (limit !== undefined) {
137+
totalBytes += TypedArrayPrototypeGetByteLength(chunk);
138+
if (totalBytes > limit) {
139+
throw new ERR_OUT_OF_RANGE('totalBytes', `<= ${limit}`, totalBytes);
171140
}
172-
ArrayPrototypePush(chunks, chunk);
173141
}
142+
ArrayPrototypePush(chunks, chunk);
174143
}
175-
} else {
176-
throw new ERR_INVALID_ARG_TYPE('source', ['AsyncIterable', 'Iterable'], source);
177144
}
178145

179146
return chunks;
@@ -216,7 +183,8 @@ function validateConsumerOptions(options) {
216183
try {
217184
new TextDecoder(options.encoding);
218185
} catch {
219-
throw new ERR_INVALID_ARG_VALUE('options.encoding', options.encoding);
186+
throw new ERR_INVALID_ARG_VALUE.RangeError(
187+
'options.encoding', options.encoding);
220188
}
221189
}
222190
}
@@ -234,7 +202,8 @@ function validateSyncConsumerOptions(options) {
234202
try {
235203
new TextDecoder(options.encoding);
236204
} catch {
237-
throw new ERR_INVALID_ARG_VALUE('options.encoding', options.encoding);
205+
throw new ERR_INVALID_ARG_VALUE.RangeError(
206+
'options.encoding', options.encoding);
238207
}
239208
}
240209
}
@@ -266,7 +235,6 @@ function textSync(source, options = { __proto__: null }) {
266235
const decoder = new TextDecoder(options.encoding ?? 'utf-8', {
267236
__proto__: null,
268237
fatal: true,
269-
ignoreBOM: true,
270238
});
271239
return decoder.decode(data);
272240
}
@@ -322,7 +290,6 @@ async function text(source, options = { __proto__: null }) {
322290
const decoder = new TextDecoder(options.encoding ?? 'utf-8', {
323291
__proto__: null,
324292
fatal: true,
325-
ignoreBOM: true,
326293
});
327294
return decoder.decode(data);
328295
}

lib/internal/streams/iter/utils.js

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict';
22

33
const {
4+
ArrayBufferPrototypeGetByteLength,
45
ArrayPrototypeSlice,
56
TypedArrayPrototypeGetBuffer,
67
TypedArrayPrototypeGetByteLength,
@@ -70,6 +71,21 @@ function allUint8Array(chunks) {
7071
* @returns {Uint8Array}
7172
*/
7273
function concatBytes(chunks) {
74+
// Empty stream: return zero-length Uint8Array
75+
if (chunks.length === 0) {
76+
return new Uint8Array(0);
77+
}
78+
// Single chunk: return directly if buffer is not shared
79+
if (chunks.length === 1) {
80+
const chunk = chunks[0];
81+
const buf = TypedArrayPrototypeGetBuffer(chunk);
82+
if (TypedArrayPrototypeGetByteOffset(chunk) === 0 &&
83+
TypedArrayPrototypeGetByteLength(chunk) ===
84+
ArrayBufferPrototypeGetByteLength(buf)) {
85+
return chunk;
86+
}
87+
}
88+
// Multiple chunks or shared buffer: concatenate
7389
const buf = Buffer.concat(chunks);
7490
return new Uint8Array(
7591
TypedArrayPrototypeGetBuffer(buf),

test/parallel/test-stream-iter-consumers-bytes.js

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,23 @@ async function testConsumersNonArrayBatchSync() {
182182
assert.strictEqual(arr.length, 2);
183183
}
184184

185+
// Consumers accept string sources directly (normalized via from/fromSync)
186+
async function testBytesStringSource() {
187+
const result = await bytes('hello-bytes');
188+
assert.strictEqual(new TextDecoder().decode(result), 'hello-bytes');
189+
}
190+
191+
function testBytesSyncStringSource() {
192+
const result = bytesSync('hello-sync');
193+
assert.strictEqual(new TextDecoder().decode(result), 'hello-sync');
194+
}
195+
196+
async function testTextStringSource() {
197+
const { text } = require('stream/iter');
198+
const result = await text('direct-string');
199+
assert.strictEqual(result, 'direct-string');
200+
}
201+
185202
Promise.all([
186203
testBytesSyncBasic(),
187204
testBytesSyncLimit(),
@@ -197,4 +214,7 @@ Promise.all([
197214
testArrayAsyncLimit(),
198215
testConsumersNonArrayBatch(),
199216
testConsumersNonArrayBatchSync(),
217+
testBytesStringSource(),
218+
testBytesSyncStringSource(),
219+
testTextStringSource(),
200220
]).then(common.mustCall());

test/parallel/test-stream-iter-consumers-text.js

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,35 @@ async function testTextMultiChunkSplitCodepoint() {
116116
assert.strictEqual(result, '€');
117117
}
118118

119+
// BOM should be stripped (ignoreBOM defaults to false per spec)
120+
async function testTextBOMStripped() {
121+
// UTF-8 BOM: 0xEF, 0xBB, 0xBF followed by 'hi'
122+
const withBOM = new Uint8Array([0xEF, 0xBB, 0xBF, 0x68, 0x69]);
123+
const result = await text(from(withBOM));
124+
assert.strictEqual(result, 'hi');
125+
}
126+
127+
async function testTextSyncBOMStripped() {
128+
const withBOM = new Uint8Array([0xEF, 0xBB, 0xBF, 0x68, 0x69]);
129+
const result = textSync(fromSync(withBOM));
130+
assert.strictEqual(result, 'hi');
131+
}
132+
133+
// Unsupported encoding throws RangeError
134+
async function testTextUnsupportedEncodingThrowsRangeError() {
135+
await assert.rejects(
136+
() => text(from('hello'), { encoding: 'not-a-real-encoding' }),
137+
{ name: 'RangeError' },
138+
);
139+
}
140+
141+
function testTextSyncUnsupportedEncodingThrowsRangeError() {
142+
assert.throws(
143+
() => textSync(fromSync('hello'), { encoding: 'not-a-real-encoding' }),
144+
{ name: 'RangeError' },
145+
);
146+
}
147+
119148
Promise.all([
120149
testTextSyncBasic(),
121150
testTextAsync(),
@@ -128,4 +157,8 @@ Promise.all([
128157
testTextEmpty(),
129158
testTextWithSignal(),
130159
testTextMultiChunkSplitCodepoint(),
160+
testTextBOMStripped(),
161+
testTextSyncBOMStripped(),
162+
testTextUnsupportedEncodingThrowsRangeError(),
163+
testTextSyncUnsupportedEncodingThrowsRangeError(),
131164
]).then(common.mustCall());

0 commit comments

Comments
 (0)