Skip to content

Commit 888fced

Browse files
committed
stream: add pull/writer methods to FileHandle conditionally
1 parent 23a822a commit 888fced

2 files changed

Lines changed: 94 additions & 83 deletions

File tree

doc/api/fs.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,9 @@ If transforms are provided, they are applied via [`stream/iter pull()`][].
400400
The file handle is locked while the iterable is being consumed and unlocked
401401
when iteration completes.
402402
403+
This function is only available when the `--experimental-stream-iter` flag is
404+
enabled.
405+
403406
```mjs
404407
import { open } from 'node:fs/promises';
405408
import { text, compressGzip } from 'node:stream/iter';
@@ -954,6 +957,9 @@ Return a [`node:stream/iter`][] writer backed by this file handle.
954957
The writer supports `Symbol.asyncDispose`, so it can be used with
955958
`await using`.
956959
960+
This function is only available when the `--experimental-stream-iter` flag is
961+
enabled.
962+
957963
```mjs
958964
import { open } from 'node:fs/promises';
959965
import { from, pipeTo, compressGzip } from 'node:stream/iter';

lib/internal/fs/promises.js

Lines changed: 88 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ const {
9595
isWindows,
9696
isMacOS,
9797
} = require('internal/util');
98+
const { getOptionValue } = require('internal/options');
9899
const EventEmitter = require('events');
99100
const { StringDecoder } = require('string_decoder');
100101
const { kFSWatchStart, watch } = require('internal/fs/watchers');
@@ -354,6 +355,87 @@ class FileHandle extends EventEmitter {
354355
return readable;
355356
}
356357

