Skip to content

Commit 4dd0542

Browse files
committed
http2: keep reading during small client writes
Signed-off-by: Matteo Collina <[email protected]>
1 parent 58a8e1d commit 4dd0542

3 files changed

Lines changed: 134 additions & 1 deletion

File tree

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
'use strict';
2+
3+
const common = require('../common.js');
4+
const net = require('node:net');
5+
const { Duplex } = require('node:stream');
6+
7+
const WINDOW_SIZE = 32 * 1024 * 1024;
8+
9+
const bench = common.createBenchmark(main, {
10+
n: [500],
11+
parallel: [25],
12+
size: [384 * 1024],
13+
delay: [0, 5],
14+
}, { flags: ['--no-warnings'] });
15+
16+
class DelayedWriteSocket extends Duplex {
17+
constructor(port, delay, host = '127.0.0.1') {
18+
super();
19+
this.delay = delay;
20+
this.inner = net.connect(port, host);
21+
this.inner.on('data', (chunk) => {
22+
if (!this.push(chunk)) this.inner.pause();
23+
});
24+
this.inner.on('end', () => this.push(null));
25+
this.inner.on('error', (err) => this.destroy(err));
26+
this.inner.on('close', () => this.destroy());
27+
}
28+
29+
_read() {
30+
this.inner.resume();
31+
}
32+
33+
_write(chunk, encoding, callback) {
34+
if (!this.inner.write(chunk, encoding)) {
35+
this.inner.once('drain', () => setTimeout(callback, this.delay));
36+
return;
37+
}
38+
setTimeout(callback, this.delay);
39+
}
40+
41+
_final(callback) {
42+
this.inner.end();
43+
setTimeout(callback, this.delay);
44+
}
45+
46+
_destroy(err, callback) {
47+
this.inner.destroy();
48+
callback(err);
49+
}
50+
}
51+
52+
function once(emitter, event) {
53+
return new Promise((resolve, reject) => {
54+
emitter.once(event, resolve);
55+
emitter.once('error', reject);
56+
});
57+
}
58+
59+
function fetchHttp2(client) {
60+
return new Promise((resolve, reject) => {
61+
const req = client.request({ ':path': '/' });
62+
let total = 0;
63+
req.on('data', (chunk) => {
64+
total += chunk.length;
65+
});
66+
req.on('end', () => resolve(total));
67+
req.on('error', reject);
68+
req.end();
69+
});
70+
}
71+
72+
async function main({ n, parallel, size, delay }) {
73+
const http2 = require('node:http2');
74+
const payload = Buffer.alloc(size, 'x');
75+
const server = http2.createServer();
76+
77+
server.on('stream', (stream) => {
78+
stream.respond({
79+
':status': 200,
80+
'content-length': payload.length,
81+
'content-type': 'application/octet-stream',
82+
});
83+
stream.end(payload);
84+
});
85+
86+
server.listen(0, '127.0.0.1');
87+
await once(server, 'listening');
88+
89+
const port = server.address().port;
90+
const client = http2.connect(`http://127.0.0.1:${port}`, {
91+
settings: { initialWindowSize: WINDOW_SIZE },
92+
createConnection: delay === 0 ? undefined : () => {
93+
return new DelayedWriteSocket(port, delay);
94+
},
95+
});
96+
97+
try {
98+
await once(client, 'connect');
99+
client.setLocalWindowSize(WINDOW_SIZE);
100+
101+
// Warm up the session so connection establishment does not dominate.
102+
await fetchHttp2(client);
103+
104+
bench.start();
105+
for (let completed = 0; completed < n; completed += parallel) {
106+
const batch = Math.min(parallel, n - completed);
107+
await Promise.all(
108+
Array.from({ length: batch }, () => fetchHttp2(client)),
109+
);
110+
}
111+
bench.end(n);
112+
} finally {
113+
client.close();
114+
server.close();
115+
}
116+
}

src/node_http2.cc

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ namespace http2 {
5454
namespace {
5555

5656
const char zero_bytes_256[256] = {};
57+
// Small client-side control frame writes (e.g. WINDOW_UPDATE) should not
58+
// immediately force HTTP/2 input processing to become effectively sequential
59+
// if the write callback lingers for a short time.
60+
constexpr size_t kMaxReadWhileWriteInProgress = 1024;
61+
constexpr uint64_t kMinWriteAgeToKeepReading = 1'000'000;
5762

5863
bool HasHttp2Observer(Environment* env) {
5964
AliasedUint32Array& observers = env->performance_state()->observers;
@@ -1396,7 +1401,11 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
13961401

13971402
// If we are currently waiting for a write operation to finish, we should
13981403
// tell nghttp2 that we want to wait before we process more input data.
1399-
if (session->is_write_in_progress()) {
1404+
if (session->is_write_in_progress() &&
1405+
(session->session_type_ == NGHTTP2_SESSION_SERVER ||
1406+
session->current_write_size_ > kMaxReadWhileWriteInProgress ||
1407+
uv_hrtime() - session->current_write_start_time_ <
1408+
kMinWriteAgeToKeepReading)) {
14001409
CHECK(session->is_reading_stopped());
14011410
session->set_receive_paused();
14021411
Debug(session, "receive paused");
@@ -1774,6 +1783,8 @@ void Http2Session::OnStreamAfterWrite(WriteWrap* w, int status) {
17741783
MaybeNotifyGracefulCloseComplete();
17751784
CHECK(is_write_in_progress());
17761785
set_write_in_progress(false);
1786+
current_write_size_ = 0;
1787+
current_write_start_time_ = 0;
17771788

17781789
// Inform all pending writes about their completion.
17791790
ClearOutgoing(status);
@@ -1989,10 +2000,14 @@ uint8_t Http2Session::SendPendingData() {
19892000
chunks_sent_since_last_write_++;
19902001

19912002
CHECK(!is_write_in_progress());
2003+
current_write_size_ = outgoing_length_;
2004+
current_write_start_time_ = uv_hrtime();
19922005
set_write_in_progress();
19932006
StreamWriteResult res = underlying_stream()->Write(*bufs, count);
19942007
if (!res.async) {
19952008
set_write_in_progress(false);
2009+
current_write_size_ = 0;
2010+
current_write_start_time_ = 0;
19962011
ClearOutgoing(res.err);
19972012
MaybeNotifyGracefulCloseComplete();
19982013
}

src/node_http2.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -948,6 +948,8 @@ class Http2Session : public AsyncWrap,
948948
std::vector<NgHttp2StreamWrite> outgoing_buffers_;
949949
std::vector<uint8_t> outgoing_storage_;
950950
size_t outgoing_length_ = 0;
951+
size_t current_write_size_ = 0;
952+
uint64_t current_write_start_time_ = 0;
951953
std::vector<int32_t> pending_rst_streams_;
952954
// Count streams that have been rejected while being opened. Exceeding a fixed
953955
// limit will result in the session being destroyed, as an indication of a

0 commit comments

Comments
 (0)