diff --git a/docs/gitbook/SUMMARY.md b/docs/gitbook/SUMMARY.md index d6dba3fb40..a5baa32687 100644 --- a/docs/gitbook/SUMMARY.md +++ b/docs/gitbook/SUMMARY.md @@ -9,7 +9,8 @@ * [Introduction](guide/introduction.md) * [Connections](guide/connections.md) -* [Queues](guide/queues.md) +* [Queues](guide/queues/README.md) + * [Removing Jobs](guide/queues/removing-jobs.md) * [Workers](guide/workers/README.md) * [Concurrency](guide/workers/concurrency.md) * [Graceful shutdown](guide/workers/graceful-shutdown.md) @@ -24,6 +25,7 @@ * [Repeatable](guide/jobs/repeatable.md) * [Prioritized](guide/jobs/prioritized.md) * [Adding bulks](guide/jobs/adding-bulks.md) + * [Removing job](guide/jobs/removing-job.md) * [Stalled](guide/jobs/stalled.md) * [Getters](guide/jobs/getters.md) * [Flows](guide/flows/README.md) @@ -75,3 +77,4 @@ * [Compatibility class](bull-3.x-migration/compatibility-class.md) * [Migration](bull-3.x-migration/migration.md) + diff --git a/docs/gitbook/guide/jobs/removing-job.md b/docs/gitbook/guide/jobs/removing-job.md new file mode 100644 index 0000000000..ba0cc82253 --- /dev/null +++ b/docs/gitbook/guide/jobs/removing-job.md @@ -0,0 +1,32 @@ +# Removing job + +Sometimes it is necessary to remove a job. For example there could be a job that has bad data. + +```typescript +import { Queue } from 'bullmq'; + +const queue = new Queue('paint'); + +const job = await queue.add('wall', { color: 1 }); + +await job.remove(); +``` + +{% hint style="info" %} +Locked jobs (in active state) can not be removed. An error will be thrown. +{% endhint %} + +# Having a parent job + +There are 2 possible cases: + +1. There are not pending dependencies; in this case the parent is moved to wait status, we may try to process this job. +2. There are pending dependencies; in this case the parent is kept in waiting-children status. + +# Having pending dependencies + +We may try to remove all its pending descendents first. + +{% hint style="warning" %} +In case one of the children is locked, it will stop the deletion process. +{% endhint %} diff --git a/docs/gitbook/guide/queues.md b/docs/gitbook/guide/queues/README.md similarity index 100% rename from docs/gitbook/guide/queues.md rename to docs/gitbook/guide/queues/README.md diff --git a/docs/gitbook/guide/queues/removing-jobs.md b/docs/gitbook/guide/queues/removing-jobs.md new file mode 100644 index 0000000000..3a3467dfa4 --- /dev/null +++ b/docs/gitbook/guide/queues/removing-jobs.md @@ -0,0 +1,39 @@ +Currently we have 2 available methods in queue class: + +# Drain + +Removes all jobs that are waiting or delayed, but not active, completed or failed. + +```typescript +import { Queue } from 'bullmq'; + +const queue = new Queue('paint'); + +await queue.drain(); +``` + +{% hint style="warning" %} +Parent jobs that belong to the queue being drained will be kept in **waiting-children** status if they have pending children, but if they do not have any pending children they will just be removed. +{% endhint %} + +{% hint style="warning" %} +Parent jobs in queues different from the one being drained will either stay in **waiting-children** if they +have pending children in other queues, or just moved to wait. +{% endhint %} + +# Obliterate + +Completely obliterates a queue and all of its contents. + +```typescript +import { Queue } from 'bullmq'; + +const queue = new Queue('paint'); + +await queue.obliterate(); +``` + +{% hint style="warning" %} +Parent jobs in queues different from the one being obliterated will either stay in **waiting-children** if they +have pending children in other queues, or just moved to wait. +{% endhint %} diff --git a/src/classes/queue-getters.ts b/src/classes/queue-getters.ts index 8651ed775b..249c80136f 100644 --- a/src/classes/queue-getters.ts +++ b/src/classes/queue-getters.ts @@ -48,7 +48,12 @@ export class QueueGetters< Returns the number of jobs waiting to be processed. */ count(): Promise { - return this.getJobCountByTypes('waiting', 'paused', 'delayed'); + return this.getJobCountByTypes( + 'waiting', + 'paused', + 'delayed', + 'waiting-children', + ); } /** diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index dfb9cb64b9..1cc8cbf9d9 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -336,10 +336,10 @@ export class Scripts { } } - static drainArgs(queue: MinimalQueue, delayed: boolean): string[] { + static drainArgs(queue: MinimalQueue, delayed: boolean): (string | number)[] { const queueKeys = queue.keys; - const keys = [ + const keys: (string | number)[] = [ queueKeys.wait, queueKeys.paused, delayed ? queueKeys.delayed : '', diff --git a/src/commands/drain-4.lua b/src/commands/drain-4.lua index e0f75c3dc3..5db738c4fe 100644 --- a/src/commands/drain-4.lua +++ b/src/commands/drain-4.lua @@ -13,24 +13,11 @@ local rcall = redis.call local queueBaseKey = ARGV[1] -local function removeJobs (list) - for _, id in ipairs(list) do - rcall("DEL", queueBaseKey .. id) - end -end - -local wait_ids = rcall("LRANGE", KEYS[1] , 0, -1) -local paused_ids = rcall("LRANGE", KEYS[2] , 0, -1) +--- @include "includes/removeJobs" -removeJobs(wait_ids) -removeJobs(paused_ids) +removeListJobs(KEYS[1], true, queueBaseKey, 0) --wait +removeListJobs(KEYS[2], true, queueBaseKey, 0) --paused if KEYS[3] ~= "" then - local delayed_ids = rcall("ZRANGE", KEYS[3] , 0, -1) - removeJobs(delayed_ids) - rcall("DEL", KEYS[3]) + removeZSetJobs(KEYS[3], true, queueBaseKey, 0) --delayed end - -rcall("DEL", KEYS[1]) -rcall("DEL", KEYS[2]) -rcall("DEL", KEYS[4]) diff --git a/src/commands/includes/removeJobs.lua b/src/commands/includes/removeJobs.lua new file mode 100644 index 0000000000..b482e4dcee --- /dev/null +++ b/src/commands/includes/removeJobs.lua @@ -0,0 +1,40 @@ +--[[ + Functions remove jobs. +]] +local function getListItems(keyName, max) + return rcall('LRANGE', keyName, 0, max - 1) +end + +local function getZSetItems(keyName, max) + return rcall('ZRANGE', keyName, 0, max - 1) +end + +--- @include "removeParentDependencyKey" + +local function removeJobs(keys, hard, baseKey, max) + for i, key in ipairs(keys) do + local jobKey = baseKey .. key + removeParentDependencyKey(jobKey, hard, baseKey) + rcall("DEL", jobKey) + rcall("DEL", jobKey .. ':logs') + rcall("DEL", jobKey .. ':dependencies') + rcall("DEL", jobKey .. ':processed') + end + return max - #keys +end + +local function removeListJobs(keyName, hard, baseKey, max) + local jobs = getListItems(keyName, max) + local count = removeJobs(jobs, hard, baseKey, max) + rcall("LTRIM", keyName, #jobs, -1) + return count +end + +local function removeZSetJobs(keyName, hard, baseKey, max) + local jobs = getZSetItems(keyName, max) + local count = removeJobs(jobs, hard, baseKey, max) + if(#jobs > 0) then + rcall("ZREM", keyName, unpack(jobs)) + end + return count +end diff --git a/src/commands/includes/removeParentDependencyKey.lua b/src/commands/includes/removeParentDependencyKey.lua index 4fcbf0d3a0..393307cd31 100644 --- a/src/commands/includes/removeParentDependencyKey.lua +++ b/src/commands/includes/removeParentDependencyKey.lua @@ -6,25 +6,45 @@ --- @include "destructureJobKey" -local function removeParentDependencyKey(jobKey) +local function moveParentToWait(parentPrefix, parentId) + if rcall("HEXISTS", parentPrefix .. "meta", "paused") ~= 1 then + rcall("RPUSH", parentPrefix .. "wait", parentId) + else + rcall("RPUSH", parentPrefix .. "paused", parentId) + end + + local parentEventStream = parentPrefix .. "events" + rcall("XADD", parentEventStream, "*", "event", "active", "jobId", parentId, "prev", "waiting-children") +end + +local function removeParentDependencyKey(jobKey, hard, baseKey) local parentKey = rcall("HGET", jobKey, "parentKey") if( (type(parentKey) == "string") and parentKey ~= "" and (rcall("EXISTS", parentKey) == 1)) then - local parentDependenciesKey = parentKey .. ":dependencies" - local result = rcall("SREM", parentDependenciesKey, jobKey) - if result > 0 and rcall("SCARD", parentDependenciesKey) == 0 then - local parentId = getJobIdFromKey(parentKey) - local parentPrefix = getJobKeyPrefix(parentKey, parentId) + local parentDependenciesKey = parentKey .. ":dependencies" + local result = rcall("SREM", parentDependenciesKey, jobKey) + if result > 0 then + local pendingDependencies = rcall("SCARD", parentDependenciesKey) + if pendingDependencies == 0 then + local parentId = getJobIdFromKey(parentKey) + local parentPrefix = getJobKeyPrefix(parentKey, parentId) - rcall("ZREM", parentPrefix .. "waiting-children", parentId) + rcall("ZREM", parentPrefix .. "waiting-children", parentId) - if rcall("HEXISTS", parentPrefix .. "meta", "paused") ~= 1 then - rcall("RPUSH", parentPrefix .. "wait", parentId) + if hard then + if parentPrefix == baseKey then + removeParentDependencyKey(parentKey, hard, baseKey) + rcall("DEL", parentKey) + rcall("DEL", parentKey .. ':logs') + rcall("DEL", parentKey .. ':dependencies') + rcall("DEL", parentKey .. ':processed') else - rcall("RPUSH", parentPrefix .. "paused", parentId) + moveParentToWait(parentPrefix, parentId) end - - local parentEventStream = parentPrefix .. "events" - rcall("XADD", parentEventStream, "*", "event", "active", "jobId", parentId, "prev", "waiting-children") + else + moveParentToWait(parentPrefix, parentId) + end end + end end end + diff --git a/src/commands/obliterate-2.lua b/src/commands/obliterate-2.lua index 8734778129..3224158fd0 100644 --- a/src/commands/obliterate-2.lua +++ b/src/commands/obliterate-2.lua @@ -1,12 +1,12 @@ --[[ - Completely obliterates a queue and all of its contents - Input: - - KEYS[1] meta - KEYS[2] base - - ARGV[1] count - ARGV[2] force + Completely obliterates a queue and all of its contents + + Input: + KEYS[1] meta + KEYS[2] base + + ARGV[1] count + ARGV[2] force ]] -- This command completely destroys a queue including all of its jobs, current or past @@ -20,113 +20,70 @@ local maxCount = tonumber(ARGV[1]) local baseKey = KEYS[2] local rcall = redis.call -local function getListItems(keyName, max) - return rcall('LRANGE', keyName, 0, max - 1) -end -local function getZSetItems(keyName, max) - return rcall('ZRANGE', keyName, 0, max - 1) -end - -local function getSetItems(keyName, max) - return rcall('SMEMBERS', keyName, 0, max) -end - ---- @include "includes/removeParentDependencyKey" - -local function removeJobs(keys) - for i, key in ipairs(keys) do - local jobKey = baseKey .. key - removeParentDependencyKey(jobKey) - rcall("DEL", jobKey) - rcall("DEL", jobKey .. ':logs') - rcall("DEL", jobKey .. ':dependencies') - rcall("DEL", jobKey .. ':processed') - end - maxCount = maxCount - #keys -end - -local function removeListJobs(keyName, max) - local jobs = getListItems(keyName, max) - removeJobs(jobs) - rcall("LTRIM", keyName, #jobs, -1) -end - -local function removeZSetJobs(keyName, max) - local jobs = getZSetItems(keyName, max) - removeJobs(jobs) - if(#jobs > 0) then - rcall("ZREM", keyName, unpack(jobs)) - end -end +--- @include "includes/removeJobs" local function removeLockKeys(keys) - for i, key in ipairs(keys) do - rcall("DEL", baseKey .. key .. ':lock') - end + for i, key in ipairs(keys) do + rcall("DEL", baseKey .. key .. ':lock') + end end -- 1) Check if paused, if not return with error. if rcall("HEXISTS", KEYS[1], "paused") ~= 1 then - return -1 -- Error, NotPaused + return -1 -- Error, NotPaused end -- 2) Check if there are active jobs, if there are and not "force" return error. local activeKey = baseKey .. 'active' local activeJobs = getListItems(activeKey, maxCount) if (#activeJobs > 0) then - if(ARGV[2] == "") then - return -2 -- Error, ExistActiveJobs - end + if(ARGV[2] == "") then + return -2 -- Error, ExistActiveJobs + end end removeLockKeys(activeJobs) -removeJobs(activeJobs) +maxCount = removeJobs(activeJobs, true, baseKey, maxCount) rcall("LTRIM", activeKey, #activeJobs, -1) if(maxCount <= 0) then - return 1 + return 1 end local delayedKey = baseKey .. 'delayed' -removeZSetJobs(delayedKey, maxCount) +maxCount = removeZSetJobs(delayedKey, true, baseKey, maxCount) if(maxCount <= 0) then - return 1 + return 1 end local completedKey = baseKey .. 'completed' -removeZSetJobs(completedKey, maxCount) -if(maxCount <= 0) then - return 1 -end - -local failedKey = baseKey .. 'failed' -removeZSetJobs(failedKey, maxCount) +maxCount = removeZSetJobs(completedKey, true, baseKey, maxCount) if(maxCount <= 0) then - return 1 + return 1 end local waitKey = baseKey .. 'paused' -removeListJobs(waitKey, maxCount) +maxCount = removeListJobs(waitKey, true, baseKey, maxCount) if(maxCount <= 0) then - return 1 + return 1 end -local waitingChildrenKey = baseKey .. 'waiting-children' -removeZSetJobs(waitingChildrenKey, maxCount) +local failedKey = baseKey .. 'failed' +maxCount = removeZSetJobs(failedKey, true, baseKey, maxCount) if(maxCount <= 0) then - return 1 + return 1 end if(maxCount > 0) then - rcall("DEL", baseKey .. 'priority') - rcall("DEL", baseKey .. 'events') - rcall("DEL", baseKey .. 'delay') - rcall("DEL", baseKey .. 'stalled-check') - rcall("DEL", baseKey .. 'stalled') - rcall("DEL", baseKey .. 'id') - rcall("DEL", baseKey .. 'meta') - rcall("DEL", baseKey .. 'repeat') - return 0 + rcall("DEL", baseKey .. 'priority') + rcall("DEL", baseKey .. 'events') + rcall("DEL", baseKey .. 'delay') + rcall("DEL", baseKey .. 'stalled-check') + rcall("DEL", baseKey .. 'stalled') + rcall("DEL", baseKey .. 'id') + rcall("DEL", baseKey .. 'meta') + rcall("DEL", baseKey .. 'repeat') + return 0 else - return 1 + return 1 end diff --git a/tests/test_obliterate.ts b/tests/test_obliterate.ts index a6a3d4ec7b..9470e3bb05 100644 --- a/tests/test_obliterate.ts +++ b/tests/test_obliterate.ts @@ -47,7 +47,7 @@ describe('Obliterate', function () { let first = true; const worker = new Worker( queue.name, - async job => { + async () => { if (first) { first = false; throw new Error('failed first'); @@ -69,54 +69,214 @@ describe('Obliterate', function () { }); describe('when creating a flow', async () => { - it('obliterates a queue with jobs and its dependency keys', async () => { - await queue.waitUntilReady(); - const name = 'child-job'; - - let first = true; - const worker = new Worker( - queue.name, - async job => { - if (first) { - first = false; - throw new Error('failed first'); - } - return delay(10); - }, - { connection }, - ); - await worker.waitUntilReady(); + describe('when parent belongs to same queue', async () => { + describe('when parent has more than 1 pending children in the same queue', async () => { + it('removes parent record', async () => { + await queue.waitUntilReady(); + const name = 'child-job'; + + const flow = new FlowProducer({ connection }); + await flow.add({ + name: 'parent-job', + queueName, + data: {}, + children: [ + { name, data: { idx: 0, foo: 'bar' }, queueName }, + { name, data: { idx: 1, foo: 'baz' }, queueName }, + { name, data: { idx: 2, foo: 'qux' }, queueName }, + ], + }); + + const count = await queue.count(); + expect(count).to.be.eql(4); + + await queue.obliterate(); + + const client = await queue.client; + const keys = await client.keys(`bull:${queue.name}:*`); + + expect(keys.length).to.be.eql(0); + + const countAfterEmpty = await queue.count(); + expect(countAfterEmpty).to.be.eql(0); + + const failedCount = await queue.getJobCountByTypes('failed'); + expect(failedCount).to.be.eql(0); + }); + }); - const completing = new Promise((resolve, reject) => { - worker.on('completed', after(2, resolve)); + describe('when parent has only 1 pending child in the same queue', async () => { + it('obliterates a queue with jobs and its dependency keys', async () => { + await queue.waitUntilReady(); + const name = 'child-job'; + + let first = true; + const worker = new Worker( + queue.name, + async () => { + if (first) { + first = false; + throw new Error('failed first'); + } + return delay(10); + }, + { connection }, + ); + await worker.waitUntilReady(); + + const completing = new Promise((resolve, reject) => { + worker.on('completed', after(2, resolve)); + }); + + const failing = new Promise((resolve, reject) => { + worker.on('failed', resolve); + }); + + const flow = new FlowProducer({ connection }); + await flow.add({ + name: 'parent-job', + queueName, + data: {}, + children: [ + { name, data: { idx: 0, foo: 'bar' }, queueName }, + { name, data: { idx: 1, foo: 'baz' }, queueName }, + { name, data: { idx: 2, foo: 'qux' }, queueName }, + ], + }); + + await failing; + await completing; + await queue.obliterate(); + + const client = await queue.client; + const keys = await client.keys(`bull:${queue.name}:*`); + + expect(keys.length).to.be.eql(0); + + await worker.close(); + }); }); - const failing = new Promise((resolve, reject) => { - worker.on('failed', resolve); + describe('when parent has pending children in different queue', async () => { + it('keeps parent in waiting-children', async () => { + await queue.waitUntilReady(); + const childrenQueueName = `test-${v4()}`; + const childrenQueue = new Queue(childrenQueueName, { connection }); + await childrenQueue.waitUntilReady(); + const name = 'child-job'; + + const flow = new FlowProducer({ connection }); + await flow.add({ + name: 'parent-job', + queueName, + data: {}, + children: [ + { + name, + data: { idx: 0, foo: 'bar' }, + queueName: childrenQueueName, + }, + ], + }); + + const count = await queue.count(); + expect(count).to.be.eql(1); + + await queue.obliterate(); + + const client = await queue.client; + const keys = await client.keys(`bull:${queue.name}:*`); + + expect(keys.length).to.be.eql(3); + + const countAfterEmpty = await queue.count(); + expect(countAfterEmpty).to.be.eql(1); + }); }); + }); - const flow = new FlowProducer({ connection }); - await flow.add({ - name: 'parent-job', - queueName, - data: {}, - children: [ - { name, data: { idx: 0, foo: 'bar' }, queueName }, - { name, data: { idx: 1, foo: 'baz' }, queueName }, - { name, data: { idx: 2, foo: 'qux' }, queueName }, - ], + describe('when parent belongs to different queue', async () => { + describe('when parent has more than 1 pending children', async () => { + it('deletes each children until trying to move parent to wait', async () => { + await queue.waitUntilReady(); + const parentQueueName = `test-${v4()}`; + const parentQueue = new Queue(parentQueueName, { connection }); + await parentQueue.waitUntilReady(); + const name = 'child-job'; + + const flow = new FlowProducer({ connection }); + await flow.add({ + name: 'parent-job', + queueName: parentQueueName, + data: {}, + children: [ + { name, data: { idx: 0, foo: 'bar' }, queueName }, + { name, data: { idx: 1, foo: 'baz' }, queueName }, + { name, data: { idx: 2, foo: 'qux' }, queueName }, + ], + }); + + const count = await queue.count(); + expect(count).to.be.eql(3); + + await queue.obliterate(); + + const client = await queue.client; + const keys = await client.keys(`bull:${queue.name}:*`); + + expect(keys.length).to.be.eql(0); + + const countAfterEmpty = await queue.count(); + expect(countAfterEmpty).to.be.eql(0); + + const childrenFailedCount = await queue.getJobCountByTypes('failed'); + expect(childrenFailedCount).to.be.eql(0); + + const parentWaitCount = await parentQueue.getJobCountByTypes('wait'); + expect(parentWaitCount).to.be.eql(1); + await parentQueue.close(); + await removeAllQueueData(new IORedis(), parentQueueName); + }); }); - await failing; - await completing; - await queue.obliterate(); + describe('when parent has only 1 pending children', async () => { + it('moves parent to wait to try to process it', async () => { + await queue.waitUntilReady(); + const parentQueueName = `test-${v4()}`; + const parentQueue = new Queue(parentQueueName, { connection }); + await parentQueue.waitUntilReady(); + const name = 'child-job'; + + const flow = new FlowProducer({ connection }); + await flow.add({ + name: 'parent-job', + queueName: parentQueueName, + data: {}, + children: [{ name, data: { idx: 0, foo: 'bar' }, queueName }], + }); - const client = await queue.client; - const keys = await client.keys(`bull:${queue.name}:*`); + const count = await queue.count(); + expect(count).to.be.eql(1); - expect(keys.length).to.be.eql(0); + await queue.obliterate(); - await worker.close(); + const client = await queue.client; + const keys = await client.keys(`bull:${queue.name}:*`); + + expect(keys.length).to.be.eql(0); + + const countAfterEmpty = await queue.count(); + expect(countAfterEmpty).to.be.eql(0); + + const failedCount = await queue.getJobCountByTypes('failed'); + expect(failedCount).to.be.eql(0); + + const parentWaitCount = await parentQueue.getJobCountByTypes('wait'); + expect(parentWaitCount).to.be.eql(1); + await parentQueue.close(); + await removeAllQueueData(new IORedis(), parentQueueName); + }); + }); }); }); diff --git a/tests/test_queue.ts b/tests/test_queue.ts index 4f9c0b6390..2d0e8c5009 100644 --- a/tests/test_queue.ts +++ b/tests/test_queue.ts @@ -72,7 +72,7 @@ import * as IORedis from 'ioredis'; import { describe, beforeEach, it } from 'mocha'; import * as sinon from 'sinon'; import { v4 } from 'uuid'; -import { Queue, QueueScheduler } from '../src/classes'; +import { FlowProducer, Queue, QueueScheduler } from '../src/classes'; import { removeAllQueueData } from '../src/utils'; describe('queues', function () { @@ -112,6 +112,197 @@ describe('queues', function () { expect(countAfterEmpty).to.be.eql(0); }); + describe('when having a flow', async () => { + describe('when parent belongs to same queue', async () => { + describe('when parent has more than 1 pending children in the same queue', async () => { + it('deletes parent record', async () => { + await queue.waitUntilReady(); + const name = 'child-job'; + + const flow = new FlowProducer({ connection }); + await flow.add({ + name: 'parent-job', + queueName, + data: {}, + children: [ + { name, data: { idx: 0, foo: 'bar' }, queueName }, + { name, data: { idx: 1, foo: 'baz' }, queueName }, + { name, data: { idx: 2, foo: 'qux' }, queueName }, + ], + }); + + const count = await queue.count(); + expect(count).to.be.eql(4); + + await queue.drain(); + + const client = await queue.client; + const keys = await client.keys(`bull:${queue.name}:*`); + + expect(keys.length).to.be.eql(3); + + const countAfterEmpty = await queue.count(); + expect(countAfterEmpty).to.be.eql(0); + }); + }); + + describe('when parent has only 1 pending child in the same queue', async () => { + it('deletes parent record', async () => { + await queue.waitUntilReady(); + const name = 'child-job'; + + const flow = new FlowProducer({ connection }); + await flow.add({ + name: 'parent-job', + queueName, + data: {}, + children: [{ name, data: { idx: 0, foo: 'bar' }, queueName }], + }); + + const count = await queue.count(); + expect(count).to.be.eql(2); + + await queue.drain(); + + const client = await queue.client; + const keys = await client.keys(`bull:${queue.name}:*`); + + expect(keys.length).to.be.eql(3); + + const countAfterEmpty = await queue.count(); + expect(countAfterEmpty).to.be.eql(0); + }); + }); + + describe('when parent has pending children in different queue', async () => { + it('keeps parent in waiting-children', async () => { + await queue.waitUntilReady(); + const childrenQueueName = `test-${v4()}`; + const childrenQueue = new Queue(childrenQueueName, { connection }); + await childrenQueue.waitUntilReady(); + const name = 'child-job'; + + const flow = new FlowProducer({ connection }); + await flow.add({ + name: 'parent-job', + queueName, + data: {}, + children: [ + { + name, + data: { idx: 0, foo: 'bar' }, + queueName: childrenQueueName, + }, + ], + }); + + const count = await queue.count(); + expect(count).to.be.eql(1); + + await queue.drain(); + + const client = await queue.client; + const keys = await client.keys(`bull:${queue.name}:*`); + + expect(keys.length).to.be.eql(6); + + const countAfterEmpty = await queue.count(); + expect(countAfterEmpty).to.be.eql(1); + }); + }); + }); + + describe('when parent belongs to different queue', async () => { + describe('when parent has more than 1 pending children', async () => { + it('deletes each children until trying to move parent to wait', async () => { + await queue.waitUntilReady(); + const parentQueueName = `test-${v4()}`; + const parentQueue = new Queue(parentQueueName, { connection }); + await parentQueue.waitUntilReady(); + const name = 'child-job'; + + const flow = new FlowProducer({ connection }); + await flow.add({ + name: 'parent-job', + queueName: parentQueueName, + data: {}, + children: [ + { name, data: { idx: 0, foo: 'bar' }, queueName }, + { name, data: { idx: 1, foo: 'baz' }, queueName }, + { name, data: { idx: 2, foo: 'qux' }, queueName }, + ], + }); + + const count = await queue.count(); + expect(count).to.be.eql(3); + + await queue.drain(); + + const client = await queue.client; + const keys = await client.keys(`bull:${queue.name}:*`); + + expect(keys.length).to.be.eql(3); + + const countAfterEmpty = await queue.count(); + expect(countAfterEmpty).to.be.eql(0); + + const childrenFailedCount = await queue.getJobCountByTypes( + 'failed', + ); + expect(childrenFailedCount).to.be.eql(0); + + const parentWaitCount = await parentQueue.getJobCountByTypes( + 'wait', + ); + expect(parentWaitCount).to.be.eql(1); + await parentQueue.close(); + await removeAllQueueData(new IORedis(), parentQueueName); + }); + }); + + describe('when parent has only 1 pending children', async () => { + it('moves parent to wait to try to process it', async () => { + await queue.waitUntilReady(); + const parentQueueName = `test-${v4()}`; + const parentQueue = new Queue(parentQueueName, { connection }); + await parentQueue.waitUntilReady(); + const name = 'child-job'; + + const flow = new FlowProducer({ connection }); + await flow.add({ + name: 'parent-job', + queueName: parentQueueName, + data: {}, + children: [{ name, data: { idx: 0, foo: 'bar' }, queueName }], + }); + + const count = await queue.count(); + expect(count).to.be.eql(1); + + await queue.drain(); + + const client = await queue.client; + const keys = await client.keys(`bull:${queue.name}:*`); + + expect(keys.length).to.be.eql(3); + + const countAfterEmpty = await queue.count(); + expect(countAfterEmpty).to.be.eql(0); + + const failedCount = await queue.getJobCountByTypes('failed'); + expect(failedCount).to.be.eql(0); + + const parentWaitCount = await parentQueue.getJobCountByTypes( + 'wait', + ); + expect(parentWaitCount).to.be.eql(1); + await parentQueue.close(); + await removeAllQueueData(new IORedis(), parentQueueName); + }); + }); + }); + }); + describe('when delayed option is provided as false', () => { it('clean queue without delayed jobs', async () => { const maxJobs = 50;