Skip to content

Commit 2629f80

Browse files
committed
module: allow module.register from workers
1 parent c0c598d commit 2629f80

9 files changed

Lines changed: 52 additions & 18 deletions

File tree

lib/internal/modules/esm/hooks.js

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
'use strict';
22

33
const {
4+
ArrayPrototypeFilter,
5+
ArrayPrototypeMap,
46
ArrayPrototypePush,
57
ArrayPrototypePushApply,
8+
ArrayPrototypeReduce,
69
AtomicsLoad,
710
AtomicsWait,
811
AtomicsWaitAsync,
@@ -35,7 +38,7 @@ const {
3538
const { exitCodes: { kUnsettledTopLevelAwait } } = internalBinding('errors');
3639
const { URL } = require('internal/url');
3740
const { canParse: URLCanParse } = internalBinding('url');
38-
const { receiveMessageOnPort, isMainThread } = require('worker_threads');
41+
const { receiveMessageOnPort } = require('worker_threads');
3942
const {
4043
isAnyArrayBuffer,
4144
isArrayBufferView,
@@ -164,6 +167,18 @@ class Hooks {
164167
* @returns {any | Promise<any>} User data, ignored unless it's a promise, in which case it will be awaited.
165168
*/
166169
addCustomLoader(url, exports, data) {
170+
const alreadyKnown = ArrayPrototypeReduce(
171+
ArrayPrototypeMap(['initialize', 'resolve', 'load'], (hookName) => {
172+
if (this.#chains[hookName]) {
173+
const res2 = ArrayPrototypeFilter(this.#chains[hookName], (el) => el.url === url);
174+
return res2.length;
175+
}
176+
}), (acc, val) => acc || (val === 1), false);
177+
178+
if (alreadyKnown) {
179+
return undefined;
180+
}
181+
167182
const {
168183
initialize,
169184
resolve,
@@ -499,13 +514,14 @@ class HooksProxy {
499514
#numberOfPendingAsyncResponses = 0;
500515

501516
#isReady = false;
517+
#isWorkerOwner = false;
502518

503519
constructor() {
504-
const { InternalWorker, hooksPort } = require('internal/worker');
520+
const { InternalWorker, hooksPort, hasHooksThread } = require('internal/worker');
505521
const lock = new SharedArrayBuffer(SHARED_MEMORY_BYTE_LENGTH);
506522
this.#lock = new Int32Array(lock);
507523

508-
if (isMainThread) {
524+
if (!hasHooksThread()) {
509525
// Main thread is the only one that creates the internal single hooks worker
510526
this.#worker = new InternalWorker(loaderWorkerId, {
511527
stderr: false,
@@ -518,6 +534,7 @@ class HooksProxy {
518534
});
519535
this.#worker.unref(); // ! Allows the process to eventually exit.
520536
this.#worker.on('exit', process.exit);
537+
this.#isWorkerOwner = true;
521538
this.#portToHooksThread = this.#worker;
522539
} else {
523540
this.#portToHooksThread = hooksPort;
@@ -529,7 +546,7 @@ class HooksProxy {
529546
// has an InternalWorker. That was the Hooks instance created for the main thread.
530547
// It means for all Hooks instances that are not on the main thread => they are ready because they
531548
// delegate to the single InternalWorker anyway.
532-
if (!isMainThread) {
549+
if (!this.#isWorkerOwner) {
533550
return;
534551
}
535552

lib/internal/modules/esm/loader.js

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ const { ModuleWrap, kEvaluating, kEvaluated } = internalBinding('module_wrap');
4141
const {
4242
urlToFilename,
4343
} = require('internal/modules/helpers');
44-
const { isMainThread } = require('worker_threads');
4544
let defaultResolve, defaultLoad, defaultLoadSync, importMetaInitializer;
4645

4746
/**
@@ -623,11 +622,9 @@ class CustomizedModuleLoader {
623622
* @returns {{ format: string, url: URL['href'] } | undefined}
624623
*/
625624
register(originalSpecifier, parentURL, data, transferList) {
626-
if (isMainThread) {
627-
// Only the main thread has a Hooks instance with worker thread. All other Worker threads
628-
// delegate their hooks to the HooksThread of the main thread.
629-
return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data);
630-
}
625+
// Only the main thread has a Hooks instance with worker thread. All other Worker threads
626+
// delegate their hooks to the HooksThread of the main thread.
627+
return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data);
631628
}
632629

633630
/**
@@ -640,6 +637,9 @@ class CustomizedModuleLoader {
640637
* @returns {{ format: string, url: URL['href'] }}
641638
*/
642639
resolve(originalSpecifier, parentURL, importAttributes) {
640+
// const FS = require('fs');
641+
// const UTIL = require('util');
642+
// FS.writeFileSync(1, `resolve(${originalSpecifier}). ${Error().stack}\n`);
643643
return hooksProxy.makeAsyncRequest('resolve', undefined, originalSpecifier, parentURL, importAttributes);
644644
}
645645

lib/internal/worker.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ const {
6969
resourceLimits: resourceLimitsRaw,
7070
threadId,
7171
Worker: WorkerImpl,
72+
hasHooksThread,
7273
kMaxYoungGenerationSizeMb,
7374
kMaxOldGenerationSizeMb,
7475
kCodeRangeSizeMb,
@@ -562,6 +563,7 @@ module.exports = {
562563
isMainThread,
563564
SHARE_ENV,
564565
hooksPort: undefined,
566+
hasHooksThread,
565567
resourceLimits:
566568
!isMainThread ? makeResourceLimits(resourceLimitsRaw) : {},
567569
setEnvironmentData,

src/node_worker.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ namespace node {
4646
namespace worker {
4747

4848
constexpr double kMB = 1024 * 1024;
49+
std::atomic_bool Worker::internalExists{false};
4950

5051
Worker::Worker(Environment* env,
5152
Local<Object> wrap,
@@ -489,6 +490,8 @@ void Worker::New(const FunctionCallbackInfo<Value>& args) {
489490
if (is_internal->IsFalse()) {
490491
THROW_IF_INSUFFICIENT_PERMISSIONS(
491492
env, permission::PermissionScope::kWorkerThreads, "");
493+
} else {
494+
internalExists = true;
492495
}
493496
Isolate* isolate = args.GetIsolate();
494497

@@ -903,6 +906,10 @@ void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
903906
args.GetReturnValue().Set(loop_start_time / 1e6);
904907
}
905908

909+
void Worker::HasHooksThreadAlready(const FunctionCallbackInfo<Value>& args) {
910+
args.GetReturnValue().Set(Worker::internalExists);
911+
}
912+
906913
namespace {
907914

908915
// Return the MessagePort that is global for this Environment and communicates
@@ -940,6 +947,7 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
940947
SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime);
941948

942949
SetConstructorFunction(isolate, target, "Worker", w);
950+
SetMethodNoSideEffect(isolate, target, "hasHooksThread", Worker::HasHooksThread);
943951
}
944952

945953
{
@@ -1011,6 +1019,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
10111019
registry->Register(Worker::TakeHeapSnapshot);
10121020
registry->Register(Worker::LoopIdleTime);
10131021
registry->Register(Worker::LoopStartTime);
1022+
registry->Register(Worker::HasHooksThreadAlready);
10141023
}
10151024

10161025
} // anonymous namespace

src/node_worker.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
#include <optional>
77
#include <unordered_map>
8+
#include <atomic>
89
#include "node_exit_code.h"
910
#include "node_messaging.h"
1011
#include "uv.h"
@@ -76,6 +77,7 @@ class Worker : public AsyncWrap {
7677
static void TakeHeapSnapshot(const v8::FunctionCallbackInfo<v8::Value>& args);
7778
static void LoopIdleTime(const v8::FunctionCallbackInfo<v8::Value>& args);
7879
static void LoopStartTime(const v8::FunctionCallbackInfo<v8::Value>& args);
80+
static void HasHooksThreadAlready(const v8::FunctionCallbackInfo<v8::Value>& args);
7981

8082
private:
8183
bool CreateEnvMessagePort(Environment* env);
@@ -102,6 +104,7 @@ class Worker : public AsyncWrap {
102104
uintptr_t stack_base_ = 0;
103105
// Optional name used for debugging in inspector and trace events.
104106
std::string name_;
107+
static std::atomic_bool internalExists;
105108

106109
// Custom resource constraints:
107110
double resource_limits_[kTotalResourceLimitCount];

test/es-module/test-esm-loader-hooks.mjs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -612,7 +612,7 @@ describe('Loader hooks', { concurrency: !process.env.TEST_PARALLEL }, () => {
612612
]);
613613

614614
assert.strictEqual(stderr, '');
615-
assert.deepStrictEqual(stdout.split('\n'), ['resolve passthru', 'resolve passthru', '']);
615+
assert.deepStrictEqual(stdout.split('\n'), ['resolve passthru', '']);
616616
assert.strictEqual(code, 0);
617617
assert.strictEqual(signal, null);
618618
});

test/es-module/test-esm-loader-programmatically.mjs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,10 @@ describe('ESM: programmatically register loaders', { concurrency: !process.env.T
163163
const lines = stdout.split('\n');
164164

165165
assert.match(lines[0], /resolve passthru/);
166-
assert.match(lines[1], /resolve passthru/);
167-
assert.match(lines[2], /load passthru/);
168-
assert.match(lines[3], /load passthru/);
169-
assert.match(lines[4], /Hello from dynamic import/);
166+
assert.match(lines[1], /load passthru/);
167+
assert.match(lines[2], /Hello from dynamic import/);
170168

171-
assert.strictEqual(lines[5], '');
169+
assert.strictEqual(lines[3], '');
172170
});
173171

174172
it('works registering loaders as package name', async () => {

test/es-module/test-esm-loader-threads.mjs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ describe('off-thread hooks', { concurrency: true }, () => {
1616
fixtures.path('es-module-loaders/workers-spawned.mjs'),
1717
]);
1818

19+
console.log(stderr);
20+
console.log(stdout);
21+
1922
strictEqual(stderr, '');
2023
strictEqual(stdout.split('\n').filter((line) => line.startsWith('initialize')).length, 1);
2124
strictEqual(stdout.split('\n').filter((line) => line === 'foo').length, 2);
@@ -30,7 +33,9 @@ describe('off-thread hooks', { concurrency: true }, () => {
3033
// 6x module-named-exports.mjs => 2x worker-log.mjs + 4x worker-log-again.mjs
3134
// ===========================
3235
// 16 calls to resolve + 16 calls to load hook for the registered custom loader
33-
strictEqual(stdout.split('\n').filter((line) => line.startsWith('hooked resolve')).length, 16);
36+
// 6 additional calls to resolve because of the modeul.register being allowed from worker threads (happens
37+
// implicitly because of the --import on the main thread)
38+
strictEqual(stdout.split('\n').filter((line) => line.startsWith('hooked resolve')).length, 22);
3439
strictEqual(stdout.split('\n').filter((line) => line.startsWith('hooked load')).length, 16);
3540
strictEqual(code, 0);
3641
strictEqual(signal, null);

test/fixtures/es-module-loaders/hooks-log.mjs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ export function initialize() {
99
}
1010

1111
export function resolve(specifier, context, next) {
12-
writeFileSync(1, `hooked resolve ${++resolveCount} ${specifier}\n`);
12+
writeFileSync(1, `hooked resolve ${++resolveCount} ${specifier} \n`);
1313
return next(specifier, context);
1414
}
1515

0 commit comments

Comments
 (0)