Skip to content

Commit 8f1a29d

Browse files
committed
quic: complete the javascript implementation of QUIC
Signed-off-by: James M Snell <[email protected]> Assisted-by: Opencode:Opus 4.6
1 parent 5e2b4a3 commit 8f1a29d

9 files changed

Lines changed: 2802 additions & 446 deletions

File tree

lib/internal/blob.js

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
'use strict';
22

33
const {
4+
ArrayFrom,
5+
ArrayPrototypePush,
46
MathMax,
57
MathMin,
68
ObjectDefineProperties,
@@ -527,13 +529,82 @@ function createBlobReaderStream(reader) {
527529
}, { highWaterMark: 0 });
528530
}
529531

532+
// Maximum number of chunks to collect in a single batch to prevent
533+
// unbounded memory growth when the DataQueue has a large burst of data.
534+
const kMaxBatchChunks = 16;
535+
536+
async function* createBlobReaderIterable(reader, options = {}) {
537+
const { getReadError } = options;
538+
let wakeup = PromiseWithResolvers();
539+
reader.setWakeup(wakeup.resolve);
540+
541+
try {
542+
while (true) {
543+
const batch = [];
544+
let blocked = false;
545+
let eos = false;
546+
let error = null;
547+
548+
// Pull as many chunks as available synchronously.
549+
// reader.pull(callback) calls the callback synchronously via
550+
// MakeCallback, so we can collect multiple chunks per iteration
551+
// step without any async overhead.
552+
while (true) {
553+
let pullResult;
554+
reader.pull((status, buffer) => {
555+
pullResult = { status, buffer };
556+
});
557+
558+
if (pullResult.status === 0) {
559+
eos = true;
560+
break;
561+
}
562+
if (pullResult.status < 0) {
563+
error = typeof getReadError === 'function' ?
564+
getReadError(pullResult.status) :
565+
new ERR_INVALID_STATE('The reader is not readable');
566+
break;
567+
}
568+
if (pullResult.status === 2) {
569+
blocked = true;
570+
break;
571+
}
572+
ArrayPrototypePush(batch, new Uint8Array(pullResult.buffer));
573+
if (batch.length >= kMaxBatchChunks) break;
574+
}
575+
576+
if (batch.length > 0) {
577+
yield batch;
578+
}
579+
580+
if (eos) return;
581+
if (error) throw error;
582+
583+
if (blocked) {
584+
const fin = await wakeup.promise;
585+
wakeup = PromiseWithResolvers();
586+
reader.setWakeup(wakeup.resolve);
587+
// If the wakeup was triggered by FIN (EndReadable), the DataQueue
588+
// is capped. Continue the loop to pull again -- the next pull will
589+
// return EOS. Without this, a race between the data notification
590+
// and the FIN notification can leave the iterator waiting for a
591+
// wakeup that will never come.
592+
if (fin) continue;
593+
}
594+
}
595+
} finally {
596+
reader.setWakeup(undefined);
597+
}
598+
}
599+
530600
module.exports = {
531601
Blob,
532602
createBlob,
533603
createBlobFromFilePath,
604+
createBlobReaderIterable,
605+
createBlobReaderStream,
534606
isBlob,
535607
kHandle,
536608
resolveObjectURL,
537609
TransferableBlob,
538-
createBlobReaderStream,
539610
};

