Skip to content

Commit

Permalink
feat(job): expose priority value (#2804)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Oct 9, 2024
1 parent 592938a commit 9abec3d
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 113 deletions.
21 changes: 17 additions & 4 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import {
lengthInUtf8Bytes,
parseObjectValues,
tryCatch,
finishedErrors,
} from '../utils';
import { Backoffs } from './backoffs';
import { Scripts, raw2NextJobData } from './scripts';
Expand Down Expand Up @@ -95,7 +94,15 @@ export class Job<
* An amount of milliseconds to wait until this job can be processed.
* @defaultValue 0
*/
delay: number;
delay = 0;

/**
* Ranges from 0 (highest priority) to 2 097 152 (lowest priority). Note that
* using priorities has a slight impact on performance,
* so do not use it if not required.
* @defaultValue 0
*/
priority = 0;

/**
* Timestamp when the job was created (unless overridden with job options).
Expand Down Expand Up @@ -201,6 +208,8 @@ export class Job<

this.delay = this.opts.delay;

this.priority = this.opts.priority || 0;

this.repeatJobKey = repeatJobKey;

this.timestamp = opts.timestamp ? opts.timestamp : Date.now();
Expand All @@ -214,7 +223,9 @@ export class Job<
: undefined;

this.debounceId = opts.debounce ? opts.debounce.id : undefined;
this.deduplicationId = opts.deduplication ? opts.deduplication.id : this.debounceId;
this.deduplicationId = opts.deduplication
? opts.deduplication.id
: this.debounceId;

this.toKey = queue.toKey.bind(queue);
this.setScripts();
Expand Down Expand Up @@ -737,7 +748,7 @@ export class Job<

const result = results[results.length - 1][1] as number;
if (result < 0) {
throw finishedErrors({
throw this.scripts.finishedErrors({
code: result,
jobId: this.id,
command,
Expand Down Expand Up @@ -840,13 +851,15 @@ export class Job<
/**
* Change job priority.
*
* @param opts - options containing priority and lifo values.
* @returns void
*/
async changePriority(opts: {
priority?: number;
lifo?: boolean;
}): Promise<void> {
await this.scripts.changePriority(this.id, opts.priority, opts.lifo);
this.priority = opts.priority || 0;
}

/**
Expand Down
76 changes: 57 additions & 19 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ import {
RedisJobOptions,
} from '../types';
import { ErrorCode } from '../enums';
import {
array2obj,
finishedErrors,
getParentKey,
isRedisVersionLowerThan,
} from '../utils';
import { array2obj, getParentKey, isRedisVersionLowerThan } from '../utils';
import { ChainableCommander } from 'ioredis';

export type JobData = [JobJsonRaw | number, string?];
Expand Down Expand Up @@ -225,7 +220,7 @@ export class Scripts {
}

if (<number>result < 0) {
throw finishedErrors({
throw this.finishedErrors({
code: <number>result,
parentKey: parentOpts.parentKey,
command: 'addJob',
Expand Down Expand Up @@ -414,7 +409,7 @@ export class Scripts {
);

if (result == ErrorCode.JobBelongsToJobScheduler) {
throw finishedErrors({
throw this.finishedErrors({
code: ErrorCode.JobBelongsToJobScheduler,
jobId,
command: 'remove',
Expand Down Expand Up @@ -453,7 +448,7 @@ export class Scripts {
const result = await (<any>client).updateData(keys.concat([dataJson]));

if (result < 0) {
throw finishedErrors({
throw this.finishedErrors({
code: result,
jobId: job.id,
command: 'updateData',
Expand All @@ -479,7 +474,7 @@ export class Scripts {
);

if (result < 0) {
throw finishedErrors({
throw this.finishedErrors({
code: result,
jobId,
command: 'updateProgress',
Expand All @@ -504,7 +499,7 @@ export class Scripts {
);

if (result < 0) {
throw finishedErrors({
throw this.finishedErrors({
code: result,
jobId,
command: 'addLog',
Expand Down Expand Up @@ -588,7 +583,7 @@ export class Scripts {

const result = await (<any>client).moveToFinished(args);
if (result < 0) {
throw finishedErrors({
throw this.finishedErrors({
code: result,
jobId,
command: 'moveToFinished',
Expand All @@ -601,6 +596,49 @@ export class Scripts {
}
}

finishedErrors = ({
code,
jobId,
parentKey,
command,
state,
}: {
code: number;
jobId?: string;
parentKey?: string;
command: string;
state?: string;
}): Error => {
switch (code) {
case ErrorCode.JobNotExist:
return new Error(`Missing key for job ${jobId}. ${command}`);
case ErrorCode.JobLockNotExist:
return new Error(`Missing lock for job ${jobId}. ${command}`);
case ErrorCode.JobNotInState:
return new Error(
`Job ${jobId} is not in the ${state} state. ${command}`,
);
case ErrorCode.JobPendingDependencies:
return new Error(`Job ${jobId} has pending dependencies. ${command}`);
case ErrorCode.ParentJobNotExist:
return new Error(`Missing key for parent job ${parentKey}. ${command}`);
case ErrorCode.JobLockMismatch:
return new Error(
`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`,
);
case ErrorCode.ParentJobCannotBeReplaced:
return new Error(
`The parent job ${parentKey} cannot be replaced. ${command}`,
);
case ErrorCode.JobBelongsToJobScheduler:
return new Error(
`Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`,
);
default:
return new Error(`Unknown code ${code} error for ${jobId}. ${command}`);
}
};

private drainArgs(delayed: boolean): (string | number)[] {
const queueKeys = this.queue.keys;

Expand Down Expand Up @@ -652,7 +690,7 @@ export class Scripts {
case 1:
return false;
default:
throw finishedErrors({
throw this.finishedErrors({
code: result,
jobId,
parentKey,
Expand Down Expand Up @@ -816,7 +854,7 @@ export class Scripts {
const args = this.changeDelayArgs(jobId, delay);
const result = await (<any>client).changeDelay(args);
if (result < 0) {
throw finishedErrors({
throw this.finishedErrors({
code: result,
jobId,
command: 'changeDelay',
Expand Down Expand Up @@ -853,7 +891,7 @@ export class Scripts {
const args = this.changePriorityArgs(jobId, priority, lifo);
const result = await (<any>client).changePriority(args);
if (result < 0) {
throw finishedErrors({
throw this.finishedErrors({
code: result,
jobId,
command: 'changePriority',
Expand Down Expand Up @@ -971,7 +1009,7 @@ export class Scripts {
const args = this.moveToDelayedArgs(jobId, timestamp, token, delay, opts);
const result = await (<any>client).moveToDelayed(args);
if (result < 0) {
throw finishedErrors({
throw this.finishedErrors({
code: result,
jobId,
command: 'moveToDelayed',
Expand Down Expand Up @@ -1007,7 +1045,7 @@ export class Scripts {
case 1:
return false;
default:
throw finishedErrors({
throw this.finishedErrors({
code: result,
jobId,
command: 'moveToWaitingChildren',
Expand Down Expand Up @@ -1166,7 +1204,7 @@ export class Scripts {
case 1:
return;
default:
throw finishedErrors({
throw this.finishedErrors({
code: result,
jobId: job.id,
command: 'reprocessJob',
Expand Down Expand Up @@ -1230,7 +1268,7 @@ export class Scripts {

const code = await (<any>client).promote(keys.concat(args));
if (code < 0) {
throw finishedErrors({
throw this.finishedErrors({
code,
jobId,
command: 'promote',
Expand Down
1 change: 0 additions & 1 deletion src/commands/includes/deduplicateJob.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,3 @@ local function deduplicateJob(prefixKey, deduplicationOpts, jobId, deduplication
end
end
end

3 changes: 2 additions & 1 deletion src/interfaces/base-job-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ export interface DefaultJobOptions {
timestamp?: number;

/**
* Ranges from 1 (highest priority) to 2 097 152 (lowest priority). Note that
* Ranges from 0 (highest priority) to 2 097 152 (lowest priority). Note that
* using priorities has a slight impact on performance,
* so do not use it if not required.
* @defaultValue 0
*/
priority?: number;

Expand Down
42 changes: 0 additions & 42 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import { CONNECTION_CLOSED_ERROR_MSG } from 'ioredis/built/utils';
import { ChildMessage, RedisClient } from './interfaces';
import { EventEmitter } from 'events';
import * as semver from 'semver';
import { ErrorCode } from './enums';

export const errorObject: { [index: string]: any } = { value: null };

Expand Down Expand Up @@ -248,44 +247,3 @@ export const toString = (value: any): string => {
};

export const QUEUE_EVENT_SUFFIX = ':qe';

export const finishedErrors = ({
code,
jobId,
parentKey,
command,
state,
}: {
code: number;
jobId?: string;
parentKey?: string;
command: string;
state?: string;
}): Error => {
switch (code) {
case ErrorCode.JobNotExist:
return new Error(`Missing key for job ${jobId}. ${command}`);
case ErrorCode.JobLockNotExist:
return new Error(`Missing lock for job ${jobId}. ${command}`);
case ErrorCode.JobNotInState:
return new Error(`Job ${jobId} is not in the ${state} state. ${command}`);
case ErrorCode.JobPendingDependencies:
return new Error(`Job ${jobId} has pending dependencies. ${command}`);
case ErrorCode.ParentJobNotExist:
return new Error(`Missing key for parent job ${parentKey}. ${command}`);
case ErrorCode.JobLockMismatch:
return new Error(
`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`,
);
case ErrorCode.ParentJobCannotBeReplaced:
return new Error(
`The parent job ${parentKey} cannot be replaced. ${command}`,
);
case ErrorCode.JobBelongsToJobScheduler:
return new Error(
`Job ${jobId} belongs to a job scheduler and cannot be removed directly`,
);
default:
return new Error(`Unknown code ${code} error for ${jobId}. ${command}`);
}
};
Loading

0 comments on commit 9abec3d

Please sign in to comment.