Skip to content

Commit

Permalink
feat(repeatable): allow saving custom key (#1824)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Dec 27, 2023
1 parent dddd2c8 commit 8ea0e1f
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 23 deletions.
53 changes: 31 additions & 22 deletions src/classes/repeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,13 @@ export class Repeat extends QueueBase {
//
// Generate unique job id for this iteration.
//
const jobId = this.getRepeatJobId(
const jobId = this.getRepeatJobId({
name,
nextMillis,
this.hash(repeatJobKey),
opts.repeat.jobId,
);
namespace: this.hash(repeatJobKey),
jobId: opts.repeat.jobId,
key: opts.repeat.key,
});
const now = Date.now();
const delay =
nextMillis + (opts.repeat.offset ? opts.repeat.offset : 0) - now;
Expand All @@ -146,25 +147,26 @@ export class Repeat extends QueueBase {
jobId?: string,
): Promise<number> {
const repeatJobKey = getRepeatKey(name, { ...repeat, jobId });
const repeatJobId = this.getRepeatJobId(
const repeatJobId = this.getRepeatJobId({
name,
'',
this.hash(repeatJobKey),
jobId || repeat.jobId,
);
nextMillis: '',
namespace: this.hash(repeatJobKey),
jobId: jobId ?? repeat.jobId,
key: repeat.key,
});

return this.scripts.removeRepeatable(repeatJobId, repeatJobKey);
}

async removeRepeatableByKey(repeatJobKey: string): Promise<number> {
const data = this.keyToData(repeatJobKey);

const repeatJobId = this.getRepeatJobId(
data.name,
'',
this.hash(repeatJobKey),
data.id,
);
const repeatJobId = this.getRepeatJobId({
name: data.name,
nextMillis: '',
namespace: this.hash(repeatJobKey),
jobId: data.id,
});

return this.scripts.removeRepeatable(repeatJobId, repeatJobKey);
}
Expand Down Expand Up @@ -212,13 +214,20 @@ export class Repeat extends QueueBase {
return createHash(this.repeatKeyHashAlgorithm).update(str).digest('hex');
}

private getRepeatJobId(
name: string,
nextMillis: number | string,
namespace: string,
jobId?: string,
) {
const checksum = this.hash(`${name}${jobId || ''}${namespace}`);
private getRepeatJobId({
name,
nextMillis,
namespace,
jobId,
key,
}: {
name?: string;
nextMillis: number | string;
namespace?: string;
jobId?: string;
key?: string;
}) {
const checksum = key ?? this.hash(`${name}${jobId || ''}${namespace}`);
return `repeat:${checksum}:${nextMillis}`;
// return `repeat:${jobId || ''}:${name}:${namespace}:${nextMillis}`;
//return `repeat:${name}:${namespace}:${nextMillis}`;
Expand Down
9 changes: 9 additions & 0 deletions src/interfaces/repeat-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ export interface RepeatOptions extends Omit<ParserOptions, 'iterator'> {
* A repeat pattern
*/
pattern?: string;

/**
* Custom repeatable key. This is the key that holds the "metadata"
* of a given repeatable job. This key is normally auto-generated but
* it is sometimes useful to specify a custom key for easier retrieval
* of repeatable jobs.
*/
key?: string;

/**
* Number of times the job should repeat at max.
*/
Expand Down
2 changes: 1 addition & 1 deletion tests/test_pause.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ describe('Pause', function () {
await worker.resume();

await processPromise;
worker.close();
await worker.close();
});

it('should wait until active jobs are finished before resolving pause', async () => {
Expand Down
55 changes: 55 additions & 0 deletions tests/test_repeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,61 @@ describe('repeat', function () {
delayStub.restore();
});

describe('when custom key is provided', function () {
it('should allow removing a repeatable job by custom key', async function () {
const numJobs = 4;
const date = new Date('2017-02-07 9:24:00');
let prev: Job;
let counter = 0;
let processor;
const key = 'xxxx';

this.clock.setSystemTime(date);

const nextTick = 2 * ONE_SECOND + 10;
const repeat = { pattern: '*/2 * * * * *', key };

await queue.add('test', { foo: 'bar' }, { repeat });

this.clock.tick(nextTick);

const processing = new Promise<void>((resolve, reject) => {
processor = async () => {
counter++;
if (counter == numJobs) {
try {
await queue.removeRepeatable('test', repeat);
this.clock.tick(nextTick);
const delayed = await queue.getDelayed();
expect(delayed).to.be.empty;
resolve();
} catch (err) {
reject(err);
}
} else if (counter > numJobs) {
reject(Error(`should not repeat more than ${numJobs} times`));
}
};
});

const worker = new Worker(queueName, processor, { connection, prefix });
const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {});
await worker.waitUntilReady();

worker.on('completed', job => {
this.clock.tick(nextTick);
if (prev) {
expect(prev.timestamp).to.be.lt(job.timestamp);
expect(job.timestamp - prev.timestamp).to.be.gte(2000);
}
prev = job;
});

await processing;
delayStub.restore();
});
});

// This test is flaky and too complex we need something simpler that tests the same thing
it.skip('should not re-add a repeatable job after it has been removed', async function () {
const repeat = await queue.repeat;
Expand Down

0 comments on commit 8ea0e1f

Please sign in to comment.