Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions doc/api/diagnostics_channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,24 @@ Emitted when [`child_process.spawn()`][] encounters an error.

Emitted when [`process.execve()`][] is invoked.

#### Streams

> Stability: 1 - Experimental

<!-- YAML
added: REPLACEME
-->

##### Event: `'stream.web.done'`

* `stream` {ReadableStream} The stream that finished.

Emitted when a [`ReadableStream`][] has finished, either because all data has
been read or because the stream was cancelled. The event fires once per stream,
after the stream has transitioned to the `'closed'` state. It is emitted for
both the async iterator (`for await (...)`) and the direct reader
(`reader.read()`, including BYOB readers) consumption paths.

#### Web Locks

> Stability: 1 - Experimental
Expand Down Expand Up @@ -1917,6 +1935,7 @@ Emitted when a new thread is created.
[TracingChannel Channels]: #tracingchannel-channels
[`'uncaughtException'`]: process.md#event-uncaughtexception
[`BoundedChannel`]: #class-boundedchannel
[`ReadableStream`]: webstreams.md#class-readablestream
[`TracingChannel`]: #class-tracingchannel
[`asyncEnd` event]: #asyncendevent
[`asyncStart` event]: #asyncstartevent
Expand Down
45 changes: 42 additions & 3 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ const { Buffer } = require('buffer');

const assert = require('internal/assert');

let streamDoneChannel;
function getStreamDoneChannel() {
if (streamDoneChannel === undefined) {
streamDoneChannel = require('diagnostics_channel').channel('stream.web.done');
}
return streamDoneChannel;
}

const kCancel = Symbol('kCancel');
const kClose = Symbol('kClose');
const kChunk = Symbol('kChunk');
Expand Down Expand Up @@ -463,6 +471,9 @@ class ReadableStream {
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
const preventCancel = !!(options?.preventCancel);

const stream = this;
const channel = getStreamDoneChannel();

// eslint-disable-next-line no-use-before-define
const reader = new ReadableStreamDefaultReader(this);

Expand Down Expand Up @@ -490,8 +501,11 @@ class ReadableStream {
}
const promise = PromiseWithResolvers();

// eslint-disable-next-line no-use-before-define
readableStreamDefaultReaderRead(reader, new ReadableStreamAsyncIteratorReadRequest(reader, state, promise));
readableStreamDefaultReaderRead(
reader,
// eslint-disable-next-line no-use-before-define
new ReadableStreamAsyncIteratorReadRequest(reader, state, promise, stream, channel),
);
return promise.promise;
}

Expand All @@ -509,10 +523,16 @@ class ReadableStream {
const result = readableStreamReaderGenericCancel(reader, value);
readableStreamReaderGenericRelease(reader);
await result;
if (channel.hasSubscribers) {
channel.publish({ stream });
}
return { done: true, value }; // eslint-disable-line node-core/avoid-prototype-pollution
}

readableStreamReaderGenericRelease(reader);
if (channel.hasSubscribers) {
channel.publish({ stream });
}
return { done: true, value }; // eslint-disable-line node-core/avoid-prototype-pollution
}

Expand Down Expand Up @@ -760,10 +780,12 @@ function createReadableStreamBYOBRequest(controller, view) {
}

class ReadableStreamAsyncIteratorReadRequest {
constructor(reader, state, promise) {
constructor(reader, state, promise, stream, channel) {
this.reader = reader;
this.state = state;
this.promise = promise;
this.stream = stream;
this.channel = channel;
}

[kChunk](chunk) {
Expand All @@ -775,6 +797,9 @@ class ReadableStreamAsyncIteratorReadRequest {
this.state.current = undefined;
this.state.done = true;
readableStreamReaderGenericRelease(this.reader);
if (this.channel.hasSubscribers) {
this.channel.publish({ stream: this.stream });
}
this.promise.resolve({ done: true, value: undefined });
}

Expand Down Expand Up @@ -888,6 +913,13 @@ class ReadableStreamDefaultReader {
// Slow path: create request and go through normal flow
const readRequest = new DefaultReadRequest();
readableStreamDefaultReaderRead(this, readRequest);
if (getStreamDoneChannel().hasSubscribers) {
const stream = this[kState].stream;
PromisePrototypeThen(readRequest.promise, ({ done }) => {
if (done)
getStreamDoneChannel().publish({ stream });
});
}
return readRequest.promise;
}

Expand Down Expand Up @@ -1028,6 +1060,13 @@ class ReadableStreamBYOBReader {
}
const readIntoRequest = new ReadIntoRequest();
readableStreamBYOBReaderRead(this, view, min, readIntoRequest);
if (getStreamDoneChannel().hasSubscribers) {
const stream = this[kState].stream;
PromisePrototypeThen(readIntoRequest.promise, ({ done }) => {
if (done)
getStreamDoneChannel().publish({ stream });
});
}
return readIntoRequest.promise;
}

Expand Down
70 changes: 70 additions & 0 deletions test/parallel/test-whatwg-webstreams-dc-events.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Flags: --expose-internals
import * as common from '../common/index.mjs';
import assert from 'assert';

import util from 'internal/webstreams/util';

import { Readable } from 'stream';

import * as dc from 'diagnostics_channel';

{
const readable = Readable.toWeb(Readable.from([1]));

const channel = dc.channel('stream.web.done');
const subscriber = common.mustCall(({ stream }) => {
assert.strictEqual(readable, stream);
assert.strictEqual(readable[util.kState].state, 'closed');
});
channel.subscribe(subscriber);

const reader = readable.getReader();
let result;

while (!result?.done) {
result = await reader.read();
}

channel.unsubscribe(subscriber);
}

{
const readable = Readable.toWeb(Readable.from([1]));

const channel = dc.channel('stream.web.done');
const subscriber = common.mustCall(({ stream }) => {
assert.strictEqual(readable, stream);
assert.strictEqual(readable[util.kState].state, 'closed');
});
channel.subscribe(subscriber);

// eslint-disable-next-line no-unused-vars
for await (const _ of readable) { /* drain */ }

channel.unsubscribe(subscriber);
}

{
const readable = new ReadableStream({
type: 'bytes',
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.close();
},
});

const channel = dc.channel('stream.web.done');
const subscriber = common.mustCall(({ stream }) => {
assert.strictEqual(readable, stream);
assert.strictEqual(readable[util.kState].state, 'closed');
});
channel.subscribe(subscriber);

const reader = readable.getReader({ mode: 'byob' });
let result;
while (!result?.done) {
result = await reader.read(new Uint8Array(8));
}

channel.unsubscribe(subscriber);
}
Loading