Skip to content

Commit 350bda3

Browse files
committed
quic: address pimterry review comments
1 parent 4be1717 commit 350bda3

1 file changed

Lines changed: 155 additions & 38 deletions

File tree

lib/internal/quic/quic.js

Lines changed: 155 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ const {
158158
} = require('internal/fs/promises');
159159

160160
const {
161+
validateAbortSignal,
161162
validateBoolean,
162163
validateFunction,
163164
validateInteger,
@@ -859,7 +860,11 @@ function parseHeaderPairs(pairs) {
859860
const block = { __proto__: null };
860861
for (let n = 0; n + 1 < pairs.length; n += 2) {
861862
if (block[pairs[n]] !== undefined) {
862-
block[pairs[n]] = [block[pairs[n]], pairs[n + 1]];
863+
if (ArrayIsArray(block[pairs[n]])) {
864+
ArrayPrototypePush(block[pairs[n]], pairs[n + 1]);
865+
} else {
866+
block[pairs[n]] = [block[pairs[n]], pairs[n + 1]];
867+
}
863868
} else {
864869
block[pairs[n]] = pairs[n + 1];
865870
}
@@ -990,6 +995,23 @@ function configureOutbound(handle, stream, body, depth = 0) {
990995
);
991996
}
992997

998+
// Sets the high water mark and initial writeDesiredSize for a streaming
999+
// outbound source. Called after handle.initStreamingSource() for both
1000+
// body-source and writer paths. One-shot body sources (string, Uint8Array,
1001+
// Blob, FileHandle, etc.) do not use this -- they go through attachSource
1002+
// and are not subject to backpressure.
1003+
function initStreamingBackpressure(stream) {
1004+
const state = getQuicStreamState(stream);
1005+
// Only set defaults if the user hasn't already configured them
1006+
// (e.g., via createBidirectionalStream({ highWaterMark: N })).
1007+
if (state.highWaterMark === 0) {
1008+
state.highWaterMark = kDefaultHighWaterMark;
1009+
}
1010+
if (state.writeDesiredSize === 0) {
1011+
state.writeDesiredSize = state.highWaterMark;
1012+
}
1013+
}
1014+
9931015
// Waits for the stream's drain callback to fire, indicating the
9941016
// outbound has capacity for more data.
9951017
function waitForDrain(stream) {
@@ -1004,18 +1026,39 @@ function waitForDrain(stream) {
10041026

10051027
// Writes a batch to the handle, awaiting drain if backpressured.
10061028
// Returns true if the stream was destroyed during the wait.
1029+
// Checks writeDesiredSize before writing to enforce backpressure
1030+
// against the outbound DataQueue's uncommitted bytes.
10071031
async function writeBatchWithDrain(handle, stream, batch) {
1032+
const state = getQuicStreamState(stream);
1033+
1034+
// Calculate total batch size for the capacity check.
1035+
let len = 0;
1036+
for (const chunk of batch) len += TypedArrayPrototypeGetByteLength(chunk);
1037+
1038+
// If insufficient capacity, wait for the C++ drain signal which
1039+
// fires when writeDesiredSize transitions from 0 to > 0 (i.e.,
1040+
// ngtcp2 has consumed data from the outbound DataQueue).
1041+
if (len > state.writeDesiredSize) {
1042+
await waitForDrain(stream);
1043+
if (stream.destroyed) return true;
1044+
}
1045+
1046+
// Write the batch. The return value is the total queued byte count
1047+
// on success, or undefined on failure (e.g., DataQueue append
1048+
// rejected). Guard against silent data loss.
10081049
const result = handle.write(batch);
1009-
if (result !== undefined) return false;
1010-
// Write rejected (flow control) - wait for drain
1011-
await waitForDrain(stream);
1012-
if (stream.destroyed) return true;
1013-
handle.write(batch);
1050+
if (result === undefined) {
1051+
if (!stream.destroyed) {
1052+
stream.destroy(new ERR_INVALID_STATE('Stream write failed'));
1053+
}
1054+
return true;
1055+
}
10141056
return false;
10151057
}
10161058

10171059
async function consumeAsyncSource(handle, stream, source) {
10181060
handle.initStreamingSource();
1061+
initStreamingBackpressure(stream);
10191062
try {
10201063
// Normalize to AsyncIterable<Uint8Array[]>
10211064
const normalized = streamFrom(source);
@@ -1033,6 +1076,7 @@ async function consumeAsyncSource(handle, stream, source) {
10331076

10341077
async function consumeSyncSource(handle, stream, source) {
10351078
handle.initStreamingSource();
1079+
initStreamingBackpressure(stream);
10361080
// Normalize to Iterable<Uint8Array[]>. Manually iterate so we can
10371081
// pause between next() calls when backpressure hits.
10381082
const normalized = streamFromSync(source);
@@ -1045,9 +1089,13 @@ async function consumeSyncSource(handle, stream, source) {
10451089
if (await writeBatchWithDrain(handle, stream, batch)) return;
10461090
}
10471091
handle.endWrite();
1048-
} catch {
1092+
} catch (err) {
10491093
if (!stream.destroyed) {
1050-
handle.resetStream(0n);
1094+
stream.destroy(err);
1095+
} else {
1096+
// If the stream is already destroyed, rethrow the error to avoid
1097+
// silently swallowing it. Tho in practice this shouldn't happen.
1098+
throw err;
10511099
}
10521100
}
10531101
}
@@ -1598,10 +1646,22 @@ class QuicStream {
15981646
}
15991647
};
16001648

1649+
// A note on backpressure handling: per the stream/iter spec, the default
1650+
// backpressure policy for writers is strict, meaning that if the stream
1651+
// signals backpressure additional writes are rejected until the buffer has
1652+
// capacity again.
1653+
16011654
function writeSync(chunk) {
1602-
if (closed || errored || stream.#state.writeEnded) return false;
1655+
// If the stream is closed, errored, or write-ended, we cannot accept
1656+
// more data. Refuse the sync write.
1657+
// If a drain is already pending, another operation is waiting
1658+
// for capacity. Refuse the sync write.
1659+
if (closed || errored || stream.#state.writeEnded || drainWakeup != null) {
1660+
return false;
1661+
}
16031662
chunk = toUint8Array(chunk);
1604-
const len = chunk.byteLength;
1663+
const len = TypedArrayPrototypeGetByteLength(chunk);
1664+
if (len === 0) return true;
16051665
// Refuse the write if the chunk doesn't fit in the available
16061666
// buffer capacity. The caller should wait for drain and retry.
16071667
if (len > stream.#state.writeDesiredSize) return false;
@@ -1611,85 +1671,136 @@ class QuicStream {
16111671
return true;
16121672
}
16131673

1614-
async function write(chunk, options) {
1615-
if (options?.signal?.aborted) {
1616-
throw options.signal.reason;
1674+
async function write(chunk, options = kEmptyObject) {
1675+
validateObject(options, 'options');
1676+
const { signal } = options;
1677+
if (signal !== undefined) {
1678+
validateAbortSignal(signal, 'options.signal');
1679+
signal.throwIfAborted();
16171680
}
16181681
if (errored) throw error;
16191682
if (closed || stream.#state.writeEnded) {
16201683
throw new ERR_INVALID_STATE('Writer is closed');
16211684
}
1622-
chunk = toUint8Array(chunk);
1623-
const len = chunk.byteLength;
1624-
// Reject if the chunk doesn't fit in the available buffer capacity.
1625-
if (len > stream.#state.writeDesiredSize) {
1685+
// If a drain is already pending, another operation is waiting
1686+
// for capacity. Under strict policy, reject immediately.
1687+
// Later, if we add support for other backpressure policies,
1688+
// we could instead await the existing drain before proceeding.
1689+
if (drainWakeup != null) {
16261690
throw new ERR_INVALID_STATE('Stream write buffer is full');
16271691
}
1628-
const result = handle.write([chunk]);
1629-
if (result === undefined) {
1692+
1693+
if (!writeSync(chunk)) {
16301694
throw new ERR_INVALID_STATE('Stream write buffer is full');
16311695
}
1632-
totalBytesWritten += len;
16331696
}
16341697

16351698
function writevSync(chunks) {
1636-
if (closed || errored || stream.#state.writeEnded) return false;
1699+
if (closed || errored || stream.#state.writeEnded || drainWakeup != null) {
1700+
return false;
1701+
}
16371702
chunks = convertChunks(chunks);
16381703
let len = 0;
16391704
for (const c of chunks) len += TypedArrayPrototypeGetByteLength(c);
1705+
if (len === 0) return true;
16401706
if (len > stream.#state.writeDesiredSize) return false;
16411707
const result = handle.write(chunks);
16421708
if (result === undefined) return false;
16431709
totalBytesWritten += len;
16441710
return true;
16451711
}
16461712

1647-
async function writev(chunks, options) {
1648-
if (options?.signal?.aborted) {
1649-
throw options.signal.reason;
1713+
async function writev(chunks, options = kEmptyObject) {
1714+
validateObject(options, 'options');
1715+
const { signal } = options;
1716+
if (signal !== undefined) {
1717+
validateAbortSignal(signal, 'options.signal');
1718+
signal.throwIfAborted();
16501719
}
1720+
16511721
if (errored) throw error;
16521722
if (closed || stream.#state.writeEnded) {
16531723
throw new ERR_INVALID_STATE('Writer is closed');
16541724
}
1655-
chunks = convertChunks(chunks);
1656-
let len = 0;
1657-
for (const c of chunks) len += TypedArrayPrototypeGetByteLength(c);
1658-
if (len > stream.#state.writeDesiredSize) {
1725+
1726+
// If a drain is already pending, another operation is waiting
1727+
// for capacity. Under strict policy, reject immediately.
1728+
// Later, if we add support for other backpressure policies,
1729+
// we could instead await the existing drain before proceeding.
1730+
if (drainWakeup != null) {
16591731
throw new ERR_INVALID_STATE('Stream write buffer is full');
16601732
}
1661-
const result = handle.write(chunks);
1662-
if (result === undefined) {
1733+
1734+
if (!writevSync(chunks)) {
16631735
throw new ERR_INVALID_STATE('Stream write buffer is full');
16641736
}
1665-
totalBytesWritten += len;
16661737
}
16671738

16681739
function endSync() {
1740+
// Per the streams/iter spec, endSync and end follow a try-fallback
1741+
// pattern. That is, callers should try endSync first and if it returns
1742+
// -1, then they should call and await end(). This is a signal that sync
1743+
// end is not currently possible. However, we always support sync end
1744+
// here unless the stream is already errored.
16691745
if (errored) return -1;
1746+
1747+
// If we're already closed, just return the total bytes written.
16701748
if (closed) return totalBytesWritten;
1749+
1750+
// If we are waiting for drain to complete, we cannot end synchronously.
1751+
if (drainWakeup != null) return -1;
1752+
1753+
// Fantastic, we can end synchronously!
16711754
handle.endWrite();
16721755
closed = true;
16731756
return totalBytesWritten;
16741757
}
16751758

1676-
async function end(options) {
1759+
async function end(options = kEmptyObject) {
1760+
validateObject(options, 'options');
1761+
const { signal } = options;
1762+
if (signal !== undefined) {
1763+
validateAbortSignal(signal, 'options.signal');
1764+
signal.throwIfAborted();
1765+
// TODO(@jasnell): The stream/iter spec allows individual sync end
1766+
// calls to be canceled via an AbortSignal. We currently do not support
1767+
// this, but we can add before the impl is graduated from experimental.
1768+
// At most we do here is check for signal abort at the start of the call.
1769+
}
1770+
1771+
// Per the streams/iter spec, endSync and end follow a try-fallback
1772+
// pattern. That is, callers should try endSync first and if it returns
1773+
// -1, then they should call and await end(). This is a signal that sync
1774+
// end is not currently possible. However, we always support sync end
1775+
// here unless the stream is already errored.
1776+
// While the user should have already called endSync, we call it again
1777+
// here to actually process the end request. At worst it's called twice.
16771778
const n = endSync();
1779+
1780+
// A return value of -1 indicates that endSync was not yet able to
1781+
// process the end request, either because we are errored or because we
1782+
// are awaiting drain. If we're errored, throw the error. If we're waiting
1783+
// for drain, await it and then try ending again.
1784+
16781785
if (n >= 0) return n;
16791786
if (errored) throw error;
1680-
drainWakeup = PromiseWithResolvers();
1681-
await drainWakeup.promise;
1682-
drainWakeup = null;
1787+
1788+
drainWakeup ??= PromiseWithResolvers();
1789+
try {
1790+
await drainWakeup.promise;
1791+
} finally {
1792+
drainWakeup = null;
1793+
}
16831794
return endSync();
16841795
}
16851796

16861797
function fail(reason) {
16871798
if (closed || errored) return;
16881799
errored = true;
1689-
error = reason;
1800+
error = reason ?? new ERR_INVALID_STATE('Failed');
16901801
handle.resetStream(0n);
1691-
if (drainWakeup) {
1692-
drainWakeup.reject(reason);
1802+
if (drainWakeup != null) {
1803+
drainWakeup.reject(error);
16931804
drainWakeup = null;
16941805
}
16951806
}
@@ -1709,6 +1820,8 @@ class QuicStream {
17091820
fail,
17101821
[drainableProtocol]() {
17111822
if (closed || errored) return null;
1823+
// If a drain is already pending, return the existing promise.
1824+
if (drainWakeup != null) return drainWakeup.promise;
17121825
if (stream.#state.writeDesiredSize > 0) return null;
17131826
drainWakeup = PromiseWithResolvers();
17141827
return drainWakeup.promise;
@@ -1734,6 +1847,7 @@ class QuicStream {
17341847

17351848
// Initialize the outbound DataQueue for streaming writes
17361849
handle.initStreamingSource();
1850+
initStreamingBackpressure(this);
17371851

17381852
this.#writer = writer;
17391853
return this.#writer;
@@ -1912,6 +2026,9 @@ class QuicStream {
19122026
this.#stats[kFinishClose]();
19132027
this.#state[kFinishClose]();
19142028
this.#session[kRemoveStream](this);
2029+
if (this.#writer !== undefined) {
2030+
this.#writer.fail(error);
2031+
}
19152032
this.#session = undefined;
19162033
this.#pendingClose.reject = undefined;
19172034
this.#pendingClose.resolve = undefined;

0 commit comments

Comments
 (0)