Skip to content

Commit

Permalink
fix: do not remove delayed jobs that belongs to a job scheduler when …
Browse files Browse the repository at this point in the history
…using drain and clean
  • Loading branch information
manast committed Oct 5, 2024
1 parent bd6ed2a commit fb3748c
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 115 deletions.
2 changes: 2 additions & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,7 @@ export class Scripts {
queueKeys.paused,
delayed ? queueKeys.delayed : '',
queueKeys.prioritized,
queueKeys.repeat,
];

const args = [queueKeys['']];
Expand Down Expand Up @@ -1043,6 +1044,7 @@ export class Scripts {
return (<any>client).cleanJobsInSet([
this.queue.toKey(set),
this.queue.toKey('events'),
this.queue.toKey('repeat'),
this.queue.toKey(''),
timestamp,
limit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Input:
KEYS[1] set key,
KEYS[2] events stream key
KEYS[3] job schedulers key
ARGV[1] jobKey prefix
ARGV[2] timestamp
Expand Down Expand Up @@ -32,21 +33,21 @@ end

local result
if ARGV[4] == "active" then
result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], false)
result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], false --[[ hasFinished ]])
elseif ARGV[4] == "delayed" then
rangeEnd = "+inf"
result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit,
{"processedOn", "timestamp"}, false)
{"processedOn", "timestamp"}, false --[[ hasFinished ]], KEYS[3])
elseif ARGV[4] == "prioritized" then
rangeEnd = "+inf"
result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit,
{"timestamp"}, false)
{"timestamp"}, false --[[ hasFinished ]])
elseif ARGV[4] == "wait" or ARGV[4] == "paused" then
result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], true)
result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], true --[[ hasFinished ]])
else
rangeEnd = ARGV[2]
result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit,
{"finishedOn"}, true)
{"finishedOn"}, true --[[ hasFinished ]])
end

rcall("XADD", KEYS[2], "*", "event", "cleaned", "count", result[2])
Expand Down
26 changes: 0 additions & 26 deletions src/commands/drain-4.lua

This file was deleted.

41 changes: 41 additions & 0 deletions src/commands/drain-5.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
--[[
Drains the queue, removes all jobs that are waiting
or delayed, but not active, completed or failed
Input:
KEYS[1] 'wait',
KEYS[2] 'paused'
KEYS[3] 'delayed'
KEYS[4] 'prioritized'
KEYS[5] 'jobschedulers' (repeat)
ARGV[1] queue key prefix
]]
local rcall = redis.call
local queueBaseKey = ARGV[1]

--- @include "includes/removeListJobs"
--- @include "includes/removeZSetJobs"

removeListJobs(KEYS[1], true, queueBaseKey, 0) -- wait
removeListJobs(KEYS[2], true, queueBaseKey, 0) -- paused

if KEYS[3] ~= "" then

-- We must not remove delayed jobs if they are associated to a job scheduler.
local scheduledJobs = {}
local jobSchedulers = rcall("ZRANGE", KEYS[5], 0, -1, "WITHSCORES")

-- For every job scheduler, get the current delayed job id.
for i = 1, #jobSchedulers, 2 do
local jobSchedulerId = jobSchedulers[i]
local jobSchedulerMillis = jobSchedulers[i + 1]

local delayedJobId = "repeat:" .. jobSchedulerId .. ":" .. jobSchedulerMillis
scheduledJobs[delayedJobId] = true
end

removeZSetJobs(KEYS[3], true, queueBaseKey, 0, scheduledJobs) -- delayed
end

removeZSetJobs(KEYS[4], true, queueBaseKey, 0) -- prioritized
79 changes: 50 additions & 29 deletions src/commands/includes/cleanSet.lua
Original file line number Diff line number Diff line change
@@ -1,45 +1,66 @@
--[[
Function to clean job set.
Returns jobIds and deleted count number.
]]
]]

-- Includes
--- @include "batches"
--- @include "getJobsInZset"
--- @include "getTimestamp"
--- @include "removeJob"

local function cleanSet(setKey, jobKeyPrefix, rangeEnd, timestamp, limit, attributes, isFinished)
local jobs = getJobsInZset(setKey, rangeEnd, limit)
local deleted = {}
local deletedCount = 0
local jobTS
for i, job in ipairs(jobs) do
if limit > 0 and deletedCount >= limit then
break
local function isJobSchedulerJob(jobId, jobSchedulersKey)
if jobSchedulersKey then
local jobSchedulerId = jobId:match("repeat:(.*):%d+")
if jobSchedulerId then
return rcall("ZSCORE", jobSchedulersKey, jobSchedulerId)
end
end
return false
end

local function cleanSet(
setKey,
jobKeyPrefix,
rangeEnd,
timestamp,
limit,
attributes,
isFinished,
jobSchedulersKey)
local jobs = getJobsInZset(setKey, rangeEnd, limit)
local deleted = {}
local deletedCount = 0
local jobTS
for i, job in ipairs(jobs) do
if limit > 0 and deletedCount >= limit then
break
end

