diff --git a/package.json b/package.json index e1d00280bd..c6e945cbd0 100644 --- a/package.json +++ b/package.json @@ -174,7 +174,7 @@ { "releaseRules": [ { - "scope": "python*", + "message": "*python*", "release": false } ] diff --git a/python/bullmq/flow_producer.py b/python/bullmq/flow_producer.py index e6b562cb68..fbdcfa2782 100644 --- a/python/bullmq/flow_producer.py +++ b/python/bullmq/flow_producer.py @@ -50,6 +50,15 @@ async def addChildren(self, nodes, parent, queues_opts, pipe): children.append(job) return children + async def addNodes(self, nodes: list[dict], pipe): + trees = [] + for node in nodes: + parent_opts = node.get("opts", {}).get("parent", None) + jobs_tree = await self.addNode(node, {"parentOpts": parent_opts},None, pipe) + trees.append(jobs_tree) + + return trees + async def addNode(self, node: dict, parent: dict, queues_opts: dict, pipe): prefix = node.get("prefix", self.prefix) queue = self.queueFromNode(node, QueueKeys(prefix), prefix) @@ -116,6 +125,15 @@ async def add(self, flow: dict, opts: dict = {}): return result + async def addBulk(self, flows: list[dict]): + result = None + async with self.redisConnection.conn.pipeline(transaction=True) as pipe: + job_trees = await self.addNodes(flows, pipe) + await pipe.execute() + result = job_trees + + return result + def close(self): """ Close the flow instance. diff --git a/python/tests/flow_tests.py b/python/tests/flow_tests.py index 1f12fcc726..a835453509 100644 --- a/python/tests/flow_tests.py +++ b/python/tests/flow_tests.py @@ -52,7 +52,7 @@ async def process2(job: Job, token: str): children_worker = Worker(queue_name, process1) flow = FlowProducer() - tree = await flow.add( + await flow.add( { "name": 'parent-job', "queueName": parent_queue_name, @@ -77,5 +77,68 @@ async def process2(job: Job, token: str): await parent_queue.obliterate() await parent_queue.close() + async def test_addBulk_should_process_children_before_parent(self): + child_job_name = 'child-job' + children_data = [ + {"idx": 0, "bar": 'something'}, + {"idx": 1, "baz": 'something'} + ] + parent_queue_name = f"__test_parent_queue__{uuid4().hex}" + + processing_children = Future() + + processed_children = 0 + async def process1(job: Job, token: str): + nonlocal processed_children + processed_children+=1 + if processed_children == len(children_data): + processing_children.set_result(None) + return children_data[job.data.get("idx")] + + processing_parents = Future() + + processed_parents = 0 + async def process2(job: Job, token: str): + nonlocal processed_parents + processed_parents+=1 + if processed_parents == 2: + processing_parents.set_result(None) + return 1 + + parent_worker = Worker(parent_queue_name, process2) + children_worker = Worker(queue_name, process1) + + flow = FlowProducer() + await flow.addBulk([ + { + "name": 'parent-job-1', + "queueName": parent_queue_name, + "data": {}, + "children": [ + {"name": child_job_name, "data": {"idx": 0, "foo": 'bar'}, "queueName": queue_name} + ] + }, + { + "name": 'parent-job-2', + "queueName": parent_queue_name, + "data": {}, + "children": [ + {"name": child_job_name, "data": {"idx": 1, "foo": 'baz'}, "queueName": queue_name} + ] + } + ]) + + await processing_children + await processing_parents + + await parent_worker.close() + await children_worker.close() + await flow.close() + + parent_queue = Queue(parent_queue_name) + await parent_queue.pause() + await parent_queue.obliterate() + await parent_queue.close() + if __name__ == '__main__': unittest.main()