Skip to content

Commit 83355dd

Browse files
committed
stream: apply spec conformance fixes (part 1)
1 parent ecf1053 commit 83355dd

12 files changed

Lines changed: 459 additions & 209 deletions

doc/api/fs.md

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -946,11 +946,8 @@ added: REPLACEME
946946
* `options` {Object}
947947
* `signal` {AbortSignal} If the signal is already aborted, `end()`
948948
rejects with `AbortError` and the writer remains open.
949-
* `fail(reason)` {Function} Returns {Promise\<void>}. Puts the writer
950-
into a terminal error state.
951-
* `failSync(reason)` {Function} Returns {boolean}. Synchronous best-effort
952-
cleanup. Marks the writer as closed so subsequent writes fail immediately.
953-
Cannot honor `autoClose` (requires async I/O).
949+
* `fail(reason)` {Function} Puts the writer into a terminal error state.
950+
Synchronous. If the writer is already closed or errored, this is a no-op.
954951
955952
Return a [`node:stream/iter`][] writer backed by this file handle.
956953

doc/api/stream_iter.md

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ complete:
409409
if (!writer.writeSync(chunk)) await writer.write(chunk);
410410
if (!writer.writevSync(chunks)) await writer.writev(chunks);
411411
if (writer.endSync() < 0) await writer.end();
412-
if (!writer.failSync(err)) await writer.fail(err);
412+
writer.fail(err); // Always synchronous, no fallback needed
413413
```
414414

415415
### `writer.desiredSize`
@@ -446,18 +446,12 @@ if (result < 0) {
446446

447447
### `writer.fail(reason)`
448448

449-
* `reason` {Error}
450-
* Returns: {Promise\<void>}
451-
452-
Fail the stream with an error.
453-
454-
### `writer.failSync(reason)`
455-
456-
* `reason` {Error}
457-
* Returns: {boolean} `true` if the writer was failed, `false` if already
458-
errored.
449+
* `reason` {any}
459450

460-
Synchronous variant of `writer.fail()`.
451+
Put the writer into a terminal error state. If the writer is already closed
452+
or errored, this is a no-op. Unlike `write()` and `end()`, `fail()` is
453+
unconditionally synchronous because failing a writer is a pure state
454+
transition with no async work to perform.
461455

462456
### `writer.write(chunk[, options])`
463457

@@ -604,10 +598,10 @@ Pipe a source through transforms into a writer. If the writer has a
604598
scatter/gather I/O).
605599

606600
If the writer implements the optional `*Sync` methods (`writeSync`, `writevSync`,
607-
`endSync`, `failSync`), `pipeTo()` will attempt to use the synchronous methods
601+
`endSync`), `pipeTo()` will attempt to use the synchronous methods
608602
first as a fast path, and fall back to the async versions only when the sync
609603
methods indicate they cannot complete (e.g., backpressure or waiting for the
610-
next tick).
604+
next tick). `fail()` is always called synchronously.
611605

612606
```mjs
613607
import { from, pipeTo, compressGzip } from 'node:stream/iter';
@@ -655,7 +649,7 @@ Synchronous version of [`pipeTo()`][]. The `source`, all transforms, and the
655649
`writer` must be synchronous. Cannot accept async iterables or promises.
656650

657651
The `writer` must have the `*Sync` methods (`writeSync`, `writevSync`,
658-
`endSync`, `failSync`) for this to work.
652+
`endSync`) and `fail()` for this to work.
659653

660654
### `pull(source[, ...transforms][, options])`
661655

