Skip to content

Commit 9e62322

Browse files
committed
stream: add diagnostics_channel event for completion
1 parent a2d86f6 commit 9e62322

3 files changed

Lines changed: 85 additions & 14 deletions

File tree

lib/internal/webstreams/readablestream.js

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,12 @@ const {
129129

130130
const assert = require('internal/assert');
131131

132+
const dc = require('diagnostics_channel');
133+
let streamDoneChannel;
134+
function getStreamDoneChannel () {
135+
return streamDoneChannel ||= dc.channel('stream.web.done');
136+
}
137+
132138
const kCancel = Symbol('kCancel');
133139
const kClose = Symbol('kClose');
134140
const kChunk = Symbol('kChunk');
@@ -424,6 +430,9 @@ class ReadableStream {
424430
preventCancel = false,
425431
} = options;
426432

433+
const stream = this;
434+
const channel = getStreamDoneChannel();
435+
427436
const reader = new ReadableStreamDefaultReader(this);
428437
let done = false;
429438
let started = false;
@@ -455,6 +464,9 @@ class ReadableStream {
455464
current = undefined;
456465
done = true;
457466
readableStreamReaderGenericRelease(reader);
467+
if (channel.hasSubscribers) {
468+
channel.publish({ stream });
469+
}
458470
promise.resolve({ done: true, value: undefined });
459471
},
460472
[kError](error) {
@@ -468,23 +480,25 @@ class ReadableStream {
468480
}
469481

470482
async function returnSteps(value) {
471-
if (done)
472-
return { done: true, value };
473-
done = true;
483+
if (!done) {
484+
done = true;
474485

475-
if (reader[kState].stream === undefined) {
476-
throw new ERR_INVALID_STATE.TypeError(
477-
'The reader is not bound to a ReadableStream');
486+
if (reader[kState].stream === undefined) {
487+
throw new ERR_INVALID_STATE.TypeError(
488+
'The reader is not bound to a ReadableStream');
489+
}
490+
assert(!reader[kState].readRequests.length);
491+
if (!preventCancel) {
492+
const result = readableStreamReaderGenericCancel(reader, value);
493+
readableStreamReaderGenericRelease(reader);
494+
await result;
495+
} else {
496+
readableStreamReaderGenericRelease(reader);
497+
}
478498
}
479-
assert(!reader[kState].readRequests.length);
480-
if (!preventCancel) {
481-
const result = readableStreamReaderGenericCancel(reader, value);
482-
readableStreamReaderGenericRelease(reader);
483-
await result;
484-
return { done: true, value };
499+
if (channel.hasSubscribers) {
500+
channel.publish({ stream });
485501
}
486-
487-
readableStreamReaderGenericRelease(reader);
488502
return { done: true, value };
489503
}
490504

@@ -789,6 +803,12 @@ class ReadableStreamDefaultReader {
789803
}
790804
const readRequest = new DefaultReadRequest();
791805
readableStreamDefaultReaderRead(this, readRequest);
806+
if (getStreamDoneChannel().hasSubscribers) {
807+
readRequest.promise.then(({ done }) => {
808+
if (done)
809+
getStreamDoneChannel().publish({ stream: this[kState].stream });
810+
})
811+
}
792812
return readRequest.promise;
793813
}
794814

@@ -906,6 +926,12 @@ class ReadableStreamBYOBReader {
906926
}
907927
const readIntoRequest = new ReadIntoRequest();
908928
readableStreamBYOBReaderRead(this, view, readIntoRequest);
929+
if (getStreamDoneChannel().hasSubscribers) {
930+
readIntoRequest.promise.then(({ done }) => {
931+
if (done)
932+
getStreamDoneChannel().publish({ stream: this[kState].stream });
933+
})
934+
}
909935
return readIntoRequest.promise;
910936
}
911937

test/parallel/test-bootstrap-modules.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ const expectedModules = new Set([
4444
'Internal Binding v8',
4545
'Internal Binding worker',
4646
'NativeModule buffer',
47+
'NativeModule diagnostics_channel',
4748
'NativeModule events',
4849
'NativeModule fs',
4950
'NativeModule internal/abort_controller',
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Flags: --expose-internals
2+
import * as common from '../common/index.mjs';
3+
import assert from 'assert';
4+
5+
import util from 'internal/webstreams/util';
6+
7+
import { Readable } from 'stream';
8+
9+
import * as dc from 'diagnostics_channel';
10+
11+
{
12+
const readable = Readable.toWeb(Readable.from([1]));
13+
14+
const channel = dc.channel('stream.web.done');
15+
const subscriber = common.mustCall(({ stream }) => {
16+
assert.strictEqual(readable, stream);
17+
assert.strictEqual(readable[util.kState].state, 'closed');
18+
});
19+
channel.subscribe(subscriber);
20+
21+
const reader = readable.getReader();
22+
let result;
23+
24+
while (!result?.done) {
25+
result = await reader.read();
26+
}
27+
28+
channel.unsubscribe(subscriber);
29+
}
30+
31+
{
32+
const readable = Readable.toWeb(Readable.from([1]));
33+
34+
const channel = dc.channel('stream.web.done');
35+
const subscriber = common.mustCall(({ stream }) => {
36+
assert.strictEqual(readable, stream);
37+
assert.strictEqual(readable[util.kState].state, 'closed');
38+
});
39+
channel.subscribe(subscriber);
40+
41+
for await (const _ of readable) {}
42+
43+
channel.unsubscribe(subscriber);
44+
}

0 commit comments

Comments
 (0)