diff --git a/src/classes/child-pool.ts b/src/classes/child-pool.ts index 61434c0c39..66147977aa 100644 --- a/src/classes/child-pool.ts +++ b/src/classes/child-pool.ts @@ -27,7 +27,7 @@ export class ChildPool { }; } - async retain(processFile: string): Promise { + async retain(processFile: string, exitHandler: any): Promise { let child = this.getFree(processFile).pop(); if (child) { @@ -40,7 +40,8 @@ export class ChildPool { workerForkOptions: this.opts.workerForkOptions, workerThreadsOptions: this.opts.workerThreadsOptions, }); - child.on('exit', this.remove.bind(this, child)); + + child.on('exit', exitHandler); try { await child.init(); diff --git a/src/classes/sandbox.ts b/src/classes/sandbox.ts index c07e14cb93..e9793714d3 100644 --- a/src/classes/sandbox.ts +++ b/src/classes/sandbox.ts @@ -1,5 +1,6 @@ import { ChildCommand, ParentCommand } from '../enums'; import { ChildMessage } from '../interfaces'; +import { Child } from './child'; import { ChildPool } from './child-pool'; import { Job } from './job'; @@ -8,65 +9,78 @@ const sandbox = ( childPool: ChildPool, ) => { return async function process(job: Job, token?: string): Promise { - const child = await childPool.retain(processFile); + let child: Child; let msgHandler: any; let exitHandler: any; + try { + const done: Promise = new Promise((resolve, reject) => { + const initChild = async () => { + try { + exitHandler = (exitCode: any, signal: any) => { + reject( + new Error( + 'Unexpected exit code: ' + exitCode + ' signal: ' + signal, + ), + ); + }; - await child.send({ - cmd: ChildCommand.Start, - job: job.asJSONSandbox(), - token, - }); + child = await childPool.retain(processFile, exitHandler); - const done: Promise = new Promise((resolve, reject) => { - msgHandler = async (msg: ChildMessage) => { - switch (msg.cmd) { - case ParentCommand.Completed: - resolve(msg.value); - break; - case ParentCommand.Failed: - case ParentCommand.Error: { - const err = new Error(); - Object.assign(err, msg.value); - reject(err); - break; - } - case ParentCommand.Progress: - await job.updateProgress(msg.value); - break; - case ParentCommand.Log: - await job.log(msg.value); - break; - case ParentCommand.MoveToDelayed: - await job.moveToDelayed(msg.value?.timestamp, msg.value?.token); - break; - case ParentCommand.Update: - await job.updateData(msg.value); - break; - } - }; + msgHandler = async (msg: ChildMessage) => { + switch (msg.cmd) { + case ParentCommand.Completed: + resolve(msg.value); + break; + case ParentCommand.Failed: + case ParentCommand.Error: { + const err = new Error(); + Object.assign(err, msg.value); + reject(err); + break; + } + case ParentCommand.Progress: + await job.updateProgress(msg.value); + break; + case ParentCommand.Log: + await job.log(msg.value); + break; + case ParentCommand.MoveToDelayed: + await job.moveToDelayed( + msg.value?.timestamp, + msg.value?.token, + ); + break; + case ParentCommand.Update: + await job.updateData(msg.value); + break; + } + }; - exitHandler = (exitCode: any, signal: any) => { - reject( - new Error('Unexpected exit code: ' + exitCode + ' signal: ' + signal), - ); - }; + child.on('message', msgHandler); - child.on('message', msgHandler); - child.on('exit', exitHandler); - }); + child.send({ + cmd: ChildCommand.Start, + job: job.asJSONSandbox(), + token, + }); + } catch (error) { + reject(error); + } + }; + initChild(); + }); - try { await done; return done; } finally { - child.off('message', msgHandler); - child.off('exit', exitHandler); - - if (child.exitCode !== null || /SIG.*/.test(`${child.signalCode}`)) { - childPool.remove(child); - } else { - childPool.release(child); + if (child) { + child.off('message', msgHandler); + child.off('exit', exitHandler); + if (child.exitCode !== null || /SIG.*/.test(`${child.signalCode}`)) { + childPool.remove(child); + } else { + childPool.release(child); + } } } }; diff --git a/tests/test_child-pool.ts b/tests/test_child-pool.ts index 99ec80064d..c53769a447 100644 --- a/tests/test_child-pool.ts +++ b/tests/test_child-pool.ts @@ -2,6 +2,7 @@ import { expect } from 'chai'; import { ChildPool } from '../src/classes'; import { join } from 'path'; +const NoopProc = () => {}; describe('Child pool for Child Processes', () => { sandboxProcessTests(); }); @@ -32,70 +33,70 @@ function sandboxProcessTests( it('should return same child if free', async () => { const processor = __dirname + '/fixtures/fixture_processor_bar.js'; - const child = await pool.retain(processor); + const child = await pool.retain(processor, NoopProc); expect(child).to.be.ok; pool.release(child); expect(pool.retained).to.be.empty; - const newChild = await pool.retain(processor); + const newChild = await pool.retain(processor, NoopProc); expect(child).to.be.eql(newChild); }); it('should return a new child if reused the last free one', async () => { const processor = __dirname + '/fixtures/fixture_processor_bar.js'; - let child = await pool.retain(processor); + let child = await pool.retain(processor, NoopProc); expect(child).to.be.ok; pool.release(child); expect(pool.retained).to.be.empty; - let newChild = await pool.retain(processor); + let newChild = await pool.retain(processor, NoopProc); expect(child).to.be.eql(newChild); child = newChild; - newChild = await pool.retain(processor); + newChild = await pool.retain(processor, NoopProc); expect(child).not.to.be.eql(newChild); }); it('should return a new child if none free', async () => { const processor = __dirname + '/fixtures/fixture_processor_bar.js'; - const child = await pool.retain(processor); + const child = await pool.retain(processor, NoopProc); expect(child).to.be.ok; expect(pool.retained).not.to.be.empty; - const newChild = await pool.retain(processor); + const newChild = await pool.retain(processor, NoopProc); expect(child).to.not.be.eql(newChild); }); it('should return a new child if killed', async () => { const processor = __dirname + '/fixtures/fixture_processor_bar.js'; - const child = await pool.retain(processor); + const child = await pool.retain(processor, NoopProc); expect(child).to.be.ok; await pool.kill(child); expect(pool.retained).to.be.empty; - const newChild = await pool.retain(processor); + const newChild = await pool.retain(processor, NoopProc); expect(child).to.not.be.eql(newChild); }); it('should return a new child if many retained and none free', async () => { const processor = __dirname + '/fixtures/fixture_processor_bar.js'; const children = await Promise.all([ - pool.retain(processor), - pool.retain(processor), - pool.retain(processor), - pool.retain(processor), - pool.retain(processor), - pool.retain(processor), + pool.retain(processor, NoopProc), + pool.retain(processor, NoopProc), + pool.retain(processor, NoopProc), + pool.retain(processor, NoopProc), + pool.retain(processor, NoopProc), + pool.retain(processor, NoopProc), ]); expect(children).to.have.length(6); - const child = await pool.retain(processor); + const child = await pool.retain(processor, NoopProc); expect(children).not.to.include(child); }).timeout(10000); it('should return an old child if many retained and one free', async () => { const processor = __dirname + '/fixtures/fixture_processor_bar.js'; const children = await Promise.all([ - pool.retain(processor), - pool.retain(processor), - pool.retain(processor), - pool.retain(processor), - pool.retain(processor), - pool.retain(processor), + pool.retain(processor, NoopProc), + pool.retain(processor, NoopProc), + pool.retain(processor, NoopProc), + pool.retain(processor, NoopProc), + pool.retain(processor, NoopProc), + pool.retain(processor, NoopProc), ]); expect(children).to.have.length(6); @@ -108,7 +109,7 @@ function sandboxProcessTests( const processor = __dirname + '/fixtures/fixture_processor_bar.js'; process.execArgv.push('--no-warnings'); - const child = await pool.retain(processor); + const child = await pool.retain(processor, NoopProc); expect(child).to.be.ok; if (!useWorkerThreads) { expect(child.childProcess.spawnargs).to.include('--no-warnings'); diff --git a/tests/test_sandboxed_process.ts b/tests/test_sandboxed_process.ts index da6e13e015..4cf155dafb 100644 --- a/tests/test_sandboxed_process.ts +++ b/tests/test_sandboxed_process.ts @@ -77,6 +77,44 @@ describe('Sandboxed process using child processes', () => { await worker.close(); }); + + it('should allow to pass workerForkOptions with timeout', async function () { + const processFile = __dirname + '/fixtures/fixture_processor.js'; + + const workerForkOptions = { + timeout: 50, + } as any; + const worker = new Worker(queueName, processFile, { + autorun: false, + connection, + prefix, + drainDelay: 1, + useWorkerThreads: false, + workerForkOptions, + }); + + const failing = new Promise((resolve, reject) => { + worker.on('failed', async (job, error) => { + try { + expect([ + 'Unexpected exit code: null signal: SIGTERM', + 'Unexpected exit code: 0 signal: null', + ]).to.include(error.message); + resolve(); + } catch (err) { + reject(err); + } + }); + }); + + await queue.add('test', { foo: 'bar' }); + + worker.run(); + + await failing; + + await worker.close(); + }); }); }); @@ -1019,12 +1057,20 @@ function sandboxProcessTests( useWorkerThreads, }); - const job = await queue.add('test', { exitCode: 1 }); + const failing = new Promise((resolve, reject) => { + worker.on('failed', async (job, error) => { + try { + expect(error.message).to.be.equal('Broken file processor'); + resolve(); + } catch (err) { + reject(err); + } + }); + }); - await expect(job.waitUntilFinished(queueEvents)).to.be.rejectedWith( - 'Broken file processor', - ); + await queue.add('test', { exitCode: 1 }); + await failing; await worker.close(); }); @@ -1050,7 +1096,7 @@ function sandboxProcessTests( }); }); - it('should remove exited process', async () => { + it('should release exited process', async () => { const processFile = __dirname + '/fixtures/fixture_processor_exit.js'; const worker = new Worker(queueName, processFile, { @@ -1072,7 +1118,7 @@ function sandboxProcessTests( expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf( 0, ); - expect(worker['childPool'].getAllFree()).to.have.lengthOf(0); + expect(worker['childPool'].getAllFree()).to.have.lengthOf(1); resolve(); } catch (err) { reject(err); @@ -1097,7 +1143,10 @@ function sandboxProcessTests( }); // acquire and release a child here so we know it has it's full termination handler setup - const initializedChild = await worker['childPool'].retain(processFile); + const initializedChild = await worker['childPool'].retain( + processFile, + () => {}, + ); await worker['childPool'].release(initializedChild); // await this After we've added the job