Skip to content

Commit

Permalink
Merge branch 'master' into fix-every-immediately
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Oct 7, 2024
2 parents 3e3c4ae + 592938a commit 9b4bf02
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 4 deletions.
7 changes: 7 additions & 0 deletions docs/gitbook/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## [5.17.1](https://github.com/taskforcesh/bullmq/compare/v5.17.0...v5.17.1) (2024-10-07)


### Bug Fixes

* **repeat:** also consider startDate when using "every" ([25bbaa8](https://github.com/taskforcesh/bullmq/commit/25bbaa81af87f9944a64bc4fb7e0c76ef223ada4))

# [5.17.0](https://github.com/taskforcesh/bullmq/compare/v5.16.0...v5.17.0) (2024-10-07)


Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bullmq",
"version": "5.17.0",
"version": "5.17.1",
"description": "Queue for messages and jobs based on Redis",
"homepage": "https://bullmq.io/",
"main": "./dist/cjs/index.js",
Expand Down
22 changes: 21 additions & 1 deletion src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ export class JobScheduler extends QueueBase {
opts: Omit<JobsOptions, 'jobId' | 'repeat' | 'delay'>,
{ override }: { override: boolean },
): Promise<Job<T, R, N> | undefined> {
const { every, pattern } = repeatOpts;

if (pattern && every) {
throw new Error(
'Both .pattern and .every options are defined for this repeatable job',
);
}

if (repeatOpts.immediately && repeatOpts.startDate) {
throw new Error(
'Both .immediately and .startDate options are defined for this repeatable job',
);
}

// Check if we reached the limit of the repeatable job's iterations
const iterationCount = repeatOpts.count ? repeatOpts.count + 1 : 1;
if (
Expand All @@ -57,8 +71,14 @@ export class JobScheduler extends QueueBase {
const prevMillis = opts.prevMillis || 0;
now = prevMillis < now ? now : prevMillis;

// Check if we have a start date for the repeatable job
const { startDate } = repeatOpts;
if (startDate) {
const startMillis = new Date(startDate).getTime();
now = startMillis > now ? startMillis : now;
}

const nextMillis = await this.repeatStrategy(now, repeatOpts, jobName);
const { every, pattern } = repeatOpts;

const hasImmediately = Boolean(
(every || pattern) && repeatOpts.immediately,
Expand Down
148 changes: 146 additions & 2 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,77 @@ describe('Job Scheduler', function () {
delayStub.restore();
});

it('should start immediately even after removing the job scheduler and adding it again', async function () {
const date = new Date('2017-02-07 9:24:00');
this.clock.setSystemTime(date);
const nextTick = 2 * ONE_SECOND;

let worker: Worker;
const processing1 = new Promise<void>((resolve, reject) => {
worker = new Worker(
queueName,
async (job: Job) => {
this.clock.tick(nextTick);

try {
expect(job.opts.delay).to.be.eq(0);
resolve();
} catch (error) {
reject(error);
}
},
{ connection, prefix },
);
});

await queue.upsertJobScheduler(
'repeat',
{
every: 2000,
immediately: true,
},
{ data: { foo: 'bar' } },
);

this.clock.tick(1265);

await processing1;

await worker!.close();

await queue.removeJobScheduler('repeat');

const processing2 = new Promise<void>((resolve, reject) => {
worker = new Worker(
queueName,
async (job: Job) => {
this.clock.tick(nextTick);

try {
expect(job.opts.delay).to.be.eq(0);
resolve();
} catch (error) {
reject(error);
}
},
{ connection, prefix },
);
});

await queue.upsertJobScheduler(
'repeat',
{
every: 2000,
immediately: true,
},
{ data: { foo: 'bar' } },
);

await processing2;

await worker!.close();
});

it('should repeat once a day for 5 days and start immediately using endDate', async function () {
this.timeout(8000);

Expand Down Expand Up @@ -1359,8 +1430,6 @@ describe('Job Scheduler', function () {
const job = delayed[0];
try {
await job.remove();
const delayed = await queue.getDelayed();
console.log({ delayed });
expect.fail(
'Should not be able to remove a delayed job that belongs to a repeatable job',
);
Expand Down Expand Up @@ -1540,6 +1609,69 @@ describe('Job Scheduler', function () {
delayStub.restore();
});

it('should repeat every 2 seconds with a startDate in the future', async function () {
this.timeout(10000);

// Set the initial system time
const initialDate = new Date('2024-01-01 10:00:00');
this.clock.setSystemTime(initialDate);

// Set the next tick (repeat interval) and the startDate in the future
const nextTick = 2 * ONE_SECOND + 500;
const startDate = new Date('2024-01-01 10:00:10'); // 10 seconds in the future

const worker = new Worker(
queueName,
async () => {
this.clock.tick(nextTick);
},
{ autorun: false, connection, prefix },
);
const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {});

// Schedule the job with the 'every' interval and a future startDate
await queue.upsertJobScheduler(
'test',
{
every: 2000, // every 2 seconds
startDate,
},
{ data: { foo: 'bar' } },
);

// Simulate the passage of time up to the startDate
const startDateDelay = startDate.getTime() - initialDate.getTime();
this.clock.tick(startDateDelay + nextTick);

let prev: Job;
let counter = 0;

// Promise to resolve when 5 iterations of the job are completed
const completing = new Promise<void>((resolve, reject) => {
worker.on('completed', async job => {
try {
if (prev) {
expect(prev.timestamp).to.be.lt(job.timestamp);
expect(job.timestamp - prev.timestamp).to.be.gte(2000); // Ensure it's repeating every 2 seconds
}
prev = job;
counter++;
if (counter == 5) {
resolve();
}
} catch (err) {
reject(err);
}
});
});

worker.run();

await completing;
await worker.close();
delayStub.restore();
});

it('should throw an error when using .pattern and .every simultaneously', async function () {
await expect(
queue.upsertJobScheduler('repeat', {
Expand All @@ -1551,6 +1683,18 @@ describe('Job Scheduler', function () {
);
});

it('should throw an error when using .immediately and .startDate simultaneously', async function () {
await expect(
queue.upsertJobScheduler('repeat', {
every: 5000,
immediately: true,
startDate: new Date(),
}),
).to.be.rejectedWith(
'Both .immediately and .startDate options are defined for this repeatable job',
);
});

it('should emit a waiting event when adding a repeatable job to the waiting list', async function () {
const date = new Date('2017-02-07 9:24:00');
this.clock.setSystemTime(date);
Expand Down

0 comments on commit 9b4bf02

Please sign in to comment.