Skip to content

Commit 8e33f22

Browse files
committed
quic: complete the javascript implementation of QUIC
Signed-off-by: James M Snell <[email protected]> Assisted-by: Opencode:Opus 4.6
1 parent 96c0a38 commit 8e33f22

9 files changed

Lines changed: 2801 additions & 446 deletions

File tree

lib/internal/blob.js

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict';
22

33
const {
4+
ArrayPrototypePush,
45
MathMax,
56
MathMin,
67
ObjectDefineProperties,
@@ -527,13 +528,82 @@ function createBlobReaderStream(reader) {
527528
}, { highWaterMark: 0 });
528529
}
529530

531+
// Maximum number of chunks to collect in a single batch to prevent
532+
// unbounded memory growth when the DataQueue has a large burst of data.
533+
const kMaxBatchChunks = 16;
534+
535+
async function* createBlobReaderIterable(reader, options = {}) {
536+
const { getReadError } = options;
537+
let wakeup = PromiseWithResolvers();
538+
reader.setWakeup(wakeup.resolve);
539+
540+
try {
541+
while (true) {
542+
const batch = [];
543+
let blocked = false;
544+
let eos = false;
545+
let error = null;
546+
547+
// Pull as many chunks as available synchronously.
548+
// reader.pull(callback) calls the callback synchronously via
549+
// MakeCallback, so we can collect multiple chunks per iteration
550+
// step without any async overhead.
551+
while (true) {
552+
let pullResult;
553+
reader.pull((status, buffer) => {
554+
pullResult = { status, buffer };
555+
});
556+
557+
if (pullResult.status === 0) {
558+
eos = true;
559+
break;
560+
}
561+
if (pullResult.status < 0) {
562+
error = typeof getReadError === 'function' ?
563+
getReadError(pullResult.status) :
564+
new ERR_INVALID_STATE('The reader is not readable');
565+
break;
566+
}
567+
if (pullResult.status === 2) {
568+
blocked = true;
569+
break;
570+
}
571+
ArrayPrototypePush(batch, new Uint8Array(pullResult.buffer));
572+
if (batch.length >= kMaxBatchChunks) break;
573+
}
574+
575+
if (batch.length > 0) {
576+
yield batch;
577+
}
578+
579+
if (eos) return;
580+
if (error) throw error;
581+
582+
if (blocked) {
583+
const fin = await wakeup.promise;
584+
wakeup = PromiseWithResolvers();
585+
reader.setWakeup(wakeup.resolve);
586+
// If the wakeup was triggered by FIN (EndReadable), the DataQueue
587+
// is capped. Continue the loop to pull again -- the next pull will
588+
// return EOS. Without this, a race between the data notification
589+
// and the FIN notification can leave the iterator waiting for a
590+
// wakeup that will never come.
591+
if (fin) continue;
592+
}
593+
}
594+
} finally {
595+
reader.setWakeup(undefined);
596+
}
597+
}
598+
530599
module.exports = {
531600
Blob,
532601
createBlob,
533602
createBlobFromFilePath,
603+
createBlobReaderIterable,
604+
createBlobReaderStream,
534605
isBlob,
535606
kHandle,
536607
resolveObjectURL,
537608
TransferableBlob,
538-
createBlobReaderStream,
539609
};

