Skip to content

Commit 1af5e80

Browse files
committed
stream: performance optimizations for stream/iter pipeline and broadcast
Optimize the stream/iter implementation based on benchmark analysis comparing classic streams, web streams, and stream/iter. - Eliminate `withFlushSignalAsync`/`withFlushSignalSync` generator wrappers from the stateless transform pipeline. Stateless transforms now handle their own flush (`null`) signal internally after the for-await loop, removing an entire async generator layer per pipeline. Stateful transforms retain the wrapper since their cost is dominated by the transform operation itself (compression, encryption, etc). - Hoist writer capability checks in `pipeTo`/`pipeToSync`. Property lookups for `writeSync`/`writevSync`/`endSync`/`failSync` are done once before the loop instead of per-chunk via optional chaining. Split signal/no-signal loops to avoid per-batch null checks. Added `writevSync` batch write support to `pipeToSync`. - Optimize `isUint8ArrayBatch` with single-element fast path and plain for loop. Replaces `ArrayPrototypeEvery` (function call per element) with direct indexed loop. Short-circuits on length 1 (most common from transforms) and checks first/last before iterating middle. - Make broadcast consumer `next()`/`return()`/`throw()` non-async. Returns `PromiseResolve()` directly on the fast path (data in buffer) instead of wrapping through async function machinery. Caches the done result. - `RingBuffer`: replace modulo with bitwise AND. Capacity is always a power of 2, so index computation uses & mask instead of % capacity. - `Broadcast`: incremental min-cursor tracking. Replaces O(N) full scan of all consumers on every `next()` call with a cached min cursor that is only recomputed when dirty (consumer at the minimum advances or detaches). Eliminates O(N^2) per-write-cycle scaling. - `Broadcast`: separate waiters list for `notifyConsumers`. Only iterates consumers with pending resolve callbacks instead of scanning all consumers on every write. - `concatBytes`: cache per-chunk byte lengths to avoid reading `byteLength` twice per chunk (once for total, once for offset advance). Remove dead `totalByteLength` function. Benchmark results (MB/s, higher is better): | Benchmark | classic | webstream | iter | iter-sync | iter vs classic | | ---------------- | ------- | --------- | ------ | --------- | --------------- | | Identity 1MB | 1,245 | 582 | **3,110** | 16,658 | 2.5x | | Identity 64MB | 31,410 | 14,980 | **33,894** | 62,111 | 1.1x | | Transform 1MB | 287 | 227 | **325** | 327 | 1.1x | | Transform 64MB | 595 | 605 | **605** | 573 | 1.0x | | Compression 1MB | **123** | 98 | 110 | -- | 0.9x | | Compression 64MB | **329** | 303 | 308 | -- | 0.9x | | pipeTo 1MB | 1,137 | 494 | **2,740** | 13,611 | 2.4x | | pipeTo 64MB | 22,081 | 15,377 | **30,036** | 60,976 | 1.4x | | Broadcast 1c 1MB | 1,365 | 521 | **1,991** | -- | 1.5x | | Broadcast 2c 1MB | 1,285 | 439 | **1,962** | -- | 1.5x | | Broadcast 4c 1MB | **1,217** | 322 | 750 | -- | 0.6x | | File read 16MB | 1,469 | 537 | **1,639** | -- | 1.1x | The creation benchmarks show the raw cost of constructing the various objects without any other activity. The `classic` Node.js streams are faster here simply because they do less work on actual creation. | Creation (ops/sec) | classic | webstream | iter | iter vs classic | | ------------------ | --------- | --------- | --------- | --------------- | | readable | 8,662,361 | 505,889 | 1,144,385 | 0.1x | | writable | 3,856,139 | 269,950 | 1,285,210 | 0.3x | | pair | 3,120,224 | 141,988 | 349,176 | 0.1x |
1 parent 371dbe9 commit 1af5e80

5 files changed

Lines changed: 196 additions & 152 deletions

File tree

lib/internal/streams/iter/broadcast.js

