Skip to content

Commit

Permalink
Revert "module: have a single hooks thread for all workers"
Browse files Browse the repository at this point in the history
This reverts commit 22cb99d.

PR-URL: nodejs#53183
Reviewed-By: Geoffrey Booth <webadmin@geoffreybooth.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Gerhard Stöbich <deb2001-github@yahoo.de>
  • Loading branch information
mcollina committed Jun 2, 2024
1 parent 881e196 commit 8c5c2c1
Show file tree
Hide file tree
Showing 23 changed files with 79 additions and 392 deletions.
2 changes: 0 additions & 2 deletions lib/internal/main/worker_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ port.on('message', (message) => {
filename,
hasStdin,
publicPort,
hooksPort,
workerData,
} = message;

Expand All @@ -110,7 +109,6 @@ port.on('message', (message) => {
}

require('internal/worker').assignEnvironmentData(environmentData);
require('internal/worker').hooksPort = hooksPort;

if (SharedArrayBuffer !== undefined) {
// The counter is only passed to the workers created by the main thread,
Expand Down
127 changes: 43 additions & 84 deletions lib/internal/modules/esm/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const {
const { exitCodes: { kUnsettledTopLevelAwait } } = internalBinding('errors');
const { URL } = require('internal/url');
const { canParse: URLCanParse } = internalBinding('url');
const { receiveMessageOnPort, isMainThread } = require('worker_threads');
const { receiveMessageOnPort } = require('worker_threads');
const {
isAnyArrayBuffer,
isArrayBufferView,
Expand Down Expand Up @@ -482,8 +482,6 @@ class HooksProxy {
*/
#worker;

#portToHooksThread;

/**
* The last notification ID received from the worker. This is used to detect
* if the worker has already sent a notification before putting the main
Expand All @@ -501,38 +499,26 @@ class HooksProxy {
#isReady = false;

constructor() {
const { InternalWorker, hooksPort } = require('internal/worker');
const { InternalWorker } = require('internal/worker');
MessageChannel ??= require('internal/worker/io').MessageChannel;

const lock = new SharedArrayBuffer(SHARED_MEMORY_BYTE_LENGTH);
this.#lock = new Int32Array(lock);

if (isMainThread) {
// Main thread is the only one that creates the internal single hooks worker
this.#worker = new InternalWorker(loaderWorkerId, {
stderr: false,
stdin: false,
stdout: false,
trackUnmanagedFds: false,
workerData: {
lock,
},
});
this.#worker.unref(); // ! Allows the process to eventually exit.
this.#worker.on('exit', process.exit);
this.#portToHooksThread = this.#worker;
} else {
this.#portToHooksThread = hooksPort;
}
this.#worker = new InternalWorker(loaderWorkerId, {
stderr: false,
stdin: false,
stdout: false,
trackUnmanagedFds: false,
workerData: {
lock,
},
});
this.#worker.unref(); // ! Allows the process to eventually exit.
this.#worker.on('exit', process.exit);
}

waitForWorker() {
// There is one Hooks instance for each worker thread. But only one of these Hooks instances
// has an InternalWorker. That was the Hooks instance created for the main thread.
// It means for all Hooks instances that are not on the main thread => they are ready because they
// delegate to the single InternalWorker anyway.
if (!isMainThread) {
return;
}

if (!this.#isReady) {
const { kIsOnline } = require('internal/worker');
if (!this.#worker[kIsOnline]) {
Expand All @@ -549,37 +535,6 @@ class HooksProxy {
}
}

#postMessageToWorker(method, type, transferList, args) {
this.waitForWorker();

MessageChannel ??= require('internal/worker/io').MessageChannel;

const {
port1: fromHooksThread,
port2: toHooksThread,
} = new MessageChannel();

// Pass work to the worker.
debug(`post ${type} message to worker`, { method, args, transferList });
const usedTransferList = [toHooksThread];
if (transferList) {
ArrayPrototypePushApply(usedTransferList, transferList);
}

this.#portToHooksThread.postMessage(
{
__proto__: null,
args,
lock: this.#lock,
method,
port: toHooksThread,
},
usedTransferList,
);

return fromHooksThread;
}

/**
* Invoke a remote method asynchronously.
* @param {string} method Method to invoke
Expand All @@ -588,7 +543,22 @@ class HooksProxy {
* @returns {Promise<any>}
*/
async makeAsyncRequest(method, transferList, ...args) {
const fromHooksThread = this.#postMessageToWorker(method, 'Async', transferList, args);
this.waitForWorker();

MessageChannel ??= require('internal/worker/io').MessageChannel;
const asyncCommChannel = new MessageChannel();

// Pass work to the worker.
debug('post async message to worker', { method, args, transferList });
const finalTransferList = [asyncCommChannel.port2];
if (transferList) {
ArrayPrototypePushApply(finalTransferList, transferList);
}
this.#worker.postMessage({
__proto__: null,
method, args,
port: asyncCommChannel.port2,
}, finalTransferList);

if (this.#numberOfPendingAsyncResponses++ === 0) {
// On the next lines, the main thread will await a response from the worker thread that might
Expand All @@ -597,11 +567,7 @@ class HooksProxy {
// However we want to keep the process alive until the worker thread responds (or until the
// event loop of the worker thread is also empty), so we ref the worker until we get all the
// responses back.
if (this.#worker) {
this.#worker.ref();
} else {
this.#portToHooksThread.ref();
}
this.#worker.ref();
}

let response;
Expand All @@ -610,26 +576,18 @@ class HooksProxy {
await AtomicsWaitAsync(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId).value;
this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);

response = receiveMessageOnPort(fromHooksThread);
response = receiveMessageOnPort(asyncCommChannel.port1);
} while (response == null);
debug('got async response from worker', { method, args }, this.#lock);

if (--this.#numberOfPendingAsyncResponses === 0) {
// We got all the responses from the worker, its job is done (until next time).
if (this.#worker) {
this.#worker.unref();
} else {
this.#portToHooksThread.unref();
}
}

if (response.message.status === 'exit') {
process.exit(response.message.body);
this.#worker.unref();
}

fromHooksThread.close();

return this.#unwrapMessage(response);
const body = this.#unwrapMessage(response);
asyncCommChannel.port1.close();
return body;
}

/**
Expand All @@ -640,7 +598,11 @@ class HooksProxy {
* @returns {any}
*/
makeSyncRequest(method, transferList, ...args) {
const fromHooksThread = this.#postMessageToWorker(method, 'Sync', transferList, args);
this.waitForWorker();

// Pass work to the worker.
debug('post sync message to worker', { method, args, transferList });
this.#worker.postMessage({ __proto__: null, method, args }, transferList);

let response;
do {
Expand All @@ -649,17 +611,14 @@ class HooksProxy {
AtomicsWait(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId);
this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);

response = receiveMessageOnPort(fromHooksThread);
response = this.#worker.receiveMessageSync();
} while (response == null);
debug('got sync response from worker', { method, args });
if (response.message.status === 'never-settle') {
process.exit(kUnsettledTopLevelAwait);
} else if (response.message.status === 'exit') {
process.exit(response.message.body);
}

fromHooksThread.close();

return this.#unwrapMessage(response);
}

Expand Down
19 changes: 3 additions & 16 deletions lib/internal/modules/esm/loader.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ const { ModuleWrap, kEvaluating, kEvaluated } = internalBinding('module_wrap');
const {
urlToFilename,
} = require('internal/modules/helpers');
const { isMainThread } = require('worker_threads');
let defaultResolve, defaultLoad, defaultLoadSync, importMetaInitializer;

/**
Expand Down Expand Up @@ -608,26 +607,21 @@ class CustomizedModuleLoader {
*/
constructor() {
getHooksProxy();
_hasCustomizations = true;
}

/**
* Register a loader specifier.
* Register some loader specifier.
* @param {string} originalSpecifier The specified URL path of the loader to
* be registered.
* @param {string} parentURL The parent URL from where the loader will be
* registered if using it package name as specifier
* @param {any} [data] Arbitrary data to be passed from the custom loader
* (user-land) to the worker.
* @param {any[]} [transferList] Objects in `data` that are changing ownership
* @returns {{ format: string, url: URL['href'] } | undefined}
* @returns {{ format: string, url: URL['href'] }}
*/
register(originalSpecifier, parentURL, data, transferList) {
if (isMainThread) {
// Only the main thread has a Hooks instance with worker thread. All other Worker threads
// delegate their hooks to the HooksThread of the main thread.
return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data);
}
return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data);
}

/**
Expand Down Expand Up @@ -725,12 +719,6 @@ function getHooksProxy() {
return hooksProxy;
}

let _hasCustomizations = false;
function hasCustomizations() {
return _hasCustomizations;
}


let cascadedLoader;

/**
Expand Down Expand Up @@ -792,7 +780,6 @@ function register(specifier, parentURL = undefined, options) {

module.exports = {
createModuleLoader,
hasCustomizations,
getHooksProxy,
getOrInitializeCascadedLoader,
register,
Expand Down
Loading

0 comments on commit 8c5c2c1

Please sign in to comment.