From 4dd0542f8bf6ba0d83c31550f03ddee4ae7dd635 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Tue, 28 Apr 2026 12:19:02 +0200 Subject: [PATCH] http2: keep reading during small client writes Signed-off-by: Matteo Collina --- benchmark/http2/client-delayed-write.js | 116 ++++++++++++++++++++++++ src/node_http2.cc | 17 +++- src/node_http2.h | 2 + 3 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 benchmark/http2/client-delayed-write.js diff --git a/benchmark/http2/client-delayed-write.js b/benchmark/http2/client-delayed-write.js new file mode 100644 index 00000000000000..b98c044c39b27e --- /dev/null +++ b/benchmark/http2/client-delayed-write.js @@ -0,0 +1,116 @@ +'use strict'; + +const common = require('../common.js'); +const net = require('node:net'); +const { Duplex } = require('node:stream'); + +const WINDOW_SIZE = 32 * 1024 * 1024; + +const bench = common.createBenchmark(main, { + n: [500], + parallel: [25], + size: [384 * 1024], + delay: [0, 5], +}, { flags: ['--no-warnings'] }); + +class DelayedWriteSocket extends Duplex { + constructor(port, delay, host = '127.0.0.1') { + super(); + this.delay = delay; + this.inner = net.connect(port, host); + this.inner.on('data', (chunk) => { + if (!this.push(chunk)) this.inner.pause(); + }); + this.inner.on('end', () => this.push(null)); + this.inner.on('error', (err) => this.destroy(err)); + this.inner.on('close', () => this.destroy()); + } + + _read() { + this.inner.resume(); + } + + _write(chunk, encoding, callback) { + if (!this.inner.write(chunk, encoding)) { + this.inner.once('drain', () => setTimeout(callback, this.delay)); + return; + } + setTimeout(callback, this.delay); + } + + _final(callback) { + this.inner.end(); + setTimeout(callback, this.delay); + } + + _destroy(err, callback) { + this.inner.destroy(); + callback(err); + } +} + +function once(emitter, event) { + return new Promise((resolve, reject) => { + emitter.once(event, resolve); + emitter.once('error', reject); + }); +} + +function fetchHttp2(client) { + return new Promise((resolve, reject) => { + const req = client.request({ ':path': '/' }); + let total = 0; + req.on('data', (chunk) => { + total += chunk.length; + }); + req.on('end', () => resolve(total)); + req.on('error', reject); + req.end(); + }); +} + +async function main({ n, parallel, size, delay }) { + const http2 = require('node:http2'); + const payload = Buffer.alloc(size, 'x'); + const server = http2.createServer(); + + server.on('stream', (stream) => { + stream.respond({ + ':status': 200, + 'content-length': payload.length, + 'content-type': 'application/octet-stream', + }); + stream.end(payload); + }); + + server.listen(0, '127.0.0.1'); + await once(server, 'listening'); + + const port = server.address().port; + const client = http2.connect(`http://127.0.0.1:${port}`, { + settings: { initialWindowSize: WINDOW_SIZE }, + createConnection: delay === 0 ? undefined : () => { + return new DelayedWriteSocket(port, delay); + }, + }); + + try { + await once(client, 'connect'); + client.setLocalWindowSize(WINDOW_SIZE); + + // Warm up the session so connection establishment does not dominate. + await fetchHttp2(client); + + bench.start(); + for (let completed = 0; completed < n; completed += parallel) { + const batch = Math.min(parallel, n - completed); + await Promise.all( + Array.from({ length: batch }, () => fetchHttp2(client)), + ); + } + bench.end(n); + } finally { + client.close(); + server.close(); + } +} diff --git a/src/node_http2.cc b/src/node_http2.cc index 084a4773673e5b..b248bc46a73ba7 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -54,6 +54,11 @@ namespace http2 { namespace { const char zero_bytes_256[256] = {}; +// Small client-side control frame writes (e.g. WINDOW_UPDATE) should not +// immediately force HTTP/2 input processing to become effectively sequential +// if the write callback lingers for a short time. +constexpr size_t kMaxReadWhileWriteInProgress = 1024; +constexpr uint64_t kMinWriteAgeToKeepReading = 1'000'000; bool HasHttp2Observer(Environment* env) { AliasedUint32Array& observers = env->performance_state()->observers; @@ -1396,7 +1401,11 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle, // If we are currently waiting for a write operation to finish, we should // tell nghttp2 that we want to wait before we process more input data. - if (session->is_write_in_progress()) { + if (session->is_write_in_progress() && + (session->session_type_ == NGHTTP2_SESSION_SERVER || + session->current_write_size_ > kMaxReadWhileWriteInProgress || + uv_hrtime() - session->current_write_start_time_ < + kMinWriteAgeToKeepReading)) { CHECK(session->is_reading_stopped()); session->set_receive_paused(); Debug(session, "receive paused"); @@ -1774,6 +1783,8 @@ void Http2Session::OnStreamAfterWrite(WriteWrap* w, int status) { MaybeNotifyGracefulCloseComplete(); CHECK(is_write_in_progress()); set_write_in_progress(false); + current_write_size_ = 0; + current_write_start_time_ = 0; // Inform all pending writes about their completion. ClearOutgoing(status); @@ -1989,10 +2000,14 @@ uint8_t Http2Session::SendPendingData() { chunks_sent_since_last_write_++; CHECK(!is_write_in_progress()); + current_write_size_ = outgoing_length_; + current_write_start_time_ = uv_hrtime(); set_write_in_progress(); StreamWriteResult res = underlying_stream()->Write(*bufs, count); if (!res.async) { set_write_in_progress(false); + current_write_size_ = 0; + current_write_start_time_ = 0; ClearOutgoing(res.err); MaybeNotifyGracefulCloseComplete(); } diff --git a/src/node_http2.h b/src/node_http2.h index 259b71840b66d4..d46d4c746dc5b6 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -948,6 +948,8 @@ class Http2Session : public AsyncWrap, std::vector outgoing_buffers_; std::vector outgoing_storage_; size_t outgoing_length_ = 0; + size_t current_write_size_ = 0; + uint64_t current_write_start_time_ = 0; std::vector pending_rst_streams_; // Count streams that have been rejected while being opened. Exceeding a fixed // limit will result in the session being destroyed, as an indication of a