lib/internal/errors.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1691,6 +1691,8 @@ E('ERR_QUIC_APPLICATION_ERROR', 'A QUIC application error occurred. %d [%s]', Er
16911691
E('ERR_QUIC_CONNECTION_FAILED', 'QUIC connection failed', Error);
16921692
E('ERR_QUIC_ENDPOINT_CLOSED', 'QUIC endpoint closed: %s (%d)', Error);
16931693
E('ERR_QUIC_OPEN_STREAM_FAILED', 'Failed to open QUIC stream', Error);
1694+
E('ERR_QUIC_STREAM_RESET',
1695+
'The QUIC stream was reset by the peer with error code %d', Error);
16941696
E('ERR_QUIC_TRANSPORT_ERROR', 'A QUIC transport error occurred. %d [%s]', Error);
16951697
E('ERR_QUIC_VERSION_NEGOTIATION_ERROR', 'The QUIC session requires version negotiation', Error);
16961698
E('ERR_REQUIRE_ASYNC_MODULE', function(filename, parentFilename) {

lib/internal/fs/promises.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1994,6 +1994,8 @@ module.exports = {
19941994
},
19951995

19961996
FileHandle,
1997+
kHandle,
1998+
kLocked,
19971999
kRef,
19982000
kUnref,
19992001
};

lib/internal/perf/observe.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ const {
2727
NODE_PERFORMANCE_ENTRY_TYPE_HTTP,
2828
NODE_PERFORMANCE_ENTRY_TYPE_NET,
2929
NODE_PERFORMANCE_ENTRY_TYPE_DNS,
30+
NODE_PERFORMANCE_ENTRY_TYPE_QUIC,
3031
},
3132
installGarbageCollectionTracking,
3233
observerCounts,
@@ -87,6 +88,7 @@ const kSupportedEntryTypes = ObjectFreeze([
8788
'mark',
8889
'measure',
8990
'net',
91+
'quic',
9092
'resource',
9193
]);
9294

@@ -131,6 +133,7 @@ function getObserverType(type) {
131133
case 'http': return NODE_PERFORMANCE_ENTRY_TYPE_HTTP;
132134
case 'net': return NODE_PERFORMANCE_ENTRY_TYPE_NET;
133135
case 'dns': return NODE_PERFORMANCE_ENTRY_TYPE_DNS;
136+
case 'quic': return NODE_PERFORMANCE_ENTRY_TYPE_QUIC;
134137
}
135138
}
136139

lib/internal/quic/diagnostics.js

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
'use strict';
2+
3+
const dc = require('diagnostics_channel');
4+
5+
const onEndpointCreatedChannel = dc.channel('quic.endpoint.created');
6+
const onEndpointListeningChannel = dc.channel('quic.endpoint.listen');
7+
const onEndpointClosingChannel = dc.channel('quic.endpoint.closing');
8+
const onEndpointClosedChannel = dc.channel('quic.endpoint.closed');
9+
const onEndpointErrorChannel = dc.channel('quic.endpoint.error');
10+
const onEndpointBusyChangeChannel = dc.channel('quic.endpoint.busy.change');
11+
const onEndpointClientSessionChannel = dc.channel('quic.session.created.client');
12+
const onEndpointServerSessionChannel = dc.channel('quic.session.created.server');
13+
const onSessionOpenStreamChannel = dc.channel('quic.session.open.stream');
14+
const onSessionReceivedStreamChannel = dc.channel('quic.session.received.stream');
15+
const onSessionSendDatagramChannel = dc.channel('quic.session.send.datagram');
16+
const onSessionUpdateKeyChannel = dc.channel('quic.session.update.key');
17+
const onSessionClosingChannel = dc.channel('quic.session.closing');
18+
const onSessionClosedChannel = dc.channel('quic.session.closed');
19+
const onSessionReceiveDatagramChannel = dc.channel('quic.session.receive.datagram');
20+
const onSessionReceiveDatagramStatusChannel = dc.channel('quic.session.receive.datagram.status');
21+
const onSessionPathValidationChannel = dc.channel('quic.session.path.validation');
22+
const onSessionNewTokenChannel = dc.channel('quic.session.new.token');
23+
const onSessionTicketChannel = dc.channel('quic.session.ticket');
24+
const onSessionVersionNegotiationChannel = dc.channel('quic.session.version.negotiation');
25+
const onSessionOriginChannel = dc.channel('quic.session.receive.origin');
26+
const onSessionHandshakeChannel = dc.channel('quic.session.handshake');
27+
const onSessionGoawayChannel = dc.channel('quic.session.goaway');
28+
const onSessionEarlyRejectedChannel = dc.channel('quic.session.early.rejected');
29+
const onStreamClosedChannel = dc.channel('quic.stream.closed');
30+
const onStreamHeadersChannel = dc.channel('quic.stream.headers');
31+
const onStreamTrailersChannel = dc.channel('quic.stream.trailers');
32+
const onStreamInfoChannel = dc.channel('quic.stream.info');
33+
const onStreamResetChannel = dc.channel('quic.stream.reset');
34+
const onStreamBlockedChannel = dc.channel('quic.stream.blocked');
35+
const onSessionErrorChannel = dc.channel('quic.session.error');
36+
const onEndpointConnectChannel = dc.channel('quic.endpoint.connect');
37+
38+
module.exports = {
39+
onEndpointCreatedChannel,
40+
onEndpointListeningChannel,
41+
onEndpointClosingChannel,
42+
onEndpointClosedChannel,
43+
onEndpointErrorChannel,
44+
onEndpointBusyChangeChannel,
45+
onEndpointClientSessionChannel,
46+
onEndpointServerSessionChannel,
47+
onSessionOpenStreamChannel,
48+
onSessionReceivedStreamChannel,
49+
onSessionSendDatagramChannel,
50+
onSessionUpdateKeyChannel,
51+
onSessionClosingChannel,
52+
onSessionClosedChannel,
53+
onSessionReceiveDatagramChannel,
54+
onSessionReceiveDatagramStatusChannel,
55+
onSessionPathValidationChannel,
56+
onSessionNewTokenChannel,
57+
onSessionTicketChannel,
58+
onSessionVersionNegotiationChannel,
59+
onSessionOriginChannel,
60+
onSessionHandshakeChannel,
61+
onSessionGoawayChannel,
62+
onSessionEarlyRejectedChannel,
63+
onStreamClosedChannel,
64+
onStreamHeadersChannel,
65+
onStreamTrailersChannel,
66+
onStreamInfoChannel,
67+
onStreamResetChannel,
68+
onStreamBlockedChannel,
69+
onSessionErrorChannel,
70+
onEndpointConnectChannel,
71+
};

0 commit comments

Comments
 (0)