forked from taskforcesh/bullmq
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
GitBook: [taskforcesh#104] No subject
- Loading branch information
1 parent
b60ac86
commit f718340
Showing
7 changed files
with
79 additions
and
12 deletions.
There are no files selected for viewing
Binary file added
BIN
+18.3 KB
docs/gitbook/.gitbook/assets/image (1) (1) (1) (1) (1) (1) (1) (1).png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
# Observables | ||
|
||
Instead of returning regular promises, your workers can also return an Observable, this allows for some more advanced uses cases: | ||
|
||
* It makes it possible to cleanly cancel a running job. | ||
* You can define a "Time to live" (TTL) so that jobs that take too long time will be automatically canceled. | ||
* Since the last value returned by the observable is persisted, you could retry a job and continue where you left of, for example, if the job implements a state machine or similar. | ||
|
||
If you are new to Observables you may want to read this [introduction](https://www.learnrxjs.io/learn-rxjs/concepts/rxjs-primer). The two biggest advantages that Observables have over Promises are that they can emit more than 1 value and that they are cancelable. | ||
|
||
Let's see a silly example of a worker making use of Observables: | ||
|
||
```typescript | ||
import { WorkerPro } from "@taskforcesh/bullmq-pro" | ||
import { Observable } from "rxjs" | ||
|
||
const processor = async () => { | ||
return new Observable<number>(subscriber => { | ||
subscriber.next(1); | ||
subscriber.next(2); | ||
subscriber.next(3); | ||
const intervalId = setTimeout(() => { | ||
subscriber.next(4); | ||
subscriber.complete(); | ||
}, 500); | ||
|
||
// Provide a way of canceling and disposing the interval resource | ||
return function unsubscribe() { | ||
clearInterval(intervalId); | ||
}; | ||
}); | ||
}; | ||
|
||
const worker = new WorkerPro(queueName, processor, { connection }); | ||
|
||
``` | ||
|
||
In the example above, the observable will emit 4 values, the first 3 directly and then a 4th after 500 ms. Also note that the "subscriber" returns a "unsubscribe" function. This is the function that will be called if the Observable is cancelled, so this is where you would do the necessary clean up. | ||
|
||
You may be asking whats the use of returning several values for a worker. One case that comes to mind is if you have a larger processor and you want to make sure that if the process crashes you can continue from the latest value. You could do this with a simple switch-case on the return value, something like this: | ||
|
||
```typescript | ||
import { WorkerPro } from "@taskforcesh/bullmq-pro" | ||
import { Observable } from "rxjs" | ||
|
||
const processor = async (job) => { | ||
return new Observable<number>(subscriber => { | ||
switch(job.returnvalue){ | ||
default: | ||
subscriber.next(1); | ||
case 1: | ||
subscriber.next(2); | ||
case 2: | ||
subscriber.next(3); | ||
case 3: | ||
subscriber.complete(); | ||
} | ||
}); | ||
}; | ||
|
||
const worker = new WorkerPro(queueName, processor, { connection }); | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
# Cancelation | ||
|
||
As mentioned, Observables allows for clean cancelation. Currently we support a TTL value that defines the maximum processing time before the job is finally cancelled: | ||
|
||
```typescript | ||
import { WorkerPro } from "@taskforcesh/bullmq-pro" | ||
|
||
const worker = new WorkerPro(queueName, processor, { | ||
ttl: 100, | ||
connection, | ||
}); | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters