From 25bbaa81af87f9944a64bc4fb7e0c76ef223ada4 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Mon, 7 Oct 2024 15:41:32 +0200 Subject: [PATCH 1/3] fix(repeat): also consider startDate when using "every" --- src/classes/job-scheduler.ts | 42 ++++++++++++-------- tests/test_job_scheduler.ts | 77 +++++++++++++++++++++++++++++++++++- 2 files changed, 100 insertions(+), 19 deletions(-) diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index 75819c6223..24e7466d02 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -2,7 +2,6 @@ import { parseExpression } from 'cron-parser'; import { RedisClient, RepeatBaseOptions, RepeatOptions } from '../interfaces'; import { JobsOptions, RepeatStrategy } from '../types'; import { Job } from './job'; -import { Scripts } from './scripts'; import { QueueBase } from './queue-base'; import { RedisConnection } from './redis-connection'; @@ -39,6 +38,20 @@ export class JobScheduler extends QueueBase { opts: Omit, { override }: { override: boolean }, ): Promise | undefined> { + const { every, pattern } = repeatOpts; + + if (pattern && every) { + throw new Error( + 'Both .pattern and .every options are defined for this repeatable job', + ); + } + + if (repeatOpts.immediately && repeatOpts.startDate) { + throw new Error( + 'Both .immediately and .startDate options are defined for this repeatable job', + ); + } + // Check if we reached the limit of the repeatable job's iterations const iterationCount = repeatOpts.count ? repeatOpts.count + 1 : 1; if ( @@ -58,8 +71,14 @@ export class JobScheduler extends QueueBase { const prevMillis = opts.prevMillis || 0; now = prevMillis < now ? now : prevMillis; + // Check if we have a start date for the repeatable job + const { startDate } = repeatOpts; + if (startDate) { + const startMillis = new Date(startDate).getTime(); + now = startMillis > now ? startMillis : now; + } + const nextMillis = await this.repeatStrategy(now, repeatOpts, jobName); - const { every, pattern } = repeatOpts; const hasImmediately = Boolean( (every || pattern) && repeatOpts.immediately, @@ -215,24 +234,13 @@ export const getNextMillis = ( millis: number, opts: RepeatOptions, ): number | undefined => { - const pattern = opts.pattern; - if (pattern && opts.every) { - throw new Error( - 'Both .pattern and .every options are defined for this repeatable job', - ); - } + const { every, pattern } = opts; - if (opts.every) { - return ( - Math.floor(millis / opts.every) * opts.every + - (opts.immediately ? 0 : opts.every) - ); + if (every) { + return Math.floor(millis / every) * every + (opts.immediately ? 0 : every); } - const currentDate = - opts.startDate && new Date(opts.startDate) > new Date(millis) - ? new Date(opts.startDate) - : new Date(millis); + const currentDate = new Date(millis); const interval = parseExpression(pattern, { ...opts, currentDate, diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index f22f80f253..b851cfaf31 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -1365,8 +1365,6 @@ describe('Job Scheduler', function () { const job = delayed[0]; try { await job.remove(); - const delayed = await queue.getDelayed(); - console.log({ delayed }); expect.fail( 'Should not be able to remove a delayed job that belongs to a repeatable job', ); @@ -1546,6 +1544,69 @@ describe('Job Scheduler', function () { delayStub.restore(); }); + it('should repeat every 2 seconds with a startDate in the future', async function () { + this.timeout(10000); + + // Set the initial system time + const initialDate = new Date('2024-01-01 10:00:00'); + this.clock.setSystemTime(initialDate); + + // Set the next tick (repeat interval) and the startDate in the future + const nextTick = 2 * ONE_SECOND + 500; + const startDate = new Date('2024-01-01 10:00:10'); // 10 seconds in the future + + const worker = new Worker( + queueName, + async () => { + this.clock.tick(nextTick); + }, + { autorun: false, connection, prefix }, + ); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {}); + + // Schedule the job with the 'every' interval and a future startDate + await queue.upsertJobScheduler( + 'test', + { + every: 2000, // every 2 seconds + startDate, + }, + { data: { foo: 'bar' } }, + ); + + // Simulate the passage of time up to the startDate + const startDateDelay = startDate.getTime() - initialDate.getTime(); + this.clock.tick(startDateDelay + nextTick); + + let prev: Job; + let counter = 0; + + // Promise to resolve when 5 iterations of the job are completed + const completing = new Promise((resolve, reject) => { + worker.on('completed', async job => { + try { + if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.gte(2000); // Ensure it's repeating every 2 seconds + } + prev = job; + counter++; + if (counter == 5) { + resolve(); + } + } catch (err) { + reject(err); + } + }); + }); + + worker.run(); + + await completing; + await worker.close(); + delayStub.restore(); + }); + it('should throw an error when using .pattern and .every simultaneously', async function () { await expect( queue.upsertJobScheduler('repeat', { @@ -1557,6 +1618,18 @@ describe('Job Scheduler', function () { ); }); + it('should throw an error when using .immediately and .startDate simultaneously', async function () { + await expect( + queue.upsertJobScheduler('repeat', { + every: 5000, + immediately: true, + startDate: new Date(), + }), + ).to.be.rejectedWith( + 'Both .immediately and .startDate options are defined for this repeatable job', + ); + }); + it('should emit a waiting event when adding a repeatable job to the waiting list', async function () { const date = new Date('2017-02-07 9:24:00'); this.clock.setSystemTime(date); From ca8207b24beac765b757b7273c24dcfd976d44af Mon Sep 17 00:00:00 2001 From: semantic-release-bot Date: Mon, 7 Oct 2024 16:38:03 +0000 Subject: [PATCH 2/3] chore(release): 5.17.1 [skip ci] ## [5.17.1](https://github.com/taskforcesh/bullmq/compare/v5.17.0...v5.17.1) (2024-10-07) ### Bug Fixes * **repeat:** also consider startDate when using "every" ([25bbaa8](https://github.com/taskforcesh/bullmq/commit/25bbaa81af87f9944a64bc4fb7e0c76ef223ada4)) --- docs/gitbook/changelog.md | 7 +++++++ package.json | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/docs/gitbook/changelog.md b/docs/gitbook/changelog.md index 69eba39982..379c9df073 100644 --- a/docs/gitbook/changelog.md +++ b/docs/gitbook/changelog.md @@ -1,3 +1,10 @@ +## [5.17.1](https://github.com/taskforcesh/bullmq/compare/v5.17.0...v5.17.1) (2024-10-07) + + +### Bug Fixes + +* **repeat:** also consider startDate when using "every" ([25bbaa8](https://github.com/taskforcesh/bullmq/commit/25bbaa81af87f9944a64bc4fb7e0c76ef223ada4)) + # [5.17.0](https://github.com/taskforcesh/bullmq/compare/v5.16.0...v5.17.0) (2024-10-07) diff --git a/package.json b/package.json index cc1876f544..ff5aa0ce98 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bullmq", - "version": "5.17.0", + "version": "5.17.1", "description": "Queue for messages and jobs based on Redis", "homepage": "https://bullmq.io/", "main": "./dist/cjs/index.js", From 592938a7fa1c056ce262ca75760db993ae7106ef Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Mon, 7 Oct 2024 19:17:20 +0200 Subject: [PATCH 3/3] test: starts immediately after remove job scheduler --- tests/test_job_scheduler.ts | 71 +++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index b851cfaf31..2973f11512 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -751,6 +751,77 @@ describe('Job Scheduler', function () { delayStub.restore(); }); + it('should start immediately even after removing the job scheduler and adding it again', async function () { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + const nextTick = 2 * ONE_SECOND; + + let worker: Worker; + const processing1 = new Promise((resolve, reject) => { + worker = new Worker( + queueName, + async (job: Job) => { + this.clock.tick(nextTick); + + try { + expect(job.opts.delay).to.be.eq(0); + resolve(); + } catch (error) { + reject(error); + } + }, + { connection, prefix }, + ); + }); + + await queue.upsertJobScheduler( + 'repeat', + { + every: 2000, + immediately: true, + }, + { data: { foo: 'bar' } }, + ); + + this.clock.tick(1265); + + await processing1; + + await worker!.close(); + + await queue.removeJobScheduler('repeat'); + + const processing2 = new Promise((resolve, reject) => { + worker = new Worker( + queueName, + async (job: Job) => { + this.clock.tick(nextTick); + + try { + expect(job.opts.delay).to.be.eq(0); + resolve(); + } catch (error) { + reject(error); + } + }, + { connection, prefix }, + ); + }); + + await queue.upsertJobScheduler( + 'repeat', + { + every: 2000, + immediately: true, + }, + { data: { foo: 'bar' } }, + ); + + await processing2; + + await worker!.close(); + }); + it('should repeat once a day for 5 days and start immediately using endDate', async function () { this.timeout(8000);