From 64feec91b0b034fe640a846166bd95b546ff6d71 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Sun, 7 Apr 2024 16:07:22 -0600 Subject: [PATCH] perf(stalled): remove jobId from stalled after removing lock when moved from active (#2512) (python) --- python/bullmq/scripts.py | 11 +++++--- src/classes/scripts.ts | 16 +++++++---- src/commands/includes/removeLock.lua | 19 +++++++++++++ ...oveToDelayed-7.lua => moveToDelayed-8.lua} | 15 +++++----- ...dren-4.lua => moveToWaitingChildren-5.lua} | 28 ++++++++++--------- .../{retryJob-10.lua => retryJob-11.lua} | 13 ++++----- 6 files changed, 64 insertions(+), 38 deletions(-) create mode 100644 src/commands/includes/removeLock.lua rename src/commands/{moveToDelayed-7.lua => moveToDelayed-8.lua} (89%) rename src/commands/{moveToWaitingChildren-4.lua => moveToWaitingChildren-5.lua} (74%) rename src/commands/{retryJob-10.lua => retryJob-11.lua} (90%) diff --git a/python/bullmq/scripts.py b/python/bullmq/scripts.py index e44d25cd81..0333f89961 100644 --- a/python/bullmq/scripts.py +++ b/python/bullmq/scripts.py @@ -45,15 +45,15 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection "isJobInList": self.redisClient.register_script(self.getScript("isJobInList-1.lua")), "moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-9.lua")), "moveToActive": self.redisClient.register_script(self.getScript("moveToActive-11.lua")), - "moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-7.lua")), + "moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-8.lua")), "moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-14.lua")), - "moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-4.lua")), + "moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-5.lua")), "obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")), "pause": self.redisClient.register_script(self.getScript("pause-7.lua")), "promote": self.redisClient.register_script(self.getScript("promote-8.lua")), "removeJob": self.redisClient.register_script(self.getScript("removeJob-1.lua")), "reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-7.lua")), - "retryJob": self.redisClient.register_script(self.getScript("retryJob-10.lua")), + "retryJob": self.redisClient.register_script(self.getScript("retryJob-11.lua")), "moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-7.lua")), "saveStacktrace": self.redisClient.register_script(self.getScript("saveStacktrace-1.lua")), "updateData": self.redisClient.register_script(self.getScript("updateData-1.lua")), @@ -171,7 +171,8 @@ def moveToWaitingChildrenArgs(self, job_id, token, opts: dict = {}): keys = [self.toKey(job_id) + ":lock", self.keys['active'], self.keys['waiting-children'], - self.toKey(job_id)] + self.toKey(job_id), + self.keys['stalled']] child_key = opts.get("child") if opts else None args = [token, get_parent_key(child_key) or "", round(time.time() * 1000), job_id, "1" if opts.get("skipAttempt") else "0"] @@ -251,6 +252,7 @@ def retryJobArgs(self, job_id: str, lifo: bool, token: str, opts: dict = {}): keys.append(self.keys['prioritized']) keys.append(self.keys['pc']) keys.append(self.keys['marker']) + keys.append(self.keys['stalled']) push_cmd = "RPUSH" if lifo else "LPUSH" @@ -269,6 +271,7 @@ def moveToDelayedArgs(self, job_id: str, timestamp: int, token: str, delay: int keys.append(self.toKey(job_id)) keys.append(self.keys['events']) keys.append(self.keys['meta']) + keys.append(self.keys['stalled']) args = [self.keys[''], round(time.time() * 1000), str(max_timestamp), job_id, token, delay, "1" if opts.get("skipAttempt") else "0" ] diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 5a34adebe9..0c2a9830d3 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -778,6 +778,7 @@ export class Scripts { this.queue.toKey(jobId), queueKeys.events, queueKeys.meta, + queueKeys.stalled, ]; return keys.concat([ @@ -810,11 +811,15 @@ export class Scripts { const childKey = getParentKey(opts.child); - const keys = [`${jobId}:lock`, 'active', 'waiting-children', jobId].map( - name => { - return this.queue.toKey(name); - }, - ); + const keys = [ + `${jobId}:lock`, + 'active', + 'waiting-children', + jobId, + 'stalled', + ].map(name => { + return this.queue.toKey(name); + }); return keys.concat([ token, @@ -919,6 +924,7 @@ export class Scripts { this.queue.keys.prioritized, this.queue.keys.pc, this.queue.keys.marker, + this.queue.keys.stalled, ]; const pushCmd = (lifo ? 'R' : 'L') + 'PUSH'; diff --git a/src/commands/includes/removeLock.lua b/src/commands/includes/removeLock.lua new file mode 100644 index 0000000000..d7335f6350 --- /dev/null +++ b/src/commands/includes/removeLock.lua @@ -0,0 +1,19 @@ +local function removeLock(jobKey, stalledKey, token, jobId) + if token ~= "0" then + local lockKey = jobKey .. ':lock' + local lockToken = rcall("GET", lockKey) + if lockToken == token then + rcall("DEL", lockKey) + rcall("SREM", stalledKey, jobId) + else + if lockToken then + -- Lock exists but token does not match + return -6 + else + -- Lock is missing completely + return -2 + end + end + end + return 0 +end diff --git a/src/commands/moveToDelayed-7.lua b/src/commands/moveToDelayed-8.lua similarity index 89% rename from src/commands/moveToDelayed-7.lua rename to src/commands/moveToDelayed-8.lua index 8cfcfd2f34..3e3f14c802 100644 --- a/src/commands/moveToDelayed-7.lua +++ b/src/commands/moveToDelayed-8.lua @@ -9,6 +9,7 @@ KEYS[5] job key KEYS[6] events stream KEYS[7] meta key + KEYS[8] stalled key ARGV[1] key prefix ARGV[2] timestamp @@ -32,20 +33,18 @@ local rcall = redis.call --- @include "includes/addDelayMarkerIfNeeded" --- @include "includes/getOrSetMaxEvents" --- @include "includes/isQueuePaused" +--- @include "includes/removeLock" local jobKey = KEYS[5] local metaKey = KEYS[7] +local token = ARGV[5] if rcall("EXISTS", jobKey) == 1 then - local delayedKey = KEYS[4] - if ARGV[5] ~= "0" then - local lockKey = jobKey .. ':lock' - if rcall("GET", lockKey) == ARGV[5] then - rcall("DEL", lockKey) - else - return -2 - end + local errorCode = removeLock(jobKey, KEYS[8], token, ARGV[4]) + if errorCode < 0 then + return errorCode end + local delayedKey = KEYS[4] local jobId = ARGV[4] local score = tonumber(ARGV[3]) local delayedTimestamp = (score / 0x1000) diff --git a/src/commands/moveToWaitingChildren-4.lua b/src/commands/moveToWaitingChildren-5.lua similarity index 74% rename from src/commands/moveToWaitingChildren-4.lua rename to src/commands/moveToWaitingChildren-5.lua index 5766e0c22a..a7519faeb0 100644 --- a/src/commands/moveToWaitingChildren-4.lua +++ b/src/commands/moveToWaitingChildren-5.lua @@ -6,6 +6,7 @@ KEYS[2] active key KEYS[3] waitChildrenKey key KEYS[4] job key + KEYS[5] stalled key ARGV[1] token ARGV[2] child key @@ -21,16 +22,11 @@ ]] local rcall = redis.call -local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId, timestamp, - lockKey, jobKey, token) - if token ~= "0" then - if rcall("GET", lockKey) == token then - rcall("DEL", lockKey) - else - return -2 - end - end +-- Includes +--- @include "includes/removeLock" +local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId, + timestamp) local score = tonumber(timestamp) local numRemovedElements = rcall("LREM", activeKey, -1, jobId) @@ -47,15 +43,21 @@ end if rcall("EXISTS", KEYS[4]) == 1 then if ARGV[2] ~= "" then if rcall("SISMEMBER", KEYS[4] .. ":dependencies", ARGV[2]) ~= 0 then - return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], KEYS[4], - ARGV[1]) + local errorCode = removeLock(KEYS[4], KEYS[5], ARGV[1], ARGV[4]) + if errorCode < 0 then + return errorCode + end + return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3]) end return 1 else if rcall("SCARD", KEYS[4] .. ":dependencies") ~= 0 then - return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], KEYS[4], - ARGV[1]) + local errorCode = removeLock(KEYS[4], KEYS[5], ARGV[1], ARGV[4]) + if errorCode < 0 then + return errorCode + end + return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3]) end return 1 diff --git a/src/commands/retryJob-10.lua b/src/commands/retryJob-11.lua similarity index 90% rename from src/commands/retryJob-10.lua rename to src/commands/retryJob-11.lua index c07e98e49b..c563da8d98 100644 --- a/src/commands/retryJob-10.lua +++ b/src/commands/retryJob-11.lua @@ -12,6 +12,7 @@ KEYS[8] prioritized key KEYS[9] 'pc' priority counter KEYS[10] 'marker' + KEYS[11] 'stalled' ARGV[1] key prefix ARGV[2] timestamp @@ -34,6 +35,7 @@ local rcall = redis.call --- @include "includes/getOrSetMaxEvents" --- @include "includes/getTargetQueueList" --- @include "includes/promoteDelayedJobs" +--- @include "includes/removeLock" local target, paused = getTargetQueueList(KEYS[5], KEYS[2], KEYS[3]) local markerKey = KEYS[10] @@ -43,14 +45,9 @@ local markerKey = KEYS[10] promoteDelayedJobs(KEYS[7], markerKey, target, KEYS[8], KEYS[6], ARGV[1], ARGV[2], KEYS[9], paused) if rcall("EXISTS", KEYS[4]) == 1 then - - if ARGV[5] ~= "0" then - local lockKey = KEYS[4] .. ':lock' - if rcall("GET", lockKey) == ARGV[5] then - rcall("DEL", lockKey) - else - return -2 - end + local errorCode = removeLock(KEYS[4], KEYS[11], ARGV[5], ARGV[4]) + if errorCode < 0 then + return errorCode end rcall("LREM", KEYS[1], 0, ARGV[4])