From d3a303826aee3c73dbf1be790c28a43c8b5fd15e Mon Sep 17 00:00:00 2001 From: Bryan English Date: Thu, 21 Apr 2022 18:03:10 -0400 Subject: [PATCH 1/3] stream: add diagnostics_channel event for completion Signed-off-by: Bryan English --- lib/internal/webstreams/readablestream.js | 40 ++++++++++++++++- .../test-whatwg-webstreams-dc-events.mjs | 44 +++++++++++++++++++ 2 files changed, 82 insertions(+), 2 deletions(-) create mode 100644 test/parallel/test-whatwg-webstreams-dc-events.mjs diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 597135778a1e0f..36038ac456667f 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -136,6 +136,14 @@ const { Buffer } = require('buffer'); const assert = require('internal/assert'); +let streamDoneChannel; +function getStreamDoneChannel() { + if (streamDoneChannel === undefined) { + streamDoneChannel = require('diagnostics_channel').channel('stream.web.done'); + } + return streamDoneChannel; +} + const kCancel = Symbol('kCancel'); const kClose = Symbol('kClose'); const kChunk = Symbol('kChunk'); @@ -463,6 +471,9 @@ class ReadableStream { validateObject(options, 'options', kValidateObjectAllowObjectsAndNull); const preventCancel = !!(options?.preventCancel); + const stream = this; + const channel = getStreamDoneChannel(); + // eslint-disable-next-line no-use-before-define const reader = new ReadableStreamDefaultReader(this); @@ -491,7 +502,7 @@ class ReadableStream { const promise = PromiseWithResolvers(); // eslint-disable-next-line no-use-before-define - readableStreamDefaultReaderRead(reader, new ReadableStreamAsyncIteratorReadRequest(reader, state, promise)); + readableStreamDefaultReaderRead(reader, new ReadableStreamAsyncIteratorReadRequest(reader, state, promise, stream, channel)); return promise.promise; } @@ -509,10 +520,16 @@ class ReadableStream { const result = readableStreamReaderGenericCancel(reader, value); readableStreamReaderGenericRelease(reader); await result; + if (channel.hasSubscribers) { + channel.publish({ stream }); + } return { done: true, value }; // eslint-disable-line node-core/avoid-prototype-pollution } readableStreamReaderGenericRelease(reader); + if (channel.hasSubscribers) { + channel.publish({ stream }); + } return { done: true, value }; // eslint-disable-line node-core/avoid-prototype-pollution } @@ -760,10 +777,12 @@ function createReadableStreamBYOBRequest(controller, view) { } class ReadableStreamAsyncIteratorReadRequest { - constructor(reader, state, promise) { + constructor(reader, state, promise, stream, channel) { this.reader = reader; this.state = state; this.promise = promise; + this.stream = stream; + this.channel = channel; } [kChunk](chunk) { @@ -775,6 +794,9 @@ class ReadableStreamAsyncIteratorReadRequest { this.state.current = undefined; this.state.done = true; readableStreamReaderGenericRelease(this.reader); + if (this.channel.hasSubscribers) { + this.channel.publish({ stream: this.stream }); + } this.promise.resolve({ done: true, value: undefined }); } @@ -888,6 +910,13 @@ class ReadableStreamDefaultReader { // Slow path: create request and go through normal flow const readRequest = new DefaultReadRequest(); readableStreamDefaultReaderRead(this, readRequest); + if (getStreamDoneChannel().hasSubscribers) { + const stream = this[kState].stream; + PromisePrototypeThen(readRequest.promise, ({ done }) => { + if (done) + getStreamDoneChannel().publish({ stream }); + }); + } return readRequest.promise; } @@ -1028,6 +1057,13 @@ class ReadableStreamBYOBReader { } const readIntoRequest = new ReadIntoRequest(); readableStreamBYOBReaderRead(this, view, min, readIntoRequest); + if (getStreamDoneChannel().hasSubscribers) { + const stream = this[kState].stream; + PromisePrototypeThen(readIntoRequest.promise, ({ done }) => { + if (done) + getStreamDoneChannel().publish({ stream }); + }); + } return readIntoRequest.promise; } diff --git a/test/parallel/test-whatwg-webstreams-dc-events.mjs b/test/parallel/test-whatwg-webstreams-dc-events.mjs new file mode 100644 index 00000000000000..244fa488ab2692 --- /dev/null +++ b/test/parallel/test-whatwg-webstreams-dc-events.mjs @@ -0,0 +1,44 @@ +// Flags: --expose-internals +import * as common from '../common/index.mjs'; +import assert from 'assert'; + +import util from 'internal/webstreams/util'; + +import { Readable } from 'stream'; + +import * as dc from 'diagnostics_channel'; + +{ + const readable = Readable.toWeb(Readable.from([1])); + + const channel = dc.channel('stream.web.done'); + const subscriber = common.mustCall(({ stream }) => { + assert.strictEqual(readable, stream); + assert.strictEqual(readable[util.kState].state, 'closed'); + }); + channel.subscribe(subscriber); + + const reader = readable.getReader(); + let result; + + while (!result?.done) { + result = await reader.read(); + } + + channel.unsubscribe(subscriber); +} + +{ + const readable = Readable.toWeb(Readable.from([1])); + + const channel = dc.channel('stream.web.done'); + const subscriber = common.mustCall(({ stream }) => { + assert.strictEqual(readable, stream); + assert.strictEqual(readable[util.kState].state, 'closed'); + }); + channel.subscribe(subscriber); + + for await (const _ of readable) {} + + channel.unsubscribe(subscriber); +} From cb76bc7236f98e07b9039fc66beb5b8b51a3d491 Mon Sep 17 00:00:00 2001 From: Bryan English Date: Wed, 29 Apr 2026 15:14:06 -0400 Subject: [PATCH 2/3] fixup! stream: add diagnostics_channel event for completion Signed-off-by: Bryan English --- lib/internal/webstreams/readablestream.js | 7 +++++-- test/parallel/test-whatwg-webstreams-dc-events.mjs | 3 ++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 36038ac456667f..59a4da3cf5ba4d 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -501,8 +501,11 @@ class ReadableStream { } const promise = PromiseWithResolvers(); - // eslint-disable-next-line no-use-before-define - readableStreamDefaultReaderRead(reader, new ReadableStreamAsyncIteratorReadRequest(reader, state, promise, stream, channel)); + readableStreamDefaultReaderRead( + reader, + // eslint-disable-next-line no-use-before-define + new ReadableStreamAsyncIteratorReadRequest(reader, state, promise, stream, channel), + ); return promise.promise; } diff --git a/test/parallel/test-whatwg-webstreams-dc-events.mjs b/test/parallel/test-whatwg-webstreams-dc-events.mjs index 244fa488ab2692..39488cb9e4feb6 100644 --- a/test/parallel/test-whatwg-webstreams-dc-events.mjs +++ b/test/parallel/test-whatwg-webstreams-dc-events.mjs @@ -38,7 +38,8 @@ import * as dc from 'diagnostics_channel'; }); channel.subscribe(subscriber); - for await (const _ of readable) {} + // eslint-disable-next-line no-unused-vars + for await (const _ of readable) { /* drain */ } channel.unsubscribe(subscriber); } From f1d80b17a90a928df2055f9b28f9f80a8d79b4b1 Mon Sep 17 00:00:00 2001 From: Bryan English Date: Wed, 29 Apr 2026 16:23:55 -0400 Subject: [PATCH 3/3] fixup! stream: add diagnostics_channel event for completion Signed-off-by: Bryan English --- doc/api/diagnostics_channel.md | 19 ++++++++++++++ .../test-whatwg-webstreams-dc-events.mjs | 25 +++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/doc/api/diagnostics_channel.md b/doc/api/diagnostics_channel.md index b47f98ce64211c..919e3446c47236 100644 --- a/doc/api/diagnostics_channel.md +++ b/doc/api/diagnostics_channel.md @@ -1853,6 +1853,24 @@ Emitted when [`child_process.spawn()`][] encounters an error. Emitted when [`process.execve()`][] is invoked. +#### Streams + +> Stability: 1 - Experimental + + + +##### Event: `'stream.web.done'` + +* `stream` {ReadableStream} The stream that finished. + +Emitted when a [`ReadableStream`][] has finished, either because all data has +been read or because the stream was cancelled. The event fires once per stream, +after the stream has transitioned to the `'closed'` state. It is emitted for +both the async iterator (`for await (...)`) and the direct reader +(`reader.read()`, including BYOB readers) consumption paths. + #### Web Locks > Stability: 1 - Experimental @@ -1917,6 +1935,7 @@ Emitted when a new thread is created. [TracingChannel Channels]: #tracingchannel-channels [`'uncaughtException'`]: process.md#event-uncaughtexception [`BoundedChannel`]: #class-boundedchannel +[`ReadableStream`]: webstreams.md#class-readablestream [`TracingChannel`]: #class-tracingchannel [`asyncEnd` event]: #asyncendevent [`asyncStart` event]: #asyncstartevent diff --git a/test/parallel/test-whatwg-webstreams-dc-events.mjs b/test/parallel/test-whatwg-webstreams-dc-events.mjs index 39488cb9e4feb6..908e16c75d879f 100644 --- a/test/parallel/test-whatwg-webstreams-dc-events.mjs +++ b/test/parallel/test-whatwg-webstreams-dc-events.mjs @@ -43,3 +43,28 @@ import * as dc from 'diagnostics_channel'; channel.unsubscribe(subscriber); } + +{ + const readable = new ReadableStream({ + type: 'bytes', + start(controller) { + controller.enqueue(new Uint8Array([1, 2, 3])); + controller.close(); + }, + }); + + const channel = dc.channel('stream.web.done'); + const subscriber = common.mustCall(({ stream }) => { + assert.strictEqual(readable, stream); + assert.strictEqual(readable[util.kState].state, 'closed'); + }); + channel.subscribe(subscriber); + + const reader = readable.getReader({ mode: 'byob' }); + let result; + while (!result?.done) { + result = await reader.read(new Uint8Array(8)); + } + + channel.unsubscribe(subscriber); +}