Skip to content

Commit

Permalink
feat(python): add getJobs method in queue class (#2011)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Jun 23, 2023
1 parent 65184e3 commit 8d5d6c1
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 19 deletions.
24 changes: 24 additions & 0 deletions python/bullmq/queue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
from bullmq.redis_connection import RedisConnection
from bullmq.types import QueueOptions, RetryJobsOptions, JobOptions
from bullmq.utils import extract_result
from bullmq.scripts import Scripts
from bullmq.job import Job

Expand Down Expand Up @@ -124,6 +126,28 @@ async def getJobCounts(self, *types):
counts[current_types[index]] = val or 0
return counts

async def getJobs(self, types, start=0, end=-1, asc:bool=False):
current_types = self.sanitizeJobTypes(types)
job_ids = await self.scripts.getRanges(current_types, start, end, asc)
tasks = [asyncio.create_task(Job.fromId(self, i)) for i in job_ids]
job_set, _ = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
jobs = [extract_result(job_task) for job_task in job_set]
jobs_len = len(jobs)

# we filter `None` out to remove:
jobs = list(filter(lambda j: j is not None, jobs))

for index, job_id in enumerate(job_ids):
pivot_job = jobs[index]

for i in range(index,jobs_len):
current_job = jobs[i]
if current_job and current_job.id == job_id:
jobs[index] = current_job
jobs[i] = pivot_job

return jobs

def sanitizeJobTypes(self, types):
current_types = list(types)

Expand Down
55 changes: 50 additions & 5 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"changePriority": self.redisClient.register_script(self.getScript("changePriority-5.lua")),
"extendLock": self.redisClient.register_script(self.getScript("extendLock-2.lua")),
"getCounts": self.redisClient.register_script(self.getScript("getCounts-1.lua")),
"getRanges": self.redisClient.register_script(self.getScript("getRanges-1.lua")),
"getState": self.redisClient.register_script(self.getScript("getState-8.lua")),
"getStateV2": self.redisClient.register_script(self.getScript("getStateV2-8.lua")),
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-8.lua")),
Expand Down Expand Up @@ -100,6 +101,51 @@ def addJob(self, job: Job):

return self.commands["addJob"](keys=keys, args=[packedArgs, jsonData, packedOpts])

def getRangesArgs(self, types, start: int = 0, end: int = 1, asc: bool = False):
transformed_types = []
for type in types:
transformed_types.append("wait" if type == "waiting" else type)

keys = self.getKeys([''])
args = [start, end, "1" if asc else "0"] + transformed_types

return (keys, args)

async def getRanges(self, types, start: int = 0, end: int = 1, asc: bool = False):
commands = []

switcher = {
"completed": "zrange",
"delayed": "zrange",
"failed": "zrange",
"priority": "zrange",
"repeat": "zrange",
"waiting-children": "zrange",
"active": "lrange",
"paused": "lrange",
"wait": "lrange"
}
transformed_types = []
for type in types:
transformed_type = "wait" if type == "waiting" else type
transformed_types.append(transformed_type)
commands.append(switcher.get(transformed_type))

keys, args = self.getRangesArgs(transformed_types, start, end, asc)

responses = await self.commands["getRanges"](keys=keys, args=args)

results = []
for i, response in enumerate(responses):
result = response or []

if asc and commands[i] == "lrange":
results+=result.reverse()
else:
results+=result

return results

def saveStacktraceArgs(self, job_id: str, stacktrace: str, failedReason: str):
keys = [self.toKey(job_id)]
args = [stacktrace, failedReason]
Expand Down Expand Up @@ -190,7 +236,7 @@ async def changePriority(self, job_id: str, priority:int = 0, lifo:bool = False)

if result is not None:
if result < 0:
raise self.finishedErrors(result, job_id, 'updateData')
raise self.finishedErrors(result, job_id, 'changePriority', None)
return None

async def updateData(self, job_id: str, data):
Expand All @@ -202,7 +248,7 @@ async def updateData(self, job_id: str, data):

if result is not None:
if result < 0:
raise self.finishedErrors(result, job_id, 'updateData')
raise self.finishedErrors(result, job_id, 'updateData', None)
return None

async def reprocessJob(self, job: Job, state: str):
Expand Down Expand Up @@ -291,7 +337,7 @@ async def updateProgress(self, job_id: str, progress):

if result is not None:
if result < 0:
raise self.finishedErrors(result, job_id, 'updateProgress')
raise self.finishedErrors(result, job_id, 'updateProgress', None)
return None

def moveToFinishedArgs(self, job: Job, val: Any, propVal: str, shouldRemove, target, token: str, opts: dict, fetchNext=True) -> list[Any] | None:
Expand Down Expand Up @@ -373,8 +419,7 @@ def moveStalledJobsToWait(self, maxStalledCount: int, stalledInterval: int):
time.time() * 1000), stalledInterval]
return self.commands["moveStalledJobsToWait"](keys, args)