lib/internal/fs/promises.js

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ if (getOptionValue('--experimental-stream-iter')) {
541541
* autoClose?: boolean;
542542
* start?: number;
543543
* }} [options]
544-
* @returns {{ write, writev, end, fail, failSync }}
544+
* @returns {{ write, writev, end, fail }}
545545
*/
546546
FileHandle.prototype.writer = function writer(options) {
547547
if (this[kFd] === -1)
@@ -558,6 +558,7 @@ if (getOptionValue('--experimental-stream-iter')) {
558558
let pos = options?.start ?? -1;
559559
let totalBytesWritten = 0;
560560
let closed = false;
561+
let error = null;
561562

562563
if (pos !== -1) {
563564
validateInteger(pos, 'options.start', 0);
@@ -643,9 +644,12 @@ if (getOptionValue('--experimental-stream-iter')) {
643644
return {
644645
__proto__: null,
645646
write(chunk, options) {
647+
if (error) {
648+
return PromiseReject(error);
649+
}
646650
if (closed) {
647651
return PromiseReject(
648-
new ERR_INVALID_STATE('The writer is closed'));
652+
new ERR_INVALID_STATE.TypeError('The writer is closed'));
649653
}
650654
if (options?.signal?.aborted) {
651655
return PromiseReject(
@@ -657,9 +661,12 @@ if (getOptionValue('--experimental-stream-iter')) {
657661
},
658662

659663
writev(chunks, options) {
664+
if (error) {
665+
return PromiseReject(error);
666+
}
660667
if (closed) {
661668
return PromiseReject(
662-
new ERR_INVALID_STATE('The writer is closed'));
669+
new ERR_INVALID_STATE.TypeError('The writer is closed'));
663670
}
664671
if (options?.signal?.aborted) {
665672
return PromiseReject(
@@ -682,19 +689,12 @@ if (getOptionValue('--experimental-stream-iter')) {
682689
return totalBytesWritten;
683690
},
684691

685-
async fail(reason) {
686-
await cleanup();
687-
},
688-
689-
failSync(reason) {
690-
// Synchronous cleanup is best-effort for file handles.
691-
// Mark as closed so subsequent writes fail immediately.
692-
if (closed) return true;
692+
fail(reason) {
693+
if (closed || error) return;
694+
error = reason ?? new ERR_INVALID_STATE('Failed');
693695
closed = true;
694696
handle[kLocked] = false;
695697
handle[kUnref]();
696-
// autoClose cannot be handled synchronously - skip it.
697-
return true;
698698
},
699699

700700
async [SymbolAsyncDispose]() {

lib/internal/streams/iter/broadcast.js

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ const {
2121
SafeSet,
2222
String,
2323
Symbol,
24-
SymbolAsyncIterator,
25-
SymbolDispose,
24+
SymbolAsyncDispose,
25+
SymbolAsyncIterator,
26+
SymbolDispose,
2627
TypedArrayPrototypeGetByteLength,
2728
} = primordials;
2829

@@ -565,26 +566,23 @@ class BroadcastWriter {
565566
return this.#totalBytes;
566567
}
567568

568-
fail(reason) {
569-
if (this.#aborted) return kResolvedPromise;
570-
this.#aborted = true;
571-
this.#closed = true;
572-
const error = reason ?? new ERR_INVALID_STATE('Failed');
573-
this.#rejectPendingWrites(error);
574-
this.#rejectPendingDrains(error);
575-
this.#broadcast[kAbort](error);
576-
return kResolvedPromise;
569+
fail(reason) {
570+
if (this.#aborted || this.#closed) return;
571+
this.#aborted = true;
572+
this.#closed = true;
573+
const error = reason ?? new ERR_INVALID_STATE('Failed');
574+
this.#rejectPendingWrites(error);
575+
this.#rejectPendingDrains(error);
576+
this.#broadcast[kAbort](error);
577+
}
578+
579+
[SymbolAsyncDispose]() {
580+
this.fail();
581+
return PromiseResolve();
577582
}
578583

579-
failSync(reason) {
580-
if (this.#aborted) return true;
581-
this.#aborted = true;
582-
this.#closed = true;
583-
const error = reason ?? new ERR_INVALID_STATE('Failed');
584-
this.#rejectPendingWrites(error);
585-
this.#rejectPendingDrains(error);
586-
this.#broadcast[kAbort](error);
587-
return true;
584+
[SymbolDispose]() {
585+
this.fail();
588586
}
589587

590588
[kCancelWriter]() {
@@ -778,9 +776,7 @@ const Broadcast = {
778776
} catch (error) {
779777
const err = isError(error) ? error :
780778
new ERR_OPERATION_FAILED(String(error));
781-
if (!w.failSync(err)) {
782-
await w.fail(err);
783-
}
779+
w.fail(err);
784780
}
785781
};
786782
PromisePrototypeThen(pump(), undefined, () => {});

lib/internal/streams/iter/pull.js

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -588,9 +588,8 @@ function pipeToSync(source, ...args) {
588588
const hasWriteSync = typeof writer.writeSync === 'function';
589589
const hasWritevSync = typeof writer.writevSync === 'function';
590590
const hasEndSync = typeof writer.endSync === 'function';
591-
const hasFailSync = typeof writer.failSync === 'function';
592591

593-
try {
592+
try {
594593
for (const batch of pipeline) {
595594
if (hasWritevSync && batch.length > 1) {
596595
writer.writevSync(batch);
@@ -616,17 +615,15 @@ function pipeToSync(source, ...args) {
616615
}
617616
}
618617
} catch (error) {
619-
if (!options?.preventFail) {
620-
const err = isError(error) ? error :
621-
new ERR_OPERATION_FAILED(String(error));
622-
if (!hasFailSync || !writer.failSync(err)) {
623-
writer.fail?.(err);
624-
}
625-
}
626-
throw error;
627-
}
628-
629-
return totalBytes;
618+
if (!options?.preventFail) {
619+
const err = isError(error) ? error :
620+
new ERR_OPERATION_FAILED(String(error));
621+
writer.fail(err);
622+
}
623+
throw error;
624+
}
625+
626+
return totalBytes;
630627
}
631628

632629
/**
@@ -656,8 +653,7 @@ async function pipeTo(source, ...args) {
656653
const hasWriteSync = typeof writer.writeSync === 'function';
657654
const hasWritevSync = typeof writer.writevSync === 'function';
658655
const hasEndSync = typeof writer.endSync === 'function';
659-
const hasFailSync = typeof writer.failSync === 'function';
660-
// Write a batch using try-fallback: sync first, async if needed.
656+
// Write a batch using try-fallback: sync first, async if needed.
661657
async function writeBatch(batch) {
662658
if (hasWritev && batch.length > 1) {
663659
if (!hasWritevSync || !writer.writevSync(batch)) {
@@ -727,17 +723,15 @@ async function pipeTo(source, ...args) {
727723
}
728724
}
729725
} catch (error) {
730-
if (!options?.preventFail) {
731-
const err = isError(error) ? error :
732-
new ERR_OPERATION_FAILED(String(error));
733-
if (!hasFailSync || !writer.failSync(err)) {
734-
await writer.fail?.(err);
735-
}
736-
}
737-
throw error;
738-
}
739-
740-
return totalBytes;
726+
if (!options?.preventFail) {
727+
const err = isError(error) ? error :
728+
new ERR_OPERATION_FAILED(String(error));
729+
writer.fail(err);
730+
}
731+
throw error;
732+
}
733+
734+
return totalBytes;
741735
}
742736

743737
module.exports = {

0 commit comments

Comments
 (0)