diff --git a/src/classes/redis-connection.ts b/src/classes/redis-connection.ts index 3e752828c8..96db6bf322 100644 --- a/src/classes/redis-connection.ts +++ b/src/classes/redis-connection.ts @@ -24,6 +24,7 @@ const deprecationMessage = interface RedisCapabilities { canDoubleTimeout: boolean; + canBlockFor1Ms: boolean; } export interface RawCommand { @@ -39,6 +40,7 @@ export class RedisConnection extends EventEmitter { closing: boolean; capabilities: RedisCapabilities = { canDoubleTimeout: false, + canBlockFor1Ms: true, }; status: 'initializing' | 'ready' | 'closing' | 'closed' = 'initializing'; @@ -251,6 +253,7 @@ export class RedisConnection extends EventEmitter { this.capabilities = { canDoubleTimeout: !isRedisVersionLowerThan(this.version, '6.0.0'), + canBlockFor1Ms: !isRedisVersionLowerThan(this.version, '7.0.8'), }; this.status = 'ready'; diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 548be5b341..56a56fd6e1 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -592,6 +592,12 @@ export class Worker< ); } + get minimumBlockTimeout(): number { + return this.blockingConnection.capabilities.canBlockFor1Ms + ? minimumBlockTimeout + : 0.002; + } + protected async moveToActive( client: RedisClient, token: string, @@ -653,7 +659,7 @@ export class Worker< if (blockUntil) { const blockDelay = blockUntil - Date.now(); // when we reach the time to get new jobs - if (blockDelay < 1) { + if (blockDelay < this.minimumBlockTimeout * 1000) { blockTimeout = minimumBlockTimeout; } else { blockTimeout = blockDelay / 1000; @@ -664,7 +670,7 @@ export class Worker< // reference: https://github.com/taskforcesh/bullmq/issues/1658 blockTimeout = Math.min(blockTimeout, maximumBlockTimeout); } else { - blockTimeout = Math.max(opts.drainDelay, minimumBlockTimeout); + blockTimeout = Math.max(opts.drainDelay, this.minimumBlockTimeout); } return blockTimeout; diff --git a/tests/test_worker.ts b/tests/test_worker.ts index 3e78c8b76e..cda9a64ded 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -816,6 +816,25 @@ describe('workers', function () { await worker.close(); }); + describe('when 0.002 is used as blocktimeout', () => { + it('should not block forever', async () => { + const worker = new Worker(queueName, async () => {}, { + connection, + prefix, + }); + await worker.waitUntilReady(); + const client = await worker.client; + if (isRedisVersionLowerThan(worker.redisVersion, '7.0.8')) { + await client.bzpopmin(`key`, 0.002); + } else { + await client.bzpopmin(`key`, 0.001); + } + + expect(true).to.be.true; + await worker.close(); + }); + }); + describe('when closing a worker', () => { it('process a job that throws an exception after worker close', async () => { const jobError = new Error('Job Failed');