Skip to content

Commit 371dbe9

Browse files
committed
stream: add stream/iter benchmarks
1 parent 70de86d commit 371dbe9

7 files changed

Lines changed: 848 additions & 0 deletions

File tree

benchmark/streams/iter/creation.js

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Object creation overhead benchmark.
2+
// Measures the cost of constructing stream infrastructure (no data flow).
3+
'use strict';
4+
5+
const common = require('../../common.js');
6+
const { Readable, Writable, Transform, PassThrough } = require('stream');
7+
8+
const bench = common.createBenchmark(main, {
9+
api: ['classic', 'webstream', 'iter'],
10+
type: ['readable', 'writable', 'transform', 'pair'],
11+
n: [1e5],
12+
}, {
13+
flags: ['--experimental-stream-iter'],
14+
// Iter has no standalone Transform class; transforms are plain functions.
15+
combinationFilter: ({ api, type }) =>
16+
!(api === 'iter' && type === 'transform'),
17+
});
18+
19+
function main({ api, type, n }) {
20+
switch (api) {
21+
case 'classic':
22+
return benchClassic(type, n);
23+
case 'webstream':
24+
return benchWebStream(type, n);
25+
case 'iter':
26+
return benchIter(type, n);
27+
}
28+
}
29+
30+
function benchClassic(type, n) {
31+
bench.start();
32+
switch (type) {
33+
case 'readable':
34+
for (let i = 0; i < n; i++) new Readable({ read() {} });
35+
break;
36+
case 'writable':
37+
for (let i = 0; i < n; i++) new Writable({ write(c, e, cb) { cb(); } });
38+
break;
39+
case 'transform':
40+
for (let i = 0; i < n; i++) new Transform({
41+
transform(c, e, cb) { cb(null, c); },
42+
});
43+
break;
44+
case 'pair':
45+
for (let i = 0; i < n; i++) new PassThrough();
46+
break;
47+
}
48+
bench.end(n);
49+
}
50+
51+
function benchWebStream(type, n) {
52+
bench.start();
53+
switch (type) {
54+
case 'readable':
55+
for (let i = 0; i < n; i++) new ReadableStream({ pull() {} });
56+
break;
57+
case 'writable':
58+
for (let i = 0; i < n; i++) new WritableStream({ write() {} });
59+
break;
60+
case 'transform':
61+
for (let i = 0; i < n; i++) new TransformStream({
62+
transform(c, controller) { controller.enqueue(c); },
63+
});
64+
break;
65+
case 'pair': {
66+
// TransformStream gives a readable+writable pair
67+
for (let i = 0; i < n; i++) new TransformStream();
68+
break;
69+
}
70+
}
71+
bench.end(n);
72+
}
73+
74+
function benchIter(type, n) {
75+
const { push, from, duplex } = require('stream/iter');
76+
77+
bench.start();
78+
switch (type) {
79+
case 'readable':
80+
for (let i = 0; i < n; i++) from('x');
81+
break;
82+
case 'writable':
83+
// push() creates a writer+readable pair
84+
for (let i = 0; i < n; i++) push();
85+
break;
86+
case 'pair':
87+
// duplex() creates a bidirectional channel pair
88+
for (let i = 0; i < n; i++) duplex();
89+
break;
90+
}
91+
bench.end(n);
92+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// File reading throughput benchmark.
2+
// Reads a real file through the three stream APIs.
3+
'use strict';
4+
5+
const common = require('../../common.js');
6+
const fs = require('fs');
7+
const { Writable, pipeline } = require('stream');
8+
const tmpdir = require('../../../test/common/tmpdir');
9+
10+
tmpdir.refresh();
11+
const filename = tmpdir.resolve(`.removeme-bench-file-read-${process.pid}`);
12+
13+
const bench = common.createBenchmark(main, {
14+
api: ['classic', 'webstream', 'iter'],
15+
filesize: [1024 * 1024, 16 * 1024 * 1024, 64 * 1024 * 1024],
16+
n: [5],
17+
}, {
18+
flags: ['--experimental-stream-iter'],
19+
});
20+
21+
function main({ api, filesize, n }) {
22+
// Create fixture file
23+
const chunk = Buffer.alloc(Math.min(filesize, 64 * 1024), 'abcdefghij');
24+
const fd = fs.openSync(filename, 'w');
25+
let remaining = filesize;
26+
while (remaining > 0) {
27+
const size = Math.min(remaining, chunk.length);
28+
fs.writeSync(fd, chunk, 0, size);
29+
remaining -= size;
30+
}
31+
fs.closeSync(fd);
32+
33+
const totalOps = (filesize * n) / (1024 * 1024);
34+
35+
switch (api) {
36+
case 'classic':
37+
return benchClassic(filesize, n, totalOps);
38+
case 'webstream':
39+
return benchWebStream(filesize, n, totalOps);
40+
case 'iter':
41+
return benchIter(filesize, n, totalOps);
42+
}
43+
}
44+
45+
function benchClassic(filesize, n, totalOps) {
46+
function run(cb) {
47+
const r = fs.createReadStream(filename);
48+
const w = new Writable({ write(data, enc, cb) { cb(); } });
49+
pipeline(r, w, cb);
50+
}
51+
52+
// Warmup
53+
run(() => {
54+
let i = 0;
55+
bench.start();
56+
(function next() {
57+
if (i++ >= n) {
58+
fs.unlinkSync(filename);
59+
return bench.end(totalOps);
60+
}
61+
run(next);
62+
})();
63+
});
64+
}
65+
66+
function benchWebStream(filesize, n, totalOps) {
67+
const fsp = require('fs/promises');
68+
69+
async function run() {
70+
const fh = await fsp.open(filename, 'r');
71+
const rs = fh.readableWebStream();
72+
const ws = new WritableStream({ write() {} });
73+
await rs.pipeTo(ws);
74+
await fh.close();
75+
}
76+
77+
(async () => {
78+
// Warmup
79+
await run();
80+
81+
bench.start();
82+
for (let i = 0; i < n; i++) await run();
83+
fs.unlinkSync(filename);
84+
bench.end(totalOps);
85+
})();
86+
}
87+
88+
function benchIter(filesize, n, totalOps) {
89+
const fsp = require('fs/promises');
90+
const { pipeTo } = require('stream/iter');
91+
92+
async function run() {
93+
const fh = await fsp.open(filename, 'r');
94+
await pipeTo(fh.pull(), { write() {} });
95+
await fh.close();
96+
}
97+
98+
(async () => {
99+
// Warmup
100+
await run();
101+
102+
bench.start();
103+
for (let i = 0; i < n; i++) await run();
104+
fs.unlinkSync(filename);
105+
bench.end(totalOps);
106+
})();
107+
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// Throughput benchmark: fan-out data to N consumers simultaneously.
2+
// Classic streams use PassThrough + pipe, Web Streams use tee() chains,
3+
// stream/iter uses broadcast() with push() consumers.
4+
'use strict';
5+
6+
const common = require('../../common.js');
7+
const { PassThrough, Writable } = require('stream');
8+
9+
const bench = common.createBenchmark(main, {
10+
api: ['classic', 'webstream', 'iter'],
11+
consumers: [1, 2, 4],
12+
datasize: [1024 * 1024, 16 * 1024 * 1024],
13+
n: [5],
14+
}, {
15+
flags: ['--experimental-stream-iter'],
16+
});
17+
18+
const CHUNK_SIZE = 64 * 1024;
19+
20+
function main({ api, consumers, datasize, n }) {
21+
const chunk = Buffer.alloc(CHUNK_SIZE, 'abcdefghij');
22+
const totalOps = (datasize * n) / (1024 * 1024);
23+
24+
switch (api) {
25+
case 'classic':
26+
return benchClassic(chunk, consumers, datasize, n, totalOps);
27+
case 'webstream':
28+
return benchWebStream(chunk, consumers, datasize, n, totalOps);
29+
case 'iter':
30+
return benchIter(chunk, consumers, datasize, n, totalOps);
31+
}
32+
}
33+
34+
function benchClassic(chunk, numConsumers, datasize, n, totalOps) {
35+
function run(cb) {
36+
const source = new PassThrough();
37+
const sinks = [];
38+
let finished = 0;
39+
40+
for (let c = 0; c < numConsumers; c++) {
41+
const w = new Writable({ write(data, enc, cb) { cb(); } });
42+
source.pipe(w);
43+
w.on('finish', () => { if (++finished === numConsumers) cb(); });
44+
sinks.push(w);
45+
}
46+
47+
let remaining = datasize;
48+
function write() {
49+
let ok = true;
50+
while (ok && remaining > 0) {
51+
const size = Math.min(remaining, chunk.length);
52+
remaining -= size;
53+
const buf = size === chunk.length ? chunk : chunk.subarray(0, size);
54+
ok = source.write(buf);
55+
}
56+
if (remaining > 0) {
57+
source.once('drain', write);
58+
} else {
59+
source.end();
60+
}
61+
}
62+
write();
63+
}
64+
65+
let i = 0;
66+
bench.start();
67+
(function next() {
68+
if (i++ >= n) return bench.end(totalOps);
69+
run(next);
70+
})();
71+
}
72+
73+
function benchWebStream(chunk, numConsumers, datasize, n, totalOps) {
74+
async function run() {
75+
let remaining = datasize;
76+
const rs = new ReadableStream({
77+
pull(controller) {
78+
if (remaining <= 0) { controller.close(); return; }
79+
const size = Math.min(remaining, chunk.length);
80+
remaining -= size;
81+
controller.enqueue(
82+
size === chunk.length ? chunk : chunk.subarray(0, size));
83+
},
84+
});
85+
86+
// Chain tee() calls to get numConsumers branches.
87+
// tee() gives 2; for 4 we tee twice at each level.
88+
const branches = [];
89+
if (numConsumers === 1) {
90+
branches.push(rs);
91+
} else {
92+
const pending = [rs];
93+
while (branches.length + pending.length < numConsumers) {
94+
const stream = pending.shift();
95+
const [a, b] = stream.tee();
96+
pending.push(a, b);
97+
}
98+
branches.push(...pending);
99+
}
100+
101+
const ws = () => new WritableStream({ write() {} });
102+
await Promise.all(branches.map((b) => b.pipeTo(ws())));
103+
}
104+
105+
(async () => {
106+
bench.start();
107+
for (let i = 0; i < n; i++) await run();
108+
bench.end(totalOps);
109+
})();
110+
}
111+
112+
function benchIter(chunk, numConsumers, datasize, n, totalOps) {
113+
const { broadcast } = require('stream/iter');
114+
115+
// No-op consumer: drain all batches without collecting
116+
async function drain(source) {
117+
// eslint-disable-next-line no-unused-vars
118+
for await (const _ of source) { /* drain */ }
119+
}
120+
121+
async function run() {
122+
const { writer, broadcast: bc } = broadcast();
123+
const consumers = [];
124+
for (let c = 0; c < numConsumers; c++) {
125+
consumers.push(drain(bc.push()));
126+
}
127+
128+
let remaining = datasize;
129+
while (remaining > 0) {
130+
const size = Math.min(remaining, chunk.length);
131+
remaining -= size;
132+
const buf = size === chunk.length ? chunk : chunk.subarray(0, size);
133+
writer.writeSync(buf);
134+
}
135+
writer.endSync();
136+
137+
await Promise.all(consumers);
138+
}
139+
140+
(async () => {
141+
bench.start();
142+
for (let i = 0; i < n; i++) await run();
143+
bench.end(totalOps);
144+
})();
145+
}

0 commit comments

Comments
 (0)