Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -3564,6 +3564,22 @@ added: v18.1.0
The `Response` that has been passed to `WebAssembly.compileStreaming` or to
`WebAssembly.instantiateStreaming` is not a valid WebAssembly response.

<a id="ERR_WORKER_HANDLE_NOT_TRANSFERABLE"></a>

### `ERR_WORKER_HANDLE_NOT_TRANSFERABLE`

An attempt was made to transfer a `net.Socket` or `net.Server` to another thread
via a `worker_threads` `postMessage()` call while it was not in a transferable
state, for example because it had already started reading or had buffered data.

<a id="ERR_WORKER_HANDLE_TRANSFER_UNSUPPORTED"></a>

### `ERR_WORKER_HANDLE_TRANSFER_UNSUPPORTED`

An attempt was made to transfer a `net.Socket` or `net.Server` to another thread
on a platform where moving the underlying handle between event loops is not
supported (currently Windows).

<a id="ERR_WORKER_INIT_FAILED"></a>

### `ERR_WORKER_INIT_FAILED`
Expand Down
42 changes: 42 additions & 0 deletions doc/api/net.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,11 @@ added: v0.1.90

This class is used to create a TCP or [IPC][] server.

A listening TCP `net.Server` can be transferred to a worker thread by listing it
in the `transferList` of a [`worker_threads`][] `postMessage()` call. This moves
the underlying listening socket to the receiving thread, where it resumes
accepting connections. See [Transferring TCP handles to other threads][].

### `new net.Server([options][, connectionListener])`

* `options` {Object} See
Expand Down Expand Up @@ -751,6 +756,41 @@ is received. For example, it is passed to the listeners of a
[`'connection'`][] event emitted on a [`net.Server`][], so the user can use
it to interact with the client.

### Transferring TCP handles to other threads

A connected TCP `net.Socket` can be moved to another thread by listing it in the
`transferList` of a [`worker_threads`][] `postMessage()` call. After the
transfer, the source socket is destroyed on the sending thread (further use
fails with `ERR_STREAM_DESTROYED` rather than silently dropping data), and the
socket continues to work on the receiving thread. This makes it possible to
accept connections on one thread and distribute them across a pool of worker
threads, for example to build a `node:cluster`-like model on top of worker
threads.

The socket must be a freshly accepted or created TCP connection: it must still
be attached to a live handle, must not be connecting or destroyed, and must not
have started reading or have buffered data. Otherwise `postMessage()` throws
`ERR_WORKER_HANDLE_NOT_TRANSFERABLE`. Only TCP sockets are supported, and only
on Unix-like platforms; on Windows `postMessage()` throws
`ERR_WORKER_HANDLE_TRANSFER_UNSUPPORTED`.

```cjs
const net = require('node:net');
const { Worker } = require('node:worker_threads');

// worker.js receives `{ socket }` messages and handles each connection.
const worker = new Worker('./worker.js');

const server = net.createServer((socket) => {
// Hand the freshly accepted connection off to the worker thread.
worker.postMessage({ socket }, [socket]);
});
server.listen(8000);
```

A listening [`net.Server`][] can be transferred the same way, which moves the
listening socket itself (and its pending accept queue) to the receiving thread.

### `new net.Socket([options])`

<!-- YAML
Expand Down Expand Up @@ -2194,6 +2234,7 @@ net.isIPv6('fhqwhgads'); // returns false
[Identifying paths for IPC connections]: #identifying-paths-for-ipc-connections
[RFC 8305]: https://www.rfc-editor.org/rfc/rfc8305.txt
[Readable Stream]: stream.md#class-streamreadable
[Transferring TCP handles to other threads]: #transferring-tcp-handles-to-other-threads
[`'close'`]: #event-close
[`'connect'`]: #event-connect
[`'connection'`]: #event-connection
Expand Down Expand Up @@ -2250,6 +2291,7 @@ net.isIPv6('fhqwhgads'); // returns false
[`socket.setTimeout()`]: #socketsettimeouttimeout-callback
[`socket.setTimeout(timeout)`]: #socketsettimeouttimeout-callback
[`stream.getDefaultHighWaterMark()`]: stream.md#streamgetdefaulthighwatermarkobjectmode
[`worker_threads`]: worker_threads.md
[`writable.destroy()`]: stream.md#writabledestroyerror
[`writable.destroyed`]: stream.md#writabledestroyed
[`writable.end()`]: stream.md#writableendchunk-encoding-callback
Expand Down
21 changes: 15 additions & 6 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -1208,6 +1208,8 @@ In particular, the significant differences to `JSON` are:
* {KeyObject}s,
* {MessagePort}s,
* {net.BlockList}s,
* {net.Server}s (TCP only, when listed in `transferList`),
* {net.Socket}s (TCP only, when listed in `transferList`),
* {net.SocketAddress}es,
* {X509Certificate}s.

