Skip to content

Commit 1ac4bfe

Browse files
committed
worker: add connect and setConnectionsListener
1 parent 2333573 commit 1ac4bfe

14 files changed

Lines changed: 370 additions & 10 deletions

doc/api/errors.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3072,6 +3072,16 @@ added: v18.1.0
30723072
The `Response` that has been passed to `WebAssembly.compileStreaming` or to
30733073
`WebAssembly.instantiateStreaming` is not a valid WebAssembly response.
30743074

3075+
<a id="ERR_WORKER_CONNECTION_REFUSED"></a>
3076+
3077+
### `ERR_WORKER_CONNECTION_REFUSED`
3078+
3079+
<!-- YAML
3080+
added: REPLACEME
3081+
-->
3082+
3083+
The thread requested in [`connect()`][] refused the connection or has no connections listener provided.
3084+
30753085
<a id="ERR_WORKER_INIT_FAILED"></a>
30763086

30773087
### `ERR_WORKER_INIT_FAILED`
@@ -3085,6 +3095,16 @@ The `Worker` initialization failed.
30853095
The `execArgv` option passed to the `Worker` constructor contains
30863096
invalid flags.
30873097

3098+
<a id="ERR_WORKER_INVALID_ID"></a>
3099+
3100+
### `ERR_WORKER_INVALID_ID`
3101+
3102+
<!-- YAML
3103+
added: REPLACEME
3104+
-->
3105+
3106+
The thread id requested in [`connect()`][] is invalid.
3107+
30883108
<a id="ERR_WORKER_NOT_RUNNING"></a>
30893109

30903110
### `ERR_WORKER_NOT_RUNNING`
@@ -3104,6 +3124,16 @@ The `Worker` instance terminated because it reached its memory limit.
31043124
The path for the main script of a worker is neither an absolute path
31053125
nor a relative path starting with `./` or `../`.
31063126

3127+
<a id="ERR_WORKER_SAME_THREAD"></a>
3128+
3129+
### `ERR_WORKER_SAME_THREAD`
3130+
3131+
<!-- YAML
3132+
added: REPLACEME
3133+
-->
3134+
3135+
The thread id requested in [`connect()`][] is the current thread id.
3136+
31073137
<a id="ERR_WORKER_UNSERIALIZABLE_ERROR"></a>
31083138

31093139
### `ERR_WORKER_UNSERIALIZABLE_ERROR`
@@ -3999,6 +4029,7 @@ An error occurred trying to allocate memory. This should never happen.
39994029
[`Writable`]: stream.md#class-streamwritable
40004030
[`child_process`]: child_process.md
40014031
[`cipher.getAuthTag()`]: crypto.md#ciphergetauthtag
4032+
[`connect()`]: worker_threads.md#workerconnecttarget-data
40024033
[`crypto.getDiffieHellman()`]: crypto.md#cryptogetdiffiehellmangroupname
40034034
[`crypto.scrypt()`]: crypto.md#cryptoscryptpassword-salt-keylen-options-callback
40044035
[`crypto.scryptSync()`]: crypto.md#cryptoscryptsyncpassword-salt-keylen-options

doc/api/worker_threads.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,22 @@ Worker threads inherit non-process-specific options by default. Refer to
6161
[`Worker constructor options`][] to know how to customize worker thread options,
6262
specifically `argv` and `execArgv` options.
6363

64+
## `worker.connect(target, data)`
65+
66+
<!-- YAML
67+
added: REPLACEME
68+
-->
69+
70+
* `target` {number} The target thread id.
71+
* `data` {any} Any arbitrary, cloneable JavaScript value.
72+
* Returns: {MessagePort}
73+
74+
Establishes a connection to another worker thread in the same process, returning a
75+
`MessagePort` that can be used for the communication.
76+
77+
The target thread must have a connection listener setup via [`worker.setConnectionsListener()`][]
78+
otherwise the connection request will fail.
79+
6480
## `worker.getEnvironmentData(key)`
6581

