From 8d5d6c14442b7b967c42cb6ec3907a4d1a5bd575 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Fri, 23 Jun 2023 14:15:36 -0500 Subject: [PATCH] feat(python): add getJobs method in queue class (#2011) --- python/bullmq/queue.py | 24 ++++++++++++++++ python/bullmq/scripts.py | 55 ++++++++++++++++++++++++++++++++---- python/bullmq/utils.py | 11 ++++++++ python/bullmq/worker.py | 13 +-------- python/tests/queue_tests.py | 10 +++++++ src/classes/queue-getters.ts | 4 +-- 6 files changed, 98 insertions(+), 19 deletions(-) diff --git a/python/bullmq/queue.py b/python/bullmq/queue.py index b20e0dfffc..1fc57e785e 100644 --- a/python/bullmq/queue.py +++ b/python/bullmq/queue.py @@ -1,5 +1,7 @@ +import asyncio from bullmq.redis_connection import RedisConnection from bullmq.types import QueueOptions, RetryJobsOptions, JobOptions +from bullmq.utils import extract_result from bullmq.scripts import Scripts from bullmq.job import Job @@ -124,6 +126,28 @@ async def getJobCounts(self, *types): counts[current_types[index]] = val or 0 return counts + async def getJobs(self, types, start=0, end=-1, asc:bool=False): + current_types = self.sanitizeJobTypes(types) + job_ids = await self.scripts.getRanges(current_types, start, end, asc) + tasks = [asyncio.create_task(Job.fromId(self, i)) for i in job_ids] + job_set, _ = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED) + jobs = [extract_result(job_task) for job_task in job_set] + jobs_len = len(jobs) + + # we filter `None` out to remove: + jobs = list(filter(lambda j: j is not None, jobs)) + + for index, job_id in enumerate(job_ids): + pivot_job = jobs[index] + + for i in range(index,jobs_len): + current_job = jobs[i] + if current_job and current_job.id == job_id: + jobs[index] = current_job + jobs[i] = pivot_job + + return jobs + def sanitizeJobTypes(self, types): current_types = list(types) diff --git a/python/bullmq/scripts.py b/python/bullmq/scripts.py index 6936d91264..053c81fc4e 100644 --- a/python/bullmq/scripts.py +++ b/python/bullmq/scripts.py @@ -34,6 +34,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection "changePriority": self.redisClient.register_script(self.getScript("changePriority-5.lua")), "extendLock": self.redisClient.register_script(self.getScript("extendLock-2.lua")), "getCounts": self.redisClient.register_script(self.getScript("getCounts-1.lua")), + "getRanges": self.redisClient.register_script(self.getScript("getRanges-1.lua")), "getState": self.redisClient.register_script(self.getScript("getState-8.lua")), "getStateV2": self.redisClient.register_script(self.getScript("getStateV2-8.lua")), "moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-8.lua")), @@ -100,6 +101,51 @@ def addJob(self, job: Job): return self.commands["addJob"](keys=keys, args=[packedArgs, jsonData, packedOpts]) + def getRangesArgs(self, types, start: int = 0, end: int = 1, asc: bool = False): + transformed_types = [] + for type in types: + transformed_types.append("wait" if type == "waiting" else type) + + keys = self.getKeys(['']) + args = [start, end, "1" if asc else "0"] + transformed_types + + return (keys, args) + + async def getRanges(self, types, start: int = 0, end: int = 1, asc: bool = False): + commands = [] + + switcher = { + "completed": "zrange", + "delayed": "zrange", + "failed": "zrange", + "priority": "zrange", + "repeat": "zrange", + "waiting-children": "zrange", + "active": "lrange", + "paused": "lrange", + "wait": "lrange" + } + transformed_types = [] + for type in types: + transformed_type = "wait" if type == "waiting" else type + transformed_types.append(transformed_type) + commands.append(switcher.get(transformed_type)) + + keys, args = self.getRangesArgs(transformed_types, start, end, asc) + + responses = await self.commands["getRanges"](keys=keys, args=args) + + results = [] + for i, response in enumerate(responses): + result = response or [] + + if asc and commands[i] == "lrange": + results+=result.reverse() + else: + results+=result + + return results + def saveStacktraceArgs(self, job_id: str, stacktrace: str, failedReason: str): keys = [self.toKey(job_id)] args = [stacktrace, failedReason] @@ -190,7 +236,7 @@ async def changePriority(self, job_id: str, priority:int = 0, lifo:bool = False) if result is not None: if result < 0: - raise self.finishedErrors(result, job_id, 'updateData') + raise self.finishedErrors(result, job_id, 'changePriority', None) return None async def updateData(self, job_id: str, data): @@ -202,7 +248,7 @@ async def updateData(self, job_id: str, data): if result is not None: if result < 0: - raise self.finishedErrors(result, job_id, 'updateData') + raise self.finishedErrors(result, job_id, 'updateData', None) return None async def reprocessJob(self, job: Job, state: str): @@ -291,7 +337,7 @@ async def updateProgress(self, job_id: str, progress): if result is not None: if result < 0: - raise self.finishedErrors(result, job_id, 'updateProgress') + raise self.finishedErrors(result, job_id, 'updateProgress', None) return None def moveToFinishedArgs(self, job: Job, val: Any, propVal: str, shouldRemove, target, token: str, opts: dict, fetchNext=True) -> list[Any] | None: @@ -373,8 +419,7 @@ def moveStalledJobsToWait(self, maxStalledCount: int, stalledInterval: int): time.time() * 1000), stalledInterval] return self.commands["moveStalledJobsToWait"](keys, args) - - def finishedErrors(code: int, jobId: str, command: str, state: str) -> TypeError: + def finishedErrors(self, code: int, jobId: str, command: str, state: str) -> TypeError: if code == ErrorCode.JobNotExist.value: return TypeError(f"Missing key for job {jobId}.{command}") elif code == ErrorCode.JobLockNotExist.value: diff --git a/python/bullmq/utils.py b/python/bullmq/utils.py index 0e89477ecf..7c1e3328bb 100644 --- a/python/bullmq/utils.py +++ b/python/bullmq/utils.py @@ -1,4 +1,15 @@ import semver +import traceback def isRedisVersionLowerThan(current_version, minimum_version): return semver.compare(current_version, minimum_version) == -1 + +def extract_result(job_task): + try: + return job_task.result() + except Exception as e: + if not str(e).startswith('Connection closed by server'): + # lets use a simple-but-effective error handling: + # print error message and ignore the job + print("ERROR:", e) + traceback.print_exc() \ No newline at end of file diff --git a/python/bullmq/worker.py b/python/bullmq/worker.py index 66bf6225a1..d3e7166e41 100644 --- a/python/bullmq/worker.py +++ b/python/bullmq/worker.py @@ -6,7 +6,7 @@ from bullmq.job import Job from bullmq.timer import Timer from bullmq.types import WorkerOptions -from bullmq.utils import isRedisVersionLowerThan +from bullmq.utils import isRedisVersionLowerThan, extract_result import asyncio import traceback @@ -195,14 +195,3 @@ async def getCompleted(task_set: set) -> tuple[list[Job], list]: # b) a failed extract_result jobs = list(filter(lambda j: j is not None, jobs)) return jobs, pending - - -def extract_result(job_task): - try: - return job_task.result() - except Exception as e: - if not str(e).startswith('Connection closed by server'): - # lets use a simple-but-effective error handling: - # print error message and ignore the job - print("ERROR:", e) - traceback.print_exc() diff --git a/python/tests/queue_tests.py b/python/tests/queue_tests.py index 5b3587987d..93909d89df 100644 --- a/python/tests/queue_tests.py +++ b/python/tests/queue_tests.py @@ -32,6 +32,16 @@ async def test_add_job(self): self.assertEqual(job.id, "1") await queue.close() + async def test_get_jobs(self): + queue = Queue(queueName) + job1 = await queue.add("test-job", {"foo": "bar"}, {}) + job2 = await queue.add("test-job", {"foo": "bar"}, {}) + jobs = await queue.getJobs(["wait"]) + + self.assertEqual(job2.id, jobs[0].id) + self.assertEqual(job1.id, jobs[1].id) + await queue.close() + async def test_add_job_with_options(self): queue = Queue(queueName) data = {"foo": "bar"} diff --git a/src/classes/queue-getters.ts b/src/classes/queue-getters.ts index 992fd44844..98548bcc2b 100644 --- a/src/classes/queue-getters.ts +++ b/src/classes/queue-getters.ts @@ -330,9 +330,9 @@ export class QueueGetters< end = -1, asc = false, ): Promise[]> { - types = this.sanitizeJobTypes(types); + const currentTypes = this.sanitizeJobTypes(types); - const jobIds = await this.getRanges(types, start, end, asc); + const jobIds = await this.getRanges(currentTypes, start, end, asc); return Promise.all( jobIds.map(