Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 22 additions & 27 deletions lib/internal/webstreams/adapters.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@

const {
ArrayPrototypeFilter,
ArrayPrototypeMap,
Boolean,
ObjectEntries,
ObjectKeys,
PromisePrototypeThen,
PromiseResolve,
PromiseWithResolvers,
SafePromiseAll,
SafePromiseAllReturnVoid,
SafePromisePrototypeFinally,
SafeSet,
StringPrototypeStartsWith,
Expand Down Expand Up @@ -74,6 +72,7 @@ const {
getDeprecationWarningEmitter,
kEmptyObject,
normalizeEncoding,
setOwnProperty,
} = require('internal/util');

const {
Expand All @@ -93,6 +92,7 @@ const {

const { eos } = require('internal/streams/end-of-stream');

const { zlib } = internalBinding('constants');
const { UV_EOF } = internalBinding('uv');

const encoder = new TextEncoder();
Expand All @@ -101,37 +101,34 @@ const kValidateChunk = Symbol('kValidateChunk');
const kDestroyOnSyncError = Symbol('kDestroyOnSyncError');

// Collect all negative (error) ZLIB codes and Z_NEED_DICT
const ZLIB_FAILURES = new SafeSet([
...ArrayPrototypeFilter(
ArrayPrototypeMap(
ObjectEntries(internalBinding('constants').zlib),
({ 0: code, 1: value }) => (value < 0 ? code : null),
),
Boolean,
const ZLIB_FAILURES = new SafeSet(
ArrayPrototypeFilter(
ObjectKeys(zlib),
(code) => code === 'Z_NEED_DICT' || zlib[code] < 0,
),
'Z_NEED_DICT',
]);
);

/**
* @param {Error|null} cause
* @returns {Error|null}
*/
function handleKnownInternalErrors(cause) {
const causeCode = cause?.code;
switch (true) {
case cause?.code === 'ERR_STREAM_PREMATURE_CLOSE': {
case causeCode === 'ERR_STREAM_PREMATURE_CLOSE': {
return new AbortError(undefined, { cause });
}
case ZLIB_FAILURES.has(cause?.code):
case ZLIB_FAILURES.has(causeCode):
// Brotli decoder error codes are formatted as 'ERR_' +
// BrotliDecoderErrorString(), where the latter returns strings like
// '_ERROR_FORMAT_...', '_ERROR_ALLOC_...', '_ERROR_UNREACHABLE', etc.
// The resulting JS error codes all start with 'ERR__ERROR_'.
// Falls through
case cause?.code != null &&
StringPrototypeStartsWith(cause.code, 'ERR__ERROR_'): {
case causeCode != null &&
StringPrototypeStartsWith(causeCode, 'ERR__ERROR_'): {
// eslint-disable-next-line no-restricted-syntax
const error = new TypeError(undefined, { cause });
error.code = cause.code;
setOwnProperty(error, 'code', causeCode);
return error;
}
default:
Expand Down Expand Up @@ -190,8 +187,7 @@ function newWritableStreamFromStreamWritable(streamWritable, options = kEmptyObj
let closed;

function onDrain() {
if (backpressurePromise !== undefined)
backpressurePromise.resolve();
backpressurePromise?.resolve();
}

const cleanup = eos(streamWritable, (error) => {
Expand All @@ -202,8 +198,7 @@ function newWritableStreamFromStreamWritable(streamWritable, options = kEmptyObj
// that happen to emit an error event again after finished is called.
streamWritable.on('error', () => {});
if (error != null) {
if (backpressurePromise !== undefined)
backpressurePromise.reject(error);
backpressurePromise?.reject(error);
// If closed is not undefined, the error is happening
// after the WritableStream close has already started.
// We need to reject it here.
Expand Down Expand Up @@ -330,10 +325,10 @@ function newStreamWritableFromWritableStream(writableStream, options = kEmptyObj
writer.ready,
() => {
return PromisePrototypeThen(
SafePromiseAll(
SafePromiseAllReturnVoid(
chunks,
(data) => writer.write(data.chunk)),
() => done(),
done,
done);
},
done);
Expand Down Expand Up @@ -802,10 +797,10 @@ function newStreamDuplexFromReadableWritablePair(pair = kEmptyObject, options =
writer.ready,
() => {
return PromisePrototypeThen(
SafePromiseAll(
SafePromiseAllReturnVoid(
chunks,
(data) => writer.write(data.chunk)),
() => done(),
done,
done);
},
done);
Expand Down Expand Up @@ -907,7 +902,7 @@ function newStreamDuplexFromReadableWritablePair(pair = kEmptyObject, options =

if (!writableClosed || !readableClosed) {
PromisePrototypeThen(
SafePromiseAll([
SafePromiseAllReturnVoid([
closeWriter(),
closeReader(),
]),
Expand Down
Loading