Skip to content

Commit 4bece45

Browse files
committed
worker: add connect and setConnectionsListener
1 parent 6cb940a commit 4bece45

15 files changed

Lines changed: 652 additions & 12 deletions

doc/api/errors.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3072,6 +3072,16 @@ added: v18.1.0
30723072
The `Response` that has been passed to `WebAssembly.compileStreaming` or to
30733073
`WebAssembly.instantiateStreaming` is not a valid WebAssembly response.
30743074

3075+
<a id="ERR_WORKER_CONNECTION_REFUSED"></a>
3076+
3077+
### `ERR_WORKER_CONNECTION_REFUSED`
3078+
3079+
<!-- YAML
3080+
added: REPLACEME
3081+
-->
3082+
3083+
The thread requested in [`connect()`][] refused the connection or has no connections listener provided.
3084+
30753085
<a id="ERR_WORKER_INIT_FAILED"></a>
30763086

30773087
### `ERR_WORKER_INIT_FAILED`
@@ -3085,6 +3095,16 @@ The `Worker` initialization failed.
30853095
The `execArgv` option passed to the `Worker` constructor contains
30863096
invalid flags.
30873097

3098+
<a id="ERR_WORKER_INVALID_ID"></a>
3099+
3100+
### `ERR_WORKER_INVALID_ID`
3101+
3102+
<!-- YAML
3103+
added: REPLACEME
3104+
-->
3105+
3106+
The thread id requested in [`connect()`][] is invalid.
3107+
30883108
<a id="ERR_WORKER_NOT_RUNNING"></a>
30893109

30903110
### `ERR_WORKER_NOT_RUNNING`
@@ -3104,6 +3124,16 @@ The `Worker` instance terminated because it reached its memory limit.
31043124
The path for the main script of a worker is neither an absolute path
31053125
nor a relative path starting with `./` or `../`.
31063126

3127+
<a id="ERR_WORKER_SAME_THREAD"></a>
3128+
3129+
### `ERR_WORKER_SAME_THREAD`
3130+
3131+
<!-- YAML
3132+
added: REPLACEME
3133+
-->
3134+
3135+
The thread id requested in [`connect()`][] is the current thread id.
3136+
31073137
<a id="ERR_WORKER_UNSERIALIZABLE_ERROR"></a>
31083138

31093139
### `ERR_WORKER_UNSERIALIZABLE_ERROR`
@@ -3999,6 +4029,7 @@ An error occurred trying to allocate memory. This should never happen.
39994029
[`Writable`]: stream.md#class-streamwritable
40004030
[`child_process`]: child_process.md
40014031
[`cipher.getAuthTag()`]: crypto.md#ciphergetauthtag
4032+
[`connect()`]: worker_threads.md#workerconnecttarget-data-timeout
40024033
[`crypto.getDiffieHellman()`]: crypto.md#cryptogetdiffiehellmangroupname
40034034
[`crypto.scrypt()`]: crypto.md#cryptoscryptpassword-salt-keylen-options-callback
40044035
[`crypto.scryptSync()`]: crypto.md#cryptoscryptsyncpassword-salt-keylen-options

doc/api/worker_threads.md

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,113 @@ Worker threads inherit non-process-specific options by default. Refer to
6161
[`Worker constructor options`][] to know how to customize worker thread options,
6262
specifically `argv` and `execArgv` options.
6363