lib/internal/errors.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1691,6 +1691,8 @@ E('ERR_QUIC_APPLICATION_ERROR', 'A QUIC application error occurred. %d [%s]', Er
16911691
E('ERR_QUIC_CONNECTION_FAILED', 'QUIC connection failed', Error);
16921692
E('ERR_QUIC_ENDPOINT_CLOSED', 'QUIC endpoint closed: %s (%d)', Error);
16931693
E('ERR_QUIC_OPEN_STREAM_FAILED', 'Failed to open QUIC stream', Error);
1694+
E('ERR_QUIC_STREAM_RESET',
1695+
'The QUIC stream was reset by the peer with error code %d', Error);
16941696
E('ERR_QUIC_TRANSPORT_ERROR', 'A QUIC transport error occurred. %d [%s]', Error);
16951697
E('ERR_QUIC_VERSION_NEGOTIATION_ERROR', 'The QUIC session requires version negotiation', Error);
16961698
E('ERR_REQUIRE_ASYNC_MODULE', function(filename, parentFilename) {

lib/internal/fs/promises.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1994,6 +1994,8 @@ module.exports = {
19941994
},
19951995

19961996
FileHandle,
1997+
kHandle,
1998+
kLocked,
19971999
kRef,
19982000
kUnref,
19992001
};

lib/internal/perf/observe.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ const {
2727
NODE_PERFORMANCE_ENTRY_TYPE_HTTP,
2828
NODE_PERFORMANCE_ENTRY_TYPE_NET,
2929
NODE_PERFORMANCE_ENTRY_TYPE_DNS,
30+
NODE_PERFORMANCE_ENTRY_TYPE_QUIC,
3031
},
3132
installGarbageCollectionTracking,
3233
observerCounts,
@@ -87,6 +88,7 @@ const kSupportedEntryTypes = ObjectFreeze([
8788
'mark',
8889
'measure',
8990
'net',
91+
'quic',
9092
'resource',
9193
]);
9294

@@ -131,6 +133,7 @@ function getObserverType(type) {
131133
case 'http': return NODE_PERFORMANCE_ENTRY_TYPE_HTTP;
132134
case 'net': return NODE_PERFORMANCE_ENTRY_TYPE_NET;
133135
case 'dns': return NODE_PERFORMANCE_ENTRY_TYPE_DNS;
136+
case 'quic': return NODE_PERFORMANCE_ENTRY_TYPE_QUIC;
134137
}
135138
}
136139

lib/internal/quic/diagnostics.js

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
'use strict';
2+
3+
const dc = require('diagnostics_channel');
4+
5+
const onEndpointCreatedChannel = dc.channel('quic.endpoint.created');
6+
const onEndpointListeningChannel = dc.channel('quic.endpoint.listen');
7+
const onEndpointClosingChannel = dc.channel('quic.endpoint.closing');
8+
const onEndpointClosedChannel = dc.channel('quic.endpoint.closed');
9+
const onEndpointErrorChannel = dc.channel('quic.endpoint.error');
10+
const onEndpointBusyChangeChannel = dc.channel('quic.endpoint.busy.change');
11+
const onEndpointClientSessionChannel = dc.channel('quic.session.created.client');
12+
const onEndpointServerSessionChannel = dc.channel('quic.session.created.server');
13+
const onSessionOpenStreamChannel = dc.channel('quic.session.open.stream');
14+
const onSessionReceivedStreamChannel = dc.channel('quic.session.received.stream');
15+
const onSessionSendDatagramChannel = dc.channel('quic.session.send.datagram');
16+
const onSessionUpdateKeyChannel = dc.channel('quic.session.update.key');
17+
const onSessionClosingChannel = dc.channel('quic.session.closing');
18+
const onSessionClosedChannel = dc.channel('quic.session.closed');
19+
const onSessionReceiveDatagramChannel = dc.channel('quic.session.receive.datagram');
20+
const onSessionReceiveDatagramStatusChannel = dc.channel('quic.session.receive.datagram.status');
21+
const onSessionPathValidationChannel = dc.channel('quic.session.path.validation');
22+
const onSessionNewTokenChannel = dc.channel('quic.session.new.token');
23+
const onSessionTicketChannel = dc.channel('quic.session.ticket');
24+
const onSessionVersionNegotiationChannel = dc.channel('quic.session.version.negotiation');
25+
const onSessionOriginChannel = dc.channel('quic.session.receive.origin');
26+
const onSessionHandshakeChannel = dc.channel('quic.session.handshake');
27+
const onSessionGoawayChannel = dc.channel('quic.session.goaway');
28+
const onSessionEarlyRejectedChannel = dc.channel('quic.session.early.rejected');
29+
const onStreamClosedChannel = dc.channel('quic.stream.closed');
30+
const onStreamHeadersChannel = dc.channel('quic.stream.headers');
31+
const onStreamTrailersChannel = dc.channel('quic.stream.trailers');
32+
const onStreamInfoChannel = dc.channel('quic.stream.info');
33+
const onStreamResetChannel = dc.channel('quic.stream.reset');
34+
const onStreamBlockedChannel = dc.channel('quic.stream.blocked');
35+
const onSessionErrorChannel = dc.channel('quic.session.error');
36+
const onEndpointConnectChannel = dc.channel('quic.endpoint.connect');
37+
38+
module.exports = {
39+
onEndpointCreatedChannel,
40+
onEndpointListeningChannel,
41+
onEndpointClosingChannel,
42+
onEndpointClosedChannel,
43+
onEndpointErrorChannel,
44+
onEndpointBusyChangeChannel,
45+
onEndpointClientSessionChannel,
46+
onEndpointServerSessionChannel,
47+
onSessionOpenStreamChannel,
48+
onSessionReceivedStreamChannel,
49+
onSessionSendDatagramChannel,
50+
onSessionUpdateKeyChannel,
51+
onSessionClosingChannel,
52+
onSessionClosedChannel,
53+
onSessionReceiveDatagramChannel,
54+
onSessionReceiveDatagramStatusChannel,
55+
onSessionPathValidationChannel,
56+
onSessionNewTokenChannel,
57+
onSessionTicketChannel,
58+
onSessionVersionNegotiationChannel,
59+
onSessionOriginChannel,
60+
onSessionHandshakeChannel,
61+
onSessionGoawayChannel,
62+
onSessionEarlyRejectedChannel,
63+
onStreamClosedChannel,
64+
onStreamHeadersChannel,
65+
onStreamTrailersChannel,
66+
onStreamInfoChannel,
67+
onStreamResetChannel,
68+
onStreamBlockedChannel,
69+
onSessionErrorChannel,
70+
onEndpointConnectChannel,
71+
};

0 commit comments

Comments
 (0)