Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 197 additions & 64 deletions test/parallel/test-fastutf8stream-flush-mocks.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

const common = require('../common');
require('../common');
const { test } = require('node:test');
Comment thread
mcollina marked this conversation as resolved.
const tmpdir = require('../common/tmpdir');
const {
ok,
Expand All @@ -11,10 +12,12 @@ const {
fsyncSync,
writeSync,
write,
mkdirSync,
} = require('node:fs');
const { join } = require('node:path');
const { Utf8Stream } = require('node:fs');
const { isMainThread } = require('node:worker_threads');
const { once } = require('node:events');

tmpdir.refresh();
if (isMainThread) {
Expand All @@ -24,82 +27,212 @@ if (isMainThread) {
let fileCounter = 0;

function getTempFile() {
return join(tmpdir.path, `fastutf8stream-${process.pid}-${Date.now()}-${fileCounter++}.log`);
const testDir = join(tmpdir.path, `test-${process.pid}-${process.hrtime.bigint()}`);
mkdirSync(testDir, { recursive: true });
return join(testDir, `fastutf8stream-${fileCounter++}.log`);
}

function createClosePromise(stream) {
return once(stream, 'close');
}

runTests(false);
runTests(true);

function runTests(sync) {

{
const dest = getTempFile();
const fd = openSync(dest, 'w');

const fsOverride = {
fsync: common.mustNotCall(),
fsyncSync: common.mustCall(() => fsyncSync(fd)),
};
if (sync) {
fsOverride.writeSync = common.mustCall((...args) => writeSync(...args));
fsOverride.write = common.mustNotCall();
} else {
fsOverride.write = common.mustCall((...args) => write(...args));
fsOverride.writeSync = common.mustNotCall();
}

const stream = new Utf8Stream({
fd,
sync,
fsync: true,
minLength: 4096,
fs: fsOverride,
});

stream.on('ready', common.mustCall(() => {
test('Utf8Stream sync flush with fsync success', async (t) => {
const dest = getTempFile();
const fd = openSync(dest, 'w');

let fsyncSyncCalled = false;
let writeSyncCalled = false;

const fsOverride = {
fsync: (fd, cb) => process.nextTick(cb),
fsyncSync: () => {
fsyncSyncCalled = true;
return fsyncSync(fd);
},
writeSync: (...args) => {
writeSyncCalled = true;
return writeSync(...args);
},
write: t.assert.fail,
close: (fd, cb) => process.nextTick(cb),
};

const stream = new Utf8Stream({
fd,
sync: true,
fsync: true,
minLength: 4096,
fs: fsOverride,
});

const closePromise = createClosePromise(stream);

await new Promise((resolve) => {
stream.on('ready', () => {
ok(stream.write('hello world\n'));

stream.flush(common.mustSucceed(() => stream.end()));
}));
}
stream.flush((err) => {
t.assert.ifError(err);
stream.end();
resolve();
});
});
});

ok(fsyncSyncCalled, 'fsyncSync should have been called');
ok(writeSyncCalled, 'writeSync should have been called');

await closePromise;

});

test('Utf8Stream async flush with fsync success', async (t) => {
const dest = getTempFile();
const fd = openSync(dest, 'w');

let fsyncSyncCalled = false;
let writeCalled = false;

const fsOverride = {
fsync: (fd, cb) => process.nextTick(cb),
fsyncSync: () => {
fsyncSyncCalled = true;
return fsyncSync(fd);
},
write: (...args) => {
writeCalled = true;
return write(...args);
},
writeSync: t.assert.fail,
close: (fd, cb) => process.nextTick(cb),
};

const stream = new Utf8Stream({
fd,
sync: false,
fsync: true,
minLength: 4096,
fs: fsOverride,
});

const closePromise = createClosePromise(stream);

await new Promise((resolve) => {
stream.on('ready', () => {
ok(stream.write('hello world\n'));

{
const dest = getTempFile();
const fd = openSync(dest, 'w');
stream.flush((err) => {
t.assert.ifError(err);
stream.end();
resolve();
});
});
});

const testError = new Error('fsync failed');
testError.code = 'ETEST';
ok(fsyncSyncCalled, 'fsyncSync should have been called');
ok(writeCalled, 'write should have been called');

const fsOverride = {
fsync: common.mustCall((fd, cb) => {
process.nextTick(() => cb(testError));
}, 2),
};
await closePromise;

if (sync) {
fsOverride.writeSync = common.mustCall((...args) => {
return writeSync(...args);
});
} else {
fsOverride.write = common.mustCall((...args) => {
return write(...args);
});
}
});

test('Utf8Stream sync flush with fsync error', async (t) => {
const dest = getTempFile();
const fd = openSync(dest, 'w');

const testError = new Error('fsync failed');
testError.code = 'ETEST';

let fsyncCallCount = 0;
let writeSyncCalled = false;

const stream = new Utf8Stream({
fd,
sync,
minLength: 4096,
fs: fsOverride,
const fsOverride = {
fsync: (fd, cb) => {
fsyncCallCount++;
process.nextTick(() => cb(testError));
},
writeSync: (...args) => {
writeSyncCalled = true;
return writeSync(...args);
},
close: (fd, cb) => process.nextTick(cb),
};

const stream = new Utf8Stream({
fd,
sync: true,
minLength: 4096,
fs: fsOverride,
});

const closePromise = createClosePromise(stream);

await new Promise((resolve) => {
stream.on('ready', () => {
ok(stream.write('hello world\n'));
stream.flush((err) => {
ok(err, 'flush should return an error');
strictEqual(err.code, 'ETEST');
stream.end();
resolve();
});
});
});

strictEqual(fsyncCallCount, 2, 'fsync should have been called twice');
ok(writeSyncCalled, 'writeSync should have been called');

await closePromise;

});

test('Utf8Stream async flush with fsync error', async (t) => {
const dest = getTempFile();
const fd = openSync(dest, 'w');

const testError = new Error('fsync failed');
testError.code = 'ETEST';

let fsyncCallCount = 0;
let writeCalled = false;

stream.on('ready', common.mustCall(() => {
const fsOverride = {
fsync: (fd, cb) => {
fsyncCallCount++;
process.nextTick(() => cb(testError));
},
write: (...args) => {
writeCalled = true;
return write(...args);
},
close: (fd, cb) => process.nextTick(cb),
};

const stream = new Utf8Stream({
fd,
sync: false,
minLength: 4096,
fs: fsOverride,
});

const closePromise = createClosePromise(stream);

await new Promise((resolve) => {
stream.on('ready', () => {
ok(stream.write('hello world\n'));
stream.flush(common.mustCall((err) => {
stream.flush((err) => {
ok(err, 'flush should return an error');
strictEqual(err.code, 'ETEST');
stream.end();
}));
}));
}
}
resolve();
});
});
});

strictEqual(fsyncCallCount, 2, 'fsync should have been called twice');
ok(writeCalled, 'write should have been called');

await closePromise;

});
Loading
Loading