Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions benchmark/http2/client-delayed-write.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
'use strict';

const common = require('../common.js');
const net = require('node:net');
const { Duplex } = require('node:stream');

const WINDOW_SIZE = 32 * 1024 * 1024;

const bench = common.createBenchmark(main, {
n: [500],
parallel: [25],
size: [384 * 1024],
delay: [0, 5],
}, { flags: ['--no-warnings'] });

class DelayedWriteSocket extends Duplex {
constructor(port, delay, host = '127.0.0.1') {
super();
this.delay = delay;
this.inner = net.connect(port, host);
this.inner.on('data', (chunk) => {
if (!this.push(chunk)) this.inner.pause();
});
this.inner.on('end', () => this.push(null));
this.inner.on('error', (err) => this.destroy(err));
this.inner.on('close', () => this.destroy());
}

_read() {
this.inner.resume();
}

_write(chunk, encoding, callback) {
if (!this.inner.write(chunk, encoding)) {
this.inner.once('drain', () => setTimeout(callback, this.delay));
return;
}
setTimeout(callback, this.delay);
}

_final(callback) {
this.inner.end();
setTimeout(callback, this.delay);
}

_destroy(err, callback) {
this.inner.destroy();
callback(err);
}
}

function once(emitter, event) {
return new Promise((resolve, reject) => {
emitter.once(event, resolve);
emitter.once('error', reject);
});
}

function fetchHttp2(client) {
return new Promise((resolve, reject) => {
const req = client.request({ ':path': '/' });
let total = 0;
req.on('data', (chunk) => {
total += chunk.length;
});
req.on('end', () => resolve(total));
req.on('error', reject);
req.end();
});
}

async function main({ n, parallel, size, delay }) {
const http2 = require('node:http2');
const payload = Buffer.alloc(size, 'x');
const server = http2.createServer();

server.on('stream', (stream) => {
stream.respond({
':status': 200,
'content-length': payload.length,
'content-type': 'application/octet-stream',
});
stream.end(payload);
});

server.listen(0, '127.0.0.1');
await once(server, 'listening');

const port = server.address().port;
const client = http2.connect(`http://127.0.0.1:${port}`, {
settings: { initialWindowSize: WINDOW_SIZE },
createConnection: delay === 0 ? undefined : () => {
return new DelayedWriteSocket(port, delay);
},
});

try {
await once(client, 'connect');
client.setLocalWindowSize(WINDOW_SIZE);

// Warm up the session so connection establishment does not dominate.
await fetchHttp2(client);

bench.start();
for (let completed = 0; completed < n; completed += parallel) {
const batch = Math.min(parallel, n - completed);
await Promise.all(
Array.from({ length: batch }, () => fetchHttp2(client)),
);
}
bench.end(n);
} finally {
client.close();
server.close();
}
}
17 changes: 16 additions & 1 deletion src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ namespace http2 {
namespace {

const char zero_bytes_256[256] = {};
// Small client-side control frame writes (e.g. WINDOW_UPDATE) should not
// immediately force HTTP/2 input processing to become effectively sequential
// if the write callback lingers for a short time.
constexpr size_t kMaxReadWhileWriteInProgress = 1024;
constexpr uint64_t kMinWriteAgeToKeepReading = 1'000'000;

bool HasHttp2Observer(Environment* env) {
AliasedUint32Array& observers = env->performance_state()->observers;
Expand Down Expand Up @@ -1396,7 +1401,11 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle,

// If we are currently waiting for a write operation to finish, we should
// tell nghttp2 that we want to wait before we process more input data.
if (session->is_write_in_progress()) {
if (session->is_write_in_progress() &&
(session->session_type_ == NGHTTP2_SESSION_SERVER ||
session->current_write_size_ > kMaxReadWhileWriteInProgress ||
uv_hrtime() - session->current_write_start_time_ <
kMinWriteAgeToKeepReading)) {
CHECK(session->is_reading_stopped());
session->set_receive_paused();
Debug(session, "receive paused");
Expand Down Expand Up @@ -1774,6 +1783,8 @@ void Http2Session::OnStreamAfterWrite(WriteWrap* w, int status) {
MaybeNotifyGracefulCloseComplete();
CHECK(is_write_in_progress());
set_write_in_progress(false);
current_write_size_ = 0;
current_write_start_time_ = 0;

// Inform all pending writes about their completion.
ClearOutgoing(status);
Expand Down Expand Up @@ -1989,10 +2000,14 @@ uint8_t Http2Session::SendPendingData() {
chunks_sent_since_last_write_++;

CHECK(!is_write_in_progress());
current_write_size_ = outgoing_length_;
current_write_start_time_ = uv_hrtime();
set_write_in_progress();
StreamWriteResult res = underlying_stream()->Write(*bufs, count);
if (!res.async) {
set_write_in_progress(false);
current_write_size_ = 0;
current_write_start_time_ = 0;
ClearOutgoing(res.err);
MaybeNotifyGracefulCloseComplete();
}
Expand Down
2 changes: 2 additions & 0 deletions src/node_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,8 @@ class Http2Session : public AsyncWrap,
std::vector<NgHttp2StreamWrite> outgoing_buffers_;
std::vector<uint8_t> outgoing_storage_;
size_t outgoing_length_ = 0;
size_t current_write_size_ = 0;
uint64_t current_write_start_time_ = 0;
std::vector<int32_t> pending_rst_streams_;
// Count streams that have been rejected while being opened. Exceeding a fixed
// limit will result in the session being destroyed, as an indication of a
Expand Down
Loading