local jobKey = jobKeyPrefix .. job
if isFinished then
removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]])
deletedCount = deletedCount + 1
table.insert(deleted, job)
else
-- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed
jobTS = getTimestamp(jobKey, attributes)
if (not jobTS or jobTS <= timestamp) then
removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]])
deletedCount = deletedCount + 1
table.insert(deleted, job)
end
-- Extract a Job Scheduler Id from jobId ("repeat:job-scheduler-id:millis")
-- and check if it is in the scheduled jobs
if not isJobSchedulerJob(job, jobSchedulersKey) then
local jobKey = jobKeyPrefix .. job
if isFinished then
removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]] )
deletedCount = deletedCount + 1
table.insert(deleted, job)
else
-- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed
jobTS = getTimestamp(jobKey, attributes)
if (not jobTS or jobTS <= timestamp) then
removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]] )
deletedCount = deletedCount + 1
table.insert(deleted, job)
end
end
end
end
end

if(#deleted > 0) then
for from, to in batches(#deleted, 7000) do
rcall("ZREM", setKey, unpack(deleted, from, to))
if (#deleted > 0) then
for from, to in batches(#deleted, 7000) do
rcall("ZREM", setKey, unpack(deleted, from, to))
end
end
end

return {deleted, deletedCount}
return {deleted, deletedCount}
end
14 changes: 13 additions & 1 deletion src/commands/includes/removeZSetJobs.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,20 @@
--- @include "getZSetItems"
--- @include "removeJobs"

local function removeZSetJobs(keyName, hard, baseKey, max)
local function removeZSetJobs(keyName, hard, baseKey, max, jobsToIgnore)
local jobs = getZSetItems(keyName, max)

-- filter out jobs to ignore
if jobsToIgnore then
local filteredJobs = {}
for i = 1, #jobs do
if not jobsToIgnore[jobs[i]] then
table.insert(filteredJobs, jobs[i])
end
end
jobs = filteredJobs
end

local count = removeJobs(jobs, hard, baseKey, max)
if(#jobs > 0) then
for from, to in batches(#jobs, 7000) do
Expand Down
88 changes: 34 additions & 54 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1217,60 +1217,6 @@ describe('Job Scheduler', function () {
});
});

it('should allow removing a customId repeatable job', async function () {
const numJobs = 4;
const date = new Date('2017-02-07 9:24:00');
let prev: Job;
let counter = 0;
let processor;
const jobId = 'xxxx';

this.clock.setSystemTime(date);

const nextTick = 2 * ONE_SECOND + 10;
const repeat = { pattern: '*/2 * * * * *' };

await queue.add('test', { foo: 'bar' }, { repeat, jobId });

this.clock.tick(nextTick);

const processing = new Promise<void>((resolve, reject) => {
processor = async () => {
counter++;
if (counter == numJobs) {
try {
await queue.removeRepeatable('test', repeat, jobId);
this.clock.tick(nextTick);
const delayed = await queue.getDelayed();
expect(delayed).to.be.empty;
resolve();
} catch (err) {
reject(err);
}
} else if (counter > numJobs) {
reject(Error(`should not repeat more than ${numJobs} times`));
}
};
});

const worker = new Worker(queueName, processor, { connection, prefix });
const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {});
await worker.waitUntilReady();

worker.on('completed', job => {
this.clock.tick(nextTick);
if (prev) {
expect(prev.timestamp).to.be.lt(job.timestamp);
expect(job.timestamp - prev.timestamp).to.be.gte(2000);
}
prev = job;
});

await processing;
await worker.close();
delayStub.restore();
});

it('should keep only one delayed job if adding a new repeatable job with the same id', async function () {
const date = new Date('2017-02-07 9:24:00');
const key = 'mykey';
Expand Down Expand Up @@ -1435,6 +1381,40 @@ describe('Job Scheduler', function () {
}
});

it('should not remove delayed jobs if they belong to a repeatable job when using drain', async function () {
await queue.upsertJobScheduler('myTestJob', { every: 5000 });
await queue.add('test', { foo: 'bar' }, { delay: 1000 });

// Get delayed jobs
let delayed = await queue.getDelayed();
expect(delayed.length).to.be.eql(2);

// Drain the queue
await queue.drain(true);

delayed = await queue.getDelayed();
expect(delayed.length).to.be.eql(1);

expect(delayed[0].name).to.be.eql('myTestJob');
});

it('should not remove delayed jobs if they belong to a repeatable job when using clean', async function () {
await queue.upsertJobScheduler('myTestJob', { every: 5000 });
await queue.add('test', { foo: 'bar' }, { delay: 1000 });

// Get delayed jobs
let delayed = await queue.getDelayed();
expect(delayed.length).to.be.eql(2);

// Clean delayed jobs
await queue.clean(0, 100, 'delayed');

delayed = await queue.getDelayed();
expect(delayed.length).to.be.eql(1);

expect(delayed[0].name).to.be.eql('myTestJob');
});

it("should keep one delayed job if updating a repeatable job's every option", async function () {
await queue.upsertJobScheduler('myTestJob', { every: 5000 });
await queue.upsertJobScheduler('myTestJob', { every: 4000 });
Expand Down

0 comments on commit fb3748c

Please sign in to comment.