Skip to content

Commit 70de86d

Browse files
committed
stream: expand and split stream/iter tests
1 parent 577e78f commit 70de86d

24 files changed

Lines changed: 2331 additions & 1413 deletions
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Flags: --experimental-stream-iter
2+
'use strict';
3+
4+
const common = require('../common');
5+
const assert = require('assert');
6+
const { broadcast, text } = require('stream/iter');
7+
8+
// =============================================================================
9+
// Backpressure policies
10+
// =============================================================================
11+
12+
async function testDropOldest() {
13+
const { writer, broadcast: bc } = broadcast({
14+
highWaterMark: 2,
15+
backpressure: 'drop-oldest',
16+
});
17+
const consumer = bc.push();
18+
19+
writer.writeSync('first');
20+
writer.writeSync('second');
21+
// This should drop 'first'
22+
writer.writeSync('third');
23+
writer.endSync();
24+
25+
const data = await text(consumer);
26+
assert.strictEqual(data, 'secondthird');
27+
}
28+
29+
async function testDropNewest() {
30+
const { writer, broadcast: bc } = broadcast({
31+
highWaterMark: 1,
32+
backpressure: 'drop-newest',
33+
});
34+
const consumer = bc.push();
35+
36+
writer.writeSync('kept');
37+
// This should be silently dropped
38+
writer.writeSync('dropped');
39+
writer.endSync();
40+
41+
const data = await text(consumer);
42+
assert.strictEqual(data, 'kept');
43+
}
44+
45+
// =============================================================================
46+
// Block backpressure
47+
// =============================================================================
48+
49+
async function testBlockBackpressure() {
50+
const { writer, broadcast: bc } = broadcast({
51+
highWaterMark: 1,
52+
backpressure: 'block',
53+
});
54+
const consumer = bc.push();
55+
writer.writeSync('a');
56+
57+
// Next write should block
58+
let writeResolved = false;
59+
const writePromise = writer.write('b').then(() => { writeResolved = true; });
60+
await new Promise((r) => setTimeout(r, 10));
61+
assert.strictEqual(writeResolved, false);
62+
63+
// Drain consumer
64+
const iter = consumer[Symbol.asyncIterator]();
65+
await iter.next();
66+
await new Promise((r) => setTimeout(r, 10));
67+
assert.strictEqual(writeResolved, true);
68+
writer.endSync();
69+
await writePromise;
70+
}
71+
72+
Promise.all([
73+
testDropOldest(),
74+
testDropNewest(),
75+
testBlockBackpressure(),
76+
]).then(common.mustCall());

test/parallel/test-stream-iter-broadcast.js renamed to test/parallel/test-stream-iter-broadcast-basic.js

Lines changed: 14 additions & 198 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
const common = require('../common');
55
const assert = require('assert');
6-
const { broadcast, Broadcast, from, text } = require('stream/iter');
6+
const { broadcast, text } = require('stream/iter');
77

88
// =============================================================================
99
// Basic broadcast
@@ -126,43 +126,6 @@ async function testWriterFail() {
126126
);
127127
}
128128

129-
// =============================================================================
130-
// Backpressure policies
131-
// =============================================================================
132-
133-
async function testDropOldest() {
134-
const { writer, broadcast: bc } = broadcast({
135-
highWaterMark: 2,
136-
backpressure: 'drop-oldest',
137-
});
138-
const consumer = bc.push();
139-
140-
writer.writeSync('first');
141-
writer.writeSync('second');
142-
// This should drop 'first'
143-
writer.writeSync('third');
144-
writer.endSync();
145-
146-
const data = await text(consumer);
147-
assert.strictEqual(data, 'secondthird');
148-
}
149-
150-
async function testDropNewest() {
151-
const { writer, broadcast: bc } = broadcast({
152-
highWaterMark: 1,
153-
backpressure: 'drop-newest',
154-
});
155-
const consumer = bc.push();
156-
157-
writer.writeSync('kept');
158-
// This should be silently dropped
159-
writer.writeSync('dropped');
160-
writer.endSync();
161-
162-
const data = await text(consumer);
163-
assert.strictEqual(data, 'kept');
164-
}
165-
166129
// =============================================================================
167130
// Cancel
168131
// =============================================================================
@@ -197,125 +160,6 @@ async function testCancelWithReason() {
197160
assert.strictEqual(result.message, 'cancelled');
198161
}
199162

