Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(drain): consider checking parent jobs when draining #992

Merged
merged 15 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 4 additions & 17 deletions src/commands/drain-4.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,11 @@
local rcall = redis.call
local queueBaseKey = ARGV[1]

local function removeJobs (list)
for _, id in ipairs(list) do
rcall("DEL", queueBaseKey .. id)
end
end

local wait_ids = rcall("LRANGE", KEYS[1] , 0, -1)
local paused_ids = rcall("LRANGE", KEYS[2] , 0, -1)
--- @include "includes/removeJobs"

removeJobs(wait_ids)
removeJobs(paused_ids)
removeListJobs(KEYS[1], queueBaseKey, 0) --wait
removeListJobs(KEYS[2], queueBaseKey, 0) --paused

if KEYS[3] ~= "" then
local delayed_ids = rcall("ZRANGE", KEYS[3] , 0, -1)
removeJobs(delayed_ids)
rcall("DEL", KEYS[3])
removeZSetJobs(KEYS[3], queueBaseKey, 0) --delayed
end

rcall("DEL", KEYS[1])
rcall("DEL", KEYS[2])
rcall("DEL", KEYS[4])
40 changes: 40 additions & 0 deletions src/commands/includes/removeJobs.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
--[[
Functions remove jobs.
]]
local function getListItems(keyName, max)
return rcall('LRANGE', keyName, 0, max - 1)
end

local function getZSetItems(keyName, max)
return rcall('ZRANGE', keyName, 0, max - 1)
end

--- @include "removeParentDependencyKey"

local function removeJobs(keys, baseKey, max)
for i, key in ipairs(keys) do
local jobKey = baseKey .. key
removeParentDependencyKey(jobKey)
rcall("DEL", jobKey)
rcall("DEL", jobKey .. ':logs')
rcall("DEL", jobKey .. ':dependencies')
rcall("DEL", jobKey .. ':processed')
end
return max - #keys
end

