Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const { Buffer } = require('buffer');
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
const { URL, urlToHttpOptions, isURL } = require('internal/url');
const {
kIsClientRequest,
kOutHeaders,
kNeedDrain,
isTraceHTTPEnabled,
Expand Down Expand Up @@ -192,6 +193,7 @@ function rewriteForProxiedHttp(req, reqOptions) {

function ClientRequest(input, options, cb) {
OutgoingMessage.call(this);
this[kIsClientRequest] = true;

if (typeof input === 'string') {
const urlStr = input;
Expand Down
12 changes: 12 additions & 0 deletions lib/_http_common.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const {
Uint8Array,
} = primordials;
const { setImmediate } = require('timers');
const dc = require('diagnostics_channel');

const { methods, allMethods, HTTPParser } = internalBinding('http_parser');
const { getOptionValue } = require('internal/options');
Expand All @@ -50,6 +51,9 @@ const kOnMessageComplete = HTTPParser.kOnMessageComplete | 0;
const kOnExecute = HTTPParser.kOnExecute | 0;
const kOnTimeout = HTTPParser.kOnTimeout | 0;

const onClientResponseBodyChunkReceivedChannel =
dc.channel('http.client.response.bodyChunkReceived');

const MAX_HEADER_PAIRS = 2000;

// Only called in the slow case where slow means
Expand Down Expand Up @@ -120,6 +124,7 @@ function parserOnHeadersComplete(versionMajor, versionMinor, headers, method,
// client only
incoming.statusCode = statusCode;
incoming.statusMessage = statusMessage;
incoming.req = socket?._httpMessage;
}

return parser.onIncoming(incoming, shouldKeepAlive);
Expand All @@ -134,6 +139,13 @@ function parserOnBody(b) {

// Pretend this was the result of a stream._read call.
if (!stream._dumped) {
if (stream.req && onClientResponseBodyChunkReceivedChannel.hasSubscribers) {
onClientResponseBodyChunkReceivedChannel.publish({
request: stream.req,
response: stream,
chunk: b,
});
}
const ret = stream.push(b);
if (!ret)
readStop(this.socket);
Expand Down
26 changes: 25 additions & 1 deletion lib/_http_outgoing.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ const {

const { getDefaultHighWaterMark } = require('internal/streams/state');
const assert = require('internal/assert');
const dc = require('diagnostics_channel');
const EE = require('events');
const Stream = require('stream');
const { kOutHeaders, utcDate, kNeedDrain } = require('internal/http');
const { kIsClientRequest, kOutHeaders, utcDate, kNeedDrain } = require('internal/http');
const { Buffer } = require('buffer');
const {
_checkIsHttpToken: checkIsHttpToken,
Expand Down Expand Up @@ -86,6 +87,12 @@ const kBytesWritten = Symbol('kBytesWritten');
const kErrored = Symbol('errored');
const kHighWaterMark = Symbol('kHighWaterMark');
const kRejectNonStandardBodyWrites = Symbol('kRejectNonStandardBodyWrites');
const kClientRequestBodyChunksWritten = Symbol('kClientRequestBodyChunksWritten');

const onClientRequestBodyChunkSentChannel =
dc.channel('http.client.request.bodyChunkSent');
const onClientRequestBodySentChannel =
dc.channel('http.client.request.bodySent');

const nop = () => {};

Expand Down Expand Up @@ -950,6 +957,17 @@ function write_(msg, chunk, encoding, callback, fromEnd) {
}
}

if (msg[kIsClientRequest]) {
msg[kClientRequestBodyChunksWritten] = true;
if (onClientRequestBodyChunkSentChannel.hasSubscribers) {
onClientRequestBodyChunkSentChannel.publish({
request: msg,
chunk,
encoding,
});
}
}

if (!fromEnd && msg.socket && !msg.socket.writableCorked) {
msg.socket.cork();
process.nextTick(connectionCorkNT, msg.socket);
Expand Down Expand Up @@ -1103,6 +1121,12 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {

this.finished = true;

if (this[kIsClientRequest] &&
this[kClientRequestBodyChunksWritten] &&
onClientRequestBodySentChannel.hasSubscribers) {
onClientRequestBodySentChannel.publish({ request: this });
}

// There is the first message on the outgoing queue, and we've sent
// everything to the socket.
debug('outgoing message end.');
Expand Down
1 change: 1 addition & 0 deletions lib/internal/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ function getGlobalAgent(proxyEnv, Agent) {
}

module.exports = {
kIsClientRequest: Symbol('kIsClientRequest'),
kOutHeaders: Symbol('kOutHeaders'),
kNeedDrain: Symbol('kNeedDrain'),
kProxyConfig: Symbol('kProxyConfig'),
Expand Down
71 changes: 59 additions & 12 deletions lib/internal/inspector/network_http.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const {
sniffMimeType,
} = require('internal/inspector/network');
const { Network } = require('inspector');
const EventEmitter = require('events');
const { Buffer } = require('buffer');

const kRequestUrl = Symbol('kRequestUrl');

Expand Down Expand Up @@ -95,6 +95,61 @@ function onClientRequestError({ request, error }) {
});
}

/**
* When a chunk of the request body is being sent, cache it until
* `getRequestPostData` request.
* https://chromedevtools.github.io/devtools-protocol/1-3/Network/#method-getRequestPostData
* @param {{ request: import('http').ClientRequest, chunk: Uint8Array | string, encoding?: string }} event
*/
function onClientRequestBodyChunkSent({ request, chunk, encoding }) {
if (typeof request[kInspectorRequestId] !== 'string') {
return;
}

const buffer = typeof chunk === 'string' ? Buffer.from(chunk, encoding) : Buffer.from(chunk);
Network.dataSent({
requestId: request[kInspectorRequestId],
timestamp: getMonotonicTime(),
dataLength: buffer.byteLength,
data: buffer,
});
}

/**
* Mark a request body as fully sent.
* @param {{ request: import('http').ClientRequest }} event
*/
function onClientRequestBodySent({ request }) {
if (typeof request[kInspectorRequestId] !== 'string') {
return;
}

Network.dataSent({
requestId: request[kInspectorRequestId],
finished: true,
});
}

/**
* When a chunk of the response body is received, cache the raw bytes until
* `getResponseBody` request.
* https://chromedevtools.github.io/devtools-protocol/1-3/Network/#method-getResponseBody
* @param {{ request: import('http').ClientRequest, chunk: Uint8Array }} event
*/
function onClientResponseBodyChunkReceived({ request, chunk }) {
if (typeof request[kInspectorRequestId] !== 'string') {
return;
}

Network.dataReceived({
requestId: request[kInspectorRequestId],
timestamp: getMonotonicTime(),
dataLength: chunk.byteLength,
encodedDataLength: chunk.byteLength,
data: chunk,
});
}

/**
* When response headers are received, emit Network.responseReceived event.
* https://chromedevtools.github.io/devtools-protocol/1-3/Network/#event-responseReceived
Expand All @@ -121,17 +176,6 @@ function onClientResponseFinish({ request, response }) {
},
});

// Unlike response.on('data', ...), this does not put the stream into flowing mode.
EventEmitter.prototype.on.call(response, 'data', (chunk) => {
Network.dataReceived({
requestId: request[kInspectorRequestId],
timestamp: getMonotonicTime(),
dataLength: chunk.byteLength,
encodedDataLength: chunk.byteLength,
data: chunk,
});
});

// Wait until the response body is consumed by user code.
response.once('end', () => {
Network.loadingFinished({
Expand All @@ -143,6 +187,9 @@ function onClientResponseFinish({ request, response }) {

module.exports = registerDiagnosticChannels([
['http.client.request.created', onClientRequestCreated],
['http.client.request.bodyChunkSent', onClientRequestBodyChunkSent],
['http.client.request.bodySent', onClientRequestBodySent],
['http.client.request.error', onClientRequestError],
['http.client.response.bodyChunkReceived', onClientResponseBodyChunkReceived],
['http.client.response.finish', onClientResponseFinish],
]);
78 changes: 67 additions & 11 deletions test/parallel/test-diagnostics-channel-http.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,40 @@ const isError = (error) => error instanceof Error;

dc.subscribe('http.client.request.start', common.mustCall(({ request }) => {
assert.strictEqual(isOutgoingMessage(request), true);
}, 4));

dc.subscribe('http.client.request.bodyChunkSent', common.mustCall(({ request, chunk, encoding }) => {
assert.strictEqual(isOutgoingMessage(request), true);
assert.ok(typeof chunk === 'string' || chunk instanceof Uint8Array);
assert.strictEqual(typeof encoding === 'string' || encoding == null, true);
}, 3));

dc.subscribe('http.client.request.bodySent', common.mustCall(({ request }) => {
assert.strictEqual(isOutgoingMessage(request), true);
}, 2));

dc.subscribe('http.client.request.error', common.mustCall(({ request, error }) => {
assert.strictEqual(isOutgoingMessage(request), true);
assert.strictEqual(isError(error), true);
}));

dc.subscribe('http.client.response.bodyChunkReceived', common.mustCall(({
request,
response,
chunk,
}) => {
assert.strictEqual(isOutgoingMessage(request), true);
assert.strictEqual(isIncomingMessage(response), true);
assert.ok(chunk instanceof Uint8Array);
}, 3));

dc.subscribe('http.client.response.finish', common.mustCall(({
request,
response
}) => {
assert.strictEqual(isOutgoingMessage(request), true);
assert.strictEqual(isIncomingMessage(response), true);
}));
}, 3));

dc.subscribe('http.server.request.start', common.mustCall(({
request,
Expand All @@ -39,7 +59,7 @@ dc.subscribe('http.server.request.start', common.mustCall(({
assert.strictEqual(isOutgoingMessage(response), true);
assert.strictEqual(isNetSocket(socket), true);
assert.strictEqual(isHTTPServer(server), true);
}));
}, 3));

dc.subscribe('http.server.response.finish', common.mustCall(({
request,
Expand All @@ -51,24 +71,37 @@ dc.subscribe('http.server.response.finish', common.mustCall(({
assert.strictEqual(isOutgoingMessage(response), true);
assert.strictEqual(isNetSocket(socket), true);
assert.strictEqual(isHTTPServer(server), true);
}));
}, 3));

dc.subscribe('http.server.response.created', common.mustCall(({
request,
response,
}) => {
assert.strictEqual(isIncomingMessage(request), true);
assert.strictEqual(isOutgoingMessage(response), true);
}));
}, 3));

dc.subscribe('http.client.request.created', common.mustCall(({ request }) => {
assert.strictEqual(isOutgoingMessage(request), true);
assert.strictEqual(isHTTPServer(server), true);
}, 2));
}, 4));

const server = http.createServer(common.mustCall((req, res) => {
res.end('done');
}));
const chunks = [];
req.on('data', (chunk) => chunks.push(chunk));
req.on('end', common.mustCall(() => {
if (req.method === 'POST' && req.url === '/string-body') {
assert.strictEqual(Buffer.concat(chunks).toString(), 'foobar');
} else if (req.method === 'POST' && req.url === '/binary-body') {
assert.deepStrictEqual(Buffer.concat(chunks), Buffer.from([0, 1, 2, 3]));
} else {
assert.strictEqual(req.method, 'GET');
assert.strictEqual(req.url, '/');
assert.strictEqual(Buffer.concat(chunks).byteLength, 0);
}
res.end('done');
}));
}, 3));

server.listen(async () => {
const { port } = server.address();
Expand All @@ -78,10 +111,33 @@ server.listen(async () => {
await new Promise((resolve) => {
invalidRequest.on('error', resolve);
});
http.get(`http://localhost:${port}`, (res) => {
res.resume();
res.on('end', () => {
server.close();
await new Promise((resolve, reject) => {
http.get(`http://localhost:${port}`, (res) => {
res.setEncoding('utf8');
res.resume();
res.on('end', resolve);
}).on('error', reject);
});
await new Promise((resolve, reject) => {
const req = http.request(`http://localhost:${port}/string-body`, {
method: 'POST',
}, (res) => {
res.resume();
res.on('end', resolve);
});
req.on('error', reject);
req.write('foo');
req.end('bar');
});
await new Promise((resolve, reject) => {
const req = http.request(`http://localhost:${port}/binary-body`, {
method: 'POST',
}, (res) => {
res.resume();
res.on('end', resolve);
});
req.on('error', reject);
req.end(Buffer.from([0, 1, 2, 3]));
});
server.close();
});
Loading
Loading