Skip to content

Commit

Permalink
fix(repeat): also consider startDate when using "every"
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Oct 7, 2024
1 parent 80ca463 commit 25bbaa8
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 19 deletions.
42 changes: 25 additions & 17 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { parseExpression } from 'cron-parser';
import { RedisClient, RepeatBaseOptions, RepeatOptions } from '../interfaces';
import { JobsOptions, RepeatStrategy } from '../types';
import { Job } from './job';
import { Scripts } from './scripts';
import { QueueBase } from './queue-base';
import { RedisConnection } from './redis-connection';

Expand Down Expand Up @@ -39,6 +38,20 @@ export class JobScheduler extends QueueBase {
opts: Omit<JobsOptions, 'jobId' | 'repeat' | 'delay'>,
{ override }: { override: boolean },
): Promise<Job<T, R, N> | undefined> {
const { every, pattern } = repeatOpts;

if (pattern && every) {
throw new Error(
'Both .pattern and .every options are defined for this repeatable job',
);
}

if (repeatOpts.immediately && repeatOpts.startDate) {
throw new Error(
'Both .immediately and .startDate options are defined for this repeatable job',
);
}

// Check if we reached the limit of the repeatable job's iterations
const iterationCount = repeatOpts.count ? repeatOpts.count + 1 : 1;
if (
Expand All @@ -58,8 +71,14 @@ export class JobScheduler extends QueueBase {
const prevMillis = opts.prevMillis || 0;
now = prevMillis < now ? now : prevMillis;

// Check if we have a start date for the repeatable job
const { startDate } = repeatOpts;
if (startDate) {
const startMillis = new Date(startDate).getTime();
now = startMillis > now ? startMillis : now;
}

const nextMillis = await this.repeatStrategy(now, repeatOpts, jobName);
const { every, pattern } = repeatOpts;

const hasImmediately = Boolean(
(every || pattern) && repeatOpts.immediately,
Expand Down Expand Up @@ -215,24 +234,13 @@ export const getNextMillis = (
millis: number,
opts: RepeatOptions,
): number | undefined => {
const pattern = opts.pattern;
if (pattern && opts.every) {
throw new Error(
'Both .pattern and .every options are defined for this repeatable job',
);
}
const { every, pattern } = opts;

if (opts.every) {
return (
Math.floor(millis / opts.every) * opts.every +
(opts.immediately ? 0 : opts.every)
);
if (every) {
return Math.floor(millis / every) * every + (opts.immediately ? 0 : every);
}

const currentDate =
opts.startDate && new Date(opts.startDate) > new Date(millis)
? new Date(opts.startDate)
: new Date(millis);
const currentDate = new Date(millis);
const interval = parseExpression(pattern, {
...opts,
currentDate,
Expand Down
77 changes: 75 additions & 2 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1365,8 +1365,6 @@ describe('Job Scheduler', function () {
const job = delayed[0];
try {
await job.remove();
const delayed = await queue.getDelayed();
console.log({ delayed });
expect.fail(
'Should not be able to remove a delayed job that belongs to a repeatable job',
);
Expand Down Expand Up @@ -1546,6 +1544,69 @@ describe('Job Scheduler', function () {
delayStub.restore();
});

it('should repeat every 2 seconds with a startDate in the future', async function () {
this.timeout(10000);

// Set the initial system time
const initialDate = new Date('2024-01-01 10:00:00');
this.clock.setSystemTime(initialDate);

// Set the next tick (repeat interval) and the startDate in the future
const nextTick = 2 * ONE_SECOND + 500;
const startDate = new Date('2024-01-01 10:00:10'); // 10 seconds in the future

const worker = new Worker(
queueName,
async () => {
this.clock.tick(nextTick);
},
{ autorun: false, connection, prefix },
);
const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {});

// Schedule the job with the 'every' interval and a future startDate
await queue.upsertJobScheduler(
'test',
{
every: 2000, // every 2 seconds
startDate,
},
{ data: { foo: 'bar' } },
);

// Simulate the passage of time up to the startDate
const startDateDelay = startDate.getTime() - initialDate.getTime();
this.clock.tick(startDateDelay + nextTick);

let prev: Job;
let counter = 0;

// Promise to resolve when 5 iterations of the job are completed
const completing = new Promise<void>((resolve, reject) => {
worker.on('completed', async job => {
try {
if (prev) {
expect(prev.timestamp).to.be.lt(job.timestamp);
expect(job.timestamp - prev.timestamp).to.be.gte(2000); // Ensure it's repeating every 2 seconds
}
prev = job;
counter++;
if (counter == 5) {
resolve();
}
} catch (err) {
reject(err);
}
});
});

worker.run();

await completing;
await worker.close();
delayStub.restore();
});

it('should throw an error when using .pattern and .every simultaneously', async function () {
await expect(
queue.upsertJobScheduler('repeat', {
Expand All @@ -1557,6 +1618,18 @@ describe('Job Scheduler', function () {
);
});

it('should throw an error when using .immediately and .startDate simultaneously', async function () {
await expect(
queue.upsertJobScheduler('repeat', {
every: 5000,
immediately: true,
startDate: new Date(),
}),
).to.be.rejectedWith(
'Both .immediately and .startDate options are defined for this repeatable job',
);
});

it('should emit a waiting event when adding a repeatable job to the waiting list', async function () {
const date = new Date('2017-02-07 9:24:00');
this.clock.setSystemTime(date);
Expand Down

0 comments on commit 25bbaa8

Please sign in to comment.