Skip to content

Commit

Permalink
perf(stalled): remove jobId from stalled after removing lock when mov…
Browse files Browse the repository at this point in the history
…ed from active (#2512) (python)
  • Loading branch information
roggervalf authored Apr 7, 2024
1 parent f875ddd commit 64feec9
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 38 deletions.
11 changes: 7 additions & 4 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"isJobInList": self.redisClient.register_script(self.getScript("isJobInList-1.lua")),
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-9.lua")),
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-11.lua")),
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-7.lua")),
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-8.lua")),
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-14.lua")),
"moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-4.lua")),
"moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-5.lua")),
"obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")),
"pause": self.redisClient.register_script(self.getScript("pause-7.lua")),
"promote": self.redisClient.register_script(self.getScript("promote-8.lua")),
"removeJob": self.redisClient.register_script(self.getScript("removeJob-1.lua")),
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-7.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-10.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-11.lua")),
"moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-7.lua")),
"saveStacktrace": self.redisClient.register_script(self.getScript("saveStacktrace-1.lua")),
"updateData": self.redisClient.register_script(self.getScript("updateData-1.lua")),
Expand Down Expand Up @@ -171,7 +171,8 @@ def moveToWaitingChildrenArgs(self, job_id, token, opts: dict = {}):
keys = [self.toKey(job_id) + ":lock",
self.keys['active'],
self.keys['waiting-children'],
self.toKey(job_id)]
self.toKey(job_id),
self.keys['stalled']]
child_key = opts.get("child") if opts else None
args = [token, get_parent_key(child_key) or "", round(time.time() * 1000), job_id,
"1" if opts.get("skipAttempt") else "0"]
Expand Down Expand Up @@ -251,6 +252,7 @@ def retryJobArgs(self, job_id: str, lifo: bool, token: str, opts: dict = {}):
keys.append(self.keys['prioritized'])
keys.append(self.keys['pc'])
keys.append(self.keys['marker'])
keys.append(self.keys['stalled'])

push_cmd = "RPUSH" if lifo else "LPUSH"