64+
## `worker.connect(target[, data][, timeout])`
65+
66+
<!-- YAML
67+
added: REPLACEME
68+
-->
69+
70+
> Stability: 1.1 - Active development
71+
72+
* `target` {number} The target thread id.
73+
* `data` {any} Any arbitrary, cloneable JavaScript value.
74+
The data will be passed as the second argument to the callback provided in the
75+
target thread via [`worker.setConnectionsListener()`][].
76+
* `timeout` {number} Time to wait for the communication port to the target thread,
77+
in milliseconds. By default it's `undefined`, which means wait forever.
78+
* Returns: {Promise} A promise for a `MessagePort`.
79+
80+
Establishes a connection to another worker thread in the same process, returning a
81+
`MessagePort` that can be used for the communication.
82+
83+
The target thread must have a connection listener setup via [`worker.setConnectionsListener()`][]
84+
otherwise the connection request will fail.
85+
86+
This method should be used when the target thread is not the direct parent or children of the current thread.
87+
The example below creates 10 nested threads, the last one will try to communicate with the third one.
88+
89+
```mjs
90+
import { fileURLToPath } from 'node:url';
91+
import {
92+
Worker,
93+
connect,
94+
setConnectionsListener,
95+
threadId,
96+
workerData,
97+
} from 'node:worker_threads';
98+
99+
const level = workerData?.level ?? 0;
100+
const targetThread =
101+
workerData?.targetThread ?? (level === 2 ? threadId : undefined);
102+
103+
if (level < 10) {
104+
const worker = new Worker(fileURLToPath(import.meta.url), {
105+
workerData: { level: level + 1, targetThread },
106+
});
107+
}
108+
109+
if (level === 2) {
110+
setConnectionsListener((sender, port, data) => {
111+
port.on('message', (message) => {
112+
console.log(`${sender} -> ${threadId}`, message);
113+
port.postMessage({ message: 'pong', data });
114+
});
115+
116+
return true;
117+
});
118+
} else if (level === 10) {
119+
const port = await connect(targetThread, { foo: 'bar' });
120+
121+
port.on('message', (message) => {
122+
console.log(`${targetThread} -> ${threadId}`, message);
123+
port.close();
124+
});
125+
126+
port.postMessage({ message: 'ping' });
127+
}
128+
```
129+
130+
```cjs
131+
const {
132+
Worker,
133+
connect,
134+
setConnectionsListener,
135+
threadId,
136+
workerData,
137+
} = require('node:worker_threads');
138+
139+
const level = workerData?.level ?? 0;
140+
const targetThread =
141+
workerData?.targetThread ?? (level === 2 ? threadId : undefined);
142+
143+
if (level < 10) {
144+
const worker = new Worker(__filename, {
145+
workerData: { level: level + 1, targetThread },
146+
});
147+
}
148+
149+
if (level === 2) {
150+
setConnectionsListener((sender, port, data) => {
151+
port.on('message', (message) => {
152+
console.log(`${sender} -> ${threadId}`, message);
153+
port.postMessage({ message: 'pong', data });
154+
});
155+
156+
return true;
157+
});
158+
} else if (level === 10) {
159+
connect(targetThread, { foo: 'bar' }).then((port) => {
160+
port.on('message', (message) => {
161+
console.log(`${targetThread} -> ${threadId}`, message);
162+
port.close();
163+
});
164+
165+
port.postMessage({ message: 'ping' });
166+
167+
});
168+
}
169+
```
170+
64171
## `worker.getEnvironmentData(key)`
65172
66173
<!-- YAML
@@ -325,6 +432,32 @@ new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })
325432
});
326433
```
327434
435+
## `worker.setConnectionsListener(fn)`
436+
437+
<!-- YAML
438+
added: REPLACEME
439+
-->
440+
441+
> Stability: 1.1 - Active development
442+
443+
* `fn` {Function} A callback to be executed when [`worker.connect()`][] is called from another thread.
444+
The function will receive the following arguments:
445+
446+
* `sender` {number} The other thread id.
447+
* `port` {MessagePort} The port than can be used to communicate with the other thread.
448+
* `data` {any} The data passed as second argument to [`worker.connect()`][].
449+
450+
The function must return `true` to accept the connection or any other value to
451+
refuse the connection. If the function returns a `Promise`, it will be awaited.
452+
453+
Sets the callback that handles connection from other worker threads in the same process.
454+
If the callback is `null` or `undefined` then the current listener is removed.
455+
456+
When no listeners are present (the default) all connection requests are immediately
457+
refused.
458+
459+
See the example in [`worker.connect()`][] for more info on how to use this function and its callback.
460+
328461
## `worker.setEnvironmentData(key[, value])`
329462
330463
<!-- YAML
@@ -1437,8 +1570,10 @@ thread spawned will spawn another until the application crashes.
14371570
[`v8.getHeapSnapshot()`]: v8.md#v8getheapsnapshotoptions
14381571
[`vm`]: vm.md
14391572
[`worker.SHARE_ENV`]: #workershare_env
1573+
[`worker.connect()`]: #workerconnecttarget-data-timeout
14401574
[`worker.on('message')`]: #event-message_1
14411575
[`worker.postMessage()`]: #workerpostmessagevalue-transferlist
1576+
[`worker.setConnectionsListener()`]: #workersetconnectionslistenerfn
14421577
[`worker.terminate()`]: #workerterminate
14431578
[`worker.threadId`]: #workerthreadid_1
14441579
[async-resource-worker-pool]: async_context.md#using-asyncresource-for-a-worker-thread-pool

lib/internal/errors.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1858,10 +1858,12 @@ E('ERR_VM_MODULE_NOT_MODULE',
18581858
E('ERR_VM_MODULE_STATUS', 'Module status %s', Error);
18591859
E('ERR_WASI_ALREADY_STARTED', 'WASI instance has already started', Error);
18601860
E('ERR_WEBASSEMBLY_RESPONSE', 'WebAssembly response %s', TypeError);
1861+
E('ERR_WORKER_CONNECTION_REFUSED', 'Connection refused from worker', Error);
18611862
E('ERR_WORKER_INIT_FAILED', 'Worker initialization failure: %s', Error);
18621863
E('ERR_WORKER_INVALID_EXEC_ARGV', (errors, msg = 'invalid execArgv flags') =>
18631864
`Initiated Worker with ${msg}: ${ArrayPrototypeJoin(errors, ', ')}`,
18641865
Error);
1866+
E('ERR_WORKER_INVALID_ID', 'Invalid worker id %d', Error);
18651867
E('ERR_WORKER_NOT_RUNNING', 'Worker instance not running', Error);
18661868
E('ERR_WORKER_OUT_OF_MEMORY',
18671869
'Worker terminated due to reaching memory limit: %s', Error);
@@ -1876,6 +1878,7 @@ E('ERR_WORKER_PATH', (filename) =>
18761878
) +
18771879
` Received "${filename}"`,
18781880
TypeError);
1881+
E('ERR_WORKER_SAME_THREAD', 'Cannot connect to the same thread', Error);
18791882
E('ERR_WORKER_UNSERIALIZABLE_ERROR',
18801883
'Serializing an uncaught exception failed', Error);
18811884
E('ERR_WORKER_UNSUPPORTED_OPERATION',

lib/internal/main/worker_thread.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const {
2828
getEnvMessagePort,
2929
} = internalBinding('worker');
3030

31+
const { processConnectionRequest } = require('internal/worker');
3132
const workerIo = require('internal/worker/io');
3233
const {
3334
messageTypes: {
@@ -40,6 +41,7 @@ const {
4041
// Messages that may be either received or posted
4142
STDIO_PAYLOAD,
4243
STDIO_WANTS_MORE_DATA,
44+
CONNECT,
4345
},
4446
kStdioWantsMoreDataCallback,
4547
} = workerIo;
@@ -182,6 +184,8 @@ port.on('message', (message) => {
182184
break;
183185
}
184186
}
187+
} else if (message.type === CONNECT) {
188+
processConnectionRequest(message);
185189
} else if (message.type === STDIO_PAYLOAD) {
186190
const { stream, chunks } = message;
187191
ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => {

0 commit comments

Comments
 (0)