Skip to content

Commit

Permalink
fix(drain): consider checking parent jobs when draining (#992)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Jan 28, 2022
1 parent e55ff35 commit 81b7221
Show file tree
Hide file tree
Showing 12 changed files with 588 additions and 154 deletions.
5 changes: 4 additions & 1 deletion docs/gitbook/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

* [Introduction](guide/introduction.md)
* [Connections](guide/connections.md)
* [Queues](guide/queues.md)
* [Queues](guide/queues/README.md)
* [Removing Jobs](guide/queues/removing-jobs.md)
* [Workers](guide/workers/README.md)
* [Concurrency](guide/workers/concurrency.md)
* [Graceful shutdown](guide/workers/graceful-shutdown.md)
Expand All @@ -24,6 +25,7 @@
* [Repeatable](guide/jobs/repeatable.md)
* [Prioritized](guide/jobs/prioritized.md)
* [Adding bulks](guide/jobs/adding-bulks.md)
* [Removing job](guide/jobs/removing-job.md)
* [Stalled](guide/jobs/stalled.md)
* [Getters](guide/jobs/getters.md)
* [Flows](guide/flows/README.md)
Expand Down Expand Up @@ -75,3 +77,4 @@

* [Compatibility class](bull-3.x-migration/compatibility-class.md)
* [Migration](bull-3.x-migration/migration.md)

32 changes: 32 additions & 0 deletions docs/gitbook/guide/jobs/removing-job.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Removing job

Sometimes it is necessary to remove a job. For example there could be a job that has bad data.

```typescript
import { Queue } from 'bullmq';

const queue = new Queue('paint');

const job = await queue.add('wall', { color: 1 });

await job.remove();
```

{% hint style="info" %}
Locked jobs (in active state) can not be removed. An error will be thrown.
{% endhint %}

# Having a parent job

There are 2 possible cases:

1. There are not pending dependencies; in this case the parent is moved to wait status, we may try to process this job.
2. There are pending dependencies; in this case the parent is kept in waiting-children status.

# Having pending dependencies

We may try to remove all its pending descendents first.

{% hint style="warning" %}
In case one of the children is locked, it will stop the deletion process.
{% endhint %}
File renamed without changes.
39 changes: 39 additions & 0 deletions docs/gitbook/guide/queues/removing-jobs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
Currently we have 2 available methods in queue class:

# Drain

Removes all jobs that are waiting or delayed, but not active, completed or failed.

```typescript
import { Queue } from 'bullmq';

const queue = new Queue('paint');

await queue.drain();
```

{% hint style="warning" %}
Parent jobs that belong to the queue being drained will be kept in **waiting-children** status if they have pending children, but if they do not have any pending children they will just be removed.
{% endhint %}

{% hint style="warning" %}
Parent jobs in queues different from the one being drained will either stay in **waiting-children** if they
have pending children in other queues, or just moved to wait.
{% endhint %}

# Obliterate

Completely obliterates a queue and all of its contents.

```typescript
import { Queue } from 'bullmq';

const queue = new Queue('paint');

await queue.obliterate();
```

{% hint style="warning" %}
Parent jobs in queues different from the one being obliterated will either stay in **waiting-children** if they
have pending children in other queues, or just moved to wait.
{% endhint %}
7 changes: 6 additions & 1 deletion src/classes/queue-getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ export class QueueGetters<
Returns the number of jobs waiting to be processed.
*/
count(): Promise<number> {
return this.getJobCountByTypes('waiting', 'paused', 'delayed');
return this.getJobCountByTypes(
'waiting',
'paused',
'delayed',
'waiting-children',
);
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,10 @@ export class Scripts {
}
}

static drainArgs(queue: MinimalQueue, delayed: boolean): string[] {
static drainArgs(queue: MinimalQueue, delayed: boolean): (string | number)[] {
const queueKeys = queue.keys;

const keys = [
const keys: (string | number)[] = [
queueKeys.wait,
queueKeys.paused,
delayed ? queueKeys.delayed : '',
Expand Down
21 changes: 4 additions & 17 deletions src/commands/drain-4.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,11 @@
local rcall = redis.call
local queueBaseKey = ARGV[1]

local function removeJobs (list)
for _, id in ipairs(list) do
rcall("DEL", queueBaseKey .. id)
end
end

local wait_ids = rcall("LRANGE", KEYS[1] , 0, -1)
local paused_ids = rcall("LRANGE", KEYS[2] , 0, -1)
--- @include "includes/removeJobs"

removeJobs(wait_ids)
removeJobs(paused_ids)
removeListJobs(KEYS[1], true, queueBaseKey, 0) --wait
removeListJobs(KEYS[2], true, queueBaseKey, 0) --paused

if KEYS[3] ~= "" then
local delayed_ids = rcall("ZRANGE", KEYS[3] , 0, -1)
removeJobs(delayed_ids)
rcall("DEL", KEYS[3])
removeZSetJobs(KEYS[3], true, queueBaseKey, 0) --delayed
end

rcall("DEL", KEYS[1])
rcall("DEL", KEYS[2])
rcall("DEL", KEYS[4])
40 changes: 40 additions & 0 deletions src/commands/includes/removeJobs.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
--[[
Functions remove jobs.
]]
local function getListItems(keyName, max)
return rcall('LRANGE', keyName, 0, max - 1)
end

local function getZSetItems(keyName, max)
return rcall('ZRANGE', keyName, 0, max - 1)
end

--- @include "removeParentDependencyKey"

local function removeJobs(keys, hard, baseKey, max)
for i, key in ipairs(keys) do
local jobKey = baseKey .. key
removeParentDependencyKey(jobKey, hard, baseKey)
rcall("DEL", jobKey)
rcall("DEL", jobKey .. ':logs')
rcall("DEL", jobKey .. ':dependencies')
rcall("DEL", jobKey .. ':processed')
end
return max - #keys
end

local function removeListJobs(keyName, hard, baseKey, max)
local jobs = getListItems(keyName, max)
local count = removeJobs(jobs, hard, baseKey, max)
rcall("LTRIM", keyName, #jobs, -1)
return count
end

local function removeZSetJobs(keyName, hard, baseKey, max)
local jobs = getZSetItems(keyName, max)
local count = removeJobs(jobs, hard, baseKey, max)
if(#jobs > 0) then
rcall("ZREM", keyName, unpack(jobs))
end
return count
end
46 changes: 33 additions & 13 deletions src/commands/includes/removeParentDependencyKey.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,45 @@

--- @include "destructureJobKey"

local function removeParentDependencyKey(jobKey)
local function moveParentToWait(parentPrefix, parentId)
if rcall("HEXISTS", parentPrefix .. "meta", "paused") ~= 1 then
rcall("RPUSH", parentPrefix .. "wait", parentId)
else
rcall("RPUSH", parentPrefix .. "paused", parentId)
end

local parentEventStream = parentPrefix .. "events"
rcall("XADD", parentEventStream, "*", "event", "active", "jobId", parentId, "prev", "waiting-children")
end

local function removeParentDependencyKey(jobKey, hard, baseKey)
local parentKey = rcall("HGET", jobKey, "parentKey")
if( (type(parentKey) == "string") and parentKey ~= "" and (rcall("EXISTS", parentKey) == 1)) then
local parentDependenciesKey = parentKey .. ":dependencies"
local result = rcall("SREM", parentDependenciesKey, jobKey)
if result > 0 and rcall("SCARD", parentDependenciesKey) == 0 then
local parentId = getJobIdFromKey(parentKey)
local parentPrefix = getJobKeyPrefix(parentKey, parentId)
local parentDependenciesKey = parentKey .. ":dependencies"
local result = rcall("SREM", parentDependenciesKey, jobKey)
if result > 0 then
local pendingDependencies = rcall("SCARD", parentDependenciesKey)
if pendingDependencies == 0 then
local parentId = getJobIdFromKey(parentKey)
local parentPrefix = getJobKeyPrefix(parentKey, parentId)

rcall("ZREM", parentPrefix .. "waiting-children", parentId)
rcall("ZREM", parentPrefix .. "waiting-children", parentId)

if rcall("HEXISTS", parentPrefix .. "meta", "paused") ~= 1 then
rcall("RPUSH", parentPrefix .. "wait", parentId)
if hard then
if parentPrefix == baseKey then
removeParentDependencyKey(parentKey, hard, baseKey)
rcall("DEL", parentKey)
rcall("DEL", parentKey .. ':logs')
rcall("DEL", parentKey .. ':dependencies')
rcall("DEL", parentKey .. ':processed')
else
rcall("RPUSH", parentPrefix .. "paused", parentId)
moveParentToWait(parentPrefix, parentId)
end

local parentEventStream = parentPrefix .. "events"
rcall("XADD", parentEventStream, "*", "event", "active", "jobId", parentId, "prev", "waiting-children")
else
moveParentToWait(parentPrefix, parentId)
end
end
end
end
end

Loading

0 comments on commit 81b7221

Please sign in to comment.