Skip to content

Commit 4e30386

Browse files
committed
stream: deduplicate consumer sync/async iteration logic
Extract shared helpers from the consumer functions: - `collectSync()` for sync chunk collection with optional limit - `collectAsync()` for async/sync chunk collection with signal + limit - `toArrayBuffer()` for `Uint8Array`-to-`ArrayBuffer` conversion `bytesSync`, `bytes`, `arraySync`, `array`, `arrayBufferSync`, and arrayBu`ffer are now thin wrappers over these helpers, eliminating ~170 lines of nearly identical iteration code
1 parent 6b93fa8 commit 4e30386

1 file changed

Lines changed: 93 additions & 162 deletions

File tree

lib/internal/streams/iter/consumers.js

Lines changed: 93 additions & 162 deletions
Original file line numberDiff line numberDiff line change
@@ -67,79 +67,16 @@ function ensureBatch(batch) {
6767
}
6868

6969
// =============================================================================
70-
// Sync Consumers
70+
// Shared chunk collection helpers
7171
// =============================================================================
7272

7373
/**
74-
* Collect all bytes from a sync source.
75-
* @param {Iterable<Uint8Array[]>} source
76-
* @param {{ limit?: number }} [options]
77-
* @returns {Uint8Array}
78-
*/
79-
function bytesSync(source, options) {
80-
const limit = options?.limit;
81-
const chunks = [];
82-
let totalBytes = 0;
83-
84-
for (const raw of source) {
85-
const batch = ensureBatch(raw);
86-
for (let i = 0; i < batch.length; i++) {
87-
const chunk = batch[i];
88-
if (limit !== undefined) {
89-
totalBytes += TypedArrayPrototypeGetByteLength(chunk);
90-
if (totalBytes > limit) {
91-
throw new ERR_OUT_OF_RANGE('totalBytes', `<= ${limit}`, totalBytes);
92-
}
93-
}
94-
ArrayPrototypePush(chunks, chunk);
95-
}
96-
}
97-
98-
return concatBytes(chunks);
99-
}
100-
101-
/**
102-
* Collect and decode text from a sync source.
103-
* @param {Iterable<Uint8Array[]>} source
104-
* @param {{ encoding?: string, limit?: number }} [options]
105-
* @returns {string}
106-
*/
107-
function textSync(source, options) {
108-
const data = bytesSync(source, options);
109-
const decoder = new TextDecoder(options?.encoding ?? 'utf-8', {
110-
fatal: true,
111-
ignoreBOM: true,
112-
});
113-
return decoder.decode(data);
114-
}
115-
116-
/**
117-
* Collect bytes as ArrayBuffer from a sync source.
74+
* Collect chunks from a sync source into an array.
11875
* @param {Iterable<Uint8Array[]>} source
119-
* @param {{ limit?: number }} [options]
120-
* @returns {ArrayBuffer}
121-
*/
122-
function arrayBufferSync(source, options) {
123-
const data = bytesSync(source, options);
124-
const byteOffset = TypedArrayPrototypeGetByteOffset(data);
125-
const byteLength = TypedArrayPrototypeGetByteLength(data);
126-
const buffer = TypedArrayPrototypeGetBuffer(data);
127-
if (byteOffset === 0 &&
128-
byteLength === ArrayBufferPrototypeGetByteLength(buffer)) {
129-
return buffer;
130-
}
131-
return ArrayBufferPrototypeSlice(buffer, byteOffset,
132-
byteOffset + byteLength);
133-
}
134-
135-
/**
136-
* Collect all chunks as an array from a sync source.
137-
* @param {Iterable<Uint8Array[]>} source
138-
* @param {{ limit?: number }} [options]
76+
* @param {number} [limit]
13977
* @returns {Uint8Array[]}
14078
*/
141-
function arraySync(source, options) {
142-
const limit = options?.limit;
79+
function collectSync(source, limit) {
14380
const chunks = [];
14481
let totalBytes = 0;
14582

@@ -160,20 +97,14 @@ function arraySync(source, options) {
16097
return chunks;
16198
}
16299

163-
// =============================================================================
164-
// Async Consumers
165-
// =============================================================================
166-
167100
/**
168-
* Collect all bytes from an async or sync source.
101+
* Collect chunks from an async or sync source into an array.
169102
* @param {AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>} source
170-
* @param {{ signal?: AbortSignal, limit?: number }} [options]
171-
* @returns {Promise<Uint8Array>}
103+
* @param {AbortSignal} [signal]
104+
* @param {number} [limit]
105+
* @returns {Promise<Uint8Array[]>}
172106
*/
173-
async function bytes(source, options) {
174-
const signal = options?.signal;
175-
const limit = options?.limit;
176-
107+
async function collectAsync(source, signal, limit) {
177108
if (signal?.aborted) {
178109
throw signal.reason ?? lazyDOMException('Aborted', 'AbortError');
179110
}
@@ -199,7 +130,7 @@ async function bytes(source, options) {
199130
} else {
200131
throw new ERR_INVALID_ARG_TYPE('source', ['AsyncIterable', 'Iterable'], source);
201132
}
202-
return concatBytes(chunks);
133+
return chunks;
203134
}
204135

205136
// Slow path: with signal or limit checks
@@ -243,6 +174,87 @@ async function bytes(source, options) {
243174
throw new ERR_INVALID_ARG_TYPE('source', ['AsyncIterable', 'Iterable'], source);
244175
}
245176

177+
return chunks;
178+
}
179+
180+
/**
181+
* Convert a Uint8Array to its backing ArrayBuffer, slicing if necessary.
182+
* @param {Uint8Array} data
183+
* @returns {ArrayBuffer}
184+
*/
185+
function toArrayBuffer(data) {
186+
const byteOffset = TypedArrayPrototypeGetByteOffset(data);
187+
const byteLength = TypedArrayPrototypeGetByteLength(data);
188+
const buffer = TypedArrayPrototypeGetBuffer(data);
189+
if (byteOffset === 0 &&
190+
byteLength === ArrayBufferPrototypeGetByteLength(buffer)) {
191+
return buffer;
192+
}
193+
return ArrayBufferPrototypeSlice(buffer, byteOffset,
194+
byteOffset + byteLength);
195+
}
196+
197+
// =============================================================================
198+
// Sync Consumers
199+
// =============================================================================
200+
201+
/**
202+
* Collect all bytes from a sync source.
203+
* @param {Iterable<Uint8Array[]>} source
204+
* @param {{ limit?: number }} [options]
205+
* @returns {Uint8Array}
206+
*/
207+
function bytesSync(source, options) {
208+
return concatBytes(collectSync(source, options?.limit));
209+
}
210+
211+
/**
212+
* Collect and decode text from a sync source.
213+
* @param {Iterable<Uint8Array[]>} source
214+
* @param {{ encoding?: string, limit?: number }} [options]
215+
* @returns {string}
216+
*/
217+
function textSync(source, options) {
218+
const data = bytesSync(source, options);
219+
const decoder = new TextDecoder(options?.encoding ?? 'utf-8', {
220+
fatal: true,
221+
ignoreBOM: true,
222+
});
223+
return decoder.decode(data);
224+
}
225+
226+
/**
227+
* Collect bytes as ArrayBuffer from a sync source.
228+
* @param {Iterable<Uint8Array[]>} source
229+
* @param {{ limit?: number }} [options]
230+
* @returns {ArrayBuffer}
231+
*/
232+
function arrayBufferSync(source, options) {
233+
return toArrayBuffer(bytesSync(source, options));
234+
}
235+
236+
/**
237+
* Collect all chunks as an array from a sync source.
238+
* @param {Iterable<Uint8Array[]>} source
239+
* @param {{ limit?: number }} [options]
240+
* @returns {Uint8Array[]}
241+
*/
242+
function arraySync(source, options) {
243+
return collectSync(source, options?.limit);
244+
}
245+
246+
// =============================================================================
247+
// Async Consumers
248+
// =============================================================================
249+
250+
/**
251+
* Collect all bytes from an async or sync source.
252+
* @param {AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>} source
253+
* @param {{ signal?: AbortSignal, limit?: number }} [options]
254+
* @returns {Promise<Uint8Array>}
255+
*/
256+
async function bytes(source, options) {
257+
const chunks = await collectAsync(source, options?.signal, options?.limit);
246258
return concatBytes(chunks);
247259
}
248260

@@ -268,16 +280,7 @@ async function text(source, options) {
268280
* @returns {Promise<ArrayBuffer>}
269281
*/
270282
async function arrayBuffer(source, options) {
271-
const data = await bytes(source, options);
272-
const byteOffset = TypedArrayPrototypeGetByteOffset(data);
273-
const byteLength = TypedArrayPrototypeGetByteLength(data);
274-
const buffer = TypedArrayPrototypeGetBuffer(data);
275-
if (byteOffset === 0 &&
276-
byteLength === ArrayBufferPrototypeGetByteLength(buffer)) {
277-
return buffer;
278-
}
279-
return ArrayBufferPrototypeSlice(buffer, byteOffset,
280-
byteOffset + byteLength);
283+
return toArrayBuffer(await bytes(source, options));
281284
}
282285

283286
/**
@@ -287,79 +290,7 @@ async function arrayBuffer(source, options) {
287290
* @returns {Promise<Uint8Array[]>}
288291
*/
289292
async function array(source, options) {
290-
const signal = options?.signal;
291-
const limit = options?.limit;
292-
293-
if (signal?.aborted) {
294-
throw signal.reason ?? lazyDOMException('Aborted', 'AbortError');
295-
}
296-
297-
const chunks = [];
298-
299-
// Fast path: no signal and no limit
300-
if (!signal && limit === undefined) {
301-
if (isAsyncIterable(source)) {
302-
for await (const raw of source) {
303-
const batch = ensureBatch(raw);
304-
for (let i = 0; i < batch.length; i++) {
305-
ArrayPrototypePush(chunks, batch[i]);
306-
}
307-
}
308-
} else if (isSyncIterable(source)) {
309-
for (const raw of source) {
310-
const batch = ensureBatch(raw);
311-
for (let i = 0; i < batch.length; i++) {
312-
ArrayPrototypePush(chunks, batch[i]);
313-
}
314-
}
315-
} else {
316-
throw new ERR_INVALID_ARG_TYPE('source', ['AsyncIterable', 'Iterable'], source);
317-
}
318-
return chunks;
319-
}
320-
321-
// Slow path
322-
let totalBytes = 0;
323-
324-
if (isAsyncIterable(source)) {
325-
for await (const raw of source) {
326-
const batch = ensureBatch(raw);
327-
if (signal?.aborted) {
328-
throw signal.reason ?? lazyDOMException('Aborted', 'AbortError');
329-
}
330-
for (let i = 0; i < batch.length; i++) {
331-
const chunk = batch[i];
332-
if (limit !== undefined) {
333-
totalBytes += TypedArrayPrototypeGetByteLength(chunk);
334-
if (totalBytes > limit) {
335-
throw new ERR_OUT_OF_RANGE('totalBytes', `<= ${limit}`, totalBytes);
336-
}
337-
}
338-
ArrayPrototypePush(chunks, chunk);
339-
}
340-
}
341-
} else if (isSyncIterable(source)) {
342-
for (const raw of source) {
343-
const batch = ensureBatch(raw);
344-
if (signal?.aborted) {
345-
throw signal.reason ?? lazyDOMException('Aborted', 'AbortError');
346-
}
347-
for (let i = 0; i < batch.length; i++) {
348-
const chunk = batch[i];
349-
if (limit !== undefined) {
350-
totalBytes += TypedArrayPrototypeGetByteLength(chunk);
351-
if (totalBytes > limit) {
352-
throw new ERR_OUT_OF_RANGE('totalBytes', `<= ${limit}`, totalBytes);
353-
}
354-
}
355-
ArrayPrototypePush(chunks, chunk);
356-
}
357-
}
358-
} else {
359-
throw new ERR_INVALID_ARG_TYPE('source', ['AsyncIterable', 'Iterable'], source);
360-
}
361-
362-
return chunks;
293+
return collectAsync(source, options?.signal, options?.limit);
363294
}
364295

365296
// =============================================================================

0 commit comments

Comments
 (0)