Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor scripts class #1240

Merged
merged 8 commits into from
May 14, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 31 additions & 42 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ export class Job<

private discarded: boolean;

protected scripts: Scripts;

constructor(
protected queue: MinimalQueue,
/**
Expand Down Expand Up @@ -146,6 +148,7 @@ export class Job<
: undefined;

this.toKey = queue.toKey.bind(queue);
this.scripts = new Scripts(queue);
}

/**
Expand Down Expand Up @@ -297,12 +300,16 @@ export class Job<
const jobData = await client.hgetall(queue.toKey(jobId));
return isEmpty(jobData)
? undefined
: Job.fromJSON<T, R, N>(queue, (<unknown>jobData) as JobJsonRaw, jobId);
: this.fromJSON<T, R, N>(
queue,
(<unknown>jobData) as JobJsonRaw,
jobId,
);
}
}

toJSON() {
const { queue, ...withoutQueue } = this;
const { queue, scripts, ...withoutQueue } = this;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now it should be called: withoutQueueAndScripts. :).
In general, I think this pattern can lead to bugs because it is easy to forget to remove what you do not want and end the serialization of unneeded things. Even if more tedious, it is probably better to manually select the properties you want to serialize.

return withoutQueue;
}

Expand Down Expand Up @@ -348,11 +355,7 @@ export class Job<
update(data: DataType): Promise<void> {
this.data = data;

return Scripts.updateData<DataType, ReturnType, NameType>(
this.queue,
this,
data,
);
return this.scripts.updateData<DataType, ReturnType, NameType>(this, data);
}

/**
Expand All @@ -362,7 +365,7 @@ export class Job<
*/
updateProgress(progress: number | object): Promise<void> {
this.progress = progress;
return Scripts.updateProgress(this.queue, this, progress);
return this.scripts.updateProgress(this, progress);
}

/**
Expand All @@ -387,7 +390,7 @@ export class Job<
const queue = this.queue;
const job = this;

const removed = await Scripts.remove(queue, job.id);
const removed = await this.scripts.remove(job.id);
if (removed) {
queue.emit('removed', job);
} else {
Expand All @@ -402,7 +405,7 @@ export class Job<
* @param duration - lock duration in milliseconds
*/
extendLock(token: string, duration: number): Promise<number> {
return Scripts.extendLock(this.queue, this.id, token, duration);
return this.scripts.extendLock(this.id, token, duration);
}

/**
Expand Down Expand Up @@ -430,8 +433,7 @@ export class Job<
throw errorObject.value;
}

return Scripts.moveToCompleted(
this.queue,
return this.scripts.moveToCompleted(
this,
stringifiedReturnValue,
this.opts.removeOnComplete,
Expand Down Expand Up @@ -486,8 +488,7 @@ export class Job<
if (delay === -1) {
moveToFailed = true;
} else if (delay) {
const args = Scripts.moveToDelayedArgs(
queue,
const args = this.scripts.moveToDelayedArgs(
this.id,
Date.now() + delay,
token,
Expand All @@ -496,7 +497,7 @@ export class Job<
command = 'delayed';
} else {
// Retry immediately
(<any>multi).retryJob(Scripts.retryJobArgs(queue, this));
(<any>multi).retryJob(this.scripts.retryJobArgs(this));
command = 'retry';
}
} else {
Expand All @@ -505,8 +506,7 @@ export class Job<
}

if (moveToFailed) {
const args = Scripts.moveToFailedArgs(
queue,
const args = this.scripts.moveToFailedArgs(
this,
message,
this.opts.removeOnFail,
Expand All @@ -520,7 +520,7 @@ export class Job<
const results = await multi.exec();
const code = results[results.length - 1][1];
if (code < 0) {
throw Scripts.finishedErrors(code, this.id, command, 'active');
throw this.scripts.finishedErrors(code, this.id, command, 'active');
}
}

Expand Down Expand Up @@ -584,7 +584,7 @@ export class Job<
* 'completed', 'failed', 'delayed', 'active', 'waiting', 'waiting-children', 'unknown'.
*/
getState(): Promise<JobState | 'unknown'> {
return Scripts.getState(this.queue, this.id);
return this.scripts.getState(this.id);
}

/**
Expand All @@ -594,7 +594,7 @@ export class Job<
* @returns void
*/
changeDelay(delay: number): Promise<void> {
return Scripts.changeDelay(this.queue, this.id, delay);
return this.scripts.changeDelay(this.id, delay);
}

/**
Expand Down Expand Up @@ -824,11 +824,10 @@ export class Job<
// that has already happened. We block checking the job until the queue events object is actually listening to
// Redis so there's no chance that it will miss events.
await queueEvents.waitUntilReady();
const [status, result] = (await Scripts.isFinished(
this.queue,
jobId,
true,
)) as [number, string];
const [status, result] = (await this.scripts.isFinished(jobId, true)) as [
number,
string,
];
const finished = status != 0;
if (finished) {
if (status == -5 || status == 2) {
Expand All @@ -848,7 +847,7 @@ export class Job<
* @returns
*/
moveToDelayed(timestamp: number, token?: string): Promise<void> {
return Scripts.moveToDelayed(this.queue, this.id, timestamp, token);
return this.scripts.moveToDelayed(this.id, timestamp, token);
}

/**
Expand All @@ -862,19 +861,18 @@ export class Job<
token: string,
opts: MoveToChildrenOpts = {},
): Promise<boolean> {
return Scripts.moveToWaitingChildren(this.queue, this.id, token, opts);
return this.scripts.moveToWaitingChildren(this.id, token, opts);
}

/**
* Promotes a delayed job so that it starts to be processed as soon as possible.
*/
async promote(): Promise<void> {
const queue = this.queue;
const jobId = this.id;

const code = await Scripts.promote(queue, jobId);
const code = await this.scripts.promote(jobId);
if (code < 0) {
throw Scripts.finishedErrors(code, this.id, 'promote', 'delayed');
throw this.scripts.finishedErrors(code, this.id, 'promote', 'delayed');
}
}

Expand All @@ -892,7 +890,7 @@ export class Job<
this.processedOn = null;
this.returnvalue = null;

return Scripts.reprocessJob(this.queue, this, state);
return this.scripts.reprocessJob(this, state);
}

