Skip to content

Commit d3a3038

Browse files
committed
stream: add diagnostics_channel event for completion
Signed-off-by: Bryan English <[email protected]>
1 parent a962e72 commit d3a3038

2 files changed

Lines changed: 82 additions & 2 deletions

File tree

lib/internal/webstreams/readablestream.js

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,14 @@ const { Buffer } = require('buffer');
136136

137137
const assert = require('internal/assert');
138138

139+
let streamDoneChannel;
140+
function getStreamDoneChannel() {
141+
if (streamDoneChannel === undefined) {
142+
streamDoneChannel = require('diagnostics_channel').channel('stream.web.done');
143+
}
144+
return streamDoneChannel;
145+
}
146+
139147
const kCancel = Symbol('kCancel');
140148
const kClose = Symbol('kClose');
141149
const kChunk = Symbol('kChunk');
@@ -463,6 +471,9 @@ class ReadableStream {
463471
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
464472
const preventCancel = !!(options?.preventCancel);
465473

474+
const stream = this;
475+
const channel = getStreamDoneChannel();
476+
466477
// eslint-disable-next-line no-use-before-define
467478
const reader = new ReadableStreamDefaultReader(this);
468479

@@ -491,7 +502,7 @@ class ReadableStream {
491502
const promise = PromiseWithResolvers();
492503

493504
// eslint-disable-next-line no-use-before-define
494-
readableStreamDefaultReaderRead(reader, new ReadableStreamAsyncIteratorReadRequest(reader, state, promise));
505+
readableStreamDefaultReaderRead(reader, new ReadableStreamAsyncIteratorReadRequest(reader, state, promise, stream, channel));
495506
return promise.promise;
496507
}
497508

@@ -509,10 +520,16 @@ class ReadableStream {
509520
const result = readableStreamReaderGenericCancel(reader, value);
510521
readableStreamReaderGenericRelease(reader);
511522
await result;
523+
if (channel.hasSubscribers) {
524+
channel.publish({ stream });
525+
}
512526
return { done: true, value }; // eslint-disable-line node-core/avoid-prototype-pollution
513527
}
514528

515529
readableStreamReaderGenericRelease(reader);
530+
if (channel.hasSubscribers) {
531+
channel.publish({ stream });
532+
}
516533
return { done: true, value }; // eslint-disable-line node-core/avoid-prototype-pollution
517534
}
518535

@@ -760,10 +777,12 @@ function createReadableStreamBYOBRequest(controller, view) {
760777
}
761778

762779
class ReadableStreamAsyncIteratorReadRequest {
763-
constructor(reader, state, promise) {
780+
constructor(reader, state, promise, stream, channel) {
764781
this.reader = reader;
765782
this.state = state;
766783
this.promise = promise;
784+
this.stream = stream;
785+
this.channel = channel;
767786
}
768787

769788
[kChunk](chunk) {
@@ -775,6 +794,9 @@ class ReadableStreamAsyncIteratorReadRequest {
775794
this.state.current = undefined;
776795
this.state.done = true;
777796
readableStreamReaderGenericRelease(this.reader);
797+
if (this.channel.hasSubscribers) {
798+
this.channel.publish({ stream: this.stream });
799+
}
778800
this.promise.resolve({ done: true, value: undefined });
779801
}
780802

@@ -888,6 +910,13 @@ class ReadableStreamDefaultReader {
888910
// Slow path: create request and go through normal flow
889911
const readRequest = new DefaultReadRequest();
890912
readableStreamDefaultReaderRead(this, readRequest);
913+
if (getStreamDoneChannel().hasSubscribers) {
914+
const stream = this[kState].stream;
915+
PromisePrototypeThen(readRequest.promise, ({ done }) => {
916+
if (done)
917+
getStreamDoneChannel().publish({ stream });
918+
});
919+
}
891920
return readRequest.promise;
892921
}
893922

@@ -1028,6 +1057,13 @@ class ReadableStreamBYOBReader {
10281057
}
10291058
const readIntoRequest = new ReadIntoRequest();
10301059
readableStreamBYOBReaderRead(this, view, min, readIntoRequest);
1060+
if (getStreamDoneChannel().hasSubscribers) {
1061+
const stream = this[kState].stream;
1062+
PromisePrototypeThen(readIntoRequest.promise, ({ done }) => {
1063+
if (done)
1064+
getStreamDoneChannel().publish({ stream });
1065+
});
1066+
}
10311067
return readIntoRequest.promise;
10321068
}
10331069

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Flags: --expose-internals
2+
import * as common from '../common/index.mjs';
3+
import assert from 'assert';
4+
5+
import util from 'internal/webstreams/util';
6+
7+
import { Readable } from 'stream';
8+
9+
import * as dc from 'diagnostics_channel';
10+
11+
{
12+
const readable = Readable.toWeb(Readable.from([1]));
13+
14+
const channel = dc.channel('stream.web.done');
15+
const subscriber = common.mustCall(({ stream }) => {
16+
assert.strictEqual(readable, stream);
17+
assert.strictEqual(readable[util.kState].state, 'closed');
18+
});
19+
channel.subscribe(subscriber);
20+
21+
const reader = readable.getReader();
22+
let result;
23+
24+
while (!result?.done) {
25+
result = await reader.read();
26+
}
27+
28+
channel.unsubscribe(subscriber);
29+
}
30+
31+
{
32+
const readable = Readable.toWeb(Readable.from([1]));
33+
34+
const channel = dc.channel('stream.web.done');
35+
const subscriber = common.mustCall(({ stream }) => {
36+
assert.strictEqual(readable, stream);
37+
assert.strictEqual(readable[util.kState].state, 'closed');
38+
});
39+
channel.subscribe(subscriber);
40+
41+
for await (const _ of readable) {}
42+
43+
channel.unsubscribe(subscriber);
44+
}

0 commit comments

Comments
 (0)