Skip to content

Commit

Permalink
refactor(retry-jobs): create getZSetItems include script (taskforcesh…
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Feb 5, 2022
1 parent 7685091 commit 5ab8525
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 18 deletions.
8 changes: 8 additions & 0 deletions src/commands/includes/getZSetItems.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

--[[
Function to get ZSet items.
]]

local function getZSetItems(keyName, max)
return rcall('ZRANGE', keyName, 0, max - 1)
end
5 changes: 1 addition & 4 deletions src/commands/includes/removeJobs.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,8 @@ local function getListItems(keyName, max)
return rcall('LRANGE', keyName, 0, max - 1)
end

local function getZSetItems(keyName, max)
return rcall('ZRANGE', keyName, 0, max - 1)
end

--- @include "removeParentDependencyKey"
--- @include "getZSetItems"

local function removeJobs(keys, hard, baseKey, max)
for i, key in ipairs(keys) do
Expand Down
5 changes: 1 addition & 4 deletions src/commands/retryJobs-4.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ local rcall = redis.call;

-- Includes
--- @include "includes/batches"

local function getZSetItems(keyName, max)
return rcall('ZRANGE', keyName, 0, max - 1)
end
--- @include "includes/getZSetItems"

local jobs = getZSetItems(KEYS[3], maxCount)

Expand Down
20 changes: 10 additions & 10 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ describe('workers', function () {
});

it('should get all workers for this queue', async function () {
const worker = new Worker(queueName, async job => {}, { connection });
const worker = new Worker(queueName, async () => {}, { connection });
await worker.waitUntilReady();
await delay(10);

const workers = await queue.getWorkers();
expect(workers).to.have.length(1);

const worker2 = new Worker(queueName, async job => {}, { connection });
const worker2 = new Worker(queueName, async () => {}, { connection });
await worker2.waitUntilReady();
await delay(10);

Expand All @@ -59,8 +59,8 @@ describe('workers', function () {
it('should get only workers related only to one queue', async function () {
const queueName2 = `${queueName}2`;
const queue2 = new Queue(queueName2, { connection });
const worker = new Worker(queueName, async job => {}, { connection });
const worker2 = new Worker(queueName2, async job => {}, { connection });
const worker = new Worker(queueName, async () => {}, { connection });
const worker2 = new Worker(queueName2, async () => {}, { connection });
await worker.waitUntilReady();
await worker2.waitUntilReady();

Expand Down Expand Up @@ -135,7 +135,7 @@ describe('workers', function () {
worker.close();

await new Promise<void>(resolve => {
worker.once('completed', async (job, err) => {
worker.once('completed', async job => {
expect(job).to.be.ok;
expect(job.finishedOn).to.be.string;
expect(job.data.foo).to.be.eql('bar');
Expand Down Expand Up @@ -760,7 +760,7 @@ describe('workers', function () {
describe('when run method is called when worker is running', function () {
it('throws error', async () => {
const maxJobs = 10;
const worker = new Worker(queueName, async (job: Job) => {}, {
const worker = new Worker(queueName, async () => {}, {
autorun: false,
});
await worker.waitUntilReady();
Expand Down Expand Up @@ -853,7 +853,7 @@ describe('workers', function () {

await Promise.all(jobs);

const worker = new Worker(queueName, async job => {});
const worker = new Worker(queueName, async () => {});
await worker.waitUntilReady();

await new Promise(resolve => {
Expand Down Expand Up @@ -896,7 +896,7 @@ describe('workers', function () {
it('process a job that returns a string in the process handler', async () => {
const testString = 'a very dignified string';

const worker = new Worker(queueName, async job => {
const worker = new Worker(queueName, async () => {
return testString;
});
await worker.waitUntilReady();
Expand Down Expand Up @@ -1017,7 +1017,7 @@ describe('workers', function () {

const addedJob = await queue.add('test', { foo: 'bar' });

const anotherWorker = new Worker(queueName, async job => {
const anotherWorker = new Worker(queueName, async () => {
err = new Error(
'The second queue should not have received a job to process',
);
Expand Down Expand Up @@ -1311,7 +1311,7 @@ describe('workers', function () {

const worker = new Worker(
queueName,
async job => {
async () => {
if (first) {
first = false;
return delay(2000);
Expand Down

0 comments on commit 5ab8525

Please sign in to comment.