Skip to content

Commit 02e6c2c

Browse files
watch: updated event relays for workers to handle race issues
1 parent 662287e commit 02e6c2c

1 file changed

Lines changed: 96 additions & 87 deletions

File tree

lib/internal/worker.js

Lines changed: 96 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -59,22 +59,20 @@ const {
5959
ReadableWorkerStdio,
6060
WritableWorkerStdio,
6161
} = workerIo;
62-
const { createMainThreadPort, destroyMainThreadPort } = require('internal/worker/messaging');
62+
const {
63+
createMainThreadPort,
64+
destroyMainThreadPort,
65+
} = require('internal/worker/messaging');
6366
const { deserializeError } = require('internal/error_serdes');
6467
const { fileURLToPath, isURL, pathToFileURL } = require('internal/url');
65-
const {
66-
constructSharedArrayBuffer,
67-
kEmptyObject,
68-
} = require('internal/util');
68+
const { constructSharedArrayBuffer, kEmptyObject } = require('internal/util');
6969
const {
7070
validateArray,
7171
validateString,
7272
validateObject,
7373
validateNumber,
7474
} = require('internal/validators');
75-
const {
76-
throwIfBuildingSnapshot,
77-
} = require('internal/v8/startup_snapshot');
75+
const { throwIfBuildingSnapshot } = require('internal/v8/startup_snapshot');
7876
const {
7977
ownsProcessState,
8078
isMainThread,
@@ -119,17 +117,15 @@ const environmentData = new SafeMap();
119117
if (isMainThread) {
120118
cwdCounter = new Uint32Array(constructSharedArrayBuffer(4));
121119
const originalChdir = process.chdir;
122-
process.chdir = function(path) {
120+
process.chdir = function (path) {
123121
originalChdir(path);
124122
AtomicsAdd(cwdCounter, 0, 1);
125123
};
126124
}
127125

128126
function setEnvironmentData(key, value) {
129-
if (value === undefined)
130-
environmentData.delete(key);
131-
else
132-
environmentData.set(key, value);
127+
if (value === undefined) environmentData.delete(key);
128+
else environmentData.set(key, value);
133129
}
134130

135131
function getEnvironmentData(key) {
@@ -158,16 +154,16 @@ class CPUProfileHandle {
158154
return this.#promise;
159155
}
160156
const stopTaker = this.#worker[kHandle]?.stopCpuProfile(this.#id);
161-
return this.#promise = new Promise((resolve, reject) => {
157+
return (this.#promise = new Promise((resolve, reject) => {
162158
if (!stopTaker) return reject(new ERR_WORKER_NOT_RUNNING());
163159
stopTaker.ondone = (err, profile) => {
164160
if (err) {
165161
return reject(err);
166162
}
167163
resolve(profile);
168164
};
169-
});
170-
};
165+
}));
166+
}
171167

172168
async [SymbolAsyncDispose]() {
173169
await this.stop();
@@ -187,16 +183,16 @@ class HeapProfileHandle {
187183
return this.#promise;
188184
}
189185
const stopTaker = this.#worker[kHandle]?.stopHeapProfile();
190-
return this.#promise = new Promise((resolve, reject) => {
186+
return (this.#promise = new Promise((resolve, reject) => {
191187
if (!stopTaker) return reject(new ERR_WORKER_NOT_RUNNING());
192188
stopTaker.ondone = (err, profile) => {
193189
if (err) {
194190
return reject(err);
195191
}
196192
resolve(profile);
197193
};
198-
});
199-
};
194+
}));
195+
}
200196

