Skip to content

Commit

Permalink
bindings: Improve bind_gen.TaskQueue with using Python3 features
Browse files Browse the repository at this point in the history
Change-Id: I2287bf6125efadd7b2d9c9805b6c8447085dbea6
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/4664981
Reviewed-by: Kentaro Hara <haraken@chromium.org>
Commit-Queue: Yuki Shiino <yukishiino@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1165773}
  • Loading branch information
yuki3 authored and Chromium LUCI CQ committed Jul 5, 2023
1 parent 2cbc252 commit eefb03b
Showing 1 changed file with 24 additions and 20 deletions.
44 changes: 24 additions & 20 deletions third_party/blink/renderer/bindings/scripts/bind_gen/task_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import dataclasses
import functools
import multiprocessing
import sys
import typing

from .package_initializer import package_initializer

Expand All @@ -15,6 +17,13 @@ class TaskQueue(object):
tasks will be executed in parallel.
"""

@dataclasses.dataclass
class _Task(object):
workload: int
func: callable
args: typing.List[typing.Any]
kwargs: typing.Dict[str, typing.Any]

def __init__(self, single_process=False):
"""
Args:
Expand All @@ -36,8 +45,7 @@ def __init__(self, single_process=False):
self._pool_size = min(self._pool_size, 56)
self._pool = multiprocessing.Pool(self._pool_size,
package_initializer().init)
self._requested_tasks = [] # List of (workload, func, args, kwargs)
self._worker_tasks = [] # List of multiprocessing.pool.AsyncResult
self._requested_tasks = [] # List of _Task
self._did_run = False

def post_task(self, func, *args, **kwargs):
Expand All @@ -60,7 +68,7 @@ def post_task_with_workload(self, workload, func, *args, **kwargs):
multiprocessor systems.
"""
assert not self._did_run
self._requested_tasks.append((workload, func, args, kwargs))
self._requested_tasks.append(self._Task(workload, func, args, kwargs))

def run(self, report_progress=None):
"""
Expand All @@ -72,46 +80,42 @@ def run(self, report_progress=None):
"""
assert report_progress is None or callable(report_progress)
assert not self._did_run
assert not self._worker_tasks
self._did_run = True

self._requested_tasks = sorted(self._requested_tasks,
key=lambda task: task.workload,
reverse=True)

if self._single_process:
self._run_in_sequence(report_progress)
else:
self._run_in_parallel(report_progress)

def _tasks_by_workload(self):
return sorted(
self._requested_tasks,
key=lambda task: task[0], # workload
reverse=True)

def _run_in_sequence(self, report_progress):
for index, task in enumerate(self._tasks_by_workload()):
_, func, args, kwargs = task
for index, task in enumerate(self._requested_tasks):
report_progress(len(self._requested_tasks), index)
func(*args, **kwargs)
task.func(*task.args, **task.kwargs)
report_progress(len(self._requested_tasks), len(self._requested_tasks))

def _run_in_parallel(self, report_progress):
for task in self._tasks_by_workload():
_, func, args, kwargs = task
self._worker_tasks.append(
self._pool.apply_async(func, args, kwargs))
worker_tasks = [] # List of multiprocessing.pool.AsyncResult
for task in self._requested_tasks:
worker_tasks.append(
self._pool.apply_async(task.func, task.args, task.kwargs))
self._pool.close()

def report_worker_task_progress():
if not report_progress:
return
done_count = functools.reduce(
lambda count, worker_task: count + bool(worker_task.ready()),
self._worker_tasks, 0)
report_progress(len(self._worker_tasks), done_count)
worker_tasks, 0)
report_progress(len(worker_tasks), done_count)

timeout_in_sec = 1
while True:
report_worker_task_progress()
for worker_task in self._worker_tasks:
for worker_task in worker_tasks:
if not worker_task.ready():
worker_task.wait(timeout_in_sec)
break
Expand Down

0 comments on commit eefb03b

Please sign in to comment.