Skip to content

Commit

Permalink
refactor: remove paused key on moveToFinished
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Sep 13, 2024
1 parent c3acbb9 commit e973ec5
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 17 deletions.
4 changes: 2 additions & 2 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-8.lua")),
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-10.lua")),
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-8.lua")),
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-14.lua")),
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-13.lua")),
"moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-5.lua")),
"obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")),
"pause": self.redisClient.register_script(self.getScript("pause-7.lua")),
Expand Down Expand Up @@ -512,7 +512,7 @@ def moveToFinishedArgs(self, job: Job, val: Any, propVal: str, shouldRemove, tar
metricsKey = self.toKey('metrics:' + target)

keys = self.getKeys(['wait', 'active', 'prioritized', 'events',
'stalled', 'limiter', 'delayed', 'paused', 'meta', 'pc', target])
'stalled', 'limiter', 'delayed', 'meta', 'pc', target])
keys.append(self.toKey(job.id))
keys.append(metricsKey)
keys.append(self.keys['marker'])
Expand Down
1 change: 0 additions & 1 deletion src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ export class Scripts {
queueKeys.stalled,
queueKeys.limiter,
queueKeys.delayed,
queueKeys.paused,
queueKeys.meta,
queueKeys.pc,
undefined,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
KEYS[6] rate limiter key
KEYS[7] delayed key
KEYS[8] paused key // TODO remove
KEYS[9] meta key
KEYS[10] pc priority counter
KEYS[8] meta key
KEYS[9] pc priority counter
KEYS[11] completed/failed key
KEYS[12] jobId key
KEYS[13] metrics key
KEYS[14] marker key
KEYS[10] completed/failed key
KEYS[11] jobId key
KEYS[12] metrics key
KEYS[13] marker key
ARGV[1] jobId
ARGV[2] timestamp
Expand Down Expand Up @@ -70,7 +69,7 @@ local rcall = redis.call
--- @include "includes/trimEvents"
--- @include "includes/updateParentDepsIfNeeded"

local jobIdKey = KEYS[12]
local jobIdKey = KEYS[11]
if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
local opts = cmsgpack.unpack(ARGV[8])

Expand Down Expand Up @@ -101,7 +100,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
if (numRemovedElements < 1) then return -3 end

local eventStreamKey = KEYS[4]
local metaKey = KEYS[9]
local metaKey = KEYS[8]
-- Trim events before emiting them to avoid trimming events emitted in this script
trimEvents(metaKey, eventStreamKey)

Expand Down Expand Up @@ -137,7 +136,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists

-- Remove job?
if maxCount ~= 0 then
local targetSet = KEYS[11]
local targetSet = KEYS[10]
-- Add to complete/failed set
rcall("ZADD", targetSet, timestamp, jobId)
rcall("HMSET", jobIdKey, ARGV[3], ARGV[4], "finishedOn", timestamp)
Expand Down Expand Up @@ -173,7 +172,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists

-- Collect metrics
if maxMetricsSize ~= "" then
collectMetrics(KEYS[13], KEYS[13] .. ':data', maxMetricsSize, timestamp)
collectMetrics(KEYS[12], KEYS[12] .. ':data', maxMetricsSize, timestamp)
end

-- Try to get next job to avoid an extra roundtrip if the queue is not closing,
Expand All @@ -183,8 +182,8 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, KEYS[2])

-- Check if there are delayed jobs that can be promoted
promoteDelayedJobs(KEYS[7], KEYS[14], KEYS[1], KEYS[3], eventStreamKey, prefix,
timestamp, KEYS[10], isPausedOrMaxed)
promoteDelayedJobs(KEYS[7], KEYS[13], KEYS[1], KEYS[3], eventStreamKey, prefix,
timestamp, KEYS[9], isPausedOrMaxed)

local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max'])
-- Check if we are rate limited first.
Expand All @@ -201,7 +200,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId,
timestamp, maxJobs, opts)
else
jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10])
jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[9])
if jobId then
return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId,
timestamp, maxJobs,
Expand Down

0 comments on commit e973ec5

Please sign in to comment.