Skip to content

Commit

Permalink
feat(move-to-delayed): allow passing token (taskforcesh#1213)
Browse files Browse the repository at this point in the history
fix(move-to-waiting-children): delete lock
  • Loading branch information
roggervalf authored Apr 26, 2022
1 parent 3e995f6 commit 14f0e4a
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 21 deletions.
5 changes: 3 additions & 2 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -840,10 +840,11 @@ export class Job<
* Moves the job to the delay set.
*
* @param timestamp - timestamp where the job should be moved back to "wait"
* @param token - token to check job is locked by current worker
* @returns
*/
moveToDelayed(timestamp: number): Promise<void> {
return Scripts.moveToDelayed(this.queue, this.id, timestamp);
moveToDelayed(timestamp: number, token?: string): Promise<void> {
return Scripts.moveToDelayed(this.queue, this.id, timestamp, token);
}

/**
Expand Down
4 changes: 1 addition & 3 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,9 @@ export class Scripts {
opts: MoveToChildrenOpts = {},
): Promise<boolean> {
const client = await queue.client;
const multi = client.multi();

const args = this.moveToWaitingChildrenArgs(queue, jobId, token, opts);
(<any>multi).moveToWaitingChildren(args);
const [[err, result]] = (await multi.exec()) as [[null | Error, number]];
const result = await (<any>client).moveToWaitingChildren(args);

switch (result) {
case 0:
Expand Down
4 changes: 2 additions & 2 deletions src/commands/moveToDelayed-5.lua
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ if rcall("EXISTS", KEYS[3]) == 1 then
local lockKey = KEYS[3] .. ':lock'
if rcall("GET", lockKey) == ARGV[3] then
rcall("DEL", lockKey)
end
else
else
return -2
end
end

local jobId = ARGV[2]
Expand Down
3 changes: 2 additions & 1 deletion src/commands/moveToFinished-12.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@
Events:
'completed/failed'
]] local rcall = redis.call
]]
local rcall = redis.call

--- Includes
--- @include "includes/destructureJobKey"
Expand Down
20 changes: 11 additions & 9 deletions src/commands/moveToWaitingChildren-4.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,15 @@
]]
local rcall = redis.call

local function move_to_waiting_children (activeKey, waitingChildrenKey, jobId, timestamp)
local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId, timestamp, lockKey, token)
if token ~= "0" then
if rcall("GET", lockKey) == token then
rcall("DEL", lockKey)
else
return -2
end
end

local score = tonumber(timestamp)

local numRemovedElements = rcall("LREM", activeKey, -1, jobId)
Expand All @@ -35,22 +43,16 @@ local function move_to_waiting_children (activeKey, waitingChildrenKey, jobId, t
return 0
end

if ARGV[1] ~= "0" then
if rcall("GET", KEYS[1]) ~= ARGV[1] then
return -2
end
end

if rcall("EXISTS", KEYS[4]) == 1 then
if ARGV[2] ~= "" then
if rcall("SISMEMBER", KEYS[4] .. ":dependencies", ARGV[2]) ~= 0 then
return move_to_waiting_children(KEYS[2], KEYS[3], ARGV[4], ARGV[3])
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], ARGV[1])
end

return 1
else
if rcall("SCARD", KEYS[4] .. ":dependencies") ~= 0 then
return move_to_waiting_children(KEYS[2], KEYS[3], ARGV[4], ARGV[3])
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], ARGV[1])
end

return 1
Expand Down
89 changes: 85 additions & 4 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1832,6 +1832,69 @@ describe('workers', function () {
await queueScheduler.close();
});

describe('when moving job to delayed in one step', () => {
it('should retry job after a delay time, keeping the current step', async function () {
this.timeout(8000);

const queueScheduler = new QueueScheduler(queueName, { connection });
await queueScheduler.waitUntilReady();

enum Step {
Initial,
Second,
Finish,
}

const worker = new Worker(
queueName,
async (job, token) => {
let step = job.data.step;
while (step !== Step.Finish) {
switch (step) {
case Step.Initial: {
await job.moveToDelayed(Date.now() + 200, token);
await job.update({
step: Step.Second,
});
step = Step.Second;
return;
}
case Step.Second: {
await job.update({
step: Step.Finish,
});
step = Step.Finish;
return Step.Finish;
}
default: {
throw new Error('invalid step');
}
}
}
},
{ connection },
);

await worker.waitUntilReady();

const start = Date.now();
await queue.add('test', { step: Step.Initial });

await new Promise<void>(resolve => {
worker.on('completed', job => {
const elapse = Date.now() - start;
expect(elapse).to.be.greaterThan(200);
expect(job.returnvalue).to.be.eql(Step.Finish);
expect(job.attemptsMade).to.be.eql(2);
resolve();
});
});

await worker.close();
await queueScheduler.close();
});
});

describe('when creating children at runtime', () => {
it('should wait children as one step of the parent job', async function () {
this.timeout(8000);
Expand Down Expand Up @@ -2510,7 +2573,9 @@ describe('workers', function () {
{ idx: 1, baz: 'something' },
{ idx: 2, qux: 'something' },
];
const client = await queue.client;
const parentToken = 'parent-token';
const parentToken2 = 'parent-token2';
const childToken = 'child-token';

const parentQueueName = `parent-queue-${v4()}`;
Expand Down Expand Up @@ -2568,6 +2633,8 @@ describe('workers', function () {
},
);

const token = await client.get(`bull:${queueName}:${parent.id}:lock`);
expect(token).to.be.null;
expect(processed2).to.deep.equal({
[`bull:${queueName}:${child1.id}`]: 'return value1',
});
Expand Down Expand Up @@ -2602,17 +2669,20 @@ describe('workers', function () {
const { processed: processed4, unprocessed: unprocessed4 } =
await parent.getDependencies();
const isWaitingChildren2 = await parent.isWaitingChildren();
const movedToWaitingChildren2 = await parent.moveToWaitingChildren(
parentToken,
);

expect(isWaitingChildren2).to.be.false;
const updatedParent = (await parentWorker.getNextJob(
parentToken2,
)) as Job;
const movedToWaitingChildren2 =
await updatedParent.moveToWaitingChildren(parentToken2);

expect(processed4).to.deep.equal({
[`bull:${queueName}:${child1.id}`]: 'return value1',
[`bull:${queueName}:${child2.id}`]: 'return value2',
[`bull:${queueName}:${child3.id}`]: 'return value3',
});
expect(unprocessed4).to.have.length(0);
expect(isWaitingChildren2).to.be.false;
expect(movedToWaitingChildren2).to.be.false;

await childrenWorker.close();
Expand Down Expand Up @@ -2678,6 +2748,17 @@ describe('workers', function () {
queue: 'bull:' + queueName,
},
}),
).to.be.rejectedWith(
`Missing lock for job ${parent.id}. moveToWaitingChildren`,
);

await expect(
parent.moveToWaitingChildren('0', {
child: {
id: child1.id,
queue: 'bull:' + queueName,
},
}),
).to.be.rejectedWith(
`Job ${parent.id} is not in the active state. moveToWaitingChildren`,
);
Expand Down

0 comments on commit 14f0e4a

Please sign in to comment.