Skip to content

Commit 6434eb7

Browse files
committed
stream: cover synchronous Readable.toWeb eos path
1 parent 3d5bfbb commit 6434eb7

3 files changed

Lines changed: 33 additions & 6 deletions

File tree

lib/internal/streams/end-of-stream.js

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ function getEosOnCloseError(stream, readable, readableFinished, writable, writab
107107
return null;
108108
}
109109

110+
// Internal only: if eos() can settle immediately, invoke the callback before
111+
// returning cleanup. Callers must tolerate cleanup yet to be assigned.
110112
const kEosNodeSyncronousCallback = Symbol('kEosNodeSynchronousCallback');
111113

112114
function eos(stream, options, callback) {
@@ -199,11 +201,10 @@ function eos(stream, options, callback) {
199201
};
200202
if (immediateResult !== undefined) {
201203
if (options.error !== false) {
202-
const onerror = () => {};
203-
stream.on('error', onerror);
204+
stream.on('error', nop);
204205
cleanup = () => {
205206
callback = nop;
206-
stream.removeListener('error', onerror);
207+
stream.removeListener('error', nop);
207208
};
208209
}
209210
returnImmediately(immediateResult);

lib/internal/webstreams/adapters.js

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,8 @@ function newStreamWritableFromWritableStream(writableStream, options = kEmptyObj
454454
return writable;
455455
}
456456

457+
const kErrorSentinelAttached = Symbol('kErrorSentinelAttached');
458+
457459
/**
458460
* @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy
459461
* @param {Readable} streamReadable
@@ -521,9 +523,12 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
521523
// If eos calls the callback synchronously, cleanup is still a no-op here.
522524
cleanup();
523525

524-
// This is a protection against non-standard, legacy streams
525-
// that happen to emit an error event again after finished is called.
526-
streamReadable.on('error', noop);
526+
if (!(kErrorSentinelAttached in streamReadable)) {
527+
// This is a protection against non-standard, legacy streams
528+
// that happen to emit an error event again after finished is called.
529+
streamReadable.on('error', noop);
530+
streamReadable[kErrorSentinelAttached] = true;
531+
}
527532
if (wasCanceled) {
528533
return;
529534
}

test/parallel/test-whatwg-webstreams-adapters-to-readablestream.js

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,27 @@ const {
286286
);
287287
}
288288

289+
{
290+
const readable = new PassThrough();
291+
readable.end();
292+
readable.destroy();
293+
294+
(async () => {
295+
await new Promise((resolve) => readable.once('close', resolve));
296+
assert.strictEqual(readable.listenerCount('error'), 0);
297+
298+
const readableStream = newReadableStreamFromStreamReadable(readable);
299+
// Only one error listener from the adapter should be added
300+
assert.strictEqual(readable.listenerCount('error'), 1);
301+
newReadableStreamFromStreamReadable(readable);
302+
// No duplicate listeners should be added.
303+
assert.strictEqual(readable.listenerCount('error'), 1);
304+
305+
const readResult = await readableStream.getReader().read();
306+
assert.deepStrictEqual(readResult, { value: undefined, done: true });
307+
})().then(common.mustCall());
308+
}
309+
289310
{
290311
const duplex = new Duplex({ readable: false });
291312
duplex.destroy();

0 commit comments

Comments
 (0)