200-
// =============================================================================
201-
// Broadcast.from
202-
// =============================================================================
203-
204-
async function testBroadcastFromAsyncIterable() {
205-
const source = from('broadcast-from');
206-
const { broadcast: bc } = Broadcast.from(source);
207-
const consumer = bc.push();
208-
209-
const data = await text(consumer);
210-
assert.strictEqual(data, 'broadcast-from');
211-
}
212-
213-
async function testBroadcastFromNonArrayChunks() {
214-
// Source that yields single Uint8Array chunks (not arrays)
215-
async function* singleChunkSource() {
216-
yield new TextEncoder().encode('hello');
217-
yield new TextEncoder().encode(' world');
218-
}
219-
const { broadcast: bc } = Broadcast.from(singleChunkSource());
220-
const consumer = bc.push();
221-
const data = await text(consumer);
222-
assert.strictEqual(data, 'hello world');
223-
}
224-
225-
async function testBroadcastFromStringChunks() {
226-
// Source that yields bare strings (not arrays)
227-
async function* stringSource() {
228-
yield 'foo';
229-
yield 'bar';
230-
}
231-
const { broadcast: bc } = Broadcast.from(stringSource());
232-
const consumer = bc.push();
233-
const data = await text(consumer);
234-
assert.strictEqual(data, 'foobar');
235-
}
236-
237-
async function testBroadcastFromMultipleConsumers() {
238-
const source = from('shared-data');
239-
const { broadcast: bc } = Broadcast.from(source);
240-
241-
const c1 = bc.push();
242-
const c2 = bc.push();
243-
244-
const [data1, data2] = await Promise.all([
245-
text(c1),
246-
text(c2),
247-
]);
248-
249-
assert.strictEqual(data1, 'shared-data');
250-
assert.strictEqual(data2, 'shared-data');
251-
}
252-
253-
// =============================================================================
254-
// AbortSignal
255-
// =============================================================================
256-
257-
async function testAbortSignal() {
258-
const ac = new AbortController();
259-
const { broadcast: bc } = broadcast({ signal: ac.signal });
260-
const consumer = bc.push();
261-
262-
ac.abort();
263-
264-
const batches = [];
265-
for await (const batch of consumer) {
266-
batches.push(batch);
267-
}
268-
assert.strictEqual(batches.length, 0);
269-
}
270-
271-
async function testAlreadyAbortedSignal() {
272-
const ac = new AbortController();
273-
ac.abort();
274-
275-
const { broadcast: bc } = broadcast({ signal: ac.signal });
276-
const consumer = bc.push();
277-
278-
const batches = [];
279-
for await (const batch of consumer) {
280-
batches.push(batch);
281-
}
282-
assert.strictEqual(batches.length, 0);
283-
}
284-
285-
// =============================================================================
286-
// Broadcast.from() hang fix - cancel while write blocked on backpressure
287-
// =============================================================================
288-
289-
async function testBroadcastFromCancelWhileBlocked() {
290-
// Create a slow async source that blocks between yields
291-
let sourceFinished = false;
292-
async function* slowSource() {
293-
yield [new TextEncoder().encode('chunk1')];
294-
// Simulate a long delay - the cancel should unblock this
295-
await new Promise((resolve) => setTimeout(resolve, 10000));
296-
yield [new TextEncoder().encode('chunk2')];
297-
sourceFinished = true;
298-
}
299-
300-
const { broadcast: bc } = Broadcast.from(slowSource());
301-
const consumer = bc.push();
302-
303-
// Read the first chunk
304-
const iter = consumer[Symbol.asyncIterator]();
305-
const first = await iter.next();
306-
assert.strictEqual(first.done, false);
307-
308-
// Cancel while the source is blocked waiting to yield the next chunk
309-
bc.cancel();
310-
311-
// The iteration should complete (not hang)
312-
const next = await iter.next();
313-
assert.strictEqual(next.done, true);
314-
315-
// Source should NOT have finished (we cancelled before chunk2)
316-
assert.strictEqual(sourceFinished, false);
317-
}
318-
319163
// =============================================================================
320164
// Writer fail detaches consumers
321165
// =============================================================================
@@ -354,37 +198,20 @@ async function testFailDetachesConsumers() {
354198
}
355199

356200
// =============================================================================
357-
// Protocol validation
201+
// Writer failSync
358202
// =============================================================================
359203

360-
function testBroadcastProtocolReturnsNull() {
361-
const obj = {
362-
[Symbol.for('Stream.broadcastProtocol')]() { return null; },
363-
};
364-
assert.throws(
365-
() => Broadcast.from(obj),
366-
{ code: 'ERR_INVALID_RETURN_VALUE' },
367-
);
368-
}
369-
370-
function testBroadcastProtocolReturnsString() {
371-
const obj = {
372-
[Symbol.for('Stream.broadcastProtocol')]() { return 'bad'; },
373-
};
374-
assert.throws(
375-
() => Broadcast.from(obj),
376-
{ code: 'ERR_INVALID_RETURN_VALUE' },
377-
);
378-
}
379-
380-
function testBroadcastProtocolReturnsUndefined() {
381-
const obj = {
382-
[Symbol.for('Stream.broadcastProtocol')]() { },
383-
};
384-
assert.throws(
385-
() => Broadcast.from(obj),
386-
{ code: 'ERR_INVALID_RETURN_VALUE' },
387-
);
204+
async function testWriterFailSync() {
205+
const { writer, broadcast: bc } = broadcast();
206+
const consumer = bc.push();
207+
writer.writeSync('hello');
208+
assert.strictEqual(writer.failSync(new Error('fail!')), true);
209+
// Second call still returns true (idempotent)
210+
assert.strictEqual(writer.failSync(new Error('fail2')), true);
211+
await assert.rejects(async () => {
212+
// eslint-disable-next-line no-unused-vars
213+
for await (const _ of consumer) { /* consume */ }
214+
}, { message: 'fail!' });
388215
}
389216

390217
Promise.all([
@@ -395,19 +222,8 @@ Promise.all([
395222
testWritevSync(),
396223
testWriterEnd(),
397224
testWriterFail(),
398-
testDropOldest(),
399-
testDropNewest(),
400225
testCancelWithoutReason(),
401226
testCancelWithReason(),
402-
testBroadcastFromAsyncIterable(),
403-
testBroadcastFromNonArrayChunks(),
404-
testBroadcastFromStringChunks(),
405-
testBroadcastFromMultipleConsumers(),
406-
testAbortSignal(),
407-
testAlreadyAbortedSignal(),
408-
testBroadcastFromCancelWhileBlocked(),
409227
testFailDetachesConsumers(),
410-
testBroadcastProtocolReturnsNull(),
411-
testBroadcastProtocolReturnsString(),
412-
testBroadcastProtocolReturnsUndefined(),
228+
testWriterFailSync(),
413229
]).then(common.mustCall());

0 commit comments

Comments
 (0)