BullMQ provides rate limiting for the queues. It is possible to configure the workers so that they obey a given rate limiting option:
import { Worker, QueueScheduler } from 'bullmq';
const worker = new Worker('painter', async job => paintCar(job), {
limiter: {
max: 10,
duration: 1000,
},
});
const scheduler = new QueueScheduler('painter');
{% hint style="warning" %} Jobs that get rate limited will actually stay in waiting state. {% endhint %}
{% hint style="danger" %} From BullMQ 2.0 and onwards, the QueueScheduler is not needed anymore. {% endhint %}
{% hint style="info" %} The rate limiter is global, so if you have for example 10 workers for one queue with the above settings, still only 10 jobs will be processed by second. {% endhint %}
{% hint style="danger" %} From BullMQ 3.0 and onwards, group keys support is removed to improve global rate limit, so the information below is only valid for older versions. {% endhint %}
It is also possible to define a rate limiter based on group keys, for example you may want to have a rate limiter per customer instead of a global rate limiter for all customers:
import { Queue, Worker, QueueScheduler } from 'bullmq';
const queue = new Queue('painter', {
limiter: {
groupKey: 'customerId',
},
});
const worker = new Worker('painter', async job => paintCar(job), {
limiter: {
max: 10,
duration: 1000,
groupKey: 'customerId',
},
});
const scheduler = new QueueScheduler('painter');
// jobs will be rate limited by the value of customerId key:
await queue.add('rate limited paint', { customerId: 'my-customer-id' });
Sometimes is useful to rate-limit a queue manually instead of based on some static options. For example, if you have an API that returns 429 (Too many requests), and you want to rate-limit the queue based on that response.
For this purpose, you can use the worker method rateLimit like this:
import { Worker } from 'bullmq';
const worker = new Worker(
'myQueue',
async () => {
const [isRateLimited, duration] = await doExternalCall();
if (isRateLimited) {
await worker.rateLimit(duration);
// Do not forget to throw this special exception,
// since the job is no longer active after being rate limited.
throw Worker.RateLimitError();
}
},
{
connection,
},
);