From 8ea0e1f76baf36dab94a66657c0f432492cb9999 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Wed, 27 Dec 2023 12:41:46 -0500 Subject: [PATCH] feat(repeatable): allow saving custom key (#1824) --- src/classes/repeat.ts | 53 +++++++++++++++++------------- src/interfaces/repeat-options.ts | 9 ++++++ tests/test_pause.ts | 2 +- tests/test_repeat.ts | 55 ++++++++++++++++++++++++++++++++ 4 files changed, 96 insertions(+), 23 deletions(-) diff --git a/src/classes/repeat.ts b/src/classes/repeat.ts index 1bfe97b4ea..1138789455 100644 --- a/src/classes/repeat.ts +++ b/src/classes/repeat.ts @@ -114,12 +114,13 @@ export class Repeat extends QueueBase { // // Generate unique job id for this iteration. // - const jobId = this.getRepeatJobId( + const jobId = this.getRepeatJobId({ name, nextMillis, - this.hash(repeatJobKey), - opts.repeat.jobId, - ); + namespace: this.hash(repeatJobKey), + jobId: opts.repeat.jobId, + key: opts.repeat.key, + }); const now = Date.now(); const delay = nextMillis + (opts.repeat.offset ? opts.repeat.offset : 0) - now; @@ -146,12 +147,13 @@ export class Repeat extends QueueBase { jobId?: string, ): Promise { const repeatJobKey = getRepeatKey(name, { ...repeat, jobId }); - const repeatJobId = this.getRepeatJobId( + const repeatJobId = this.getRepeatJobId({ name, - '', - this.hash(repeatJobKey), - jobId || repeat.jobId, - ); + nextMillis: '', + namespace: this.hash(repeatJobKey), + jobId: jobId ?? repeat.jobId, + key: repeat.key, + }); return this.scripts.removeRepeatable(repeatJobId, repeatJobKey); } @@ -159,12 +161,12 @@ export class Repeat extends QueueBase { async removeRepeatableByKey(repeatJobKey: string): Promise { const data = this.keyToData(repeatJobKey); - const repeatJobId = this.getRepeatJobId( - data.name, - '', - this.hash(repeatJobKey), - data.id, - ); + const repeatJobId = this.getRepeatJobId({ + name: data.name, + nextMillis: '', + namespace: this.hash(repeatJobKey), + jobId: data.id, + }); return this.scripts.removeRepeatable(repeatJobId, repeatJobKey); } @@ -212,13 +214,20 @@ export class Repeat extends QueueBase { return createHash(this.repeatKeyHashAlgorithm).update(str).digest('hex'); } - private getRepeatJobId( - name: string, - nextMillis: number | string, - namespace: string, - jobId?: string, - ) { - const checksum = this.hash(`${name}${jobId || ''}${namespace}`); + private getRepeatJobId({ + name, + nextMillis, + namespace, + jobId, + key, + }: { + name?: string; + nextMillis: number | string; + namespace?: string; + jobId?: string; + key?: string; + }) { + const checksum = key ?? this.hash(`${name}${jobId || ''}${namespace}`); return `repeat:${checksum}:${nextMillis}`; // return `repeat:${jobId || ''}:${name}:${namespace}:${nextMillis}`; //return `repeat:${name}:${namespace}:${nextMillis}`; diff --git a/src/interfaces/repeat-options.ts b/src/interfaces/repeat-options.ts index 4e64a05841..4936c8cf16 100644 --- a/src/interfaces/repeat-options.ts +++ b/src/interfaces/repeat-options.ts @@ -10,6 +10,15 @@ export interface RepeatOptions extends Omit { * A repeat pattern */ pattern?: string; + + /** + * Custom repeatable key. This is the key that holds the "metadata" + * of a given repeatable job. This key is normally auto-generated but + * it is sometimes useful to specify a custom key for easier retrieval + * of repeatable jobs. + */ + key?: string; + /** * Number of times the job should repeat at max. */ diff --git a/tests/test_pause.ts b/tests/test_pause.ts index b060769581..f10bc3729f 100644 --- a/tests/test_pause.ts +++ b/tests/test_pause.ts @@ -179,7 +179,7 @@ describe('Pause', function () { await worker.resume(); await processPromise; - worker.close(); + await worker.close(); }); it('should wait until active jobs are finished before resolving pause', async () => { diff --git a/tests/test_repeat.ts b/tests/test_repeat.ts index 2a70b520ca..cec55869c7 100644 --- a/tests/test_repeat.ts +++ b/tests/test_repeat.ts @@ -1252,6 +1252,61 @@ describe('repeat', function () { delayStub.restore(); }); + describe('when custom key is provided', function () { + it('should allow removing a repeatable job by custom key', async function () { + const numJobs = 4; + const date = new Date('2017-02-07 9:24:00'); + let prev: Job; + let counter = 0; + let processor; + const key = 'xxxx'; + + this.clock.setSystemTime(date); + + const nextTick = 2 * ONE_SECOND + 10; + const repeat = { pattern: '*/2 * * * * *', key }; + + await queue.add('test', { foo: 'bar' }, { repeat }); + + this.clock.tick(nextTick); + + const processing = new Promise((resolve, reject) => { + processor = async () => { + counter++; + if (counter == numJobs) { + try { + await queue.removeRepeatable('test', repeat); + this.clock.tick(nextTick); + const delayed = await queue.getDelayed(); + expect(delayed).to.be.empty; + resolve(); + } catch (err) { + reject(err); + } + } else if (counter > numJobs) { + reject(Error(`should not repeat more than ${numJobs} times`)); + } + }; + }); + + const worker = new Worker(queueName, processor, { connection, prefix }); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {}); + await worker.waitUntilReady(); + + worker.on('completed', job => { + this.clock.tick(nextTick); + if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.gte(2000); + } + prev = job; + }); + + await processing; + delayStub.restore(); + }); + }); + // This test is flaky and too complex we need something simpler that tests the same thing it.skip('should not re-add a repeatable job after it has been removed', async function () { const repeat = await queue.repeat;