Lines changed: 62 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const {
1515
MathMin,
1616
NumberMAX_SAFE_INTEGER,
1717
PromisePrototypeThen,
18+
PromiseReject,
1819
PromiseResolve,
1920
PromiseWithResolvers,
2021
SafeSet,
@@ -111,11 +112,14 @@ class BroadcastImpl {
111112
#buffer = new RingBuffer();
112113
#bufferStart = 0;
113114
#consumers = new SafeSet();
115+
#waiters = []; // Consumers with pending resolve (subset of #consumers)
114116
#ended = false;
115117
#error = null;
116118
#cancelled = false;
117119
#options;
118120
#writer = null;
121+
#cachedMinCursor = 0;
122+
#minCursorDirty = false;
119123

120124
constructor(options) {
121125
this.#options = options;
@@ -166,62 +170,76 @@ class BroadcastImpl {
166170
};
167171

168172
this.#consumers.add(state);
173+
// New consumer starts at the latest position; min cursor unchanged
174+
// unless this is the first consumer.
175+
if (this.#consumers.size === 1) {
176+
this.#cachedMinCursor = state.cursor;
177+
this.#minCursorDirty = false;
178+
}
169179
const self = this;
170180

181+
const kDone = PromiseResolve(
182+
{ __proto__: null, done: true, value: undefined });
183+
184+
function detach() {
185+
state.detached = true;
186+
state.resolve = null;
187+
state.reject = null;
188+
self.#consumers.delete(state);
189+
self.#minCursorDirty = true;
190+
self.#tryTrimBuffer();
191+
}
192+
171193
return {
172194
__proto__: null,
173195
[SymbolAsyncIterator]() {
174196
return {
175197
__proto__: null,
176-
async next() {
198+
next() {
177199
if (state.detached) {
178-
// If detached due to an error, throw the error
179-
if (self.#error) throw self.#error;
180-
return { __proto__: null, done: true, value: undefined };
200+
if (self.#error) return PromiseReject(self.#error);
201+
return kDone;
181202
}
182203

183204
const bufferIndex = state.cursor - self.#bufferStart;
184205
if (bufferIndex < self.#buffer.length) {
185206
const chunk = self.#buffer.get(bufferIndex);
207+
// If this consumer was at the min cursor, mark dirty
208+
if (state.cursor <= self.#cachedMinCursor) {
209+
self.#minCursorDirty = true;
210+
}
186211
state.cursor++;
187212
self.#tryTrimBuffer();
188-
return { __proto__: null, done: false, value: chunk };
213+
return PromiseResolve(
214+
{ __proto__: null, done: false, value: chunk });
189215
}
190216

191217
if (self.#error) {
192218
state.detached = true;
193219
self.#consumers.delete(state);
194-
throw self.#error;
220+
return PromiseReject(self.#error);
195221
}
196222

197223
if (self.#ended || self.#cancelled) {
198-
state.detached = true;
199-
self.#consumers.delete(state);
200-
return { __proto__: null, done: true, value: undefined };
224+
detach();
225+
return kDone;
201226
}
202227

203228
const { promise, resolve, reject } = PromiseWithResolvers();
204229
state.resolve = resolve;
205230
state.reject = reject;
231+
ArrayPrototypePush(self.#waiters, state);
206232
return promise;
207233
},
208234

209-
async return() {
210-
state.detached = true;
211-
state.resolve = null;
212-
state.reject = null;
213-
self.#consumers.delete(state);
214-
self.#tryTrimBuffer();
215-
return { __proto__: null, done: true, value: undefined };
235+
return() {
236+
detach();
237+
return kDone;
216238
},
217239

218-
async throw() {
219-
state.detached = true;
220-
state.resolve = null;
221-
state.reject = null;
222-
self.#consumers.delete(state);
223-
self.#tryTrimBuffer();
224-
return { __proto__: null, done: true, value: undefined };
240+
throw() {
241+
detach();
242+
return kDone;
225243
},
226244
};
227245
},
@@ -342,23 +360,26 @@ class BroadcastImpl {
342360

343361
// Private methods
344362

345-
#getMinCursor() {
363+
#recomputeMinCursor() {
346364
let min = Infinity;
347365
for (const consumer of this.#consumers) {
348366
if (consumer.cursor < min) {
349367
min = consumer.cursor;
350368
}
351369
}
352-
return min === Infinity ?
370+
this.#cachedMinCursor = min === Infinity ?
353371
this.#bufferStart + this.#buffer.length : min;
372+
this.#minCursorDirty = false;
354373
}
355374

356375
#tryTrimBuffer() {
357-
const minCursor = this.#getMinCursor();
358-
const trimCount = minCursor - this.#bufferStart;
376+
if (this.#minCursorDirty) {
377+
this.#recomputeMinCursor();
378+
}
379+
const trimCount = this.#cachedMinCursor - this.#bufferStart;
359380
if (trimCount > 0) {
360381
this.#buffer.trimFront(trimCount);
361-
this.#bufferStart = minCursor;
382+
this.#bufferStart = this.#cachedMinCursor;
362383

363384
if (this[kOnBufferDrained] &&
364385
this.#buffer.length < this.#options.highWaterMark) {
@@ -368,16 +389,28 @@ class BroadcastImpl {
368389
}
369390

370391
#notifyConsumers() {
371-
for (const consumer of this.#consumers) {
392+
const waiters = this.#waiters;
393+
if (waiters.length === 0) return;
394+
// Swap out the waiters list so consumers that re-wait during
395+
// resolve don't get processed twice in this cycle.
396+
this.#waiters = [];
397+
for (let i = 0; i < waiters.length; i++) {
398+
const consumer = waiters[i];
372399
if (consumer.resolve) {
373400
const bufferIndex = consumer.cursor - this.#bufferStart;
374401
if (bufferIndex < this.#buffer.length) {
375402
const chunk = this.#buffer.get(bufferIndex);
403+
if (consumer.cursor <= this.#cachedMinCursor) {
404+
this.#minCursorDirty = true;
405+
}
376406
consumer.cursor++;
377407
const resolve = consumer.resolve;
378408
consumer.resolve = null;
379409
consumer.reject = null;
380410
resolve({ __proto__: null, done: false, value: chunk });
411+
} else {
412+
// Still waiting -- put back
413+
ArrayPrototypePush(this.#waiters, consumer);
381414
}
382415
}
383416
}

lib/internal/streams/iter/from.js

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,17 @@ function* normalizeSyncValue(value) {
255255
*/
256256
function isUint8ArrayBatch(value) {
257257
if (!ArrayIsArray(value)) return false;
258-
if (value.length === 0) return true;
259-
return ArrayPrototypeEvery(value, isUint8Array);
258+
const len = value.length;
259+
if (len === 0) return true;
260+
// Fast path: single-element batch (most common from transforms)
261+
if (len === 1) return isUint8Array(value[0]);
262+
// Check first and last before iterating all elements
263+
if (!isUint8Array(value[0]) || !isUint8Array(value[len - 1])) return false;
264+
if (len === 2) return true;
265+
for (let i = 1; i < len - 1; i++) {
266+
if (!isUint8Array(value[i])) return false;
267+
}
268+
return true;
260269
}
261270

262271
/**

0 commit comments

Comments
 (0)