Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(drain): consider checking parent jobs when draining #992

Merged
merged 15 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/gitbook/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

* [Introduction](guide/introduction.md)
* [Connections](guide/connections.md)
* [Queues](guide/queues.md)
* [Queues](guide/queues/README.md)
* [Removing Jobs](guide/queues/removing-jobs.md)
* [Workers](guide/workers/README.md)
* [Concurrency](guide/workers/concurrency.md)
* [Graceful shutdown](guide/workers/graceful-shutdown.md)
Expand All @@ -24,6 +25,7 @@
* [Repeatable](guide/jobs/repeatable.md)
* [Prioritized](guide/jobs/prioritized.md)
* [Adding bulks](guide/jobs/adding-bulks.md)
* [Removing job](guide/jobs/removing-job.md)
* [Stalled](guide/jobs/stalled.md)
* [Getters](guide/jobs/getters.md)
* [Flows](guide/flows/README.md)
Expand Down Expand Up @@ -75,3 +77,4 @@

* [Compatibility class](bull-3.x-migration/compatibility-class.md)
* [Migration](bull-3.x-migration/migration.md)

32 changes: 32 additions & 0 deletions docs/gitbook/guide/jobs/removing-job.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Removing job

Sometimes it is necessary to remove a job. For example there could be a job that has bad data.

```typescript
import { Queue } from 'bullmq';

const queue = new Queue('paint');

const job = await queue.add('wall', { color: 1 });

await job.remove();
```

{% hint style="info" %}
Locked jobs (in active state) can not be removed. An error will be thrown.
{% endhint %}

# Having a parent job

There are 2 possible cases:

1. There are not pending dependencies; in this case the parent is moved to wait status, we may try to process this job.
2. There are pending dependencies; in this case the parent is kept in waiting-children status.

# Having pending dependencies

We may try to remove all its pending descendents first.

{% hint style="warning" %}
In case one of the children is locked, it will stop the deletion process.
{% endhint %}
File renamed without changes.
39 changes: 39 additions & 0 deletions docs/gitbook/guide/queues/removing-jobs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
Currently we have 2 available methods in queue class:

# Drain

Removes all jobs that are waiting or delayed, but not active, completed or failed.

```typescript
import { Queue } from 'bullmq';

const queue = new Queue('paint');

await queue.drain();
```

{% hint style="warning" %}
Parent jobs that belong to the queue being drained will be kept in **waiting-children** status if they have pending children, but if they do not have any pending children they will just be removed.
{% endhint %}

{% hint style="warning" %}
Parent jobs in queues different from the one being drained will either stay in **waiting-children** if they
have pending children in other queues, or just moved to wait.
{% endhint %}

# Obliterate

Completely obliterates a queue and all of its contents.

```typescript
import { Queue } from 'bullmq';

const queue = new Queue('paint');

await queue.obliterate();
```

{% hint style="warning" %}
Parent jobs in queues different from the one being obliterated will either stay in **waiting-children** if they
have pending children in other queues, or just moved to wait.
{% endhint %}
7 changes: 6 additions & 1 deletion src/classes/queue-getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ export class QueueGetters<
Returns the number of jobs waiting to be processed.
*/
count(): Promise<number> {
return this.getJobCountByTypes('waiting', 'paused', 'delayed');
return this.getJobCountByTypes(
'waiting',
'paused',
'delayed',
'waiting-children',
);
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,10 @@ export class Scripts {
}
}

