diff --git a/lib/internal/streams/duplexify.js b/lib/internal/streams/duplexify.js index 0c6701fa271110..45fc5ae660ef25 100644 --- a/lib/internal/streams/duplexify.js +++ b/lib/internal/streams/duplexify.js @@ -103,6 +103,7 @@ module.exports = function duplexify(body, name) { const then = value?.then; if (typeof then === 'function') { let d; + let finalized = false; const promise = FunctionPrototypeCall( then, @@ -111,6 +112,12 @@ module.exports = function duplexify(body, name) { if (val != null) { throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val); } + // The async function returned without (fully) consuming the input. + // Destroy the duplex so that pipeline propagates destruction + // upstream. See https://github.com/nodejs/node/issues/55077. + if (!finalized) { + destroyer(d); + } }, (err) => { destroyer(d, err); @@ -123,6 +130,7 @@ module.exports = function duplexify(body, name) { readable: false, write, final(cb) { + finalized = true; final(async () => { try { await promise; diff --git a/test/parallel/test-stream-duplex-from.js b/test/parallel/test-stream-duplex-from.js index e12599fed17c14..b353d089a0c712 100644 --- a/test/parallel/test-stream-duplex-from.js +++ b/test/parallel/test-stream-duplex-from.js @@ -418,3 +418,48 @@ function makeATestWritableStream(writeFunc) { })); r.destroy(expectedErr); } + +// Regression tests for https://github.com/nodejs/node/issues/55077 +// When `Duplex.from(asyncFn)` is used and the async function returns +// without (fully) consuming its async-iterable parameter, the upstream +// readable should still be destroyed by the pipeline so resources are +// released. +{ + const r = Readable.from(['foo', 'bar', 'baz']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + asyncGenerator.return(); + }), + common.mustCall(() => { + assert.strictEqual(r.destroyed, true); + }), + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'baz']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + // eslint-disable-next-line no-unused-vars + for await (const _ of asyncGenerator) { break; } + }), + common.mustCall(() => { + assert.strictEqual(r.destroyed, true); + }), + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'baz']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + // Resolve immediately without touching the generator at all. + }), + common.mustCall(() => { + assert.strictEqual(r.destroyed, true); + }), + ); +}