From 8a7adc0c150c06c44c36db11b0754b85f9bfdbc0 Mon Sep 17 00:00:00 2001 From: Maruthan G Date: Sat, 25 Apr 2026 16:44:32 +0530 Subject: [PATCH] stream: destroy duplex on early return in Duplex.from(asyncFn) When `Duplex.from(asyncFn)` is given an `async function (gen)` that returns without consuming all of `gen`, the duplex never reached `final`, so the pipeline kept pumping writes that never completed and the upstream Readable was never destroyed. Track whether `final()` has been called via a `finalized` flag. When the user's promise resolves and `final` has not been called, call `destroyer(d)` on the duplex so the existing `eos` handler propagates destruction upstream. The async-iterable branch (`isIterable(value)`) and the rejection path are unmodified, so the regression that caused PR #55096 to be reverted by PR #56278 is preserved. Fixes: https://github.com/nodejs/node/issues/55077 Signed-off-by: Maruthan G --- lib/internal/streams/duplexify.js | 8 +++++ test/parallel/test-stream-duplex-from.js | 45 ++++++++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/lib/internal/streams/duplexify.js b/lib/internal/streams/duplexify.js index 0c6701fa271110..45fc5ae660ef25 100644 --- a/lib/internal/streams/duplexify.js +++ b/lib/internal/streams/duplexify.js @@ -103,6 +103,7 @@ module.exports = function duplexify(body, name) { const then = value?.then; if (typeof then === 'function') { let d; + let finalized = false; const promise = FunctionPrototypeCall( then, @@ -111,6 +112,12 @@ module.exports = function duplexify(body, name) { if (val != null) { throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val); } + // The async function returned without (fully) consuming the input. + // Destroy the duplex so that pipeline propagates destruction + // upstream. See https://github.com/nodejs/node/issues/55077. + if (!finalized) { + destroyer(d); + } }, (err) => { destroyer(d, err); @@ -123,6 +130,7 @@ module.exports = function duplexify(body, name) { readable: false, write, final(cb) { + finalized = true; final(async () => { try { await promise; diff --git a/test/parallel/test-stream-duplex-from.js b/test/parallel/test-stream-duplex-from.js index e12599fed17c14..b353d089a0c712 100644 --- a/test/parallel/test-stream-duplex-from.js +++ b/test/parallel/test-stream-duplex-from.js @@ -418,3 +418,48 @@ function makeATestWritableStream(writeFunc) { })); r.destroy(expectedErr); } + +// Regression tests for https://github.com/nodejs/node/issues/55077 +// When `Duplex.from(asyncFn)` is used and the async function returns +// without (fully) consuming its async-iterable parameter, the upstream +// readable should still be destroyed by the pipeline so resources are +// released. +{ + const r = Readable.from(['foo', 'bar', 'baz']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + asyncGenerator.return(); + }), + common.mustCall(() => { + assert.strictEqual(r.destroyed, true); + }), + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'baz']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + // eslint-disable-next-line no-unused-vars + for await (const _ of asyncGenerator) { break; } + }), + common.mustCall(() => { + assert.strictEqual(r.destroyed, true); + }), + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'baz']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + // Resolve immediately without touching the generator at all. + }), + common.mustCall(() => { + assert.strictEqual(r.destroyed, true); + }), + ); +}