static drainArgs(queue: MinimalQueue, delayed: boolean): string[] {
static drainArgs(queue: MinimalQueue, delayed: boolean): (string | number)[] {
const queueKeys = queue.keys;

const keys = [
const keys: (string | number)[] = [
queueKeys.wait,
queueKeys.paused,
delayed ? queueKeys.delayed : '',
Expand Down
21 changes: 4 additions & 17 deletions src/commands/drain-4.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,11 @@
local rcall = redis.call
local queueBaseKey = ARGV[1]

local function removeJobs (list)
for _, id in ipairs(list) do
rcall("DEL", queueBaseKey .. id)
end
end

local wait_ids = rcall("LRANGE", KEYS[1] , 0, -1)
local paused_ids = rcall("LRANGE", KEYS[2] , 0, -1)
--- @include "includes/removeJobs"

removeJobs(wait_ids)
removeJobs(paused_ids)
removeListJobs(KEYS[1], true, queueBaseKey, 0) --wait
removeListJobs(KEYS[2], true, queueBaseKey, 0) --paused

if KEYS[3] ~= "" then
local delayed_ids = rcall("ZRANGE", KEYS[3] , 0, -1)
removeJobs(delayed_ids)
rcall("DEL", KEYS[3])
removeZSetJobs(KEYS[3], true, queueBaseKey, 0) --delayed
end

rcall("DEL", KEYS[1])
rcall("DEL", KEYS[2])
rcall("DEL", KEYS[4])
40 changes: 40 additions & 0 deletions src/commands/includes/removeJobs.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
--[[
Functions remove jobs.
]]
local function getListItems(keyName, max)
return rcall('LRANGE', keyName, 0, max - 1)
end

local function getZSetItems(keyName, max)
return rcall('ZRANGE', keyName, 0, max - 1)
end

--- @include "removeParentDependencyKey"

local function removeJobs(keys, hard, baseKey, max)
for i, key in ipairs(keys) do
local jobKey = baseKey .. key
removeParentDependencyKey(jobKey, hard, baseKey)
rcall("DEL", jobKey)
rcall("DEL", jobKey .. ':logs')
rcall("DEL", jobKey .. ':dependencies')
rcall("DEL", jobKey .. ':processed')
end
return max - #keys
end

local function removeListJobs(keyName, hard, baseKey, max)
local jobs = getListItems(keyName, max)
local count = removeJobs(jobs, hard, baseKey, max)
rcall("LTRIM", keyName, #jobs, -1)
return count
end

local function removeZSetJobs(keyName, hard, baseKey, max)
local jobs = getZSetItems(keyName, max)
local count = removeJobs(jobs, hard, baseKey, max)
if(#jobs > 0) then
rcall("ZREM", keyName, unpack(jobs))
end
return count
end
46 changes: 33 additions & 13 deletions src/commands/includes/removeParentDependencyKey.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,45 @@

--- @include "destructureJobKey"

local function removeParentDependencyKey(jobKey)
local function moveParentToWait(parentPrefix, parentId)
if rcall("HEXISTS", parentPrefix .. "meta", "paused") ~= 1 then
rcall("RPUSH", parentPrefix .. "wait", parentId)
else
rcall("RPUSH", parentPrefix .. "paused", parentId)
end

local parentEventStream = parentPrefix .. "events"
rcall("XADD", parentEventStream, "*", "event", "active", "jobId", parentId, "prev", "waiting-children")
end

local function removeParentDependencyKey(jobKey, hard, baseKey)
local parentKey = rcall("HGET", jobKey, "parentKey")
if( (type(parentKey) == "string") and parentKey ~= "" and (rcall("EXISTS", parentKey) == 1)) then
local parentDependenciesKey = parentKey .. ":dependencies"
local result = rcall("SREM", parentDependenciesKey, jobKey)
if result > 0 and rcall("SCARD", parentDependenciesKey) == 0 then
local parentId = getJobIdFromKey(parentKey)
local parentPrefix = getJobKeyPrefix(parentKey, parentId)
local parentDependenciesKey = parentKey .. ":dependencies"
local result = rcall("SREM", parentDependenciesKey, jobKey)
if result > 0 then
local pendingDependencies = rcall("SCARD", parentDependenciesKey)
if pendingDependencies == 0 then
local parentId = getJobIdFromKey(parentKey)
local parentPrefix = getJobKeyPrefix(parentKey, parentId)

rcall("ZREM", parentPrefix .. "waiting-children", parentId)
rcall("ZREM", parentPrefix .. "waiting-children", parentId)

if rcall("HEXISTS", parentPrefix .. "meta", "paused") ~= 1 then
rcall("RPUSH", parentPrefix .. "wait", parentId)
if hard then
if parentPrefix == baseKey then
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should only delete those records that belongs to the same queue that runs this script, on the contrary case, move it to wait or paused

removeParentDependencyKey(parentKey, hard, baseKey)
rcall("DEL", parentKey)
rcall("DEL", parentKey .. ':logs')
rcall("DEL", parentKey .. ':dependencies')
rcall("DEL", parentKey .. ':processed')
else
rcall("RPUSH", parentPrefix .. "paused", parentId)
moveParentToWait(parentPrefix, parentId)
end

local parentEventStream = parentPrefix .. "events"
rcall("XADD", parentEventStream, "*", "event", "active", "jobId", parentId, "prev", "waiting-children")
else
moveParentToWait(parentPrefix, parentId)
end
end
end
end
end

Loading