Skip to content

Commit e39b40e

Browse files
committed
stream: updates to stream/new impl
Refactors the cancelation per updates in the design doc
1 parent 78eb337 commit e39b40e

14 files changed

Lines changed: 680 additions & 155 deletions

doc/api/fs.md

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -928,11 +928,26 @@ added: REPLACEME
928928
* `start` {number} Byte offset to start writing at. **Default:** current
929929
position (append).
930930
* Returns: {Object}
931-
* `write(chunk)` {Function} Returns {Promise\<void>}.
932-
* `writev(chunks)` {Function} Returns {Promise\<void>}. Uses scatter/gather
933-
I/O via a single `writev()` syscall.
934-
* `end()` {Function} Returns {Promise\<number>} total bytes written.
935-
* `abort(reason)` {Function} Returns {Promise\<void>}.
931+
* `write(chunk[, options])` {Function} Returns {Promise\<void>}.
932+
* `chunk` {Buffer|TypedArray|DataView}
933+
* `options` {Object}
934+
* `signal` {AbortSignal} If the signal is already aborted, the write
935+
rejects with `AbortError` without performing I/O.
936+
* `writev(chunks[, options])` {Function} Returns {Promise\<void>}. Uses
937+
scatter/gather I/O via a single `writev()` syscall.
938+
* `chunks` {Buffer\[]|TypedArray\[]|DataView\[]}
939+
* `options` {Object}
940+
* `signal` {AbortSignal} If the signal is already aborted, the write
941+
rejects with `AbortError` without performing I/O.
942+
* `end([options])` {Function} Returns {Promise\<number>} total bytes written.
943+
* `options` {Object}
944+
* `signal` {AbortSignal} If the signal is already aborted, `end()`
945+
rejects with `AbortError` and the writer remains open.
946+
* `fail(reason)` {Function} Returns {Promise\<void>}. Puts the writer
947+
into a terminal error state.
948+
* `failSync(reason)` {Function} Returns {boolean}. Synchronous best-effort
949+
cleanup. Marks the writer as closed so subsequent writes fail immediately.
950+
Cannot honor `autoClose` (requires async I/O).
936951
937952
Return a [`node:stream/new`][] writer backed by this file handle.
938953

doc/api/stream_new.md

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -111,14 +111,22 @@ async function run() {
111111

112112
Transforms come in two forms:
113113

114-
* **Stateless** -- a function `(chunks) => result` called once per batch.
115-
Receives `Uint8Array[]` (or `null` as the flush signal). Returns
116-
`Uint8Array[]`, `null`, or an iterable of chunks.
114+
* **Stateless** -- a function `(chunks, options) => result` called once per
115+
batch. Receives `Uint8Array[]` (or `null` as the flush signal) and an
116+
`options` object. Returns `Uint8Array[]`, `null`, or an iterable of chunks.
117117

118-
* **Stateful** -- an object `{ transform(source) }` where `transform` is a
119-
generator (sync or async) that receives the entire upstream iterable and
120-
yields output. This form is used for compression, encryption, and any
121-
transform that needs to buffer across batches.
118+
* **Stateful** -- an object `{ transform(source, options) }` where `transform`
119+
is a generator (sync or async) that receives the entire upstream iterable
120+
and an `options` object, and yields output. This form is used for
121+
compression, encryption, and any transform that needs to buffer across
122+
batches.
123+
124+
Both forms receive an `options` parameter with the following property:
125+
126+
* `options.signal` {AbortSignal} An AbortSignal that fires when the pipeline
127+
is cancelled, encounters an error, or the consumer stops reading. Transforms
128+
can check `signal.aborted` or listen for the `'abort'` event to perform
129+
early cleanup.
122130

123131
The flush signal (`null`) is sent after the source ends, giving transforms
124132
a chance to emit trailing data (e.g., compression footers).
@@ -169,7 +177,7 @@ The API supports two models:
169177

170178
A writer is any object with a `write(chunk)` method. Writers optionally
171179
support `writev(chunks)` for batch writes (mapped to scatter/gather I/O where
172-
available), `end()` to signal completion, and `abort(reason)` to signal
180+
available), `end()` to signal completion, and `fail(reason)` to signal
173181
failure.
174182

175183
## `require('node:stream/new')`
@@ -269,7 +277,7 @@ added: REPLACEME
269277
* `signal` {AbortSignal} Abort the pipeline.
270278
* `preventClose` {boolean} If `true`, do not call `writer.end()` when
271279
the source ends. **Default:** `false`.
272-
* `preventAbort` {boolean} If `true`, do not call `writer.abort()` on
280+
* `preventFail` {boolean} If `true`, do not call `writer.fail()` on
273281
error. **Default:** `false`.
274282
* Returns: {Promise\<number>} Total bytes written.
275283

@@ -316,7 +324,7 @@ added: REPLACEME
316324
* `writer` {Object} Destination with `write(chunk)` method.
317325
* `options` {Object}
318326
* `preventClose` {boolean} **Default:** `false`.
319-
* `preventAbort` {boolean} **Default:** `false`.
327+
* `preventFail` {boolean} **Default:** `false`.
320328
* Returns: {number} Total bytes written.
321329

