Skip to content

Commit

Permalink
feat(queue-events): add retries-exhausted event (#1010)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Jan 23, 2022
1 parent 3677295 commit e476f35
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 16 deletions.
3 changes: 3 additions & 0 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,9 @@ export class Job<
this.opts.removeOnFail,
token,
fetchNext,
this.opts.attempts && this.attemptsMade >= this.opts.attempts
? this.attemptsMade
: 0,
);
(<any>multi).moveToFinished(args);
command = 'failed';
Expand Down
10 changes: 10 additions & 0 deletions src/classes/queue-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,16 @@ export interface QueueEventsListener {
*/
resumed: (args: {}, id: string) => void;

/**
* Listen to 'retries-exhausted' event.
*
* This event is triggered when a job has retried the maximum attempts.
*/
'retries-exhausted': (
args: { jobId: string; attemptsMade: string },
id: string,
) => void;

/**
* Listen to 'stalled' event.
*
Expand Down
13 changes: 8 additions & 5 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
RedisClient,
WorkerOptions,
} from '../interfaces';
import { JobState } from '../types';
import { JobState, FinishedTarget, FinishedPropValAttribute } from '../types';
import { ErrorCode } from '../enums';
import { array2obj, getParentKey } from '../utils';
import { Worker } from './worker';
Expand Down Expand Up @@ -228,9 +228,9 @@ export class Scripts {
queue: MinimalQueue,
job: Job<T, R, N>,
val: any,
propVal: string,
propVal: FinishedPropValAttribute,
shouldRemove: boolean | number,
target: string,
target: FinishedTarget,
token: string,
fetchNext = true,
) {
Expand Down Expand Up @@ -270,6 +270,8 @@ export class Scripts {
job.opts?.parent?.id,
job.opts?.parent?.queue,
job.parentKey,
job.opts.attempts,
job.attemptsMade,
];

return keys.concat(args);
Expand All @@ -283,9 +285,9 @@ export class Scripts {
queue: MinimalQueue,
job: Job<T, R, N>,
val: any,
propVal: string,
propVal: FinishedPropValAttribute,
shouldRemove: boolean | number,
target: string,
target: FinishedTarget,
token: string,
fetchNext: boolean,
): Promise<JobData | []> {
Expand Down Expand Up @@ -380,6 +382,7 @@ export class Scripts {
removeOnFailed: boolean | number,
token: string,
fetchNext = false,
retriesExhausted = 0,
) {
return this.moveToFinishedArgs(
queue,
Expand Down
8 changes: 8 additions & 0 deletions src/commands/moveToFinished-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
ARGV[12] parentId
ARGV[13] parentQueue
ARGV[14] parentKey
ARGV[15] max attempts
ARGV[16] attemptsMade
Output:
0 OK
Expand Down Expand Up @@ -131,6 +133,12 @@ if rcall("EXISTS",jobIdKey) == 1 then -- // Make sure job exists
rcall("XADD", KEYS[6], "*", "event", ARGV[5], "jobId", jobId, ARGV[3],
ARGV[4])

if ARGV[5] == "failed" then
if tonumber(ARGV[16]) >= tonumber(ARGV[15]) then
rcall("XADD", KEYS[6], "*", "event", "retries-exhausted", "jobId", jobId, "attemptsMade", ARGV[16])
end
end

-- Try to get next job to avoid an extra roundtrip if the queue is not closing,
-- and not rate limited.
if (ARGV[8] == "1") then
Expand Down
3 changes: 3 additions & 0 deletions src/types/finished-target.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export type FinishedTarget = 'completed' | 'failed';

export type FinishedPropValAttribute = 'returnvalue' | 'failedReason';
1 change: 1 addition & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './finished-target';
export * from './job-type';
5 changes: 3 additions & 2 deletions src/types/job-type.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { FinishedTarget } from './finished-target';

export type JobState =
| FinishedTarget
| 'active'
| 'completed'
| 'delayed'
| 'failed'
| 'waiting'
| 'waiting-children';

Expand Down
91 changes: 82 additions & 9 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1427,6 +1427,79 @@ describe('workers', function () {
});

describe('Retries and backoffs', () => {
describe('when attempts is 1 and job fails', () => {
it('should execute job only once and emits retries-exhausted event', async () => {
const worker = new Worker(
queueName,
async () => {
throw new Error('failed');
},
{ connection },
);

await worker.waitUntilReady();

const job = await queue.add(
'test',
{ foo: 'bar' },
{
attempts: 1,
},
);

await new Promise<void>(resolve => {
queueEvents.on(
'retries-exhausted',
async ({ jobId, attemptsMade }) => {
expect(jobId).to.eql(job.id);
expect(1).to.eql(Number(attemptsMade));
resolve();
},
);
});

await worker.close();
});
});

describe('when jobs do not fail and get the maximum attempts limit', () => {
it('should not emit retries-exhausted event', async () => {
const worker = new Worker(queueName, async () => {}, { connection });

await worker.waitUntilReady();

await queue.add(
'test',
{ foo: 'bar' },
{
attempts: 1,
},
);
await queue.add(
'test',
{ foo: 'baz' },
{
attempts: 1,
},
);

await new Promise<void>((resolve, reject) => {
queueEvents.on('retries-exhausted', async () => {
reject();
});

queueEvents.on(
'completed',
after(2, async function ({ jobId, returnvalue }) {
resolve();
}),
);
});

await worker.close();
});
});

it('should not retry a job if it has been marked as unrecoverable', async () => {
let tries = 0;

Expand Down Expand Up @@ -1507,7 +1580,7 @@ describe('workers', function () {

await worker.waitUntilReady();

await queue.add(
const job = await queue.add(
'test',
{ foo: 'bar' },
{
Expand All @@ -1519,10 +1592,10 @@ describe('workers', function () {
worker.on('completed', () => {
reject(new Error('Failed job was retried more than it should be!'));
});
worker.on('failed', () => {
if (tries === 3) {
resolve();
}
queueEvents.on('retries-exhausted', async ({ jobId, attemptsMade }) => {
expect(jobId).to.eql(job.id);
expect(3).to.eql(Number(attemptsMade));
resolve();
});
});

Expand Down Expand Up @@ -1892,7 +1965,7 @@ describe('workers', function () {

await worker.waitUntilReady();

const failing = new Promise<void>((resolve, reject) => {
const failing = new Promise<void>(resolve => {
worker.on('failed', async (job, err) => {
expect(job.data.foo).to.equal('bar');
expect(err).to.equal(failedError);
Expand All @@ -1902,7 +1975,7 @@ describe('workers', function () {
});
});

const completing = new Promise<void>((resolve, _reject) => {
const completing = new Promise<void>(resolve => {
worker.on('completed', async () => {
resolve();
});
Expand Down Expand Up @@ -1949,7 +2022,7 @@ describe('workers', function () {

await worker.waitUntilReady();

const failing = new Promise<void>((resolve, reject) => {
const failing = new Promise<void>(resolve => {
worker.on('failed', async (job, err) => {
expect(job.data.foo).to.equal('bar');
expect(err).to.equal(failedError);
Expand All @@ -1958,7 +2031,7 @@ describe('workers', function () {
});
});

const completing = new Promise<void>((resolve, _reject) => {
const completing = new Promise<void>(resolve => {
worker.on('completed', async () => {
resolve();
});
Expand Down

0 comments on commit e476f35

Please sign in to comment.