From 6d0e224cd985069055786f447b0ba7c394a76b8a Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Thu, 6 Jul 2023 13:14:40 -0600 Subject: [PATCH] feat(python): add moveToWaitingChildren job method (#2049) --- python/bullmq/__init__.py | 1 + python/bullmq/backoffs.py | 2 +- python/bullmq/custom_errors/__init__.py | 1 + .../custom_errors/waiting_children_error.py | 3 + python/bullmq/job.py | 21 +++-- python/bullmq/scripts.py | 62 +++++++++---- python/bullmq/utils.py | 6 +- python/bullmq/worker.py | 4 + python/tests/worker_tests.py | 87 ++++++++++++++++++- 9 files changed, 161 insertions(+), 26 deletions(-) create mode 100644 python/bullmq/custom_errors/__init__.py create mode 100644 python/bullmq/custom_errors/waiting_children_error.py diff --git a/python/bullmq/__init__.py b/python/bullmq/__init__.py index f15eeb79bb..1282fe0d23 100644 --- a/python/bullmq/__init__.py +++ b/python/bullmq/__init__.py @@ -10,3 +10,4 @@ from bullmq.queue import Queue from bullmq.job import Job from bullmq.worker import Worker +from bullmq.custom_errors import WaitingChildrenError diff --git a/python/bullmq/backoffs.py b/python/bullmq/backoffs.py index 3016f72d1f..59ea080eb0 100644 --- a/python/bullmq/backoffs.py +++ b/python/bullmq/backoffs.py @@ -14,7 +14,7 @@ class Backoffs: def normalize(backoff: int | BackoffOptions): if type(backoff) == int and math.isfinite(backoff): return { - "type": 'fixed', + "type": "fixed", "delay": backoff } elif backoff: diff --git a/python/bullmq/custom_errors/__init__.py b/python/bullmq/custom_errors/__init__.py new file mode 100644 index 0000000000..52cd89da6b --- /dev/null +++ b/python/bullmq/custom_errors/__init__.py @@ -0,0 +1 @@ +from bullmq.custom_errors.waiting_children_error import WaitingChildrenError diff --git a/python/bullmq/custom_errors/waiting_children_error.py b/python/bullmq/custom_errors/waiting_children_error.py new file mode 100644 index 0000000000..7134bbc9bb --- /dev/null +++ b/python/bullmq/custom_errors/waiting_children_error.py @@ -0,0 +1,3 @@ +class WaitingChildrenError(Exception): + "Raised when job is moved to waiting-children" + pass \ No newline at end of file diff --git a/python/bullmq/job.py b/python/bullmq/job.py index 6c0709b795..59aadae9f0 100644 --- a/python/bullmq/job.py +++ b/python/bullmq/job.py @@ -5,6 +5,7 @@ if TYPE_CHECKING: from bullmq.queue import Queue from bullmq.types import JobOptions +from bullmq.utils import get_parent_key import json import time @@ -34,6 +35,7 @@ def __init__(self, queue: Queue, name: str, data: Any, opts: JobOptions = {}): self.timestamp = opts.get("timestamp", round(time.time() * 1000)) final_opts = {"attempts": 0, "delay": 0} final_opts.update(opts or {}) + final_opts.update({"backoff": Backoffs.normalize(opts.get('backoff'))}) self.discarded = False self.opts = final_opts self.queue = queue @@ -48,6 +50,9 @@ def __init__(self, queue: Queue, name: str, data: Any, opts: JobOptions = {}): self.returnvalue = None self.failedReason = None self.repeatJobKey = None + parent = opts.get("parent") + self.parentKey = get_parent_key(parent) + self.parent = {"id": parent.get("id"), "queueKey": parent.get("queue")} if parent else None self.stacktrace: List[str] = [] self.scripts = Scripts(queue.prefix, queue.name, queue.redisConnection) @@ -143,6 +148,12 @@ async def saveStacktrace(self, pipe, err:str): await self.scripts.commands["saveStacktrace"](keys=keys, args=args, client=pipe) + def moveToWaitingChildren(self, token, opts:dict): + return self.scripts.moveToWaitingChildren(self.id, token, opts) + + @property + def queueQualifiedName(self): + return f"{self.queue.prefix}:{self.queue.name}" @staticmethod def fromJSON(queue: Queue, rawData: dict, jobId: str | None = None): @@ -181,13 +192,11 @@ def fromJSON(queue: Queue, rawData: dict, jobId: str | None = None): job.stacktrace = json.loads(rawData.get("stacktrace", "[]")) - # if (json.parentKey) { - # job.parentKey = json.parentKey; - # } + if rawData.get("parentKey"): + job.parentKey = rawData.get("parentKey") - # if (json.parent) { - # job.parent = JSON.parse(json.parent); - # } + if rawData.get("parent"): + job.parent = json.loads(rawData.get("parent")) return job diff --git a/python/bullmq/scripts.py b/python/bullmq/scripts.py index 053c81fc4e..dd3d210e2c 100644 --- a/python/bullmq/scripts.py +++ b/python/bullmq/scripts.py @@ -6,7 +6,7 @@ from __future__ import annotations from redis import Redis from bullmq.error_code import ErrorCode -from bullmq.utils import isRedisVersionLowerThan +from bullmq.utils import isRedisVersionLowerThan, get_parent_key from typing import Any, TYPE_CHECKING if TYPE_CHECKING: from bullmq.job import Job @@ -41,6 +41,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection "moveToActive": self.redisClient.register_script(self.getScript("moveToActive-10.lua")), "moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-8.lua")), "moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-13.lua")), + "moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-4.lua")), "obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")), "pause": self.redisClient.register_script(self.getScript("pause-5.lua")), "removeJob": self.redisClient.register_script(self.getScript("removeJob-1.lua")), @@ -53,7 +54,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection } # loop all the names and add them to the keys object - names = ["", "active", "wait", "paused", "completed", "failed", "delayed", + names = ["", "active", "wait", "waiting-children", "paused", "completed", "failed", "delayed", "stalled", "limiter", "prioritized", "id", "stalled-check", "meta", "pc", "events", "waiting-children"] for name in names: self.keys[name] = self.toKey(name) @@ -75,22 +76,9 @@ def mapKey(key): return self.keys[key] return list(map(mapKey, keys)) - def addJob(self, job: Job): - """ - Add an item to the queue - """ - packedArgs = msgpack.packb( - [self.keys[""], job.id or "", job.name, job.timestamp], use_bin_type=True) + def addJobArgs(self, job: Job, waiting_children_key): # We are still lacking some arguments here: # ARGV[1] msgpacked arguments array - # [1] key prefix, - # [2] custom id (will not generate one automatically) - # [3] name - # [4] timestamp - # [5] parentKey? - # [6] waitChildrenKey key. - # [7] parent dependencies key. - # [8] parent? {id, queueKey} # [9] repeat job key jsonData = json.dumps(job.data, separators=(',', ':')) @@ -98,8 +86,48 @@ def addJob(self, job: Job): keys = self.getKeys(['wait', 'paused', 'meta', 'id', 'delayed', 'prioritized', 'completed', 'events', 'pc']) + parent = job.parent + parentKey = job.parentKey - return self.commands["addJob"](keys=keys, args=[packedArgs, jsonData, packedOpts]) + packedArgs = msgpack.packb( + [self.keys[""], job.id or "", job.name, job.timestamp, job.parentKey, + waiting_children_key, + f"{parentKey}:dependencies" if parentKey else None, parent],use_bin_type=True) + + args = [packedArgs, jsonData, packedOpts] + + return (keys,args) + + def addJob(self, job: Job): + """ + Add an item to the queue + """ + keys, args = self.addJobArgs(job, None) + + return self.commands["addJob"](keys=keys, args=args) + + def moveToWaitingChildrenArgs(self, job_id, token, opts): + keys = [self.toKey(job_id) + ":lock", + self.keys['active'], + self.keys['waiting-children'], + self.toKey(job_id)] + child_key = opts.get("child") if opts else None + args = [token, get_parent_key(child_key) or "", round(time.time() * 1000), job_id] + + return (keys, args) + + async def moveToWaitingChildren(self, job_id, token, opts): + keys, args = self.moveToWaitingChildrenArgs(job_id, token, opts) + result = await self.commands["moveToWaitingChildren"](keys=keys, args=args) + + if result is not None: + if result == 1: + return False + elif result == 0: + return True + elif result < 0: + raise self.finishedErrors(result, job_id, 'moveToWaitingChildren', 'active') + return None def getRangesArgs(self, types, start: int = 0, end: int = 1, asc: bool = False): transformed_types = [] diff --git a/python/bullmq/utils.py b/python/bullmq/utils.py index 7c1e3328bb..11f8bc95e2 100644 --- a/python/bullmq/utils.py +++ b/python/bullmq/utils.py @@ -12,4 +12,8 @@ def extract_result(job_task): # lets use a simple-but-effective error handling: # print error message and ignore the job print("ERROR:", e) - traceback.print_exc() \ No newline at end of file + traceback.print_exc() + +def get_parent_key(opts:dict): + if opts: + return f"{opts.get('queue')}:{opts.get('id')}" diff --git a/python/bullmq/worker.py b/python/bullmq/worker.py index d3e7166e41..9d70eb1627 100644 --- a/python/bullmq/worker.py +++ b/python/bullmq/worker.py @@ -1,5 +1,6 @@ from typing import Callable from uuid import uuid4 +from bullmq.custom_errors import WaitingChildrenError from bullmq.scripts import Scripts from bullmq.redis_connection import RedisConnection from bullmq.event_emitter import EventEmitter @@ -124,7 +125,10 @@ async def processJob(self, job: Job, token: str): result = await self.processor(job, token) if not self.forceClosing: await self.scripts.moveToCompleted(job, result, job.opts.get("removeOnComplete", False), token, self.opts, fetchNext=not self.closing) + job.returnvalue = result self.emit("completed", job, result) + except WaitingChildrenError: + return except Exception as err: try: print("Error processing job", err) diff --git a/python/tests/worker_tests.py b/python/tests/worker_tests.py index 2999bfca3e..18763332fa 100644 --- a/python/tests/worker_tests.py +++ b/python/tests/worker_tests.py @@ -5,8 +5,9 @@ """ from asyncio import Future -from bullmq import Queue, Worker, Job +from bullmq import Queue, Worker, Job, WaitingChildrenError from uuid import uuid4 +from enum import Enum import asyncio import unittest @@ -187,6 +188,90 @@ def completing(job: Job, result): await queue.close() await worker.close() + async def test_create_children_at_runtime(self): + parent_queue_name = f"__parent_queue__{uuid4().hex}" + parent_queue = Queue(parent_queue_name) + queue = Queue(queueName) + + class Step(int, Enum): + Initial = 1 + Second = 2 + Third = 3 + Finish = 4 + + waiting_children_step_executions = 0 + + async def parent_process(job: Job, token: str): + step = job.data.get("step") + while step != Step.Finish: + if step == Step.Initial: + await queue.add('child-1', {"foo": "bar" },{ + "parent": { + "id": job.id, + "queue": job.queueQualifiedName + } + }) + await job.updateData({ + "step": Step.Second + }) + step = Step.Second + elif step == Step.Second: + await queue.add('child-2', {"foo": "bar" },{ + "parent": { + "id": job.id, + "queue": job.queueQualifiedName + } + }) + await job.updateData({ + "step": Step.Third + }) + step = Step.Third + elif step == Step.Third: + nonlocal waiting_children_step_executions + waiting_children_step_executions += 1 + should_wait = await job.moveToWaitingChildren(token, {}) + if not should_wait: + await job.updateData({ + "step": Step.Finish + }) + step = Step.Finish + return Step.Finish + else: + raise WaitingChildrenError + else: + raise Exception("invalid step") + + async def children_process(job: Job, token: str): + await asyncio.sleep(0.2) + return None + + worker = Worker(parent_queue_name, parent_process, {}) + children_worker = Worker(queueName, children_process, {}) + + await parent_queue.add( "test", {"step": Step.Initial}, + { + "attempts": 3, + "backoff": 1000 + } + ) + + completed_events = Future() + + def completing(job: Job, result): + self.assertEqual(job.returnvalue, Step.Finish) + completed_events.set_result(None) + + worker.on("completed", completing) + + await completed_events + + self.assertEqual(waiting_children_step_executions, 2) + + await worker.close() + await children_worker.close() + await parent_queue.close() + await queue.close() + if __name__ == '__main__': unittest.main()