Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(worker): specify error parameter in failed event #1043

Merged
merged 2 commits into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/gitbook/guide/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ worker.on('progress', (job: Job, progress: number | object) => {
Finally, when the process fails with an exception it is possible to listen for the "failed" event too:

```typescript
worker.on('failed', (job: Job, failedReason: string) => {
worker.on('failed', (job: Job, error: Error) => {
// Do something with the return value.
});
```
Expand Down
4 changes: 2 additions & 2 deletions docs/gitbook/guide/workers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ worker.on('progress', (job: Job, progress: number | object) => {
Finally, when the process fails with an exception it is possible to listen for the "failed" event too:

```typescript
worker.on('failed', (job: Job, failedReason: string) => {
worker.on('failed', (job: Job, error: Error) => {
// Do something with the return value.
});
```
Expand Down Expand Up @@ -77,7 +77,7 @@ worker.on('error', err => {
```

{% hint style="danger" %}
If the error handler is missing, your worker may stop processing jobs when an error is emitted!. More info [here](https://nodejs.org/api/events.html#events\_error\_events).
If the error handler is missing, your worker may stop processing jobs when an error is emitted!. More info [here](https://nodejs.org/api/events.html#events_error_events).
{% endhint %}

## Typescript typings
Expand Down
6 changes: 3 additions & 3 deletions docs/gitbook/patterns/failing-fast-when-redis-is-down.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

By design, BullMQ will reconnect automatically and if you add new jobs to a queue while the queue instance is disconnected from Redis, the add command will not fail, instead the call will keep waiting for a reconnection to occur until it can complete. 

This behavior is not always desirable, for example, if you have implemented a REST api that results in a call to "add", you do not want to keep the HTTP call busy while the "add" is waiting for the queue to reconnect to Redis. In this case you can just pass the option "enableOfflineQueue: false", so that "ioredis" do not queue the commands and instead throws an exception:
This behavior is not always desirable; for example, if you have implemented a REST API that results in a call to "add", you do not want to keep the HTTP call busy while the "add" is waiting for the queue to reconnect to Redis. In this case, you can just pass the option "enableOfflineQueue: false", so that "ioredis" do not queue the commands and instead throws an exception:

```typescript
const myQueue = new Queue("transcoding", {
Expand All @@ -22,8 +22,8 @@ app.post("/jobs", async (req, res) => {

```

In this way the caller can catch this temporal error and act upon it, maybe doing some retries or giving up depending on its requirements.
In this way, the caller can catch this temporal error and act upon it, maybe doing some retries or giving up depending on its requirements.

{% hint style="danger" %}
Currently there is a limitation in that the Redis instance must at least be online while the queue is being instantiated.
Currently, there is a limitation in that the Redis instance must at least be online while the queue is being instantiated.
{% endhint %}
4 changes: 2 additions & 2 deletions docs/gitbook/patterns/flows.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
The following pattern, although still useful, has been mostly super-seeded by the new [Flows](../guide/flows/) functionality
{% endhint %}

In some situations you need to execute a flow of actions that each and one of them could fail, it could be database updates, calls to external services, or any other kind of asynchronous call.
In some situations, you need to execute a flow of actions that each and one of them could fail, it could be database updates, calls to external services, or any other kind of asynchronous call.

Sometimes it may not be possible to create an [idempotent job](idempotent-jobs.md) that can execute all these actions again in the case one of them failed for any reason, instead we want to be able to only re-execute the action that failed and continue executing the rest of the actions that have not yet been executed.

The pattern to solve this issue consists on dividing the flow of actions into one queue for every action. When the first action completes it places the next action as a job in its correspondent queue.
The pattern to solve this issue consists on dividing the flow of actions into one queue for every action. When the first action completes, it places the next action as a job in its correspondent queue.
1 change: 0 additions & 1 deletion docs/gitbook/patterns/idempotent-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,3 @@ To achieve this behaviour, your jobs should be as atomic and simple as possible.
Simpler jobs also means simpler debugging, identifying bottlenecks, etc.

If necessary, split complex jobs [as described in the flow pattern](flows.md).

7 changes: 3 additions & 4 deletions docs/gitbook/patterns/manually-fetching-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ There is an important consideration regarding job "locks" when processing manual
the lock duration setting is called "visibility window" in other queue systems.
{% endhint %}

Normally a job gets locked as soon as it is fetched from the queue with a max duration of "lockDuration" worker option. The default is 30 seconds but can be changed to any value easily, for example to change it to 60 seconds:
Normally a job gets locked as soon as it is fetched from the queue with a max duration of "lockDuration" worker option. The default is 30 seconds but can be changed to any value easily, for example, to change it to 60 seconds:

```typescript
const worker = new Worker('my-queue', null, { lockDuration: 60000 });
```

When using standard worker processors the lock is renewed automatically after half lock duration time has passed, however this mechanism does not exist when processing jobs manually, so you need to make sure to process the job faster than the lockDuration to avoid the "QueueScheduler" to move the job back to the waiting list of the queue or you can extend the lock for the job manually:
When using standard worker processors the lock is renewed automatically after half lock duration time has passed, however, this mechanism does not exist when processing jobs manually, so you need to make sure to process the job faster than the lockDuration to avoid the "QueueScheduler" to move the job back to the waiting list of the queue or you can extend the lock for the job manually:

```typescript
const job = (await worker.getNextJob(token)) as Job;
Expand All @@ -46,7 +46,7 @@ await job.extendLock(token, 30000);

## Looping through jobs

In many cases you will have an "infinite" loop that processes jobs one by one like this example:
In many cases, you will have an "infinite" loop that processes jobs one by one like this example:

```typescript
const worker = new Worker('my-queue');
Expand Down Expand Up @@ -81,4 +81,3 @@ while (1) {
}
}
```

4 changes: 2 additions & 2 deletions docs/gitbook/patterns/throttle-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ worker.on('completed', (job: Job, returnvalue: any) => {
console.log('worker done painting', new Date());
});

worker.on('failed', (job: Job, failedReason: string) => {
console.error('worker fail painting', job, failedReason, new Date());
worker.on('failed', (job: Job, error: Error) => {
console.error('worker fail painting', job, error, new Date());
});

// Add only one job that will be delayed at least 1 second.
Expand Down
34 changes: 15 additions & 19 deletions tests/test_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -543,12 +543,12 @@ describe('Job', function () {
const queueScheduler = new QueueScheduler(queueName, { connection });
await queueScheduler.waitUntilReady();

const worker = new Worker(queueName, async job => {}, { connection });
const worker = new Worker(queueName, async () => {}, { connection });
await worker.waitUntilReady();

const startTime = new Date().getTime();

const completing = new Promise<void>((resolve, reject) => {
const completing = new Promise<void>(resolve => {
worker.on('completed', async () => {
const timeDiff = new Date().getTime() - startTime;
expect(timeDiff).to.be.gte(4000);
Expand Down Expand Up @@ -613,7 +613,7 @@ describe('Job', function () {
this.timeout(10000);
const worker = new Worker(
queueName,
job => {
() => {
return delay(100);
},
{ connection },
Expand Down Expand Up @@ -831,7 +831,7 @@ describe('Job', function () {
});

it('should resolve when the job has been completed', async function () {
const worker = new Worker(queueName, async job => 'qux', { connection });
const worker = new Worker(queueName, async () => 'qux', { connection });

const job = await queue.add('test', { foo: 'bar' });

Expand All @@ -846,7 +846,7 @@ describe('Job', function () {
it('rejects with missing key for job message', async function () {
const worker = new Worker(
queueName,
async job => {
async () => {
await delay(100);
return 'qux';
},
Expand Down Expand Up @@ -887,11 +887,9 @@ describe('Job', function () {
});

it('should resolve when the job has been completed and return object', async function () {
const worker = new Worker(
queueName,
async job => ({ resultFoo: 'bar' }),
{ connection },
);
const worker = new Worker(queueName, async () => ({ resultFoo: 'bar' }), {
connection,
});

const job = await queue.add('test', { foo: 'bar' });

Expand All @@ -906,7 +904,7 @@ describe('Job', function () {
it('should resolve when the job has been delayed and completed and return object', async function () {
const worker = new Worker(
queueName,
async job => {
async () => {
await delay(300);
return { resultFoo: 'bar' };
},
Expand All @@ -924,7 +922,7 @@ describe('Job', function () {
});

it('should resolve when the job has been completed and return string', async function () {
const worker = new Worker(queueName, async job => 'a string', {
const worker = new Worker(queueName, async () => 'a string', {
connection,
});

Expand All @@ -941,7 +939,7 @@ describe('Job', function () {
it('should reject when the job has been failed', async function () {
const worker = new Worker(
queueName,
async job => {
async () => {
await delay(500);
throw new Error('test error');
},
Expand All @@ -958,11 +956,9 @@ describe('Job', function () {
});

it('should resolve directly if already processed', async function () {
const worker = new Worker(
queueName,
async job => ({ resultFoo: 'bar' }),
{ connection },
);
const worker = new Worker(queueName, async () => ({ resultFoo: 'bar' }), {
connection,
});

const job = await queue.add('test', { foo: 'bar' });

Expand All @@ -978,7 +974,7 @@ describe('Job', function () {
it('should reject directly if already processed', async function () {
const worker = new Worker(
queueName,
async job => {
async () => {
throw new Error('test error');
},
{ connection },
Expand Down
4 changes: 2 additions & 2 deletions tests/test_obliterate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,11 @@ describe('Obliterate', function () {
);
await worker.waitUntilReady();

const completing = new Promise((resolve, reject) => {
const completing = new Promise(resolve => {
worker.on('completed', after(2, resolve));
});

const failing = new Promise((resolve, reject) => {
const failing = new Promise(resolve => {
worker.on('failed', resolve);
});

Expand Down