Skip to content

Commit

Permalink
feat(flow-producer): add addBulk method (python) (#2174)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Sep 7, 2023
1 parent 0f4af17 commit c67dfb4
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 2 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@
{
"releaseRules": [
{
"scope": "python*",
"message": "*python*",
"release": false
}
]
Expand Down
18 changes: 18 additions & 0 deletions python/bullmq/flow_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ async def addChildren(self, nodes, parent, queues_opts, pipe):
children.append(job)
return children

async def addNodes(self, nodes: list[dict], pipe):
trees = []
for node in nodes:
parent_opts = node.get("opts", {}).get("parent", None)
jobs_tree = await self.addNode(node, {"parentOpts": parent_opts},None, pipe)
trees.append(jobs_tree)

return trees

async def addNode(self, node: dict, parent: dict, queues_opts: dict, pipe):
prefix = node.get("prefix", self.prefix)
queue = self.queueFromNode(node, QueueKeys(prefix), prefix)
Expand Down Expand Up @@ -116,6 +125,15 @@ async def add(self, flow: dict, opts: dict = {}):

return result

async def addBulk(self, flows: list[dict]):
result = None
async with self.redisConnection.conn.pipeline(transaction=True) as pipe:
job_trees = await self.addNodes(flows, pipe)
await pipe.execute()
result = job_trees

return result

def close(self):
"""
Close the flow instance.
Expand Down
65 changes: 64 additions & 1 deletion python/tests/flow_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async def process2(job: Job, token: str):
children_worker = Worker(queue_name, process1)

flow = FlowProducer()
tree = await flow.add(
await flow.add(
{
"name": 'parent-job',
"queueName": parent_queue_name,
Expand All @@ -77,5 +77,68 @@ async def process2(job: Job, token: str):
await parent_queue.obliterate()
await parent_queue.close()

async def test_addBulk_should_process_children_before_parent(self):
child_job_name = 'child-job'
children_data = [
{"idx": 0, "bar": 'something'},
{"idx": 1, "baz": 'something'}
]
parent_queue_name = f"__test_parent_queue__{uuid4().hex}"

processing_children = Future()

processed_children = 0
async def process1(job: Job, token: str):
nonlocal processed_children
processed_children+=1
if processed_children == len(children_data):
processing_children.set_result(None)
return children_data[job.data.get("idx")]

processing_parents = Future()

processed_parents = 0
async def process2(job: Job, token: str):
nonlocal processed_parents
processed_parents+=1
if processed_parents == 2:
processing_parents.set_result(None)
return 1

parent_worker = Worker(parent_queue_name, process2)
children_worker = Worker(queue_name, process1)

flow = FlowProducer()
await flow.addBulk([
{
"name": 'parent-job-1',
"queueName": parent_queue_name,
"data": {},
"children": [
{"name": child_job_name, "data": {"idx": 0, "foo": 'bar'}, "queueName": queue_name}
]
},
{
"name": 'parent-job-2',
"queueName": parent_queue_name,
"data": {},
"children": [
{"name": child_job_name, "data": {"idx": 1, "foo": 'baz'}, "queueName": queue_name}
]
}
])

await processing_children
await processing_parents

await parent_worker.close()
await children_worker.close()
await flow.close()

parent_queue = Queue(parent_queue_name)
await parent_queue.pause()
await parent_queue.obliterate()
await parent_queue.close()

if __name__ == '__main__':
unittest.main()

0 comments on commit c67dfb4

Please sign in to comment.