Expand All @@ -269,6 +271,7 @@ def moveToDelayedArgs(self, job_id: str, timestamp: int, token: str, delay: int
keys.append(self.toKey(job_id))
keys.append(self.keys['events'])
keys.append(self.keys['meta'])
keys.append(self.keys['stalled'])

args = [self.keys[''], round(time.time() * 1000), str(max_timestamp),
job_id, token, delay, "1" if opts.get("skipAttempt") else "0" ]
Expand Down
16 changes: 11 additions & 5 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,7 @@ export class Scripts {
this.queue.toKey(jobId),
queueKeys.events,
queueKeys.meta,
queueKeys.stalled,
];

return keys.concat([
Expand Down Expand Up @@ -810,11 +811,15 @@ export class Scripts {

const childKey = getParentKey(opts.child);

const keys = [`${jobId}:lock`, 'active', 'waiting-children', jobId].map(
name => {
return this.queue.toKey(name);
},
);
const keys = [
`${jobId}:lock`,
'active',
'waiting-children',
jobId,
'stalled',
].map(name => {
return this.queue.toKey(name);
});

return keys.concat([
token,
Expand Down Expand Up @@ -919,6 +924,7 @@ export class Scripts {
this.queue.keys.prioritized,
this.queue.keys.pc,
this.queue.keys.marker,
this.queue.keys.stalled,
];

const pushCmd = (lifo ? 'R' : 'L') + 'PUSH';
Expand Down
19 changes: 19 additions & 0 deletions src/commands/includes/removeLock.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
local function removeLock(jobKey, stalledKey, token, jobId)
if token ~= "0" then
local lockKey = jobKey .. ':lock'
local lockToken = rcall("GET", lockKey)
if lockToken == token then
rcall("DEL", lockKey)
rcall("SREM", stalledKey, jobId)
else
if lockToken then
-- Lock exists but token does not match
return -6
else
-- Lock is missing completely
return -2
end
end
end
return 0
end
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
KEYS[5] job key
KEYS[6] events stream
KEYS[7] meta key
KEYS[8] stalled key
ARGV[1] key prefix
ARGV[2] timestamp
Expand All @@ -32,20 +33,18 @@ local rcall = redis.call
--- @include "includes/addDelayMarkerIfNeeded"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/isQueuePaused"
--- @include "includes/removeLock"

local jobKey = KEYS[5]
local metaKey = KEYS[7]
local token = ARGV[5]
if rcall("EXISTS", jobKey) == 1 then
local delayedKey = KEYS[4]
if ARGV[5] ~= "0" then
local lockKey = jobKey .. ':lock'
if rcall("GET", lockKey) == ARGV[5] then
rcall("DEL", lockKey)
else
return -2
end
local errorCode = removeLock(jobKey, KEYS[8], token, ARGV[4])
if errorCode < 0 then
return errorCode
end

local delayedKey = KEYS[4]
local jobId = ARGV[4]
local score = tonumber(ARGV[3])
local delayedTimestamp = (score / 0x1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
KEYS[2] active key
KEYS[3] waitChildrenKey key
KEYS[4] job key
KEYS[5] stalled key
ARGV[1] token
ARGV[2] child key
Expand All @@ -21,16 +22,11 @@
]]
local rcall = redis.call

local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId, timestamp,
lockKey, jobKey, token)
if token ~= "0" then
if rcall("GET", lockKey) == token then
rcall("DEL", lockKey)
else
return -2
end
end
-- Includes
--- @include "includes/removeLock"

local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId,
timestamp)
local score = tonumber(timestamp)

local numRemovedElements = rcall("LREM", activeKey, -1, jobId)
Expand All @@ -47,15 +43,21 @@ end
if rcall("EXISTS", KEYS[4]) == 1 then
if ARGV[2] ~= "" then
if rcall("SISMEMBER", KEYS[4] .. ":dependencies", ARGV[2]) ~= 0 then
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], KEYS[4],
ARGV[1])
local errorCode = removeLock(KEYS[4], KEYS[5], ARGV[1], ARGV[4])
if errorCode < 0 then
return errorCode
end
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3])
end

return 1
else
if rcall("SCARD", KEYS[4] .. ":dependencies") ~= 0 then
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], KEYS[4],
ARGV[1])
local errorCode = removeLock(KEYS[4], KEYS[5], ARGV[1], ARGV[4])
if errorCode < 0 then
return errorCode
end
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3])
end

return 1
Expand Down
13 changes: 5 additions & 8 deletions src/commands/retryJob-10.lua → src/commands/retryJob-11.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
KEYS[8] prioritized key
KEYS[9] 'pc' priority counter
KEYS[10] 'marker'
KEYS[11] 'stalled'
ARGV[1] key prefix
ARGV[2] timestamp
Expand All @@ -34,6 +35,7 @@ local rcall = redis.call
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/getTargetQueueList"
--- @include "includes/promoteDelayedJobs"
--- @include "includes/removeLock"

local target, paused = getTargetQueueList(KEYS[5], KEYS[2], KEYS[3])
local markerKey = KEYS[10]
Expand All @@ -43,14 +45,9 @@ local markerKey = KEYS[10]
promoteDelayedJobs(KEYS[7], markerKey, target, KEYS[8], KEYS[6], ARGV[1], ARGV[2], KEYS[9], paused)

if rcall("EXISTS", KEYS[4]) == 1 then

if ARGV[5] ~= "0" then
local lockKey = KEYS[4] .. ':lock'
if rcall("GET", lockKey) == ARGV[5] then
rcall("DEL", lockKey)
else
return -2
end
local errorCode = removeLock(KEYS[4], KEYS[11], ARGV[5], ARGV[4])
if errorCode < 0 then
return errorCode
end

rcall("LREM", KEYS[1], 0, ARGV[4])
Expand Down

0 comments on commit 64feec9

Please sign in to comment.