Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(python): respect concurrency in worker #2062

Merged
merged 5 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions python/bullmq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,19 @@ async def run(self):
token_postfix = 0

while not self.closed:
if len(jobs) == 0 and len(self.processing) < self.opts.get("concurrency") and not self.closing:
while len(self.processing) < self.opts.get("concurrency") and not self.closing:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this is a proper solution as this could result in multiple calls to BRPOPLPUSH instead of only 1, as we do on the typescript version.

Copy link
Collaborator Author

@roggervalf roggervalf Jul 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@manast manast Jul 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the difference is the asyncQueue that avoids the excessive number of calls. It waits until BRPOPLPUSH has timedout or returned a job id. It is a bit tricky to understand the code, but that's the idea.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at the end, the magic is handled by waiting attribute in worker class

token_postfix+=1
token = f'{self.id}:{token_postfix}'
waiting_job = asyncio.ensure_future(self.getNextJob(token))
self.processing.add(waiting_job)

if len(jobs) > 0:
jobs_to_process = [self.processJob(job, job.token) for job in jobs]
processing_jobs = [asyncio.ensure_future(
j) for j in jobs_to_process]
self.processing.update(processing_jobs)

try:
jobs, pending = await getCompleted(self.processing)

jobs_to_process = [self.processJob(job, job.token) for job in jobs]
processing_jobs = [asyncio.ensure_future(
j) for j in jobs_to_process]
pending.update(processing_jobs)
self.processing = pending

if (len(jobs) == 0 or len(self.processing) == 0) and self.closing:
Expand Down Expand Up @@ -181,12 +179,11 @@ async def close(self, force: bool = False):
"""
Close the worker
"""
self.closing = True
if force:
self.forceClosing = True
self.cancelProcessing()

self.closing = True

await self.blockingRedisConnection.close()
await self.redisConnection.close()

Expand All @@ -196,7 +193,7 @@ def cancelProcessing(self):
job.cancel()


async def getCompleted(task_set: set) -> tuple[list[Job], list]:
async def getCompleted(task_set: set) -> tuple[list[Job], set]:
job_set, pending = await asyncio.wait(task_set, return_when=asyncio.FIRST_COMPLETED)
jobs = [extract_result(job_task) for job_task in job_set]
# we filter `None` out to remove:
Expand Down
40 changes: 40 additions & 0 deletions python/tests/worker_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,46 @@ def completing(job: Job, result):
await parent_queue.close()
await queue.close()

async def test_process_job_respecting_the_concurrency_set(self):
num_jobs_processing = 0
pending_message_to_process = 8
wait = 0.01
job_count = 0
queue = Queue(queueName)

async def process(job: Job, token: str):
nonlocal num_jobs_processing
nonlocal wait
nonlocal pending_message_to_process
num_jobs_processing += 1
self.assertLess(num_jobs_processing, 5)
wait += 0.1
await asyncio.sleep(wait)
self.assertEqual(num_jobs_processing, min(pending_message_to_process, 4))
pending_message_to_process -= 1
num_jobs_processing -= 1

return None

for _ in range(8):
await queue.add("test", data={})

worker = Worker(queueName, process, {"concurrency": 4 })

completed_events = Future()

def completing(job: Job, result):
nonlocal job_count
if job_count == 7:
completed_events.set_result(None)
job_count += 1

worker.on("completed", completing)

await completed_events

await queue.close()
await worker.close()

if __name__ == '__main__':
unittest.main()
Loading