Skip to content

Commit

Permalink
feat(worker-thread): allow passing Worker options (#2791) ref #1555
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Sep 30, 2024
1 parent 4cbfb8d commit 6a1f7a9
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 3 deletions.
10 changes: 7 additions & 3 deletions src/classes/child.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ChildProcess, fork } from 'child_process';
import { Worker } from 'worker_threads';
import { AddressInfo, createServer } from 'net';
import { Worker, WorkerOptions as WorkerThreadsOptions } from 'worker_threads';
import { ChildCommand, ParentCommand } from '../enums';
import { EventEmitter } from 'events';

Expand Down Expand Up @@ -40,8 +40,11 @@ export class Child extends EventEmitter {
constructor(
private mainFile: string,
public processFile: string,
private opts = {
useWorkerThreads: false,
private opts: {
useWorkerThreads: boolean;
workerThreadsOptions?: WorkerThreadsOptions;
} = {
useWorkerThreads: false
},
) {
super();
Expand Down Expand Up @@ -83,6 +86,7 @@ export class Child extends EventEmitter {
stdin: true,
stdout: true,
stderr: true,
...(this.opts.workerThreadsOptions ? this.opts.workerThreadsOptions : {})
});
} else {
this.childProcess = parent = fork(this.mainFile, [], {
Expand Down
9 changes: 9 additions & 0 deletions src/interfaces/worker-options.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { WorkerOptions as WorkerThreadsOptions } from 'worker_threads';
import { Job } from '../classes/job';
import { AdvancedOptions } from './advanced-options';
import { QueueBaseOptions } from './queue-options';
Expand Down Expand Up @@ -144,6 +145,14 @@ export interface WorkerOptions extends QueueBaseOptions {
* @default false
*/
useWorkerThreads?: boolean;

/**
* Support passing Worker Threads Options.
* Note: This option can only be used when specifying
* a file for the processor argument and useWorkerThreads is passed as true.
* @see {@link https://nodejs.org/api/worker_threads.html#new-workerfilename-options}
*/
workerThreadsOptions?: WorkerThreadsOptions;
}

export interface GetNextJobOptions {
Expand Down
69 changes: 69 additions & 0 deletions tests/test_sandboxed_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,75 @@ describe('Sandboxed process using child processes', () => {

describe('Sandboxed process using worker threads', () => {
sandboxProcessTests({ useWorkerThreads: true });

describe('custom cases', () => {
const redisHost = process.env.REDIS_HOST || 'localhost';
const prefix = process.env.BULLMQ_TEST_PREFIX || 'bull';
let queue: Queue;
let queueEvents: QueueEvents;
let queueName: string;

let connection;
before(async function () {
connection = new IORedis(redisHost, { maxRetriesPerRequest: null });
});

beforeEach(async function () {
queueName = `test-${v4()}`;
queue = new Queue(queueName, { connection, prefix });
queueEvents = new QueueEvents(queueName, { connection, prefix });
await queueEvents.waitUntilReady();
});

afterEach(async function () {
await queue.close();
await queueEvents.close();
await removeAllQueueData(new IORedis(), queueName);
});

afterAll(async function () {
await connection.quit();
});

it('should allow to pass workerThreadsOptions', async () => {
const processFile = __dirname + '/fixtures/fixture_processor.js';

const worker = new Worker(queueName, processFile, {
connection,
prefix,
drainDelay: 1,
useWorkerThreads: true,
workerThreadsOptions: {
resourceLimits: {
maxOldGenerationSizeMb: 1
}
}
});

const completing = new Promise<void>((resolve, reject) => {
worker.on('completed', async (job: Job, value: any) => {
try {
expect(job.returnvalue).to.be.eql(42);
expect(job.data).to.be.eql({ foo: 'bar' });
expect(value).to.be.eql(42);
expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(
0,
);
expect(worker['childPool'].free[processFile]).to.have.lengthOf(1);
resolve();
} catch (err) {
reject(err);
}
});
});

await queue.add('test', { foo: 'bar' });

await completing;

await worker.close();
});
});
});

function sandboxProcessTests(
Expand Down

0 comments on commit 6a1f7a9

Please sign in to comment.