322330
Synchronous version of [`pipeTo()`][].
@@ -451,12 +459,18 @@ run().catch(console.error);
451459

452460
The writer returned by `push()` has the following methods:
453461

454-
##### `writer.abort(reason)`
462+
##### `writer.fail(reason)`
455463

456464
* `reason` {Error}
457465
* Returns: {Promise\<void>}
458466

459-
Abort the stream with an error.
467+
Fail the stream with an error.
468+
469+
##### `writer.failSync(reason)`
470+
471+
* `reason` {Error}
472+
473+
Synchronously fail the stream with an error. Does not return a promise.
460474

461475
##### `writer.desiredSize`
462476

@@ -465,15 +479,21 @@ Abort the stream with an error.
465479
The number of buffer slots available before the high water mark is reached.
466480
Returns `null` if the writer is closed or the consumer has disconnected.
467481

468-
##### `writer.end()`
482+
##### `writer.end([options])`
469483

484+
* `options` {Object}
485+
* `signal` {AbortSignal} Cancel just this operation. The signal cancels only
486+
the pending `end()` call; it does not fail the writer itself.
470487
* Returns: {Promise\<number>} Total bytes written.
471488

472489
Signal that no more data will be written.
473490

474-
##### `writer.write(chunk)`
491+
##### `writer.write(chunk[, options])`
475492

476493
* `chunk` {Uint8Array|string}
494+
* `options` {Object}
495+
* `signal` {AbortSignal} Cancel just this write operation. The signal cancels
496+
only the pending `write()` call; it does not fail the writer itself.
477497
* Returns: {Promise\<void>}
478498

479499
Write a chunk. The promise resolves when buffer space is available.
@@ -486,9 +506,12 @@ Write a chunk. The promise resolves when buffer space is available.
486506

487507
Synchronous write. Does not block; returns `false` if backpressure is active.
488508

489-
##### `writer.writev(chunks)`
509+
##### `writer.writev(chunks[, options])`
490510

491511
* `chunks` {Uint8Array\[]|string\[]}
512+
* `options` {Object}
513+
* `signal` {AbortSignal} Cancel just this write operation. The signal cancels
514+
only the pending `writev()` call; it does not fail the writer itself.
492515
* Returns: {Promise\<void>}
493516

494517
Write multiple chunks as a single batch.

lib/internal/fs/promises.js

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ class FileHandle extends EventEmitter {
458458
* autoClose?: boolean;
459459
* start?: number;
460460
* }} [options]
461-
* @returns {{ write, writev, end, abort }}
461+
* @returns {{ write, writev, end, fail, failSync }}
462462
*/
463463
writer(options) {
464464
if (this[kFd] === -1)
@@ -558,21 +558,29 @@ class FileHandle extends EventEmitter {
558558
}
559559

560560
return {
561-
write(chunk) {
561+
write(chunk, options) {
562562
if (closed) {
563563
return PromiseReject(
564564
new ERR_INVALID_STATE('The writer is closed'));
565565
}
566+
if (options?.signal?.aborted) {
567+
return PromiseReject(
568+
new AbortError(undefined, { cause: options.signal.reason }));
569+
}
566570
const position = pos;
567571
if (pos >= 0) pos += chunk.byteLength;
568572
return writeAll(chunk, 0, chunk.byteLength, position);
569573
},
570574

571-
writev(chunks) {
575+
writev(chunks, options) {
572576
if (closed) {
573577
return PromiseReject(
574578
new ERR_INVALID_STATE('The writer is closed'));
575579
}
580+
if (options?.signal?.aborted) {
581+
return PromiseReject(
582+
new AbortError(undefined, { cause: options.signal.reason }));
583+
}
576584
const position = pos;
577585
if (pos >= 0) {
578586
for (let i = 0; i < chunks.length; i++) {
@@ -582,15 +590,29 @@ class FileHandle extends EventEmitter {
582590
return writevAll(chunks, position);
583591
},
584592

585-
async end() {
593+
async end(options) {
594+
if (options?.signal?.aborted) {
595+
throw new AbortError(undefined, { cause: options.signal.reason });
596+
}
586597
await cleanup();
587598
return totalBytesWritten;
588599
},
589600

590-
async abort(reason) {
601+
async fail(reason) {
591602
await cleanup();
592603
},
593604

605+
failSync(reason) {
606+
// Synchronous cleanup is best-effort for file handles.
607+
// Mark as closed so subsequent writes fail immediately.
608+
if (closed) return true;
609+
closed = true;
610+
handle[kLocked] = false;
611+
handle[kUnref]();
612+
// autoClose cannot be handled synchronously - skip it.
613+
return true;
614+
},
615+
594616
async [SymbolAsyncDispose]() {
595617
await cleanup();
596618
},

0 commit comments

Comments
 (0)