Skip to content

Commit 7466249

Browse files
watch: track worker entry files in watch mode
Currently, --watch mode only tracks dependencies from the main module graph (require/import). Worker thread entry points created via new Worker() are not included, so changes to worker files do not trigger restarts. This change hooks into Worker initialization and registers the worker entry file with watch mode, ensuring restarts when worker files change. Fixes: #62275 Signed-off-by: SudhansuBandha <[email protected]> PR-URL: #62368 Reviewed-By: Moshe Atlow <[email protected]>
1 parent 1ad06ff commit 7466249

4 files changed

Lines changed: 336 additions & 1 deletion

File tree

lib/internal/modules/cjs/loader.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,28 @@ function reportModuleNotFoundToWatchMode(basePath, extensions) {
329329
}
330330
}
331331

332+
/**
333+
* Tell the watch mode that a module was required, from within a worker thread.
334+
* @param {string} filename Absolute path of the module
335+
* @returns {void}
336+
*/
337+
function reportModuleToWatchModeFromWorker(filename) {
338+
if (!shouldReportRequiredModules()) {
339+
return;
340+
}
341+
const { isMainThread } = internalBinding('worker');
342+
if (isMainThread) {
343+
return;
344+
}
345+
// Lazy require to avoid circular dependency: worker_threads is loaded after
346+
// the CJS loader is fully set up.
347+
const { parentPort } = require('worker_threads');
348+
if (!parentPort) {
349+
return;
350+
}
351+
parentPort.postMessage({ 'watch:require': [filename] });
352+
}
353+
332354
/**
333355
* Create a new module instance.
334356
* @param {string} id
@@ -1245,6 +1267,7 @@ Module._load = function(request, parent, isMain, internalResolveOptions = kEmpty
12451267
relResolveCacheIdentifier = `${parent.path}\x00${request}`;
12461268
const filename = relativeResolveCache[relResolveCacheIdentifier];
12471269
reportModuleToWatchMode(filename);
1270+
reportModuleToWatchModeFromWorker(filename);
12481271
if (filename !== undefined) {
12491272
const cachedModule = Module._cache[filename];
12501273
if (cachedModule !== undefined) {
@@ -1335,6 +1358,7 @@ Module._load = function(request, parent, isMain, internalResolveOptions = kEmpty
13351358
}
13361359

13371360
reportModuleToWatchMode(filename);
1361+
reportModuleToWatchModeFromWorker(filename);
13381362
Module._cache[filename] = module;
13391363
module[kIsCachedByESMLoader] = false;
13401364
// If there are resolve hooks, carry the context information into the

lib/internal/modules/esm/loader.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,16 @@ class ModuleLoader {
518518
const type = requestType === kRequireInImportedCJS ? 'require' : 'import';
519519
process.send({ [`watch:${type}`]: [url] });
520520
}
521+
// Relay Events from worker to main thread
522+
if (process.env.WATCH_REPORT_DEPENDENCIES && !process.send) {
523+
const { isMainThread } = internalBinding('worker');
524+
if (!isMainThread) {
525+
const { parentPort } = require('worker_threads');
526+
if (parentPort) {
527+
parentPort.postMessage({ 'watch:import': [url] });
528+
}
529+
}
530+
}
521531

522532
// TODO(joyeecheung): update the module requests to use importAttributes as property names.
523533
const importAttributes = resolveResult.importAttributes ?? request.attributes;

lib/internal/worker.js

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict';
22

33
const {
4+
ArrayIsArray,
45
ArrayPrototypeForEach,
56
ArrayPrototypeMap,
67
ArrayPrototypePush,
@@ -337,9 +338,28 @@ class Worker extends EventEmitter {
337338

338339
this[kPublicPort] = publicPortToParent;
339340
ArrayPrototypeForEach(['message', 'messageerror'], (event) => {
340-
this[kPublicPort].on(event, (message) => this.emit(event, message));
341+
this[kPublicPort].on(event, (message) => {
342+
// Extract watch messages first if needed and relay events from worker thread to watcher
343+
if (
344+
event === 'message' &&
345+
process.env.WATCH_REPORT_DEPENDENCIES &&
346+
process.send
347+
) {
348+
const { isMainThread } = internalBinding('worker');
349+
if (isMainThread) {
350+
if (ArrayIsArray(message?.['watch:require'])) {
351+
process.send({ 'watch:require': message['watch:require'] });
352+
}
353+
if (ArrayIsArray(message?.['watch:import'])) {
354+
process.send({ 'watch:import': message['watch:import'] });
355+
}
356+
}
357+
}
358+
this.emit(event, message);
359+
});
341360
});
342361
setupPortReferencing(this[kPublicPort], this, 'message');
362+
343363
this[kPort].postMessage({
344364
argv,
345365
type: messageTypes.LOAD_SCRIPT,
Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
import * as common from '../common/index.mjs';
2+
import tmpdir from '../common/tmpdir.js';
3+
import assert from 'node:assert';
4+
import path from 'node:path';
5+
import { execPath } from 'node:process';
6+
import { describe, it } from 'node:test';
7+
import { spawn } from 'node:child_process';
8+
import { writeFileSync, readFileSync } from 'node:fs';
9+
import { inspect } from 'node:util';
10+
import { pathToFileURL } from 'node:url';
11+
import { createInterface } from 'node:readline';
12+
13+
if (common.isIBMi)
14+
common.skip('IBMi does not support `fs.watch()`');
15+
16+
function restart(file, content = readFileSync(file)) {
17+
writeFileSync(file, content);
18+
const timer = setInterval(() => writeFileSync(file, content), common.platformTimeout(250));
19+
return () => clearInterval(timer);
20+
}
21+
22+
let tmpFiles = 0;
23+
function createTmpFile(content = 'console.log(\'running\');', ext = '.js', basename = tmpdir.path) {
24+
const file = path.join(basename, `${tmpFiles++}${ext}`);
25+
writeFileSync(file, content);
26+
return file;
27+
}
28+
29+
async function runWriteSucceed({
30+
file,
31+
watchedFile,
32+
watchFlag = '--watch',
33+
args = [file],
34+
completed = 'Completed running',
35+
restarts = 2,
36+
options = {},
37+
shouldFail = false,
38+
}) {
39+
args.unshift('--no-warnings');
40+
if (watchFlag !== null) args.unshift(watchFlag);
41+
42+
const child = spawn(execPath, args, { encoding: 'utf8', stdio: 'pipe', ...options });
43+
44+
let completes = 0;
45+
let cancelRestarts = () => {};
46+
let stderr = '';
47+
const stdout = [];
48+
49+
child.stderr.on('data', (data) => {
50+
stderr += data;
51+
});
52+
53+
try {
54+
for await (const data of createInterface({ input: child.stdout })) {
55+
if (!data.startsWith('Waiting for graceful termination') &&
56+
!data.startsWith('Gracefully restarted')) {
57+
stdout.push(data);
58+
}
59+
60+
if (data.startsWith(completed)) {
61+
completes++;
62+
63+
if (completes === restarts) break;
64+
65+
if (completes === 1) {
66+
cancelRestarts = restart(watchedFile);
67+
}
68+
}
69+
70+
if (!shouldFail && data.startsWith('Failed running')) break;
71+
}
72+
} finally {
73+
child.kill();
74+
cancelRestarts();
75+
}
76+
77+
return { stdout, stderr, pid: child.pid };
78+
}
79+
80+
tmpdir.refresh();
81+
const dir = tmpdir.path;
82+
83+
describe('watch mode', { concurrency: !process.env.TEST_PARALLEL, timeout: 60_000 }, () => {
84+
it('should watch changes to worker - cjs', async () => {
85+
const worker = path.join(dir, 'worker.js');
86+
87+
writeFileSync(worker, `
88+
console.log('worker running');
89+
`);
90+
91+
const file = createTmpFile(`
92+
const { Worker } = require('node:worker_threads');
93+
const w = new Worker(${JSON.stringify(worker)});
94+
`, '.js', dir);
95+
96+
const { stderr, stdout } = await runWriteSucceed({
97+
file,
98+
watchedFile: worker,
99+
});
100+
101+
assert.strictEqual(stderr, '');
102+
assert.deepStrictEqual(stdout, [
103+
'worker running',
104+
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
105+
`Restarting ${inspect(file)}`,
106+
'worker running',
107+
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
108+
]);
109+
});
110+
111+
it('should watch changes to worker dependencies - cjs', async () => {
112+
const dep = path.join(dir, 'dep.js');
113+
const worker = path.join(dir, 'worker.js');
114+
115+
writeFileSync(dep, `
116+
module.exports = 'dep v1';
117+
`);
118+
119+
writeFileSync(worker, `
120+
const dep = require('./dep.js');
121+
console.log(dep);
122+
`);
123+
124+
const file = createTmpFile(`
125+
const { Worker } = require('node:worker_threads');
126+
const w = new Worker(${JSON.stringify(worker)});
127+
`, '.js', dir);
128+
129+
const { stderr, stdout } = await runWriteSucceed({
130+
file,
131+
watchedFile: dep,
132+
});
133+
134+
assert.strictEqual(stderr, '');
135+
assert.deepStrictEqual(stdout, [
136+
'dep v1',
137+
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
138+
`Restarting ${inspect(file)}`,
139+
'dep v1',
140+
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
141+
]);
142+
});
143+
144+
it('should watch changes to nested worker dependencies - cjs', async () => {
145+
const subDep = path.join(dir, 'sub-dep.js');
146+
const dep = path.join(dir, 'dep.js');
147+
const worker = path.join(dir, 'worker.js');
148+
149+
writeFileSync(subDep, `
150+
module.exports = 'sub-dep v1';
151+
`);
152+
153+
writeFileSync(dep, `
154+
const subDep = require('./sub-dep.js');
155+
console.log(subDep);
156+
module.exports = 'dep v1';
157+
`);
158+
159+
writeFileSync(worker, `
160+
const dep = require('./dep.js');
161+
`);
162+
163+
const file = createTmpFile(`
164+
const { Worker } = require('node:worker_threads');
165+
const w = new Worker(${JSON.stringify(worker)});
166+
`, '.js', dir);
167+
168+
const { stderr, stdout } = await runWriteSucceed({
169+
file,
170+
watchedFile: subDep,
171+
});
172+
173+
assert.strictEqual(stderr, '');
174+
assert.deepStrictEqual(stdout, [
175+
'sub-dep v1',
176+
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
177+
`Restarting ${inspect(file)}`,
178+
'sub-dep v1',
179+
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
180+
]);
181+
});
182+
183+
it('should watch changes to worker - esm', async () => {
184+
const worker = path.join(dir, 'worker.mjs');
185+
186+
writeFileSync(worker, `
187+
console.log('worker running');
188+
`);
189+
190+
const file = createTmpFile(`
191+
import { Worker } from 'node:worker_threads';
192+
new Worker(new URL(${JSON.stringify(pathToFileURL(worker))}));
193+
`, '.mjs', dir);
194+
195+
const { stderr, stdout } = await runWriteSucceed({
196+
file,
197+
watchedFile: worker,
198+
});
199+
200+
assert.strictEqual(stderr, '');
201+
assert.deepStrictEqual(stdout, [
202+
'worker running',
203+
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
204+
`Restarting ${inspect(file)}`,
205+
'worker running',
206+
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
207+
]);
208+
});
209+
210+
it('should watch changes to worker dependencies - esm', async () => {
211+
const dep = path.join(dir, 'dep.mjs');
212+
const worker = path.join(dir, 'worker.mjs');
213+
214+
writeFileSync(dep, `
215+
export default 'dep v1';
216+
`);
217+
218+
writeFileSync(worker, `
219+
import dep from ${JSON.stringify(pathToFileURL(dep))};
220+
console.log(dep);
221+
`);
222+
223+
const file = createTmpFile(`
224+
import { Worker } from 'node:worker_threads';
225+
new Worker(new URL(${JSON.stringify(pathToFileURL(worker))}));
226+
`, '.mjs', dir);
227+
228+
const { stderr, stdout } = await runWriteSucceed({
229+
file,
230+
watchedFile: dep,
231+
});
232+
233+
assert.strictEqual(stderr, '');
234+
assert.deepStrictEqual(stdout, [
235+
'dep v1',
236+
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
237+
`Restarting ${inspect(file)}`,
238+
'dep v1',
239+
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
240+
]);
241+
});
242+
243+
it('should watch changes to nested worker dependencies - esm', async () => {
244+
const subDep = path.join(dir, 'sub-dep.mjs');
245+
const dep = path.join(dir, 'dep.mjs');
246+
const worker = path.join(dir, 'worker.mjs');
247+
248+
writeFileSync(subDep, `
249+
export default 'sub-dep v1';
250+
`);
251+
252+
writeFileSync(dep, `
253+
import subDep from ${JSON.stringify(pathToFileURL(subDep))};
254+
console.log(subDep);
255+
export default 'dep v1';
256+
`);
257+
258+
writeFileSync(worker, `
259+
import dep from ${JSON.stringify(pathToFileURL(dep))};
260+
`);
261+
262+
const file = createTmpFile(`
263+
import { Worker } from 'node:worker_threads';
264+
new Worker(new URL(${JSON.stringify(pathToFileURL(worker))}));
265+
`, '.mjs', dir);
266+
267+
const { stderr, stdout } = await runWriteSucceed({
268+
file,
269+
watchedFile: subDep,
270+
});
271+
272+
assert.strictEqual(stderr, '');
273+
assert.deepStrictEqual(stdout, [
274+
'sub-dep v1',
275+
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
276+
`Restarting ${inspect(file)}`,
277+
'sub-dep v1',
278+
`Completed running ${inspect(file)}. Waiting for file changes before restarting...`,
279+
]);
280+
});
281+
});

0 commit comments

Comments
 (0)