Skip to content

Commit

Permalink
refactor(scripts): create class instance to handle scripts call (#1240)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored May 14, 2022
1 parent a0b179c commit 87a16f0
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 248 deletions.
75 changes: 32 additions & 43 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ export class Job<

private discarded: boolean;

protected scripts: Scripts;

constructor(
protected queue: MinimalQueue,
/**
Expand Down Expand Up @@ -146,6 +148,7 @@ export class Job<
: undefined;

this.toKey = queue.toKey.bind(queue);
this.scripts = new Scripts(queue);
}

/**
Expand Down Expand Up @@ -297,13 +300,17 @@ export class Job<
const jobData = await client.hgetall(queue.toKey(jobId));
return isEmpty(jobData)
? undefined
: Job.fromJSON<T, R, N>(queue, (<unknown>jobData) as JobJsonRaw, jobId);
: this.fromJSON<T, R, N>(
queue,
(<unknown>jobData) as JobJsonRaw,
jobId,
);
}
}

toJSON() {
const { queue, ...withoutQueue } = this;
return withoutQueue;
const { queue, scripts, ...withoutQueueAndScripts } = this;
return withoutQueueAndScripts;
}

/**
Expand Down Expand Up @@ -348,11 +355,7 @@ export class Job<
update(data: DataType): Promise<void> {
this.data = data;

return Scripts.updateData<DataType, ReturnType, NameType>(
this.queue,
this,
data,
);
return this.scripts.updateData<DataType, ReturnType, NameType>(this, data);
}

/**
Expand All @@ -362,7 +365,7 @@ export class Job<
*/
updateProgress(progress: number | object): Promise<void> {
this.progress = progress;
return Scripts.updateProgress(this.queue, this, progress);
return this.scripts.updateProgress(this, progress);
}

/**
Expand All @@ -387,7 +390,7 @@ export class Job<
const queue = this.queue;
const job = this;

const removed = await Scripts.remove(queue, job.id);
const removed = await this.scripts.remove(job.id);
if (removed) {
queue.emit('removed', job);
} else {
Expand All @@ -402,7 +405,7 @@ export class Job<
* @param duration - lock duration in milliseconds
*/
extendLock(token: string, duration: number): Promise<number> {
return Scripts.extendLock(this.queue, this.id, token, duration);
return this.scripts.extendLock(this.id, token, duration);
}

/**
Expand Down Expand Up @@ -430,8 +433,7 @@ export class Job<
throw errorObject.value;
}

return Scripts.moveToCompleted(
this.queue,
return this.scripts.moveToCompleted(
this,
stringifiedReturnValue,
this.opts.removeOnComplete,
Expand Down Expand Up @@ -486,8 +488,7 @@ export class Job<
if (delay === -1) {
moveToFailed = true;
} else if (delay) {
const args = Scripts.moveToDelayedArgs(
queue,
const args = this.scripts.moveToDelayedArgs(
this.id,
Date.now() + delay,
token,
Expand All @@ -496,7 +497,7 @@ export class Job<
command = 'delayed';
} else {
// Retry immediately
(<any>multi).retryJob(Scripts.retryJobArgs(queue, this));
(<any>multi).retryJob(this.scripts.retryJobArgs(this));
command = 'retry';
}
} else {
Expand All @@ -505,8 +506,7 @@ export class Job<
}

if (moveToFailed) {
const args = Scripts.moveToFailedArgs(
queue,
const args = this.scripts.moveToFailedArgs(
this,
message,
this.opts.removeOnFail,
Expand All @@ -520,7 +520,7 @@ export class Job<
const results = await multi.exec();
const code = results[results.length - 1][1];
if (code < 0) {
throw Scripts.finishedErrors(code, this.id, command, 'active');
throw this.scripts.finishedErrors(code, this.id, command, 'active');
}
}

Expand Down Expand Up @@ -584,7 +584,7 @@ export class Job<
* 'completed', 'failed', 'delayed', 'active', 'waiting', 'waiting-children', 'unknown'.
*/
getState(): Promise<JobState | 'unknown'> {
return Scripts.getState(this.queue, this.id);
return this.scripts.getState(this.id);
}

/**
Expand All @@ -594,7 +594,7 @@ export class Job<
* @returns void
*/
changeDelay(delay: number): Promise<void> {
return Scripts.changeDelay(this.queue, this.id, delay);
return this.scripts.changeDelay(this.id, delay);
}

/**
Expand Down Expand Up @@ -824,11 +824,10 @@ export class Job<
// that has already happened. We block checking the job until the queue events object is actually listening to
// Redis so there's no chance that it will miss events.
await queueEvents.waitUntilReady();
const [status, result] = (await Scripts.isFinished(
this.queue,
jobId,
true,
)) as [number, string];
const [status, result] = (await this.scripts.isFinished(jobId, true)) as [
number,
string,
];
const finished = status != 0;
if (finished) {
if (status == -5 || status == 2) {
Expand All @@ -848,7 +847,7 @@ export class Job<
* @returns
*/
moveToDelayed(timestamp: number, token?: string): Promise<void> {
return Scripts.moveToDelayed(this.queue, this.id, timestamp, token);
return this.scripts.moveToDelayed(this.id, timestamp, token);
}

/**
Expand All @@ -862,19 +861,18 @@ export class Job<
token: string,
opts: MoveToChildrenOpts = {},
): Promise<boolean> {
return Scripts.moveToWaitingChildren(this.queue, this.id, token, opts);
return this.scripts.moveToWaitingChildren(this.id, token, opts);
}

/**
* Promotes a delayed job so that it starts to be processed as soon as possible.
*/
async promote(): Promise<void> {
const queue = this.queue;
const jobId = this.id;

const code = await Scripts.promote(queue, jobId);
const code = await this.scripts.promote(jobId);
if (code < 0) {
throw Scripts.finishedErrors(code, this.id, 'promote', 'delayed');
throw this.scripts.finishedErrors(code, this.id, 'promote', 'delayed');
}
}

Expand All @@ -892,7 +890,7 @@ export class Job<
this.processedOn = null;
this.returnvalue = null;

return Scripts.reprocessJob(this.queue, this, state);
return this.scripts.reprocessJob(this, state);
}

/**
Expand All @@ -910,7 +908,7 @@ export class Job<
}

private async isInList(list: string): Promise<boolean> {
return Scripts.isJobInList(this.queue, this.queue.toKey(list), this.id);
return this.scripts.isJobInList(this.queue.toKey(list), this.id);
}

/**
Expand All @@ -921,8 +919,6 @@ export class Job<
* @returns
*/
addJob(client: RedisClient, parentOpts?: ParentOpts): Promise<string> {
const queue = this.queue;

const jobData = this.asJSON();

const exceedLimit =
Expand All @@ -939,14 +935,7 @@ export class Job<
throw new Error(`Delay and repeat options could not be used together`);
}

return Scripts.addJob(
client,
queue,
jobData,
this.opts,
this.id,
parentOpts,
);
return this.scripts.addJob(client, jobData, this.opts, this.id, parentOpts);
}

