Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions lib/internal/streams/duplexify.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ module.exports = function duplexify(body, name) {
const then = value?.then;
if (typeof then === 'function') {
let d;
let finalized = false;

const promise = FunctionPrototypeCall(
then,
Expand All @@ -111,6 +112,12 @@ module.exports = function duplexify(body, name) {
if (val != null) {
throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val);
}
// The async function returned without (fully) consuming the input.
// Destroy the duplex so that pipeline propagates destruction
// upstream. See https://github.com/nodejs/node/issues/55077.
if (!finalized) {
destroyer(d);
}
},
(err) => {
destroyer(d, err);
Expand All @@ -123,6 +130,7 @@ module.exports = function duplexify(body, name) {
readable: false,
write,
final(cb) {
finalized = true;
final(async () => {
try {
await promise;
Expand Down
45 changes: 45 additions & 0 deletions test/parallel/test-stream-duplex-from.js
Original file line number Diff line number Diff line change
Expand Up @@ -418,3 +418,48 @@ function makeATestWritableStream(writeFunc) {
}));
r.destroy(expectedErr);
}

// Regression tests for https://github.com/nodejs/node/issues/55077
// When `Duplex.from(asyncFn)` is used and the async function returns
// without (fully) consuming its async-iterable parameter, the upstream
// readable should still be destroyed by the pipeline so resources are
// released.
{
const r = Readable.from(['foo', 'bar', 'baz']);
pipeline(
r,
Duplex.from(async function(asyncGenerator) {
asyncGenerator.return();
}),
common.mustCall(() => {
assert.strictEqual(r.destroyed, true);
}),
);
}

{
const r = Readable.from(['foo', 'bar', 'baz']);
pipeline(
r,
Duplex.from(async function(asyncGenerator) {
// eslint-disable-next-line no-unused-vars
for await (const _ of asyncGenerator) { break; }
}),
common.mustCall(() => {
assert.strictEqual(r.destroyed, true);
}),
);
}

{
const r = Readable.from(['foo', 'bar', 'baz']);
pipeline(
r,
Duplex.from(async function(asyncGenerator) {
// Resolve immediately without touching the generator at all.
}),
common.mustCall(() => {
assert.strictEqual(r.destroyed, true);
}),
);
}
Loading