Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
josdejong committed Feb 24, 2023
2 parents dfc71fa + a353d72 commit 68b9863
Show file tree
Hide file tree
Showing 12 changed files with 264 additions and 42 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ The following options are available:
- In case of `'web'`, a Web Worker will be used. Only available in a browser environment.
- In case of `'process'`, `child_process` will be used. Only available in a node.js environment.
- In case of `'thread'`, `worker_threads` will be used. If `worker_threads` are not available, an error is thrown. Only available in a node.js environment.
- `workerTerminateTimeout: number`. The timeout in milliseconds to wait for a worker to cleanup it's resources on termination before stopping it forcefully. Default value is `1000`.
- `forkArgs: String[]`. For `process` worker type. An array passed as `args` to [child_process.fork](https://nodejs.org/api/child_process.html#child_processforkmodulepath-args-options)
- `forkOpts: Object`. For `process` worker type. An object passed as `options` to [child_process.fork](https://nodejs.org/api/child_process.html#child_processforkmodulepath-args-options). See nodejs documentation for available options.
- `workerThreadOpts: Object`. For `worker` worker type. An object passed to [worker_threads.options](https://nodejs.org/api/worker_threads.html#new-workerfilename-options). See nodejs documentation for available options.
Expand Down Expand Up @@ -320,10 +321,15 @@ const pool3 = workerpool.pool({ maxWorkers: 7 });

A worker is constructed as:

`workerpool.worker([methods: Object.<String, Function>])`
`workerpool.worker([methods: Object.<String, Function>] [, options: Object])`

Argument `methods` is optional can can be an object with functions available in the worker. Registered functions will be available via the worker pool.

The following options are available:

- `onTerminate: ([code: number]) => Promise.<void> | void`. A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The difference with pool's `onTerminateWorker` is that this callback runs in the worker context, while `onTerminateWorker` is executed on the main thread.


Example usage:

```js
Expand Down
36 changes: 36 additions & 0 deletions examples/cleanup.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
const workerpool = require("..");

// create a worker pool
const pool = workerpool.pool(__dirname + "/workers/cleanupWorker.js", {
// cleanup is only supported for threads or processes
workerType: "thread",
// maximum time to wait for worker to cleanup it's resources
// on termination before forcefully stopping the worker
workerTerminateTimeout: 1000,
});

const main = async () => {
const timeout = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

let c = 0;
const task = () =>
pool
.exec("asyncAdd", [c++, 4.1])
.then(function (result) {
console.log(result);
})
.catch(function (err) {
console.error(err);
});

const tasks = [
task(),
timeout(50).then(() => task()),
timeout(100).then(() => task()),
];

// Will print `Inside worker cleanup finished (code = 0)` three times
await Promise.all(tasks).then(() => pool.terminate());
};

main();
32 changes: 32 additions & 0 deletions examples/workers/cleanupWorker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Example of a worker that cleans up when it is terminated.

var workerpool = require("../..");

function asyncAdd(a, b) {
return new Promise(function (resolve, reject) {
setTimeout(function () {
resolve(a + b);
}, 500);
});
}

// create a worker and register public functions
workerpool.worker(
{
asyncAdd: asyncAdd,
},
{
// This function is called when the worker is terminated.
// It can be used to clean up any open connections or resources.
// May return a promise, in such case make sure that pool's option
// `workerTerminateTimeout` is set to a value larger than the time it takes to clean up.
onTerminate: function (code) {
return new Promise(function (resolve, reject) {
setTimeout(function () {
console.log("Inside worker cleanup finished (code = " + code + ")");
resolve();
}, 500);
});
},
}
);
4 changes: 3 additions & 1 deletion src/Pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ function Pool(script, options) {
this.nodeWorker = options.nodeWorker;
this.workerType = options.workerType || options.nodeWorker || 'auto'
this.maxQueueSize = options.maxQueueSize || Infinity;
this.workerTerminateTimeout = options.workerTerminateTimeout || 1000;

this.onCreateWorker = options.onCreateWorker || (() => null);
this.onTerminateWorker = options.onTerminateWorker || (() => null);
Expand Down Expand Up @@ -400,7 +401,8 @@ Pool.prototype._createWorkerHandler = function () {
forkOpts: overridenParams.forkOpts || this.forkOpts,
workerThreadOpts: overridenParams.workerThreadOpts || this.workerThreadOpts,
debugPort: DEBUG_PORT_ALLOCATOR.nextAvailableStartingAt(this.debugPortStart),
workerType: this.workerType
workerType: this.workerType,
workerTerminateTimeout: this.workerTerminateTimeout,
});
}

Expand Down
57 changes: 26 additions & 31 deletions src/WorkerHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@ var requireFoolWebpack = require('./requireFoolWebpack');
*/
var TERMINATE_METHOD_ID = '__workerpool-terminate__';

/**
* If sending `TERMINATE_METHOD_ID` does not cause the child process to exit in this many milliseconds,
* force-kill the child process.
*/
var CHILD_PROCESS_EXIT_TIMEOUT = 1000;

function ensureWorkerThreads() {
var WorkerThreads = tryRequireWorkerThreads()
if (!WorkerThreads) {
Expand Down Expand Up @@ -216,6 +210,7 @@ function WorkerHandler(script, _options) {
this.forkOpts = options.forkOpts;
this.forkArgs = options.forkArgs;
this.workerThreadOpts = options.workerThreadOpts
this.workerTerminateTimeout = options.workerTerminateTimeout;

// The ready message is only sent if the worker.add method is called (And the default script is not used)
if (!script) {
Expand Down Expand Up @@ -305,6 +300,7 @@ function WorkerHandler(script, _options) {

this.terminating = false;
this.terminated = false;
this.cleaning = false;
this.terminationHandler = null;
this.lastId = 0;
}
Expand Down Expand Up @@ -381,11 +377,11 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) {
};

/**
* Test whether the worker is working or not
* Test whether the worker is processing any tasks or cleaning up before termination.
* @return {boolean} Returns true if the worker is busy
*/
WorkerHandler.prototype.busy = function () {
return Object.keys(this.processing).length > 0;
return this.cleaning || Object.keys(this.processing).length > 0;
};

/**
Expand Down Expand Up @@ -415,6 +411,7 @@ WorkerHandler.prototype.terminate = function (force, callback) {
// all tasks are finished. kill the worker
var cleanup = function(err) {
me.terminated = true;
me.cleaning = false;
if (me.worker != null && me.worker.removeAllListeners) {
// removeAllListeners is only available for child_process
me.worker.removeAllListeners('message');
Expand All @@ -435,32 +432,30 @@ WorkerHandler.prototype.terminate = function (force, callback) {
return;
}

if (this.worker.isChildProcess) {
var cleanExitTimeout = setTimeout(function() {
if (me.worker) {
me.worker.kill();
}
}, CHILD_PROCESS_EXIT_TIMEOUT);

this.worker.once('exit', function() {
clearTimeout(cleanExitTimeout);
if (me.worker) {
me.worker.killed = true;
}
cleanup();
});

if (this.worker.ready) {
this.worker.send(TERMINATE_METHOD_ID);
} else {
this.requestQueue.push(TERMINATE_METHOD_ID)
// child process and worker threads
var cleanExitTimeout = setTimeout(function() {
if (me.worker) {
me.worker.kill();
}
}, this.workerTerminateTimeout);

this.worker.once('exit', function() {
clearTimeout(cleanExitTimeout);
if (me.worker) {
me.worker.killed = true;
}
} else {
// worker_thread
this.worker.kill();
this.worker.killed = true;
cleanup();
});

if (this.worker.ready) {
this.worker.send(TERMINATE_METHOD_ID);
} else {
this.requestQueue.push(TERMINATE_METHOD_ID);
}

// mark that the worker is cleaning up resources
// to prevent new tasks from being executed
this.cleaning = true;
return;
}
else if (typeof this.worker.terminate === 'function') {
Expand Down
2 changes: 1 addition & 1 deletion src/generated/embeddedWorker.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ exports.pool = function pool(script, options) {
/**
* Create a worker and optionally register a set of methods to the worker.
* @param {Object} [methods]
* @param {WorkerRegisterOptions} [options]
*/
exports.worker = function worker(methods) {
exports.worker = function worker(methods, options) {
var worker = require('./worker');
worker.add(methods);
worker.add(methods, options);
};

/**
Expand Down
6 changes: 6 additions & 0 deletions src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* @property {number} [maxWorkers]
* @property {number} [maxQueueSize]
* @property {'auto' | 'web' | 'process' | 'thread'} [workerType]
* @property {number} [workerTerminateTimeout]
* @property {*} [forkArgs]
* @property {*} [forkOpts]
* @property {Function} [onCreateWorker]
Expand All @@ -15,3 +16,8 @@
* @property {(payload: any) => unknown} [on]
* @property {Object[]} [transfer]
*/

/**
* @typedef {Object} WorkerRegisterOptions
* @property {(code: number | undefined) => Promise | void} [onTerminate]
*/
40 changes: 36 additions & 4 deletions src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ else if (typeof process !== 'undefined') {
var parentPort = WorkerThreads.parentPort;
worker.send = parentPort.postMessage.bind(parentPort);
worker.on = parentPort.on.bind(parentPort);
worker.exit = process.exit.bind(process);
} else {
worker.on = process.on.bind(process);
// ignore transfer argument since it is not supported by process
Expand Down Expand Up @@ -113,11 +114,38 @@ worker.methods.methods = function methods() {
return Object.keys(worker.methods);
};

/**
* Custom handler for when the worker is terminated.
*/
worker.terminationHandler = undefined;

/**
* Cleanup and exit the worker.
* @param {Number} code
* @returns
*/
worker.cleanupAndExit = function(code) {
var _exit = function() {
worker.exit(code);
}

if(!worker.terminationHandler) {
return _exit();
}

var result = worker.terminationHandler(code);
if (isPromise(result)) {
result.then(_exit, _exit);
} else {
_exit();
}
}

var currentRequestId = null;

worker.on('message', function (request) {
if (request === TERMINATE_METHOD_ID) {
return worker.exit(0);
return worker.cleanupAndExit(0);
}
try {
var method = worker.methods[request.method];
Expand Down Expand Up @@ -190,9 +218,10 @@ worker.on('message', function (request) {

/**
* Register methods to the worker
* @param {Object} methods
* @param {Object} [methods]
* @param {WorkerRegisterOptions} [options]
*/
worker.register = function (methods) {
worker.register = function (methods, options) {

if (methods) {
for (var name in methods) {
Expand All @@ -202,8 +231,11 @@ worker.register = function (methods) {
}
}

worker.send('ready');
if (options) {
worker.terminationHandler = options.onTerminate;
}

worker.send('ready');
};

worker.emit = function (payload) {
Expand Down
Loading

0 comments on commit 68b9863

Please sign in to comment.