Skip to content

Commit 8a7adc0

Browse files
committed
stream: destroy duplex on early return in Duplex.from(asyncFn)
When `Duplex.from(asyncFn)` is given an `async function (gen)` that returns without consuming all of `gen`, the duplex never reached `final`, so the pipeline kept pumping writes that never completed and the upstream Readable was never destroyed. Track whether `final()` has been called via a `finalized` flag. When the user's promise resolves and `final` has not been called, call `destroyer(d)` on the duplex so the existing `eos` handler propagates destruction upstream. The async-iterable branch (`isIterable(value)`) and the rejection path are unmodified, so the regression that caused PR #55096 to be reverted by PR #56278 is preserved. Fixes: #55077 Signed-off-by: Maruthan G <[email protected]>
1 parent 21436f0 commit 8a7adc0

2 files changed

Lines changed: 53 additions & 0 deletions

File tree

lib/internal/streams/duplexify.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ module.exports = function duplexify(body, name) {
103103
const then = value?.then;
104104
if (typeof then === 'function') {
105105
let d;
106+
let finalized = false;
106107

107108
const promise = FunctionPrototypeCall(
108109
then,
@@ -111,6 +112,12 @@ module.exports = function duplexify(body, name) {
111112
if (val != null) {
112113
throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val);
113114
}
115+
// The async function returned without (fully) consuming the input.
116+
// Destroy the duplex so that pipeline propagates destruction
117+
// upstream. See https://github.com/nodejs/node/issues/55077.
118+
if (!finalized) {
119+
destroyer(d);
120+
}
114121
},
115122
(err) => {
116123
destroyer(d, err);
@@ -123,6 +130,7 @@ module.exports = function duplexify(body, name) {
123130
readable: false,
124131
write,
125132
final(cb) {
133+
finalized = true;
126134
final(async () => {
127135
try {
128136
await promise;

test/parallel/test-stream-duplex-from.js

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,3 +418,48 @@ function makeATestWritableStream(writeFunc) {
418418
}));
419419
r.destroy(expectedErr);
420420
}
421+
422+
// Regression tests for https://github.com/nodejs/node/issues/55077
423+
// When `Duplex.from(asyncFn)` is used and the async function returns
424+
// without (fully) consuming its async-iterable parameter, the upstream
425+
// readable should still be destroyed by the pipeline so resources are
426+
// released.
427+
{
428+
const r = Readable.from(['foo', 'bar', 'baz']);
429+
pipeline(
430+
r,
431+
Duplex.from(async function(asyncGenerator) {
432+
asyncGenerator.return();
433+
}),
434+
common.mustCall(() => {
435+
assert.strictEqual(r.destroyed, true);
436+
}),
437+
);
438+
}
439+
440+
{
441+
const r = Readable.from(['foo', 'bar', 'baz']);
442+
pipeline(
443+
r,
444+
Duplex.from(async function(asyncGenerator) {
445+
// eslint-disable-next-line no-unused-vars
446+
for await (const _ of asyncGenerator) { break; }
447+
}),
448+
common.mustCall(() => {
449+
assert.strictEqual(r.destroyed, true);
450+
}),
451+
);
452+
}
453+
454+
{
455+
const r = Readable.from(['foo', 'bar', 'baz']);
456+
pipeline(
457+
r,
458+
Duplex.from(async function(asyncGenerator) {
459+
// Resolve immediately without touching the generator at all.
460+
}),
461+
common.mustCall(() => {
462+
assert.strictEqual(r.destroyed, true);
463+
}),
464+
);
465+
}

0 commit comments

Comments
 (0)