201197
async [SymbolAsyncDispose]() {
202198
await this.stop();
@@ -214,8 +210,7 @@ class Worker extends EventEmitter {
214210
options,
215211
`isInternal: ${isInternal}`,
216212
);
217-
if (options.execArgv)
218-
validateArray(options.execArgv, 'options.execArgv');
213+
if (options.execArgv) validateArray(options.execArgv, 'options.execArgv');
219214

220215
let argv;
221216
if (options.argv) {
@@ -232,7 +227,7 @@ class Worker extends EventEmitter {
232227
throw new ERR_INVALID_ARG_VALUE(
233228
'options.eval',
234229
options.eval,
235-
'must be false when \'filename\' is not a string',
230+
"must be false when 'filename' is not a string",
236231
);
237232
}
238233
url = null;
@@ -247,13 +242,11 @@ class Worker extends EventEmitter {
247242
url = filename;
248243
filename = fileURLToPath(filename);
249244
} else if (typeof filename !== 'string') {
250-
throw new ERR_INVALID_ARG_TYPE(
251-
'filename',
252-
['string', 'URL'],
253-
filename,
254-
);
255-
} else if (path.isAbsolute(filename) ||
256-
RegExpPrototypeExec(/^\.\.?[\\/]/, filename) !== null) {
245+
throw new ERR_INVALID_ARG_TYPE('filename', ['string', 'URL'], filename);
246+
} else if (
247+
path.isAbsolute(filename) ||
248+
RegExpPrototypeExec(/^\.\.?[\\/]/, filename) !== null
249+
) {
257250
filename = path.resolve(filename);
258251
url = pathToFileURL(filename);
259252
} else {
@@ -266,15 +259,18 @@ class Worker extends EventEmitter {
266259
env = { __proto__: null };
267260
ArrayPrototypeForEach(
268261
ObjectEntries(options.env),
269-
({ 0: key, 1: value }) => { env[key] = `${value}`; },
262+
({ 0: key, 1: value }) => {
263+
env[key] = `${value}`;
264+
},
270265
);
271266
} else if (options.env == null) {
272267
env = process.env;
273268
} else if (options.env !== SHARE_ENV) {
274269
throw new ERR_INVALID_ARG_TYPE(
275270
'options.env',
276271
['object', 'undefined', 'null', 'worker_threads.SHARE_ENV'],
277-
options.env);
272+
options.env,
273+
);
278274
}
279275

280276
let name = 'WorkerThread';
@@ -285,19 +281,23 @@ class Worker extends EventEmitter {
285281

286282
debug('instantiating Worker.', `url: ${url}`, `doEval: ${doEval}`);
287283
// Set up the C++ handle for the worker, as well as some internal wiring.
288-
this[kHandle] = new WorkerImpl(url,
289-
env === process.env ? null : env,
290-
options.execArgv,
291-
parseResourceLimits(options.resourceLimits),
292-
!!(options.trackUnmanagedFds ?? true),
293-
isInternal,
294-
name);
284+
this[kHandle] = new WorkerImpl(
285+
url,
286+
env === process.env ? null : env,
287+
options.execArgv,
288+
parseResourceLimits(options.resourceLimits),
289+
!!(options.trackUnmanagedFds ?? true),
290+
isInternal,
291+
name,
292+
);
295293
if (this[kHandle].invalidExecArgv) {
296294
throw new ERR_WORKER_INVALID_EXEC_ARGV(this[kHandle].invalidExecArgv);
297295
}
298296
if (this[kHandle].invalidNodeOptions) {
299297
throw new ERR_WORKER_INVALID_EXEC_ARGV(
300-
this[kHandle].invalidNodeOptions, 'invalid NODE_OPTIONS env variable');
298+
this[kHandle].invalidNodeOptions,
299+
'invalid NODE_OPTIONS env variable',
300+
);
301301
}
302302
this[kHandle].onexit = (code, customErr, customErrReason) => {
303303
this[kOnExit](code, customErr, customErrReason);
@@ -310,8 +310,7 @@ class Worker extends EventEmitter {
310310
debug(`[${threadId}] created Worker with ID ${this.threadId}`);
311311

312312
let stdin = null;
313-
if (options.stdin)
314-
stdin = new WritableWorkerStdio(this[kPort], 'stdin');
313+
if (options.stdin) stdin = new WritableWorkerStdio(this[kPort], 'stdin');
315314
const stdout = new ReadableWorkerStdio(this[kPort], 'stdout');
316315
if (!options.stdout) {
317316
stdout[kIncrementsPortRef] = false;
@@ -326,46 +325,56 @@ class Worker extends EventEmitter {
326325
this[kParentSideStdio] = { stdin, stdout, stderr };
327326

328327
const mainThreadPortToWorker = createMainThreadPort(this.threadId);
329-
const {
330-
port1: publicPortToParent,
331-
port2: publicPortToWorker,
332-
} = new MessageChannel();
328+
const { port1: publicPortToParent, port2: publicPortToWorker } =
329+
new MessageChannel();
333330
const transferList = [mainThreadPortToWorker, publicPortToWorker];
334331
// If transferList is provided.
335332
if (options.transferList)
336-
ArrayPrototypePush(transferList,
337-
...new SafeArrayIterator(options.transferList));
333+
ArrayPrototypePush(
334+
transferList,
335+
...new SafeArrayIterator(options.transferList),
336+
);
338337

339338
this[kPublicPort] = publicPortToParent;
340339
ArrayPrototypeForEach(['message', 'messageerror'], (event) => {
341-
this[kPublicPort].on(event, (message) => this.emit(event, message));
340+
this[kPublicPort].on(event, (message) => {
341+
// Extract watch messages first if needed and relay events from worker thread to watcher
342+
if (
343+
event === 'message' &&
344+
process.env.WATCH_REPORT_DEPENDENCIES &&
345+
process.send
346+
) {
347+
const { isMainThread } = internalBinding('worker');
348+
if (isMainThread) {
349+
if (ArrayIsArray(message?.['watch:require'])) {
350+
process.send({ 'watch:require': message['watch:require'] });
351+
}
352+
if (ArrayIsArray(message?.['watch:import'])) {
353+
process.send({ 'watch:import': message['watch:import'] });
354+
}
355+
}
356+
}
357+
this.emit(event, message);
358+
});
342359
});
343360
setupPortReferencing(this[kPublicPort], this, 'message');
344361

345-
// Relay events from worker thread to watcher
346-
if (process.env.WATCH_REPORT_DEPENDENCIES && process.send) {
347-
this[kPublicPort].on('message', (message) => {
348-
if (ArrayIsArray(message?.['watch:require'])) {
349-
process.send({ 'watch:require': message['watch:require'] });
350-
}
351-
if (ArrayIsArray(message?.['watch:import'])) {
352-
process.send({ 'watch:import': message['watch:import'] });
353-
}
354-
});
355-
}
356-
this[kPort].postMessage({
357-
argv,
358-
type: messageTypes.LOAD_SCRIPT,
359-
filename,
360-
doEval,
361-
isInternal,
362-
cwdCounter: cwdCounter || workerIo.sharedCwdCounter,
363-
workerData: options.workerData,
364-
environmentData,
365-
hasStdin: !!options.stdin,
366-
publicPort: publicPortToWorker,
367-
mainThreadPort: mainThreadPortToWorker,
368-
}, transferList);
362+
this[kPort].postMessage(
363+
{
364+
argv,
365+
type: messageTypes.LOAD_SCRIPT,
366+
filename,
367+
doEval,
368+
isInternal,
369+
cwdCounter: cwdCounter || workerIo.sharedCwdCounter,
370+
workerData: options.workerData,
371+
environmentData,
372+
hasStdin: !!options.stdin,
373+
publicPort: publicPortToWorker,
374+
mainThreadPort: mainThreadPortToWorker,
375+
},
376+
transferList,
377+
);
369378
// Use this to cache the Worker's loopStart value once available.
370379
this[kLoopStartTime] = -1;
371380
this[kIsOnline] = false;
@@ -421,8 +430,7 @@ class Worker extends EventEmitter {
421430
return this[kOnCouldNotSerializeErr]();
422431
case messageTypes.ERROR_MESSAGE:
423432
return this[kOnErrorMessage](message.error);
424-
case messageTypes.STDIO_PAYLOAD:
425-
{
433+
case messageTypes.STDIO_PAYLOAD: {
426434
const { stream, chunks } = message;
427435
const readable = this[kParentSideStdio][stream];
428436
// This is a hot path, use a for(;;) loop
@@ -432,8 +440,7 @@ class Worker extends EventEmitter {
432440
}
433441
return;
434442
}
435-
case messageTypes.STDIO_WANTS_MORE_DATA:
436-
{
443+
case messageTypes.STDIO_WANTS_MORE_DATA: {
437444
const { stream } = message;
438445
return this[kParentSideStdio][stream][kStdioWantsMoreDataCallback]();
439446
}
@@ -564,7 +571,9 @@ class Worker extends EventEmitter {
564571
validateNumber(prev.system, 'prev.system', 0, NumberMAX_SAFE_INTEGER);
565572
}
566573
if (process.platform === 'sunos') {
567-
throw new ERR_OPERATION_FAILED('worker.cpuUsage() is not available on SunOS');
574+
throw new ERR_OPERATION_FAILED(
575+
'worker.cpuUsage() is not available on SunOS',
576+
);
568577
}
569578
const taker = this[kHandle]?.cpuUsage();
570579
return new Promise((resolve, reject) => {
@@ -591,13 +600,12 @@ class Worker extends EventEmitter {
591600
startCpuProfile(options) {
592601
normalizeCpuProfileOptions ??=
593602
require('internal/v8/cpu_profiler').normalizeCpuProfileOptions;
594-
const {
595-
samplingIntervalMicros,
596-
maxSamples,
597-
} = normalizeCpuProfileOptions(options);
603+
const { samplingIntervalMicros, maxSamples } =
604+
normalizeCpuProfileOptions(options);
598605
const startTaker = this[kHandle]?.startCpuProfile(
599606
samplingIntervalMicros,
600-
maxSamples);
607+
maxSamples,
608+
);
601609
return new Promise((resolve, reject) => {
602610
if (!startTaker) return reject(new ERR_WORKER_NOT_RUNNING());
603611
startTaker.ondone = (err, id) => {
@@ -624,7 +632,10 @@ class Worker extends EventEmitter {
624632
const { sampleInterval, stackDepth, flags } =
625633
normalizeHeapProfileOptions(options);
626634
const startTaker = this[kHandle]?.startHeapProfile(
627-
sampleInterval, stackDepth, flags);
635+
sampleInterval,
636+
stackDepth,
637+
flags,
638+
);
628639
return new Promise((resolve, reject) => {
629640
if (!startTaker) return reject(new ERR_WORKER_NOT_RUNNING());
630641
startTaker.ondone = (err) => {
@@ -675,8 +686,7 @@ function parseResourceLimits(obj) {
675686
ret[kMaxYoungGenerationSizeMb] = obj.maxYoungGenerationSizeMb;
676687
if (typeof obj.codeRangeSizeMb === 'number')
677688
ret[kCodeRangeSizeMb] = obj.codeRangeSizeMb;
678-
if (typeof obj.stackSizeMb === 'number')
679-
ret[kStackSizeMb] = obj.stackSizeMb;
689+
if (typeof obj.stackSizeMb === 'number') ret[kStackSizeMb] = obj.stackSizeMb;
680690
return ret;
681691
}
682692

@@ -719,8 +729,7 @@ module.exports = {
719729
isMainThread,
720730
isInternalThread,
721731
SHARE_ENV,
722-
resourceLimits:
723-
!isMainThread ? makeResourceLimits(resourceLimitsRaw) : {},
732+
resourceLimits: !isMainThread ? makeResourceLimits(resourceLimitsRaw) : {},
724733
setEnvironmentData,
725734
getEnvironmentData,
726735
assignEnvironmentData,

0 commit comments

Comments
 (0)