/**
Expand All @@ -910,7 +908,7 @@ export class Job<
}

private async isInList(list: string): Promise<boolean> {
return Scripts.isJobInList(this.queue, this.queue.toKey(list), this.id);
return this.scripts.isJobInList(this.queue.toKey(list), this.id);
}

/**
Expand All @@ -921,8 +919,6 @@ export class Job<
* @returns
*/
addJob(client: RedisClient, parentOpts?: ParentOpts): Promise<string> {
const queue = this.queue;

const jobData = this.asJSON();

const exceedLimit =
Expand All @@ -939,14 +935,7 @@ export class Job<
throw new Error(`Delay and repeat options could not be used together`);
}

return Scripts.addJob(
client,
queue,
jobData,
this.opts,
this.id,
parentOpts,
);
return this.scripts.addJob(client, jobData, this.opts, this.id, parentOpts);
}

private saveStacktrace(multi: Pipeline, err: Error) {
Expand Down
3 changes: 3 additions & 0 deletions src/classes/queue-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ import { QueueBaseOptions, RedisClient } from '../interfaces';
import { delay, DELAY_TIME_5, isNotConnectionError } from '../utils';
import { RedisConnection } from './redis-connection';
import { KeysMap, QueueKeys } from './queue-keys';
import { Scripts } from './scripts';

export class QueueBase extends EventEmitter {
toKey: (type: string) => string;
keys: KeysMap;
closing: Promise<void>;

protected scripts: Scripts;
protected connection: RedisConnection;

constructor(
Expand Down Expand Up @@ -49,6 +51,7 @@ export class QueueBase extends EventEmitter {
const queueKeys = new QueueKeys(opts.prefix);
this.keys = queueKeys.getKeys(name);
this.toKey = (type: string) => queueKeys.toKey(name, type);
this.scripts = new Scripts(this);
}

get client(): Promise<RedisClient> {
Expand Down
5 changes: 2 additions & 3 deletions src/classes/queue-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import {
QUEUE_SCHEDULER_SUFFIX,
} from '../utils';
import { QueueBase } from './queue-base';
import { Scripts } from './scripts';
import { RedisConnection } from './redis-connection';

export interface QueueSchedulerListener {
Expand Down Expand Up @@ -251,7 +250,7 @@ export class QueueScheduler extends QueueBase {
private async updateDelaySet(timestamp: number): Promise<[number, string]> {
if (!this.closing) {
const result = await this.checkConnectionError(() =>
Scripts.updateDelaySet(this, timestamp),
this.scripts.updateDelaySet(timestamp),
);

if (!result) {
Expand All @@ -265,7 +264,7 @@ export class QueueScheduler extends QueueBase {

private async moveStalledJobsToWait() {
if (!this.closing) {
const [failed, stalled] = await Scripts.moveStalledJobsToWait(this);
const [failed, stalled] = await this.scripts.moveStalledJobsToWait();

failed.forEach((jobId: string) =>
this.emit(
Expand Down
26 changes: 15 additions & 11 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { isRedisInstance, jobIdForGroup } from '../utils';
import { BulkJobOptions, Job } from './job';
import { QueueGetters } from './queue-getters';
import { Repeat } from './repeat';
import { Scripts } from './scripts';
import { RedisConnection } from './redis-connection';
import { FinishedStatus } from '../types';

Expand Down Expand Up @@ -201,7 +200,7 @@ export class Queue<
} else {
const jobId = jobIdForGroup(opts, data, { limiter: this.limiter });

const job = await Job.create<DataType, ResultType, NameType>(
const job = await this.Job.create<DataType, ResultType, NameType>(
this,
name,
data,
Expand All @@ -216,6 +215,13 @@ export class Queue<
}
}

/**
* Helper to easily extend Job class calls.
*/
protected get Job(): typeof Job {
return Job;
}

/**
* Adds an array of jobs to the queue.
*
Expand Down Expand Up @@ -251,7 +257,7 @@ export class Queue<
* and in that case it will add it there instead of the wait list.
*/
async pause(): Promise<void> {
await Scripts.pause(this, true);
await this.scripts.pause(true);
this.emit('paused');
}

Expand All @@ -270,7 +276,7 @@ export class Queue<
* queue.
*/
async resume(): Promise<void> {
await Scripts.pause(this, false);
await this.scripts.pause(false);
this.emit('resumed');
}

Expand Down Expand Up @@ -322,7 +328,7 @@ export class Queue<
* any of its dependencies was locked.
*/
remove(jobId: string): Promise<number> {
return Scripts.remove(this, jobId);
return this.scripts.remove(jobId);
}

/**
Expand All @@ -333,7 +339,7 @@ export class Queue<
* delayed jobs.
*/
drain(delayed = false): Promise<void> {
return Scripts.drain(this, delayed);
return this.scripts.drain(delayed);
}

/**
Expand All @@ -357,8 +363,7 @@ export class Queue<
| 'delayed'
| 'failed' = 'completed',
): Promise<string[]> {
const jobs = await Scripts.cleanJobsInSet(
this,
const jobs = await this.scripts.cleanJobsInSet(
type,
Date.now() - grace,
limit,
Expand All @@ -384,7 +389,7 @@ export class Queue<

let cursor = 0;
do {
cursor = await Scripts.obliterate(this, {
cursor = await this.scripts.obliterate({
force: false,
count: 1000,
...opts,
Expand All @@ -404,8 +409,7 @@ export class Queue<
): Promise<void> {
let cursor = 0;
do {
cursor = await Scripts.retryJobs(
this,
cursor = await this.scripts.retryJobs(
opts.state,
opts.count,
opts.timestamp,
Expand Down
Loading