diff --git a/src/classes/job.ts b/src/classes/job.ts index 1772affc5f..642214f46a 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -840,10 +840,11 @@ export class Job< * Moves the job to the delay set. * * @param timestamp - timestamp where the job should be moved back to "wait" + * @param token - token to check job is locked by current worker * @returns */ - moveToDelayed(timestamp: number): Promise { - return Scripts.moveToDelayed(this.queue, this.id, timestamp); + moveToDelayed(timestamp: number, token?: string): Promise { + return Scripts.moveToDelayed(this.queue, this.id, timestamp, token); } /** diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 8ef92310ce..6378fae252 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -594,11 +594,9 @@ export class Scripts { opts: MoveToChildrenOpts = {}, ): Promise { const client = await queue.client; - const multi = client.multi(); const args = this.moveToWaitingChildrenArgs(queue, jobId, token, opts); - (multi).moveToWaitingChildren(args); - const [[err, result]] = (await multi.exec()) as [[null | Error, number]]; + const result = await (client).moveToWaitingChildren(args); switch (result) { case 0: diff --git a/src/commands/moveToDelayed-5.lua b/src/commands/moveToDelayed-5.lua index ee34e3c840..0fe834a48c 100644 --- a/src/commands/moveToDelayed-5.lua +++ b/src/commands/moveToDelayed-5.lua @@ -28,9 +28,9 @@ if rcall("EXISTS", KEYS[3]) == 1 then local lockKey = KEYS[3] .. ':lock' if rcall("GET", lockKey) == ARGV[3] then rcall("DEL", lockKey) - end - else + else return -2 + end end local jobId = ARGV[2] diff --git a/src/commands/moveToFinished-12.lua b/src/commands/moveToFinished-12.lua index 9840a4d700..96bf4a454a 100644 --- a/src/commands/moveToFinished-12.lua +++ b/src/commands/moveToFinished-12.lua @@ -51,7 +51,8 @@ Events: 'completed/failed' -]] local rcall = redis.call +]] +local rcall = redis.call --- Includes --- @include "includes/destructureJobKey" diff --git a/src/commands/moveToWaitingChildren-4.lua b/src/commands/moveToWaitingChildren-4.lua index f2b23a9d5b..d987302a2f 100644 --- a/src/commands/moveToWaitingChildren-4.lua +++ b/src/commands/moveToWaitingChildren-4.lua @@ -21,7 +21,15 @@ ]] local rcall = redis.call -local function move_to_waiting_children (activeKey, waitingChildrenKey, jobId, timestamp) +local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId, timestamp, lockKey, token) + if token ~= "0" then + if rcall("GET", lockKey) == token then + rcall("DEL", lockKey) + else + return -2 + end + end + local score = tonumber(timestamp) local numRemovedElements = rcall("LREM", activeKey, -1, jobId) @@ -35,22 +43,16 @@ local function move_to_waiting_children (activeKey, waitingChildrenKey, jobId, t return 0 end -if ARGV[1] ~= "0" then - if rcall("GET", KEYS[1]) ~= ARGV[1] then - return -2 - end -end - if rcall("EXISTS", KEYS[4]) == 1 then if ARGV[2] ~= "" then if rcall("SISMEMBER", KEYS[4] .. ":dependencies", ARGV[2]) ~= 0 then - return move_to_waiting_children(KEYS[2], KEYS[3], ARGV[4], ARGV[3]) + return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], ARGV[1]) end return 1 else if rcall("SCARD", KEYS[4] .. ":dependencies") ~= 0 then - return move_to_waiting_children(KEYS[2], KEYS[3], ARGV[4], ARGV[3]) + return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], ARGV[1]) end return 1 diff --git a/tests/test_worker.ts b/tests/test_worker.ts index 0c1000922b..a7aecbed8d 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -1832,6 +1832,69 @@ describe('workers', function () { await queueScheduler.close(); }); + describe('when moving job to delayed in one step', () => { + it('should retry job after a delay time, keeping the current step', async function () { + this.timeout(8000); + + const queueScheduler = new QueueScheduler(queueName, { connection }); + await queueScheduler.waitUntilReady(); + + enum Step { + Initial, + Second, + Finish, + } + + const worker = new Worker( + queueName, + async (job, token) => { + let step = job.data.step; + while (step !== Step.Finish) { + switch (step) { + case Step.Initial: { + await job.moveToDelayed(Date.now() + 200, token); + await job.update({ + step: Step.Second, + }); + step = Step.Second; + return; + } + case Step.Second: { + await job.update({ + step: Step.Finish, + }); + step = Step.Finish; + return Step.Finish; + } + default: { + throw new Error('invalid step'); + } + } + } + }, + { connection }, + ); + + await worker.waitUntilReady(); + + const start = Date.now(); + await queue.add('test', { step: Step.Initial }); + + await new Promise(resolve => { + worker.on('completed', job => { + const elapse = Date.now() - start; + expect(elapse).to.be.greaterThan(200); + expect(job.returnvalue).to.be.eql(Step.Finish); + expect(job.attemptsMade).to.be.eql(2); + resolve(); + }); + }); + + await worker.close(); + await queueScheduler.close(); + }); + }); + describe('when creating children at runtime', () => { it('should wait children as one step of the parent job', async function () { this.timeout(8000); @@ -2510,7 +2573,9 @@ describe('workers', function () { { idx: 1, baz: 'something' }, { idx: 2, qux: 'something' }, ]; + const client = await queue.client; const parentToken = 'parent-token'; + const parentToken2 = 'parent-token2'; const childToken = 'child-token'; const parentQueueName = `parent-queue-${v4()}`; @@ -2568,6 +2633,8 @@ describe('workers', function () { }, ); + const token = await client.get(`bull:${queueName}:${parent.id}:lock`); + expect(token).to.be.null; expect(processed2).to.deep.equal({ [`bull:${queueName}:${child1.id}`]: 'return value1', }); @@ -2602,9 +2669,13 @@ describe('workers', function () { const { processed: processed4, unprocessed: unprocessed4 } = await parent.getDependencies(); const isWaitingChildren2 = await parent.isWaitingChildren(); - const movedToWaitingChildren2 = await parent.moveToWaitingChildren( - parentToken, - ); + + expect(isWaitingChildren2).to.be.false; + const updatedParent = (await parentWorker.getNextJob( + parentToken2, + )) as Job; + const movedToWaitingChildren2 = + await updatedParent.moveToWaitingChildren(parentToken2); expect(processed4).to.deep.equal({ [`bull:${queueName}:${child1.id}`]: 'return value1', @@ -2612,7 +2683,6 @@ describe('workers', function () { [`bull:${queueName}:${child3.id}`]: 'return value3', }); expect(unprocessed4).to.have.length(0); - expect(isWaitingChildren2).to.be.false; expect(movedToWaitingChildren2).to.be.false; await childrenWorker.close(); @@ -2678,6 +2748,17 @@ describe('workers', function () { queue: 'bull:' + queueName, }, }), + ).to.be.rejectedWith( + `Missing lock for job ${parent.id}. moveToWaitingChildren`, + ); + + await expect( + parent.moveToWaitingChildren('0', { + child: { + id: child1.id, + queue: 'bull:' + queueName, + }, + }), ).to.be.rejectedWith( `Job ${parent.id} is not in the active state. moveToWaitingChildren`, );