Skip to content

Commit

Permalink
Merge branch 'master' into feat-job-expose-priority
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Oct 9, 2024
2 parents a3677d5 + 592938a commit 1303c19
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 18 deletions.
7 changes: 7 additions & 0 deletions docs/gitbook/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## [5.17.1](https://github.com/taskforcesh/bullmq/compare/v5.17.0...v5.17.1) (2024-10-07)


### Bug Fixes

* **repeat:** also consider startDate when using "every" ([25bbaa8](https://github.com/taskforcesh/bullmq/commit/25bbaa81af87f9944a64bc4fb7e0c76ef223ada4))

# [5.17.0](https://github.com/taskforcesh/bullmq/compare/v5.16.0...v5.17.0) (2024-10-07)


Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bullmq",
"version": "5.17.0",
"version": "5.17.1",
"description": "Queue for messages and jobs based on Redis",
"homepage": "https://bullmq.io/",
"main": "./dist/cjs/index.js",
Expand Down
42 changes: 25 additions & 17 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { parseExpression } from 'cron-parser';
import { RedisClient, RepeatBaseOptions, RepeatOptions } from '../interfaces';
import { JobsOptions, RepeatStrategy } from '../types';
import { Job } from './job';
import { Scripts } from './scripts';
import { QueueBase } from './queue-base';
import { RedisConnection } from './redis-connection';

Expand Down Expand Up @@ -39,6 +38,20 @@ export class JobScheduler extends QueueBase {
opts: Omit<JobsOptions, 'jobId' | 'repeat' | 'delay'>,
{ override }: { override: boolean },
): Promise<Job<T, R, N> | undefined> {
const { every, pattern } = repeatOpts;

if (pattern && every) {
throw new Error(
'Both .pattern and .every options are defined for this repeatable job',
);
}

if (repeatOpts.immediately && repeatOpts.startDate) {
throw new Error(
'Both .immediately and .startDate options are defined for this repeatable job',
);
}

// Check if we reached the limit of the repeatable job's iterations
const iterationCount = repeatOpts.count ? repeatOpts.count + 1 : 1;
if (
Expand All @@ -58,8 +71,14 @@ export class JobScheduler extends QueueBase {
const prevMillis = opts.prevMillis || 0;
now = prevMillis < now ? now : prevMillis;

// Check if we have a start date for the repeatable job
const { startDate } = repeatOpts;
if (startDate) {
const startMillis = new Date(startDate).getTime();
now = startMillis > now ? startMillis : now;
}

const nextMillis = await this.repeatStrategy(now, repeatOpts, jobName);
const { every, pattern } = repeatOpts;

const hasImmediately = Boolean(
(every || pattern) && repeatOpts.immediately,
Expand Down Expand Up @@ -215,24 +234,13 @@ export const getNextMillis = (
millis: number,
opts: RepeatOptions,
): number | undefined => {
const pattern = opts.pattern;
if (pattern && opts.every) {
throw new Error(
'Both .pattern and .every options are defined for this repeatable job',
);
}
const { every, pattern } = opts;

if (opts.every) {
return (
Math.floor(millis / opts.every) * opts.every +
(opts.immediately ? 0 : opts.every)
);
if (every) {
return Math.floor(millis / every) * every + (opts.immediately ? 0 : every);
}

const currentDate =
opts.startDate && new Date(opts.startDate) > new Date(millis)
? new Date(opts.startDate)
: new Date(millis);
const currentDate = new Date(millis);
const interval = parseExpression(pattern, {
...opts,
currentDate,
Expand Down
146 changes: 146 additions & 0 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,77 @@ describe('Job Scheduler', function () {
delayStub.restore();
});

it('should start immediately even after removing the job scheduler and adding it again', async function () {
const date = new Date('2017-02-07 9:24:00');
this.clock.setSystemTime(date);
const nextTick = 2 * ONE_SECOND;

let worker: Worker;
const processing1 = new Promise<void>((resolve, reject) => {
worker = new Worker(
queueName,
async (job: Job) => {
this.clock.tick(nextTick);

try {
expect(job.opts.delay).to.be.eq(0);
resolve();
} catch (error) {
reject(error);
}
},
{ connection, prefix },
);
});

await queue.upsertJobScheduler(
'repeat',
{
every: 2000,
immediately: true,
},
{ data: { foo: 'bar' } },
);

this.clock.tick(1265);

await processing1;

await worker!.close();

await queue.removeJobScheduler('repeat');

const processing2 = new Promise<void>((resolve, reject) => {
worker = new Worker(
queueName,
async (job: Job) => {
this.clock.tick(nextTick);

try {
expect(job.opts.delay).to.be.eq(0);
resolve();
} catch (error) {
reject(error);
}
},
{ connection, prefix },
);
});

await queue.upsertJobScheduler(
'repeat',
{
every: 2000,
immediately: true,
},
{ data: { foo: 'bar' } },
);

await processing2;

await worker!.close();
});

it('should repeat once a day for 5 days and start immediately using endDate', async function () {
this.timeout(8000);

Expand Down Expand Up @@ -1532,6 +1603,69 @@ describe('Job Scheduler', function () {
delayStub.restore();
});

it('should repeat every 2 seconds with a startDate in the future', async function () {
this.timeout(10000);

// Set the initial system time
const initialDate = new Date('2024-01-01 10:00:00');
this.clock.setSystemTime(initialDate);

// Set the next tick (repeat interval) and the startDate in the future
const nextTick = 2 * ONE_SECOND + 500;
const startDate = new Date('2024-01-01 10:00:10'); // 10 seconds in the future

const worker = new Worker(
queueName,
async () => {
this.clock.tick(nextTick);
},
{ autorun: false, connection, prefix },
);
const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {});

// Schedule the job with the 'every' interval and a future startDate
await queue.upsertJobScheduler(
'test',
{
every: 2000, // every 2 seconds
startDate,
},
{ data: { foo: 'bar' } },
);

// Simulate the passage of time up to the startDate
const startDateDelay = startDate.getTime() - initialDate.getTime();
this.clock.tick(startDateDelay + nextTick);

let prev: Job;
let counter = 0;

// Promise to resolve when 5 iterations of the job are completed
const completing = new Promise<void>((resolve, reject) => {
worker.on('completed', async job => {
try {
if (prev) {
expect(prev.timestamp).to.be.lt(job.timestamp);
expect(job.timestamp - prev.timestamp).to.be.gte(2000); // Ensure it's repeating every 2 seconds
}
prev = job;
counter++;
if (counter == 5) {
resolve();
}
} catch (err) {
reject(err);
}
});
});

worker.run();

await completing;
await worker.close();
delayStub.restore();
});

it('should throw an error when using .pattern and .every simultaneously', async function () {
await expect(
queue.upsertJobScheduler('repeat', {
Expand All @@ -1543,6 +1677,18 @@ describe('Job Scheduler', function () {
);
});

it('should throw an error when using .immediately and .startDate simultaneously', async function () {
await expect(
queue.upsertJobScheduler('repeat', {
every: 5000,
immediately: true,
startDate: new Date(),
}),
).to.be.rejectedWith(
'Both .immediately and .startDate options are defined for this repeatable job',
);
});

it('should emit a waiting event when adding a repeatable job to the waiting list', async function () {
const date = new Date('2017-02-07 9:24:00');
this.clock.setSystemTime(date);
Expand Down

0 comments on commit 1303c19

Please sign in to comment.