local function removeListJobs(keyName, baseKey, max)
local jobs = getListItems(keyName, max)
local count = removeJobs(jobs, baseKey, max)
rcall("LTRIM", keyName, #jobs, -1)
return count
end

local function removeZSetJobs(keyName, baseKey, max)
local jobs = getZSetItems(keyName, max)
local count = removeJobs(jobs, baseKey, max)
if(#jobs > 0) then
rcall("ZREM", keyName, unpack(jobs))
end
return count
end
28 changes: 14 additions & 14 deletions src/commands/includes/removeParentDependencyKey.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@
local function removeParentDependencyKey(jobKey)
local parentKey = rcall("HGET", jobKey, "parentKey")
if( (type(parentKey) == "string") and parentKey ~= "" and (rcall("EXISTS", parentKey) == 1)) then
local parentDependenciesKey = parentKey .. ":dependencies"
local result = rcall("SREM", parentDependenciesKey, jobKey)
if result > 0 and rcall("SCARD", parentDependenciesKey) == 0 then
local parentId = getJobIdFromKey(parentKey)
local parentPrefix = getJobKeyPrefix(parentKey, parentId)
local parentDependenciesKey = parentKey .. ":dependencies"
local result = rcall("SREM", parentDependenciesKey, jobKey)
if result > 0 and rcall("SCARD", parentDependenciesKey) == 0 then
local parentId = getJobIdFromKey(parentKey)
local parentPrefix = getJobKeyPrefix(parentKey, parentId)

rcall("ZREM", parentPrefix .. "waiting-children", parentId)
rcall("ZREM", parentPrefix .. "waiting-children", parentId)

if rcall("HEXISTS", parentPrefix .. "meta", "paused") ~= 1 then
rcall("RPUSH", parentPrefix .. "wait", parentId)
else
rcall("RPUSH", parentPrefix .. "paused", parentId)
end

local parentEventStream = parentPrefix .. "events"
rcall("XADD", parentEventStream, "*", "event", "active", "jobId", parentId, "prev", "waiting-children")
if rcall("HEXISTS", parentPrefix .. "meta", "paused") ~= 1 then
rcall("RPUSH", parentPrefix .. "wait", parentId)
else
rcall("RPUSH", parentPrefix .. "paused", parentId)
end

local parentEventStream = parentPrefix .. "events"
rcall("XADD", parentEventStream, "*", "event", "active", "jobId", parentId, "prev", "waiting-children")
end
end
end
113 changes: 38 additions & 75 deletions src/commands/obliterate-2.lua
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
--[[
Completely obliterates a queue and all of its contents
Input:

KEYS[1] meta
KEYS[2] base
ARGV[1] count
ARGV[2] force
Completely obliterates a queue and all of its contents

Input:
KEYS[1] meta
KEYS[2] base

ARGV[1] count
ARGV[2] force
]]

-- This command completely destroys a queue including all of its jobs, current or past
Expand All @@ -20,113 +20,76 @@ local maxCount = tonumber(ARGV[1])
local baseKey = KEYS[2]

local rcall = redis.call
local function getListItems(keyName, max)
return rcall('LRANGE', keyName, 0, max - 1)
end

local function getZSetItems(keyName, max)
return rcall('ZRANGE', keyName, 0, max - 1)
end

local function getSetItems(keyName, max)
return rcall('SMEMBERS', keyName, 0, max)
end

--- @include "includes/removeParentDependencyKey"

local function removeJobs(keys)
for i, key in ipairs(keys) do
local jobKey = baseKey .. key
removeParentDependencyKey(jobKey)
rcall("DEL", jobKey)
rcall("DEL", jobKey .. ':logs')
rcall("DEL", jobKey .. ':dependencies')
rcall("DEL", jobKey .. ':processed')
end
maxCount = maxCount - #keys
end

local function removeListJobs(keyName, max)
local jobs = getListItems(keyName, max)
removeJobs(jobs)
rcall("LTRIM", keyName, #jobs, -1)
end

local function removeZSetJobs(keyName, max)
local jobs = getZSetItems(keyName, max)
removeJobs(jobs)
if(#jobs > 0) then
rcall("ZREM", keyName, unpack(jobs))
end
end
--- @include "includes/removeJobs"

local function removeLockKeys(keys)
for i, key in ipairs(keys) do
rcall("DEL", baseKey .. key .. ':lock')
end
for i, key in ipairs(keys) do
rcall("DEL", baseKey .. key .. ':lock')
end
end

-- 1) Check if paused, if not return with error.
if rcall("HEXISTS", KEYS[1], "paused") ~= 1 then
return -1 -- Error, NotPaused
return -1 -- Error, NotPaused
end

-- 2) Check if there are active jobs, if there are and not "force" return error.
local activeKey = baseKey .. 'active'
local activeJobs = getListItems(activeKey, maxCount)
if (#activeJobs > 0) then
if(ARGV[2] == "") then
return -2 -- Error, ExistActiveJobs
end
if(ARGV[2] == "") then
return -2 -- Error, ExistActiveJobs
end
end

removeLockKeys(activeJobs)
removeJobs(activeJobs)
maxCount = removeJobs(activeJobs, baseKey, maxCount)
rcall("LTRIM", activeKey, #activeJobs, -1)
if(maxCount <= 0) then
return 1
return 1
end

local delayedKey = baseKey .. 'delayed'
removeZSetJobs(delayedKey, maxCount)
maxCount = removeZSetJobs(delayedKey, baseKey, maxCount)
if(maxCount <= 0) then
return 1
return 1
end

local completedKey = baseKey .. 'completed'
removeZSetJobs(completedKey, maxCount)
maxCount = removeZSetJobs(completedKey, baseKey, maxCount)
if(maxCount <= 0) then
return 1
return 1
end

local failedKey = baseKey .. 'failed'
removeZSetJobs(failedKey, maxCount)
maxCount = removeZSetJobs(failedKey, baseKey, maxCount)
if(maxCount <= 0) then
return 1
return 1
end

local waitKey = baseKey .. 'paused'
removeListJobs(waitKey, maxCount)
maxCount = removeListJobs(waitKey, baseKey, maxCount)
if(maxCount <= 0) then
return 1
return 1
end

local waitingChildrenKey = baseKey .. 'waiting-children'
removeZSetJobs(waitingChildrenKey, maxCount)
maxCount = removeZSetJobs(waitingChildrenKey, baseKey, maxCount)
if(maxCount <= 0) then
return 1
return 1
end

if(maxCount > 0) then
rcall("DEL", baseKey .. 'priority')
rcall("DEL", baseKey .. 'events')
rcall("DEL", baseKey .. 'delay')
rcall("DEL", baseKey .. 'stalled-check')
rcall("DEL", baseKey .. 'stalled')
rcall("DEL", baseKey .. 'id')
rcall("DEL", baseKey .. 'meta')
rcall("DEL", baseKey .. 'repeat')
return 0
rcall("DEL", baseKey .. 'priority')
rcall("DEL", baseKey .. 'events')
rcall("DEL", baseKey .. 'delay')
rcall("DEL", baseKey .. 'stalled-check')
rcall("DEL", baseKey .. 'stalled')
rcall("DEL", baseKey .. 'id')
rcall("DEL", baseKey .. 'meta')
rcall("DEL", baseKey .. 'repeat')
return 0
else
return 1
return 1
end
34 changes: 33 additions & 1 deletion tests/test_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ import * as IORedis from 'ioredis';
import { describe, beforeEach, it } from 'mocha';
import * as sinon from 'sinon';
import { v4 } from 'uuid';
import { Queue, QueueScheduler } from '../src/classes';
import { FlowProducer, Queue, QueueScheduler } from '../src/classes';
import { removeAllQueueData } from '../src/utils';

describe('queues', function () {
Expand Down Expand Up @@ -112,6 +112,38 @@ describe('queues', function () {
expect(countAfterEmpty).to.be.eql(0);
});

describe('when having a flow', async () => {
it('drains paused, waiting and delayed jobs, also move parent jobs to wait if necessary', async () => {
await queue.waitUntilReady();
const name = 'child-job';

const flow = new FlowProducer({ connection });
await flow.add({
name: 'parent-job',
queueName,
data: {},
children: [
{ name, data: { idx: 0, foo: 'bar' }, queueName },
{ name, data: { idx: 1, foo: 'baz' }, queueName },
{ name, data: { idx: 2, foo: 'qux' }, queueName },
],
});

const count = await queue.count();
expect(count).to.be.eql(3);

await queue.drain();

const client = await queue.client;
const keys = await client.keys(`bull:${queue.name}:*`);

expect(keys.length).to.be.eql(5);

const countAfterEmpty = await queue.count();
expect(countAfterEmpty).to.be.eql(1);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this job is the parent that was moved to wait state, before it was in waiting-children state

});
});

describe('when delayed option is provided as false', () => {
it('clean queue without delayed jobs', async () => {
const maxJobs = 50;
Expand Down