From 6a1f7a9f0303561d6ec7b2005ba0227132b89e07 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Mon, 30 Sep 2024 01:03:45 -0600 Subject: [PATCH] feat(worker-thread): allow passing Worker options (#2791) ref #1555 --- src/classes/child.ts | 10 +++-- src/interfaces/worker-options.ts | 9 +++++ tests/test_sandboxed_process.ts | 69 ++++++++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 3 deletions(-) diff --git a/src/classes/child.ts b/src/classes/child.ts index 456354e72c..5804855e46 100644 --- a/src/classes/child.ts +++ b/src/classes/child.ts @@ -1,6 +1,6 @@ import { ChildProcess, fork } from 'child_process'; -import { Worker } from 'worker_threads'; import { AddressInfo, createServer } from 'net'; +import { Worker, WorkerOptions as WorkerThreadsOptions } from 'worker_threads'; import { ChildCommand, ParentCommand } from '../enums'; import { EventEmitter } from 'events'; @@ -40,8 +40,11 @@ export class Child extends EventEmitter { constructor( private mainFile: string, public processFile: string, - private opts = { - useWorkerThreads: false, + private opts: { + useWorkerThreads: boolean; + workerThreadsOptions?: WorkerThreadsOptions; + } = { + useWorkerThreads: false }, ) { super(); @@ -83,6 +86,7 @@ export class Child extends EventEmitter { stdin: true, stdout: true, stderr: true, + ...(this.opts.workerThreadsOptions ? this.opts.workerThreadsOptions : {}) }); } else { this.childProcess = parent = fork(this.mainFile, [], { diff --git a/src/interfaces/worker-options.ts b/src/interfaces/worker-options.ts index 77a204a23f..5cfb94d93a 100644 --- a/src/interfaces/worker-options.ts +++ b/src/interfaces/worker-options.ts @@ -1,3 +1,4 @@ +import { WorkerOptions as WorkerThreadsOptions } from 'worker_threads'; import { Job } from '../classes/job'; import { AdvancedOptions } from './advanced-options'; import { QueueBaseOptions } from './queue-options'; @@ -144,6 +145,14 @@ export interface WorkerOptions extends QueueBaseOptions { * @default false */ useWorkerThreads?: boolean; + + /** + * Support passing Worker Threads Options. + * Note: This option can only be used when specifying + * a file for the processor argument and useWorkerThreads is passed as true. + * @see {@link https://nodejs.org/api/worker_threads.html#new-workerfilename-options} + */ + workerThreadsOptions?: WorkerThreadsOptions; } export interface GetNextJobOptions { diff --git a/tests/test_sandboxed_process.ts b/tests/test_sandboxed_process.ts index df2595d8da..8f1fe238fb 100644 --- a/tests/test_sandboxed_process.ts +++ b/tests/test_sandboxed_process.ts @@ -15,6 +15,75 @@ describe('Sandboxed process using child processes', () => { describe('Sandboxed process using worker threads', () => { sandboxProcessTests({ useWorkerThreads: true }); + + describe('custom cases', () => { + const redisHost = process.env.REDIS_HOST || 'localhost'; + const prefix = process.env.BULLMQ_TEST_PREFIX || 'bull'; + let queue: Queue; + let queueEvents: QueueEvents; + let queueName: string; + + let connection; + before(async function () { + connection = new IORedis(redisHost, { maxRetriesPerRequest: null }); + }); + + beforeEach(async function () { + queueName = `test-${v4()}`; + queue = new Queue(queueName, { connection, prefix }); + queueEvents = new QueueEvents(queueName, { connection, prefix }); + await queueEvents.waitUntilReady(); + }); + + afterEach(async function () { + await queue.close(); + await queueEvents.close(); + await removeAllQueueData(new IORedis(), queueName); + }); + + afterAll(async function () { + await connection.quit(); + }); + + it('should allow to pass workerThreadsOptions', async () => { + const processFile = __dirname + '/fixtures/fixture_processor.js'; + + const worker = new Worker(queueName, processFile, { + connection, + prefix, + drainDelay: 1, + useWorkerThreads: true, + workerThreadsOptions: { + resourceLimits: { + maxOldGenerationSizeMb: 1 + } + } + }); + + const completing = new Promise((resolve, reject) => { + worker.on('completed', async (job: Job, value: any) => { + try { + expect(job.returnvalue).to.be.eql(42); + expect(job.data).to.be.eql({ foo: 'bar' }); + expect(value).to.be.eql(42); + expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf( + 0, + ); + expect(worker['childPool'].free[processFile]).to.have.lengthOf(1); + resolve(); + } catch (err) { + reject(err); + } + }); + }); + + await queue.add('test', { foo: 'bar' }); + + await completing; + + await worker.close(); + }); + }); }); function sandboxProcessTests(