6682
<!-- YAML
@@ -325,6 +341,28 @@ new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })
325341
});
326342
```
327343

344+
## `worker.setConnectionsListener(fn)`
345+
346+
<!-- YAML
347+
added: REPLACEME
348+
-->
349+
350+
* `fn` {Function} A callback to be executed when [`worker.connect()`][] is called from another thread.
351+
The function will received the following arguments:
352+
353+
* `sender` {number} The other thread id.
354+
* `port` {MessagePort} The port than can be used to communicate with the other thread.
355+
* `data` {any} The data passed to [`worker.connect()`][].
356+
357+
The function must return `true` to accept the connection or any other value to
358+
refuse the connection.
359+
360+
Sets the callback that handles connection from other worker threads in the same process.
361+
If the callback is `null` or `undefined` then the current listener is removed.
362+
363+
When no listeners are present (the default) all connection requests are immediately
364+
refused.
365+
328366
## `worker.setEnvironmentData(key[, value])`
329367

330368
<!-- YAML
@@ -1437,8 +1475,10 @@ thread spawned will spawn another until the application crashes.
14371475
[`v8.getHeapSnapshot()`]: v8.md#v8getheapsnapshotoptions
14381476
[`vm`]: vm.md
14391477
[`worker.SHARE_ENV`]: #workershare_env
1478+
[`worker.connect()`]: #workerconnecttarget-data
14401479
[`worker.on('message')`]: #event-message_1
14411480
[`worker.postMessage()`]: #workerpostmessagevalue-transferlist
1481+
[`worker.setConnectionsListener()`]: #workersetconnectionslistenerfn
14421482
[`worker.terminate()`]: #workerterminate
14431483
[`worker.threadId`]: #workerthreadid_1
14441484
[async-resource-worker-pool]: async_context.md#using-asyncresource-for-a-worker-thread-pool

lib/internal/errors.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1858,10 +1858,12 @@ E('ERR_VM_MODULE_NOT_MODULE',
18581858
E('ERR_VM_MODULE_STATUS', 'Module status %s', Error);
18591859
E('ERR_WASI_ALREADY_STARTED', 'WASI instance has already started', Error);
18601860
E('ERR_WEBASSEMBLY_RESPONSE', 'WebAssembly response %s', TypeError);
1861+
E('ERR_WORKER_CONNECTION_REFUSED', 'Connection refused from worker', Error);
18611862
E('ERR_WORKER_INIT_FAILED', 'Worker initialization failure: %s', Error);
18621863
E('ERR_WORKER_INVALID_EXEC_ARGV', (errors, msg = 'invalid execArgv flags') =>
18631864
`Initiated Worker with ${msg}: ${ArrayPrototypeJoin(errors, ', ')}`,
18641865
Error);
1866+
E('ERR_WORKER_INVALID_ID', 'Invalid worker id', Error);
18651867
E('ERR_WORKER_NOT_RUNNING', 'Worker instance not running', Error);
18661868
E('ERR_WORKER_OUT_OF_MEMORY',
18671869
'Worker terminated due to reaching memory limit: %s', Error);
@@ -1876,6 +1878,7 @@ E('ERR_WORKER_PATH', (filename) =>
18761878
) +
18771879
` Received "${filename}"`,
18781880
TypeError);
1881+
E('ERR_WORKER_SAME_THREAD', 'Cannot connect to the same thread', Error);
18791882
E('ERR_WORKER_UNSERIALIZABLE_ERROR',
18801883
'Serializing an uncaught exception failed', Error);
18811884
E('ERR_WORKER_UNSUPPORTED_OPERATION',

lib/internal/main/worker_thread.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ const {
4040
// Messages that may be either received or posted
4141
STDIO_PAYLOAD,
4242
STDIO_WANTS_MORE_DATA,
43+
CONNECT,
4344
},
4445
kStdioWantsMoreDataCallback,
4546
} = workerIo;
@@ -182,6 +183,8 @@ port.on('message', (message) => {
182183
break;
183184
}
184185
}
186+
} else if (message.type === CONNECT) {
187+
require('internal/worker').processConnectionRequest(message);
185188
} else if (message.type === STDIO_PAYLOAD) {
186189
const { stream, chunks } = message;
187190
ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => {

lib/internal/worker.js

Lines changed: 97 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,12 @@ const {
55
ArrayPrototypeMap,
66
ArrayPrototypePush,
77
AtomicsAdd,
8+
AtomicsNotify,
9+
AtomicsStore,
10+
AtomicsWait,
811
Float64Array,
912
FunctionPrototypeBind,
13+
Int32Array,
1014
JSONStringify,
1115
MathMax,
1216
ObjectEntries,
@@ -34,12 +38,14 @@ const {
3438

3539
const errorCodes = require('internal/errors').codes;
3640
const {
41+
ERR_INVALID_ARG_TYPE,
42+
ERR_INVALID_ARG_VALUE,
43+
ERR_WORKER_CONNECTION_REFUSED,
44+
ERR_WORKER_INVALID_EXEC_ARGV,
3745
ERR_WORKER_NOT_RUNNING,
3846
ERR_WORKER_PATH,
47+
ERR_WORKER_SAME_THREAD,
3948
ERR_WORKER_UNSERIALIZABLE_ERROR,
40-
ERR_WORKER_INVALID_EXEC_ARGV,
41-
ERR_INVALID_ARG_TYPE,
42-
ERR_INVALID_ARG_VALUE,
4349
} = errorCodes;
4450

4551
const workerIo = require('internal/worker/io');
@@ -59,7 +65,7 @@ const {
5965
const { deserializeError } = require('internal/error_serdes');
6066
const { fileURLToPath, isURL, pathToFileURL } = require('internal/url');
6167
const { kEmptyObject } = require('internal/util');
62-
const { validateArray, validateString } = require('internal/validators');
68+
const { validateArray, validateFunction, validateString } = require('internal/validators');
6369
const {
6470
throwIfBuildingSnapshot,
6571
} = require('internal/v8/startup_snapshot');
@@ -74,6 +80,8 @@ const {
7480
kCodeRangeSizeMb,
7581
kStackSizeMb,
7682
kTotalResourceLimitCount,
83+
sendToWorker,
84+
setMainPort,
7785
} = internalBinding('worker');
7886

7987
const kHandle = Symbol('kHandle');
@@ -100,6 +108,14 @@ let cwdCounter;
100108

101109
const environmentData = new SafeMap();
102110

111+
// ShareadArrayBuffer must always be Int32, so it's * 4.
112+
// We need one for the operation status (performing / performed) and one for the result (success / failure).
113+
const WORKER_MESSAGING_SHARED_DATA = 2 * 4;
114+
const WORKER_MESSAGING_STATUS_INDEX = 0;
115+
const WORKER_MESSAGING_RESULT_INDEX = 1;
116+
let connectionsListener = null;
117+
let mainPortWasSetup = false;
118+
103119
// SharedArrayBuffers can be disabled with --enable-sharedarraybuffer-per-context.
104120
if (isMainThread && SharedArrayBuffer !== undefined) {
105121
cwdCounter = new Uint32Array(new SharedArrayBuffer(4));
@@ -527,6 +543,79 @@ function eventLoopUtilization(util1, util2) {
527543
);
528544
}
529545

546+
function setConnectionsListener(fn) {
547+
if (isMainThread && !mainPortWasSetup) {
548+
setupMainPort();
549+
mainPortWasSetup = true;
550+
}
551+
552+
if (typeof fn === 'undefined' || fn === null) {
553+
connectionsListener = null;
554+
return;
555+
}
556+
557+
validateFunction(fn, 'fn');
558+
connectionsListener = fn;
559+
}
560+
561+
function processConnectionRequest(request) {
562+
const status = new Int32Array(request.memory);
563+
564+
if (connectionsListener?.(request.sender, request.port, request.data) !== true) {
565+
AtomicsStore(status, WORKER_MESSAGING_RESULT_INDEX, 1);
566+
AtomicsStore(status, WORKER_MESSAGING_STATUS_INDEX, 1);
567+
AtomicsNotify(status, WORKER_MESSAGING_STATUS_INDEX);
568+
return;
569+
}
570+
571+
AtomicsStore(status, WORKER_MESSAGING_RESULT_INDEX, 0);
572+
AtomicsStore(status, WORKER_MESSAGING_STATUS_INDEX, 1);
573+
AtomicsNotify(status, WORKER_MESSAGING_STATUS_INDEX);
574+
}
575+
576+
function connect(target, data) {
577+
if (target === threadId) {
578+
throw new ERR_WORKER_SAME_THREAD();
579+
}
580+
581+
// Create a shared array to exchange the status and the result
582+
const memory = new SharedArrayBuffer(WORKER_MESSAGING_SHARED_DATA);
583+
const status = new Int32Array(memory);
584+
585+
// Create the channel and send it to the other thred
586+
const { port1, port2 } = new MessageChannel();
587+
sendToWorker(target, { type: messageTypes.CONNECT, sender: threadId, port: port2, memory, data }, [port2]);
588+
589+
// Wait for the response
590+
AtomicsWait(status, 0, 0);
591+
592+
if (status[1] === 1) {
593+
port1.close();
594+
port2.close();
595+
throw new ERR_WORKER_CONNECTION_REFUSED();
596+
}
597+
598+
return port1;
599+
}
600+
601+
function setupMainPort() {
602+
const { port1, port2 } = new MessageChannel();
603+
setMainPort(port2);
604+
605+
// Set message management
606+
port1.on('message', (message) => {
607+
if (message.type === messageTypes.CONNECT) {
608+
processConnectionRequest(message);
609+
} else {
610+
assert(message.type === messageTypes.CONNECT, `Unknown worker message type ${message.type}`);
611+
}
612+
});
613+
614+
// Never block the process on this channel
615+
port1.unref();
616+
port2.unref();
617+
}
618+
530619
module.exports = {
531620
ownsProcessState,
532621
kIsOnline,
@@ -537,6 +626,10 @@ module.exports = {
537626
setEnvironmentData,
538627
getEnvironmentData,
539628
assignEnvironmentData,
629+
setConnectionsListener,
630+
setupMainPort,
631+
processConnectionRequest,
632+
connect,
540633
threadId,
541634
InternalWorker,
542635
Worker,

lib/internal/worker/io.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ const messageTypes = {
8383
STDIO_PAYLOAD: 'stdioPayload',
8484
STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData',
8585
LOAD_SCRIPT: 'loadScript',
86+
CONNECT: 'connect',
8687
};
8788

8889
// createFastMessageEvent skips webidl argument validation when the arguments

lib/worker_threads.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ const {
66
resourceLimits,
77
setEnvironmentData,
88
getEnvironmentData,
9+
connect,
910
threadId,
1011
Worker,
12+
setConnectionsListener,
1113
} = require('internal/worker');
1214

1315
const {
@@ -40,4 +42,6 @@ module.exports = {
4042
BroadcastChannel,
4143
setEnvironmentData,
4244
getEnvironmentData,
45+
setConnectionsListener,
46+
connect,
4347
};

src/node_errors.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ void OOMErrorHandler(const char* location, const v8::OOMDetails& details);
102102
V(ERR_VM_MODULE_LINK_FAILURE, Error) \
103103
V(ERR_WASI_NOT_STARTED, Error) \
104104
V(ERR_WORKER_INIT_FAILED, Error) \
105+
V(ERR_WORKER_INVALID_ID, Error) \
105106
V(ERR_PROTO_ACCESS, Error)
106107

107108
#define V(code, type) \

src/node_messaging.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ class MessagePort;
1919

2020
typedef MaybeStackBuffer<v8::Local<v8::Value>, 8> TransferList;
2121

22+
bool GetTransferList(Environment* env,
23+
v8::Local<v8::Context> context,
24+
v8::Local<v8::Value> transfer_list_v,
25+
TransferList* transfer_list_out);
26+
2227
// Used to represent the in-flight structure of an object that is being
2328
// transferred or cloned using postMessage().
2429
class TransferData : public MemoryRetainer {

0 commit comments

Comments
 (0)