private saveStacktrace(multi: Pipeline, err: Error) {
Expand Down
3 changes: 3 additions & 0 deletions src/classes/queue-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ import { QueueBaseOptions, RedisClient } from '../interfaces';
import { delay, DELAY_TIME_5, isNotConnectionError } from '../utils';
import { RedisConnection } from './redis-connection';
import { KeysMap, QueueKeys } from './queue-keys';
import { Scripts } from './scripts';

export class QueueBase extends EventEmitter {
toKey: (type: string) => string;
keys: KeysMap;
closing: Promise<void>;

protected scripts: Scripts;
protected connection: RedisConnection;

constructor(
Expand Down Expand Up @@ -49,6 +51,7 @@ export class QueueBase extends EventEmitter {
const queueKeys = new QueueKeys(opts.prefix);
this.keys = queueKeys.getKeys(name);
this.toKey = (type: string) => queueKeys.toKey(name, type);
this.scripts = new Scripts(this);
}

get client(): Promise<RedisClient> {
Expand Down
5 changes: 2 additions & 3 deletions src/classes/queue-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import {
QUEUE_SCHEDULER_SUFFIX,
} from '../utils';
import { QueueBase } from './queue-base';
import { Scripts } from './scripts';
import { RedisConnection } from './redis-connection';

export interface QueueSchedulerListener {
Expand Down Expand Up @@ -251,7 +250,7 @@ export class QueueScheduler extends QueueBase {
private async updateDelaySet(timestamp: number): Promise<[number, string]> {
if (!this.closing) {
const result = await this.checkConnectionError(() =>
Scripts.updateDelaySet(this, timestamp),
this.scripts.updateDelaySet(timestamp),
);

if (!result) {
Expand All @@ -265,7 +264,7 @@ export class QueueScheduler extends QueueBase {

private async moveStalledJobsToWait() {
if (!this.closing) {
const [failed, stalled] = await Scripts.moveStalledJobsToWait(this);
const [failed, stalled] = await this.scripts.moveStalledJobsToWait();

failed.forEach((jobId: string) =>
this.emit(
Expand Down
26 changes: 15 additions & 11 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { isRedisInstance, jobIdForGroup } from '../utils';
import { BulkJobOptions, Job } from './job';
import { QueueGetters } from './queue-getters';
import { Repeat } from './repeat';
import { Scripts } from './scripts';
import { RedisConnection } from './redis-connection';
import { FinishedStatus } from '../types';

Expand Down Expand Up @@ -201,7 +200,7 @@ export class Queue<
} else {
const jobId = jobIdForGroup(opts, data, { limiter: this.limiter });

const job = await Job.create<DataType, ResultType, NameType>(
const job = await this.Job.create<DataType, ResultType, NameType>(
this,
name,
data,
Expand All @@ -216,6 +215,13 @@ export class Queue<
}
}

/**
* Helper to easily extend Job class calls.
*/
protected get Job(): typeof Job {
return Job;
}

/**
* Adds an array of jobs to the queue.
*
Expand Down Expand Up @@ -251,7 +257,7 @@ export class Queue<
* and in that case it will add it there instead of the wait list.
*/
async pause(): Promise<void> {
await Scripts.pause(this, true);
await this.scripts.pause(true);
this.emit('paused');
}

Expand All @@ -270,7 +276,7 @@ export class Queue<
* queue.
*/
async resume(): Promise<void> {
await Scripts.pause(this, false);
await this.scripts.pause(false);
this.emit('resumed');
}

Expand Down Expand Up @@ -322,7 +328,7 @@ export class Queue<
* any of its dependencies was locked.
*/
remove(jobId: string): Promise<number> {
return Scripts.remove(this, jobId);
return this.scripts.remove(jobId);
}

/**
Expand All @@ -333,7 +339,7 @@ export class Queue<
* delayed jobs.
*/
drain(delayed = false): Promise<void> {
return Scripts.drain(this, delayed);
return this.scripts.drain(delayed);
}

/**
Expand All @@ -357,8 +363,7 @@ export class Queue<
| 'delayed'
| 'failed' = 'completed',
): Promise<string[]> {
const jobs = await Scripts.cleanJobsInSet(
this,
const jobs = await this.scripts.cleanJobsInSet(
type,
Date.now() - grace,
limit,
Expand All @@ -384,7 +389,7 @@ export class Queue<

let cursor = 0;
do {
cursor = await Scripts.obliterate(this, {
cursor = await this.scripts.obliterate({
force: false,
count: 1000,
...opts,
Expand All @@ -404,8 +409,7 @@ export class Queue<
): Promise<void> {
let cursor = 0;
do {
cursor = await Scripts.retryJobs(
this,
cursor = await this.scripts.retryJobs(
opts.state,
opts.count,
opts.timestamp,
Expand Down
Loading

0 comments on commit 87a16f0

Please sign in to comment.