Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/internal/streams/add-abort-signal.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const {
const {
isNodeStream,
isWebStream,
kControllerErrorFunction,
kControllerAbortFunction,
} = require('internal/streams/utils');

const { eos } = require('internal/streams/end-of-stream');
Expand Down Expand Up @@ -47,7 +47,7 @@ module.exports.addAbortSignalNoValidate = function(signal, stream) {
stream.destroy(new AbortError(undefined, { cause: signal.reason }));
} :
() => {
stream[kControllerErrorFunction](new AbortError(undefined, { cause: signal.reason }));
stream[kControllerAbortFunction](new AbortError(undefined, { cause: signal.reason }));
};
if (signal.aborted) {
onAbort();
Expand Down
4 changes: 2 additions & 2 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const kIsDisturbed = SymbolFor('nodejs.stream.disturbed');
const kOnConstructed = Symbol('kOnConstructed');

const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise');
const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction');
const kControllerAbortFunction = SymbolFor('nodejs.webstream.controllerAbortFunction');

const kState = Symbol('kState');
const kObjectMode = 1 << 0;
Expand Down Expand Up @@ -326,7 +326,7 @@ module.exports = {
isReadable,
kIsReadable,
kIsClosedPromise,
kControllerErrorFunction,
kControllerAbortFunction,
kIsWritable,
isClosed,
isDuplexNodeStream,
Expand Down
33 changes: 21 additions & 12 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ const {
kIsErrored,
kIsReadable,
kIsClosedPromise,
kControllerErrorFunction,
kControllerAbortFunction,
} = require('internal/streams/utils');

const {
Expand Down Expand Up @@ -254,7 +254,6 @@ class ReadableStream {
this[kState] = createReadableStreamState();

this[kIsClosedPromise] = PromiseWithResolvers();
this[kControllerErrorFunction] = () => {};

// The spec requires handling of the strategy first
// here. Specifically, if getting the size and
Expand Down Expand Up @@ -284,6 +283,13 @@ class ReadableStream {
}
}

[kControllerAbortFunction](error) {
if (!isReadableStream(this))
throw new ERR_INVALID_THIS('ReadableStream');
if (this[kState].state === 'readable')
setPromiseHandled(readableStreamCancel(this, error, 'errored'));
}

get [kIsDisturbed]() {
return this[kState].disturbed;
}
Expand Down Expand Up @@ -2077,22 +2083,26 @@ function isReadableStreamLocked(stream) {
return stream[kState].reader !== undefined;
}

function readableStreamCancel(stream, reason) {
function readableStreamCancel(stream, reason, finalState = 'closed') {
stream[kState].disturbed = true;
switch (stream[kState].state) {
case 'closed':
return PromiseResolve();
case 'errored':
return PromiseReject(stream[kState].storedError);
}
readableStreamClose(stream);
const {
reader,
} = stream[kState];
if (reader !== undefined && readableStreamHasBYOBReader(stream)) {
for (let n = 0; n < reader[kState].readIntoRequests.length; n++)
reader[kState].readIntoRequests[n][kClose]();
reader[kState].readIntoRequests = [];
if (finalState === 'errored') {
readableStreamError(stream, reason);
} else {
readableStreamClose(stream);
const {
reader,
} = stream[kState];
if (reader !== undefined && readableStreamHasBYOBReader(stream)) {
for (let n = 0; n < reader[kState].readIntoRequests.length; n++)
reader[kState].readIntoRequests[n][kClose]();
reader[kState].readIntoRequests = [];
}
}

return PromisePrototypeThen(
Expand Down Expand Up @@ -2540,7 +2550,6 @@ function setupReadableStreamDefaultController(
stream,
};
stream[kState].controller = controller;
stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller);

const startResult = startAlgorithm();

Expand Down
11 changes: 8 additions & 3 deletions lib/internal/webstreams/writablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ const {
kIsClosedPromise,
kIsErrored,
kIsWritable,
kControllerErrorFunction,
kControllerAbortFunction,
} = require('internal/streams/utils');

const {
Expand Down Expand Up @@ -173,7 +173,6 @@ class WritableStream {
this[kState] = createWritableStreamState();

this[kIsClosedPromise] = PromiseWithResolvers();
this[kControllerErrorFunction] = () => {};

const size = extractSizeAlgorithm(strategy?.size);
const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1);
Expand All @@ -185,6 +184,13 @@ class WritableStream {
size);
}

[kControllerAbortFunction](reason) {
if (!isWritableStream(this))
throw new ERR_INVALID_THIS('WritableStream');
if (this[kState].state === 'writable')
setPromiseHandled(writableStreamAbort(this, reason));
}

get [kIsErrored]() {
return this[kState].state === 'errored';
}
Expand Down Expand Up @@ -1322,7 +1328,6 @@ function setupWritableStreamDefaultController(
writeAlgorithm,
};
stream[kState].controller = controller;
stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller);

writableStreamUpdateBackpressure(
stream,
Expand Down
Loading