Skip to content

Commit

Permalink
feat(python): add moveToWaitingChildren job method (#2049)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Jul 6, 2023
1 parent 08a8a7f commit 6d0e224
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 26 deletions.
1 change: 1 addition & 0 deletions python/bullmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
from bullmq.queue import Queue
from bullmq.job import Job
from bullmq.worker import Worker
from bullmq.custom_errors import WaitingChildrenError
2 changes: 1 addition & 1 deletion python/bullmq/backoffs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class Backoffs:
def normalize(backoff: int | BackoffOptions):
if type(backoff) == int and math.isfinite(backoff):
return {
"type": 'fixed',
"type": "fixed",
"delay": backoff
}
elif backoff:
Expand Down
1 change: 1 addition & 0 deletions python/bullmq/custom_errors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from bullmq.custom_errors.waiting_children_error import WaitingChildrenError
3 changes: 3 additions & 0 deletions python/bullmq/custom_errors/waiting_children_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class WaitingChildrenError(Exception):
"Raised when job is moved to waiting-children"
pass
21 changes: 15 additions & 6 deletions python/bullmq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
if TYPE_CHECKING:
from bullmq.queue import Queue
from bullmq.types import JobOptions
from bullmq.utils import get_parent_key

import json
import time
Expand Down Expand Up @@ -34,6 +35,7 @@ def __init__(self, queue: Queue, name: str, data: Any, opts: JobOptions = {}):
self.timestamp = opts.get("timestamp", round(time.time() * 1000))
final_opts = {"attempts": 0, "delay": 0}
final_opts.update(opts or {})
final_opts.update({"backoff": Backoffs.normalize(opts.get('backoff'))})
self.discarded = False
self.opts = final_opts
self.queue = queue
Expand All @@ -48,6 +50,9 @@ def __init__(self, queue: Queue, name: str, data: Any, opts: JobOptions = {}):
self.returnvalue = None
self.failedReason = None
self.repeatJobKey = None
parent = opts.get("parent")
self.parentKey = get_parent_key(parent)
self.parent = {"id": parent.get("id"), "queueKey": parent.get("queue")} if parent else None
self.stacktrace: List[str] = []
self.scripts = Scripts(queue.prefix, queue.name, queue.redisConnection)

Expand Down Expand Up @@ -143,6 +148,12 @@ async def saveStacktrace(self, pipe, err:str):

await self.scripts.commands["saveStacktrace"](keys=keys, args=args, client=pipe)

def moveToWaitingChildren(self, token, opts:dict):
return self.scripts.moveToWaitingChildren(self.id, token, opts)

@property
def queueQualifiedName(self):
return f"{self.queue.prefix}:{self.queue.name}"

@staticmethod
def fromJSON(queue: Queue, rawData: dict, jobId: str | None = None):
Expand Down Expand Up @@ -181,13 +192,11 @@ def fromJSON(queue: Queue, rawData: dict, jobId: str | None = None):

job.stacktrace = json.loads(rawData.get("stacktrace", "[]"))

# if (json.parentKey) {
# job.parentKey = json.parentKey;
# }
if rawData.get("parentKey"):
job.parentKey = rawData.get("parentKey")

# if (json.parent) {
# job.parent = JSON.parse(json.parent);
# }
if rawData.get("parent"):
job.parent = json.loads(rawData.get("parent"))

return job

Expand Down
62 changes: 45 additions & 17 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from __future__ import annotations
from redis import Redis
from bullmq.error_code import ErrorCode
from bullmq.utils import isRedisVersionLowerThan
from bullmq.utils import isRedisVersionLowerThan, get_parent_key
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
from bullmq.job import Job
Expand Down Expand Up @@ -41,6 +41,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-10.lua")),
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-8.lua")),
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-13.lua")),
"moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-4.lua")),
"obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")),
"pause": self.redisClient.register_script(self.getScript("pause-5.lua")),
"removeJob": self.redisClient.register_script(self.getScript("removeJob-1.lua")),
Expand All @@ -53,7 +54,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
}