Expand Down Expand Up @@ -1235,12 +1237,20 @@ circularData.foo = circularData;
port2.postMessage(circularData);
```

`transferList` may be a list of {ArrayBuffer}, [`MessagePort`][], and
[`FileHandle`][] objects.
`transferList` may be a list of {ArrayBuffer}, [`MessagePort`][],
[`FileHandle`][], {net.Server}, and {net.Socket} objects.
After transferring, they are not usable on the sending side of the channel
anymore (even if they are not contained in `value`). Unlike with
[child processes][], transferring handles such as network sockets is currently
not supported.
anymore (even if they are not contained in `value`).

Transferring a {net.Server} moves its listening socket — together with any
pending connections in the accept queue — to the receiving thread's event loop.
Transferring a {net.Socket} moves a single connection; the socket must be a
freshly accepted or created TCP connection that has not yet started reading and
has no buffered data, otherwise `postMessage()` throws
`ERR_WORKER_HANDLE_NOT_TRANSFERABLE`. This makes it possible to accept
connections on one thread and distribute them across a pool of worker threads.
Only TCP handles are supported, and only on Unix-like platforms; on Windows
`postMessage()` throws `ERR_WORKER_HANDLE_TRANSFER_UNSUPPORTED`.

If `value` contains {SharedArrayBuffer} instances, those are accessible
from either thread. They cannot be listed in `transferList`.
Expand Down Expand Up @@ -2276,7 +2286,6 @@ thread spawned will spawn another until the application crashes.
[async-resource-worker-pool]: async_context.md#using-asyncresource-for-a-worker-thread-pool
[browser `LockManager`]: https://developer.mozilla.org/en-US/docs/Web/API/LockManager
[browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort
[child processes]: child_process.md
[contextified]: vm.md#what-does-it-mean-to-contextify-an-object
[locks.request()]: #locksrequestname-options-callback
[v8.serdes]: v8.md#serialization-api
6 changes: 6 additions & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -1948,6 +1948,12 @@ E('ERR_WEBASSEMBLY_NOT_SUPPORTED',
'WebAssembly is not supported in this environment, but is required for %s',
Error);
E('ERR_WEBASSEMBLY_RESPONSE', 'WebAssembly response %s', TypeError);
E('ERR_WORKER_HANDLE_NOT_TRANSFERABLE',
'%s cannot be transferred in its current state; it must be a freshly ' +
'created or accepted handle that has not started reading and has no ' +
'pending writes', Error);
E('ERR_WORKER_HANDLE_TRANSFER_UNSUPPORTED',
'Transferring a %s to another thread is not supported on this platform', Error);
E('ERR_WORKER_INIT_FAILED', 'Worker initialization failure: %s', Error);
E('ERR_WORKER_INVALID_EXEC_ARGV', (errors, msg = 'invalid execArgv flags') =>
`Initiated Worker with ${msg}: ${ArrayPrototypeJoin(errors, ', ')}`,
Expand Down
122 changes: 122 additions & 0 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,17 @@ const {
ERR_SOCKET_CLOSED_BEFORE_CONNECTION,
ERR_SOCKET_CONNECTION_TIMEOUT,
ERR_SOCKET_HANDLE_ADOPTED,
ERR_WORKER_HANDLE_NOT_TRANSFERABLE,
ERR_WORKER_HANDLE_TRANSFER_UNSUPPORTED,
},
genericNodeError,
} = require('internal/errors');
const {
markTransferMode,
kDeserialize,
kTransfer,
kTransferList,
} = require('internal/worker/js_transferable');
const { isUint8Array } = require('internal/util/types');
const { queueMicrotask } = require('internal/process/task_queues');
const {
Expand Down Expand Up @@ -484,6 +492,9 @@ class BoundSocket {

function Socket(options) {
if (!(this instanceof Socket)) return new Socket(options);
// A connected TCP Socket can be moved to another thread by listing it in the
// transferList of a worker_threads postMessage() call. See [kTransfer]().
markTransferMode(this, false, true);
if (options?.objectMode) {
throw new ERR_INVALID_ARG_VALUE(
'options.objectMode',
Expand Down Expand Up @@ -1501,6 +1512,58 @@ Socket.prototype[kReinitializeHandle] = function reinitializeHandle(handle) {
initSocketHandle(this);
};

// A Socket can be transferred to another thread only while it is a freshly
// accepted/created TCP connection: still attached to a live handle, not
// connecting or destroyed, and with no data already buffered in either
// direction (which would otherwise be lost on the sending side).
function assertTransferableSocket(socket) {
if (isWindows) {
throw new ERR_WORKER_HANDLE_TRANSFER_UNSUPPORTED('net.Socket');
}
const handle = socket._handle;
if (handle == null || !(handle instanceof TCP) ||
socket.destroyed || socket.connecting ||
socket.bytesRead > 0 || socket.bytesWritten > 0 ||
socket.readableLength > 0 || socket.writableLength > 0 ||
socket.readableEncoding != null) {
throw new ERR_WORKER_HANDLE_NOT_TRANSFERABLE('net.Socket');
}
}

Socket.prototype[kTransferList] = function() {
assertTransferableSocket(this);
return [this._handle];
};

Socket.prototype[kTransfer] = function() {
assertTransferableSocket(this);
const handle = this._handle;
const data = {
handle,
allowHalfOpen: this.allowHalfOpen,
};
// Detach the handle from this source socket; the messaging layer takes
// ownership of it via TCPWrap::TransferForMessaging(). Destroy the source so
// any further use on the sending side fails cleanly (the socket is now owned
// by the receiving thread) instead of silently dropping data.
this._handle = null;
this.destroy();
return {
data,
deserializeInfo: 'net:Socket',
};
};

Socket.prototype[kDeserialize] = function(data) {
const handle = data?.handle;
if (handle == null || !(handle instanceof TCP)) {
throw new ERR_WORKER_HANDLE_NOT_TRANSFERABLE('net.Socket');
}
this.allowHalfOpen = Boolean(data.allowHalfOpen);
this[kReinitializeHandle](handle);
this.readable = this.writable = true;
};

function socketToDnsFamily(family) {
switch (family) {
case 'IPv4':
Expand Down Expand Up @@ -1993,6 +2056,10 @@ function Server(options, connectionListener) {

EventEmitter.call(this);

// A listening TCP Server can be moved to another thread by listing it in the
// transferList of a worker_threads postMessage() call. See [kTransfer]().
markTransferMode(this, false, true);

if (typeof options === 'function') {
connectionListener = options;
options = kEmptyObject;
Expand Down Expand Up @@ -2199,6 +2266,61 @@ function setupListenHandle(address, port, addressType, backlog, fd, flags) {

Server.prototype._listen2 = setupListenHandle; // legacy alias

// A listening TCP Server can be transferred to another thread, which moves the
// underlying listening socket (and its pending accept queue) to that thread's
// event loop. Only a server bound to a live TCP handle can be transferred.
function assertTransferableServer(server) {
if (isWindows) {
throw new ERR_WORKER_HANDLE_TRANSFER_UNSUPPORTED('net.Server');
}
if (server._handle == null || !(server._handle instanceof TCP)) {
throw new ERR_WORKER_HANDLE_NOT_TRANSFERABLE('net.Server');
}
}

Server.prototype[kTransferList] = function() {
assertTransferableServer(this);
return [this._handle];
};

Server.prototype[kTransfer] = function() {
assertTransferableServer(this);
const handle = this._handle;
const data = {
handle,
// Construction-time options that govern accepted sockets, so the receiving
// server reproduces the same behaviour.
allowHalfOpen: this.allowHalfOpen,
pauseOnConnect: this.pauseOnConnect,
noDelay: this.noDelay,
keepAlive: this.keepAlive,
keepAliveInitialDelay: this.keepAliveInitialDelay * 1000,
highWaterMark: this.highWaterMark,
};
// Detach so the source server no longer references the handle being moved.
this._handle = null;
return {
data,
deserializeInfo: 'net:Server',
};
};

Server.prototype[kDeserialize] = function(data) {
const { handle, ...options } = data ?? {};
if (handle == null || !(handle instanceof TCP)) {
throw new ERR_WORKER_HANDLE_NOT_TRANSFERABLE('net.Server');
}
this.allowHalfOpen = Boolean(options.allowHalfOpen);
this.pauseOnConnect = Boolean(options.pauseOnConnect);
this.noDelay = Boolean(options.noDelay);
this.keepAlive = Boolean(options.keepAlive);
this.keepAliveInitialDelay = ~~(options.keepAliveInitialDelay / 1000);
this.highWaterMark = options.highWaterMark ?? getDefaultHighWaterMark();
// Adopt the transferred listening handle (mirrors the child_process server
// hand-off path), which re-arms accept() on this thread's event loop.
this.listen(handle);
};

function emitErrorNT(self, err) {
self.emit('error', err);
}
Expand Down
Loading
Loading