358+
/**
359+
* @typedef {import('./streams').ReadStream
360+
* } ReadStream
361+
* @param {{
362+
* encoding?: string;
363+
* autoClose?: boolean;
364+
* emitClose?: boolean;
365+
* start: number;
366+
* end?: number;
367+
* highWaterMark?: number;
368+
* }} [options]
369+
* @returns {ReadStream}
370+
*/
371+
createReadStream(options = undefined) {
372+
const { ReadStream } = lazyFsStreams();
373+
return new ReadStream(undefined, { ...options, fd: this });
374+
}
375+
376+
/**
377+
* @typedef {import('./streams').WriteStream
378+
* } WriteStream
379+
* @param {{
380+
* encoding?: string;
381+
* autoClose?: boolean;
382+
* emitClose?: boolean;
383+
* start: number;
384+
* highWaterMark?: number;
385+
* flush?: boolean;
386+
* }} [options]
387+
* @returns {WriteStream}
388+
*/
389+
createWriteStream(options = undefined) {
390+
const { WriteStream } = lazyFsStreams();
391+
return new WriteStream(undefined, { ...options, fd: this });
392+
}
393+
394+
[kTransfer]() {
395+
if (this[kClosePromise] || this[kRefs] > 1) {
396+
throw lazyDOMException('Cannot transfer FileHandle while in use',
397+
'DataCloneError');
398+
}
399+
400+
const handle = this[kHandle];
401+
this[kFd] = -1;
402+
this[kCloseReason] = 'The FileHandle has been transferred';
403+
this[kHandle] = null;
404+
this[kRefs] = 0;
405+
406+
return {
407+
data: { handle },
408+
deserializeInfo: 'internal/fs/promises:FileHandle',
409+
};
410+
}
411+
412+
[kTransferList]() {
413+
return [ this[kHandle] ];
414+
}
415+
416+
[kDeserialize]({ handle }) {
417+
this[kHandle] = handle;
418+
this[kFd] = handle.fd;
419+
}
420+
421+
[kRef]() {
422+
this[kRefs]++;
423+
}
424+
425+
[kUnref]() {
426+
this[kRefs]--;
427+
if (this[kRefs] === 0) {
428+
this[kFd] = -1;
429+
PromisePrototypeThen(
430+
this[kHandle].close(),
431+
this[kCloseResolve],
432+
this[kCloseReject],
433+
);
434+
}
435+
}
436+
}
437+
438+
if (getOptionValue('--experimental-stream-iter')) {
357439
/**
358440
* Return the file contents as an AsyncIterable<Uint8Array[]> using the
359441
* new streams pull model. Optional transforms and options (including
@@ -362,7 +444,7 @@ class FileHandle extends EventEmitter {
362444
* @param {...(Function|object)} args - Optional transforms and/or options
363445
* @returns {AsyncIterable<Uint8Array[]>}
364446
*/
365-
pull(...args) {
447+
FileHandle.prototype.pull = function pull(...args) {
366448
if (this[kFd] === -1)
367449
throw new ERR_INVALID_STATE('The FileHandle is closed');
368450
if (this[kClosePromise])
@@ -380,6 +462,7 @@ class FileHandle extends EventEmitter {
380462
const signal = options?.signal;
381463

382464
const source = {
465+
__proto__: null,
383466
async *[SymbolAsyncIterator]() {
384467
handle[kRef]();
385468
const readSize = 65536;
@@ -445,7 +528,7 @@ class FileHandle extends EventEmitter {
445528
return newStreamsPull(source, ...pullArgs);
446529
}
447530
return source;
448-
}
531+
};
449532

450533
/**
451534
* Return a new-streams Writer backed by this file handle.
@@ -460,7 +543,7 @@ class FileHandle extends EventEmitter {
460543
* }} [options]
461544
* @returns {{ write, writev, end, fail, failSync }}
462545
*/
463-
writer(options) {
546+
FileHandle.prototype.writer = function writer(options) {
464547
if (this[kFd] === -1)
465548
throw new ERR_INVALID_STATE('The FileHandle is closed');
466549
if (this[kClosePromise])
@@ -558,6 +641,7 @@ class FileHandle extends EventEmitter {
558641
}
559642

560643
return {
644+
__proto__: null,
561645
write(chunk, options) {
562646
if (closed) {
563647
return PromiseReject(
@@ -617,86 +701,7 @@ class FileHandle extends EventEmitter {
617701
await cleanup();
618702
},
619703
};
620-
}
621-
622-
/**
623-
* @typedef {import('./streams').ReadStream
624-
* } ReadStream
625-
* @param {{
626-
* encoding?: string;
627-
* autoClose?: boolean;
628-
* emitClose?: boolean;
629-
* start: number;
630-
* end?: number;
631-
* highWaterMark?: number;
632-
* }} [options]
633-
* @returns {ReadStream}
634-
*/
635-
createReadStream(options = undefined) {
636-
const { ReadStream } = lazyFsStreams();
637-
return new ReadStream(undefined, { ...options, fd: this });
638-
}
639-
640-
/**
641-
* @typedef {import('./streams').WriteStream
642-
* } WriteStream
643-
* @param {{
644-
* encoding?: string;
645-
* autoClose?: boolean;
646-
* emitClose?: boolean;
647-
* start: number;
648-
* highWaterMark?: number;
649-
* flush?: boolean;
650-
* }} [options]
651-
* @returns {WriteStream}
652-
*/
653-
createWriteStream(options = undefined) {
654-
const { WriteStream } = lazyFsStreams();
655-
return new WriteStream(undefined, { ...options, fd: this });
656-
}
657-
658-
[kTransfer]() {
659-
if (this[kClosePromise] || this[kRefs] > 1) {
660-
throw lazyDOMException('Cannot transfer FileHandle while in use',
661-
'DataCloneError');
662-
}
663-
664-
const handle = this[kHandle];
665-
this[kFd] = -1;
666-
this[kCloseReason] = 'The FileHandle has been transferred';
667-
this[kHandle] = null;
668-
this[kRefs] = 0;
669-
670-
return {
671-
data: { handle },
672-
deserializeInfo: 'internal/fs/promises:FileHandle',
673-
};
674-
}
675-
676-
[kTransferList]() {
677-
return [ this[kHandle] ];
678-
}
679-
680-
[kDeserialize]({ handle }) {
681-
this[kHandle] = handle;
682-
this[kFd] = handle.fd;
683-
}
684-
685-
[kRef]() {
686-
this[kRefs]++;
687-
}
688-
689-
[kUnref]() {
690-
this[kRefs]--;
691-
if (this[kRefs] === 0) {
692-
this[kFd] = -1;
693-
PromisePrototypeThen(
694-
this[kHandle].close(),
695-
this[kCloseResolve],
696-
this[kCloseReject],
697-
);
698-
}
699-
}
704+
};
700705
}
701706

702707
async function handleFdClose(fileOpPromise, closeFunc) {

0 commit comments

Comments
 (0)