def finishedErrors(code: int, jobId: str, command: str, state: str) -> TypeError:
def finishedErrors(self, code: int, jobId: str, command: str, state: str) -> TypeError:
if code == ErrorCode.JobNotExist.value:
return TypeError(f"Missing key for job {jobId}.{command}")
elif code == ErrorCode.JobLockNotExist.value:
Expand Down
11 changes: 11 additions & 0 deletions python/bullmq/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
import semver
import traceback

def isRedisVersionLowerThan(current_version, minimum_version):
return semver.compare(current_version, minimum_version) == -1

def extract_result(job_task):
try:
return job_task.result()
except Exception as e:
if not str(e).startswith('Connection closed by server'):
# lets use a simple-but-effective error handling:
# print error message and ignore the job
print("ERROR:", e)
traceback.print_exc()
13 changes: 1 addition & 12 deletions python/bullmq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from bullmq.job import Job
from bullmq.timer import Timer
from bullmq.types import WorkerOptions
from bullmq.utils import isRedisVersionLowerThan
from bullmq.utils import isRedisVersionLowerThan, extract_result

import asyncio
import traceback
Expand Down Expand Up @@ -195,14 +195,3 @@ async def getCompleted(task_set: set) -> tuple[list[Job], list]:
# b) a failed extract_result
jobs = list(filter(lambda j: j is not None, jobs))
return jobs, pending


def extract_result(job_task):
try:
return job_task.result()
except Exception as e:
if not str(e).startswith('Connection closed by server'):
# lets use a simple-but-effective error handling:
# print error message and ignore the job
print("ERROR:", e)
traceback.print_exc()
10 changes: 10 additions & 0 deletions python/tests/queue_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ async def test_add_job(self):
self.assertEqual(job.id, "1")
await queue.close()

async def test_get_jobs(self):
queue = Queue(queueName)
job1 = await queue.add("test-job", {"foo": "bar"}, {})
job2 = await queue.add("test-job", {"foo": "bar"}, {})
jobs = await queue.getJobs(["wait"])

self.assertEqual(job2.id, jobs[0].id)
self.assertEqual(job1.id, jobs[1].id)
await queue.close()

async def test_add_job_with_options(self):
queue = Queue(queueName)
data = {"foo": "bar"}
Expand Down
4 changes: 2 additions & 2 deletions src/classes/queue-getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,9 @@ export class QueueGetters<
end = -1,
asc = false,
): Promise<Job<DataType, ResultType, NameType>[]> {
types = this.sanitizeJobTypes(types);
const currentTypes = this.sanitizeJobTypes(types);

const jobIds = await this.getRanges(types, start, end, asc);
const jobIds = await this.getRanges(currentTypes, start, end, asc);

return Promise.all(
jobIds.map(
Expand Down

0 comments on commit 8d5d6c1

Please sign in to comment.