Skip to content

Commit 3d5bfbb

Browse files
committed
stream: let Readable.toWeb reuse eos immediate path
1 parent 7c9c069 commit 3d5bfbb

2 files changed

Lines changed: 134 additions & 143 deletions

File tree

lib/internal/streams/end-of-stream.js

Lines changed: 90 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@ function bindAsyncResource(fn, type) {
6767
};
6868
}
6969

70-
const kEosImmediateClose = Symbol('kEosImmediateClose');
71-
7270
/**
7371
* Returns the current stream error tracked by eos(), if any.
7472
* @param {import('stream').Stream} stream
@@ -87,7 +85,7 @@ function getEosErrored(stream) {
8785
* @param {boolean | null} readableFinished
8886
* @param {boolean} writable
8987
* @param {boolean | null} writableFinished
90-
* @returns {Error | typeof kEosImmediateClose}
88+
* @returns {Error | null}
9189
*/
9290
function getEosOnCloseError(stream, readable, readableFinished, writable, writableFinished) {
9391
const errored = getEosErrored(stream);
@@ -106,56 +104,57 @@ function getEosOnCloseError(stream, readable, readableFinished, writable, writab
106104
}
107105
}
108106

109-
return kEosImmediateClose;
107+
return null;
110108
}
111109

112-
function getEosInitialState(stream, options = kEmptyObject) {
110+
const kEosNodeSyncronousCallback = Symbol('kEosNodeSynchronousCallback');
111+
112+
function eos(stream, options, callback) {
113+
if (arguments.length === 2) {
114+
callback = options;
115+
options = kEmptyObject;
116+
} else if (options == null) {
117+
options = kEmptyObject;
118+
} else {
119+
validateObject(options, 'options');
120+
}
121+
validateFunction(callback, 'callback');
122+
validateAbortSignal(options.signal, 'options.signal');
123+
124+
if (isReadableStream(stream) || isWritableStream(stream)) {
125+
return eosWeb(stream, options, callback);
126+
}
127+
128+
if (!isNodeStream(stream)) {
129+
throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream);
130+
}
131+
113132
const readable = options.readable ?? isReadableNodeStream(stream);
114133
const writable = options.writable ?? isWritableNodeStream(stream);
115134

116135
// TODO (ronag): Improve soft detection to include core modules and
117136
// common ecosystem modules that do properly emit 'close' but fail
118137
// this generic check.
119-
const willEmitClose = (
138+
let willEmitClose = (
120139
_willEmitClose(stream) &&
121140
isReadableNodeStream(stream) === readable &&
122141
isWritableNodeStream(stream) === writable
123142
);
143+
let writableFinished = isWritableFinished(stream, false);
144+
let readableFinished = isReadableFinished(stream, false);
124145

125-
return {
126-
readable,
127-
writable,
128-
willEmitClose,
129-
writableFinished: isWritableFinished(stream, false),
130-
readableFinished: isReadableFinished(stream, false),
131-
closed: isClosed(stream),
132-
};
133-
}
134-
135-
/**
136-
* Classifies whether eos() can synchronously determine the result for the
137-
* current stream snapshot, or if it must defer to future events.
138-
* @param {import('stream').Stream} stream
139-
* @param {object} [options]
140-
* @param {boolean} [options.readable]
141-
* @param {boolean} [options.writable]
142-
* @param {ReturnType<typeof getEosInitialState>} [state]
143-
* @returns {Error | typeof kEosImmediateClose | null}
144-
*/
145-
function getEosImmediateResult(stream, options = kEmptyObject, state = getEosInitialState(stream, options)) {
146-
const {
147-
readable,
148-
writable,
149-
willEmitClose,
150-
writableFinished,
151-
readableFinished,
152-
closed,
153-
} = state;
154146
const wState = stream._writableState;
155147
const rState = stream._readableState;
156148

157-
if (closed) {
158-
return getEosOnCloseError(
149+
/**
150+
* @type {Error | null | undefined}
151+
* undefined: to be determined
152+
* null: no error
153+
* Error: an error occurred
154+
*/
155+
let immediateResult;
156+
if (isClosed(stream)) {
157+
immediateResult = getEosOnCloseError(
159158
stream,
160159
readable,
161160
readableFinished,
@@ -164,39 +163,57 @@ function getEosImmediateResult(stream, options = kEmptyObject, state = getEosIni
164163
);
165164
} else if (wState?.errorEmitted || rState?.errorEmitted) {
166165
if (!willEmitClose) {
167-
return getEosErrored(stream) ?? kEosImmediateClose;
166+
immediateResult = getEosErrored(stream);
168167
}
169168
} else if (
170169
!readable &&
171170
(!willEmitClose || isReadable(stream)) &&
172171
(writableFinished || isWritable(stream) === false) &&
173172
(wState == null || wState.pendingcb === undefined || wState.pendingcb === 0)
174173
) {
175-
return getEosErrored(stream) ?? kEosImmediateClose;
174+
immediateResult = getEosErrored(stream);
176175
} else if (
177176
!writable &&
178177
(!willEmitClose || isWritable(stream)) &&
179178
(readableFinished || isReadable(stream) === false)
180179
) {
181-
return getEosErrored(stream) ?? kEosImmediateClose;
180+
immediateResult = getEosErrored(stream);
182181
} else if ((rState && stream.req && stream.aborted)) {
183-
return getEosErrored(stream) ?? kEosImmediateClose;
182+
immediateResult = getEosErrored(stream);
183+
}
184+
const returnImmediately = (result) => {
185+
const args = result === null ? [] : [result];
186+
if (options[kEosNodeSyncronousCallback]) {
187+
callback.call(stream, ...args);
188+
} else {
189+
if (AsyncContextFrame.current() || enabledHooksExist()) {
190+
// Avoid AsyncResource.bind() because it calls ObjectDefineProperties which
191+
// is a bottleneck here.
192+
callback = bindAsyncResource(callback, 'STREAM_END_OF_STREAM');
193+
}
194+
process.nextTick(() => callback.call(stream, ...args));
195+
}
196+
};
197+
let cleanup = () => {
198+
callback = nop;
199+
};
200+
if (immediateResult !== undefined) {
201+
if (options.error !== false) {
202+
const onerror = () => {};
203+
stream.on('error', onerror);
204+
cleanup = () => {
205+
callback = nop;
206+
stream.removeListener('error', onerror);
207+
};
208+
}
209+
returnImmediately(immediateResult);
210+
return cleanup;
184211
}
185212

186-
return null;
187-
}
188-
189-
function eos(stream, options, callback) {
190-
if (arguments.length === 2) {
191-
callback = options;
192-
options = kEmptyObject;
193-
} else if (options == null) {
194-
options = kEmptyObject;
195-
} else {
196-
validateObject(options, 'options');
213+
if (options.signal?.aborted) {
214+
returnImmediately(new AbortError(undefined, { cause: options.signal.reason }));
215+
return cleanup;
197216
}
198-
validateFunction(callback, 'callback');
199-
validateAbortSignal(options.signal, 'options.signal');
200217

201218
if (AsyncContextFrame.current() || enabledHooksExist()) {
202219
// Avoid AsyncResource.bind() because it calls ObjectDefineProperties which
@@ -206,30 +223,12 @@ function eos(stream, options, callback) {
206223
callback = once(callback);
207224
}
208225

209-
if (isReadableStream(stream) || isWritableStream(stream)) {
210-
return eosWeb(stream, options, callback);
211-
}
212-
213-
if (!isNodeStream(stream)) {
214-
throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream);
215-
}
216-
217-
const eosState = getEosInitialState(stream, options);
218-
const wState = stream._writableState;
219-
220226
const onlegacyfinish = () => {
221227
if (!stream.writable) {
222228
onfinish();
223229
}
224230
};
225231

226-
const { readable, writable } = eosState;
227-
let {
228-
willEmitClose,
229-
writableFinished,
230-
readableFinished,
231-
closed,
232-
} = eosState;
233232
const onfinish = () => {
234233
writableFinished = true;
235234
// Stream should not be destroyed here. If it is that
@@ -271,10 +270,8 @@ function eos(stream, options, callback) {
271270
};
272271

273272
const onclose = () => {
274-
closed = true;
275-
276273
const error = getEosOnCloseError(stream, readable, readableFinished, writable, writableFinished);
277-
if (error === kEosImmediateClose) {
274+
if (error === null) {
278275
callback.call(stream);
279276
} else {
280277
callback.call(stream, error);
@@ -312,16 +309,7 @@ function eos(stream, options, callback) {
312309
}
313310
stream.on('close', onclose);
314311

315-
const immediateResult = getEosImmediateResult(stream, options, eosState);
316-
if (immediateResult !== null) {
317-
if (immediateResult === kEosImmediateClose) {
318-
process.nextTick(() => callback.call(stream));
319-
} else {
320-
process.nextTick(() => callback.call(stream, immediateResult));
321-
}
322-
}
323-
324-
const cleanup = () => {
312+
cleanup = () => {
325313
callback = nop;
326314
stream.removeListener('aborted', onclose);
327315
stream.removeListener('complete', onfinish);
@@ -336,7 +324,7 @@ function eos(stream, options, callback) {
336324
stream.removeListener('close', onclose);
337325
};
338326

339-
if (options.signal && !closed) {
327+
if (options.signal) {
340328
const abort = () => {
341329
// Keep it because cleanup removes it.
342330
const endCallback = callback;
@@ -345,23 +333,27 @@ function eos(stream, options, callback) {
345333
stream,
346334
new AbortError(undefined, { cause: options.signal.reason }));
347335
};
348-
if (options.signal.aborted) {
349-
process.nextTick(abort);
350-
} else {
351-
addAbortListener ??= require('internal/events/abort_listener').addAbortListener;
352-
const disposable = addAbortListener(options.signal, abort);
353-
const originalCallback = callback;
354-
callback = once((...args) => {
355-
disposable[SymbolDispose]();
356-
ReflectApply(originalCallback, stream, args);
357-
});
358-
}
336+
addAbortListener ??= require('internal/events/abort_listener').addAbortListener;
337+
const disposable = addAbortListener(options.signal, abort);
338+
const originalCallback = callback;
339+
callback = once((...args) => {
340+
disposable[SymbolDispose]();
341+
ReflectApply(originalCallback, stream, args);
342+
});
359343
}
360344

361345
return cleanup;
362346
}
363347

364348
function eosWeb(stream, options, callback) {
349+
if (AsyncContextFrame.current() || enabledHooksExist()) {
350+
// Avoid AsyncResource.bind() because it calls ObjectDefineProperties which
351+
// is a bottleneck here.
352+
callback = once(bindAsyncResource(callback, 'STREAM_END_OF_STREAM'));
353+
} else {
354+
callback = once(callback);
355+
}
356+
365357
let isAborted = false;
366358
let abort = nop;
367359
if (options.signal) {
@@ -420,6 +412,5 @@ function finished(stream, opts) {
420412
module.exports = {
421413
eos,
422414
finished,
423-
getEosImmediateResult,
424-
kEosImmediateClose,
415+
kEosNodeSyncronousCallback,
425416
};

0 commit comments

Comments
 (0)