# loop all the names and add them to the keys object
names = ["", "active", "wait", "paused", "completed", "failed", "delayed",
names = ["", "active", "wait", "waiting-children", "paused", "completed", "failed", "delayed",
"stalled", "limiter", "prioritized", "id", "stalled-check", "meta", "pc", "events", "waiting-children"]
for name in names:
self.keys[name] = self.toKey(name)
Expand All @@ -75,31 +76,58 @@ def mapKey(key):
return self.keys[key]
return list(map(mapKey, keys))

def addJob(self, job: Job):
"""
Add an item to the queue
"""
packedArgs = msgpack.packb(
[self.keys[""], job.id or "", job.name, job.timestamp], use_bin_type=True)
def addJobArgs(self, job: Job, waiting_children_key):
# We are still lacking some arguments here:
# ARGV[1] msgpacked arguments array
# [1] key prefix,
# [2] custom id (will not generate one automatically)
# [3] name
# [4] timestamp
# [5] parentKey?
# [6] waitChildrenKey key.
# [7] parent dependencies key.
# [8] parent? {id, queueKey}
# [9] repeat job key

jsonData = json.dumps(job.data, separators=(',', ':'))
packedOpts = msgpack.packb(job.opts)

keys = self.getKeys(['wait', 'paused', 'meta', 'id',
'delayed', 'prioritized', 'completed', 'events', 'pc'])
parent = job.parent
parentKey = job.parentKey

return self.commands["addJob"](keys=keys, args=[packedArgs, jsonData, packedOpts])
packedArgs = msgpack.packb(
[self.keys[""], job.id or "", job.name, job.timestamp, job.parentKey,
waiting_children_key,
f"{parentKey}:dependencies" if parentKey else None, parent],use_bin_type=True)

args = [packedArgs, jsonData, packedOpts]

return (keys,args)

def addJob(self, job: Job):
"""
Add an item to the queue
"""
keys, args = self.addJobArgs(job, None)

return self.commands["addJob"](keys=keys, args=args)

def moveToWaitingChildrenArgs(self, job_id, token, opts):
keys = [self.toKey(job_id) + ":lock",
self.keys['active'],
self.keys['waiting-children'],
self.toKey(job_id)]
child_key = opts.get("child") if opts else None
args = [token, get_parent_key(child_key) or "", round(time.time() * 1000), job_id]

return (keys, args)

async def moveToWaitingChildren(self, job_id, token, opts):
keys, args = self.moveToWaitingChildrenArgs(job_id, token, opts)
result = await self.commands["moveToWaitingChildren"](keys=keys, args=args)

if result is not None:
if result == 1:
return False
elif result == 0:
return True
elif result < 0:
raise self.finishedErrors(result, job_id, 'moveToWaitingChildren', 'active')
return None

def getRangesArgs(self, types, start: int = 0, end: int = 1, asc: bool = False):
transformed_types = []
Expand Down
6 changes: 5 additions & 1 deletion python/bullmq/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,8 @@ def extract_result(job_task):
# lets use a simple-but-effective error handling:
# print error message and ignore the job
print("ERROR:", e)
traceback.print_exc()
traceback.print_exc()

def get_parent_key(opts:dict):
if opts:
return f"{opts.get('queue')}:{opts.get('id')}"
4 changes: 4 additions & 0 deletions python/bullmq/worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Callable
from uuid import uuid4
from bullmq.custom_errors import WaitingChildrenError
from bullmq.scripts import Scripts
from bullmq.redis_connection import RedisConnection
from bullmq.event_emitter import EventEmitter
Expand Down Expand Up @@ -124,7 +125,10 @@ async def processJob(self, job: Job, token: str):
result = await self.processor(job, token)
if not self.forceClosing:
await self.scripts.moveToCompleted(job, result, job.opts.get("removeOnComplete", False), token, self.opts, fetchNext=not self.closing)
job.returnvalue = result
self.emit("completed", job, result)
except WaitingChildrenError:
return
except Exception as err:
try:
print("Error processing job", err)
Expand Down
87 changes: 86 additions & 1 deletion python/tests/worker_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
"""

from asyncio import Future
from bullmq import Queue, Worker, Job
from bullmq import Queue, Worker, Job, WaitingChildrenError
from uuid import uuid4
from enum import Enum

import asyncio
import unittest
Expand Down Expand Up @@ -187,6 +188,90 @@ def completing(job: Job, result):
await queue.close()
await worker.close()

async def test_create_children_at_runtime(self):
parent_queue_name = f"__parent_queue__{uuid4().hex}"
parent_queue = Queue(parent_queue_name)
queue = Queue(queueName)

class Step(int, Enum):
Initial = 1
Second = 2
Third = 3
Finish = 4

waiting_children_step_executions = 0

async def parent_process(job: Job, token: str):
step = job.data.get("step")
while step != Step.Finish:
if step == Step.Initial:
await queue.add('child-1', {"foo": "bar" },{
"parent": {
"id": job.id,
"queue": job.queueQualifiedName
}
})
await job.updateData({
"step": Step.Second
})
step = Step.Second
elif step == Step.Second:
await queue.add('child-2', {"foo": "bar" },{
"parent": {
"id": job.id,
"queue": job.queueQualifiedName
}
})
await job.updateData({
"step": Step.Third
})
step = Step.Third
elif step == Step.Third:
nonlocal waiting_children_step_executions
waiting_children_step_executions += 1
should_wait = await job.moveToWaitingChildren(token, {})
if not should_wait:
await job.updateData({
"step": Step.Finish
})
step = Step.Finish
return Step.Finish
else:
raise WaitingChildrenError
else:
raise Exception("invalid step")

async def children_process(job: Job, token: str):
await asyncio.sleep(0.2)
return None

worker = Worker(parent_queue_name, parent_process, {})
children_worker = Worker(queueName, children_process, {})

await parent_queue.add( "test", {"step": Step.Initial},
{
"attempts": 3,
"backoff": 1000
}
)

completed_events = Future()

def completing(job: Job, result):
self.assertEqual(job.returnvalue, Step.Finish)
completed_events.set_result(None)

worker.on("completed", completing)

await completed_events

self.assertEqual(waiting_children_step_executions, 2)

await worker.close()
await children_worker.close()
await parent_queue.close()
await queue.close()


if __name__ == '__main__':
unittest.main()

0 comments on commit 6d0e224

Please sign in to comment.