From 35dc9409f8cf99f82c354b97302846705dfbcc4a Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 20 Apr 2020 21:18:48 -0500 Subject: [PATCH] Use PeriodicCallback class from tornado (#3725) --- distributed/client.py | 9 +-- distributed/core.py | 15 ++--- distributed/counter.py | 4 +- distributed/deploy/adaptive_core.py | 4 +- distributed/deploy/cluster.py | 4 +- distributed/nanny.py | 5 +- distributed/scheduler.py | 7 +-- distributed/semaphore.py | 13 ++--- distributed/stealing.py | 14 ++--- distributed/utils.py | 88 ++++++++++++----------------- distributed/worker.py | 33 ++++------- 11 files changed, 77 insertions(+), 119 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index a48c236711..52c0e2b420 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -37,7 +37,7 @@ except ImportError: single_key = first from tornado import gen -from tornado.ioloop import IOLoop +from tornado.ioloop import IOLoop, PeriodicCallback from .batched import BatchedSend from .utils_comm import ( @@ -72,7 +72,6 @@ key_split, thread_state, no_default, - PeriodicCallback, LoopRunner, parse_timedelta, shutting_down, @@ -682,12 +681,10 @@ def __init__( self._periodic_callbacks = dict() self._periodic_callbacks["scheduler-info"] = PeriodicCallback( - self._update_scheduler_info, - scheduler_info_interval * 1000, - io_loop=self.loop, + self._update_scheduler_info, scheduler_info_interval * 1000, ) self._periodic_callbacks["heartbeat"] = PeriodicCallback( - self._heartbeat, heartbeat_interval * 1000, io_loop=self.loop + self._heartbeat, heartbeat_interval * 1000 ) self._start_arg = address diff --git a/distributed/core.py b/distributed/core.py index dd5e18d000..df0a55780e 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -12,7 +12,7 @@ import tblib from tlz import merge from tornado import gen -from tornado.ioloop import IOLoop +from tornado.ioloop import IOLoop, PeriodicCallback from .comm import ( connect, @@ -31,7 +31,6 @@ truncate_exception, ignoring, shutting_down, - PeriodicCallback, parse_timedelta, has_keyword, CancelledError, @@ -176,18 +175,14 @@ def stop(): self.periodic_callbacks = dict() - pc = PeriodicCallback(self.monitor.update, 500, io_loop=self.io_loop) + pc = PeriodicCallback(self.monitor.update, 500) self.periodic_callbacks["monitor"] = pc self._last_tick = time() - pc = PeriodicCallback( - self._measure_tick, - parse_timedelta( - dask.config.get("distributed.admin.tick.interval"), default="ms" - ) - * 1000, - io_loop=self.io_loop, + measure_tick_interval = parse_timedelta( + dask.config.get("distributed.admin.tick.interval"), default="ms" ) + pc = PeriodicCallback(self._measure_tick, measure_tick_interval * 1000) self.periodic_callbacks["tick"] = pc self.thread_id = 0 diff --git a/distributed/counter.py b/distributed/counter.py index ebc8cda610..feffb69ce8 100644 --- a/distributed/counter.py +++ b/distributed/counter.py @@ -1,8 +1,6 @@ from collections import defaultdict -from tornado.ioloop import IOLoop - -from .utils import PeriodicCallback +from tornado.ioloop import IOLoop, PeriodicCallback try: diff --git a/distributed/deploy/adaptive_core.py b/distributed/deploy/adaptive_core.py index 192e244bd0..7d15cb4c2c 100644 --- a/distributed/deploy/adaptive_core.py +++ b/distributed/deploy/adaptive_core.py @@ -1,11 +1,11 @@ import collections import math -from tornado.ioloop import IOLoop +from tornado.ioloop import IOLoop, PeriodicCallback import tlz as toolz from ..metrics import time -from ..utils import parse_timedelta, PeriodicCallback +from ..utils import parse_timedelta class AdaptiveCore: diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 592195443c..35e0b97c61 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -2,6 +2,7 @@ import logging import threading import warnings +from tornado.ioloop import PeriodicCallback import dask.config from dask.utils import format_bytes @@ -9,7 +10,6 @@ from .adaptive import Adaptive from ..utils import ( - PeriodicCallback, log_errors, ignoring, sync, @@ -324,7 +324,7 @@ def update(): cluster_repr_interval = parse_timedelta( dask.config.get("distributed.deploy.cluster-repr-interval", default="ms") ) - pc = PeriodicCallback(update, cluster_repr_interval * 1000, io_loop=self.loop) + pc = PeriodicCallback(update, cluster_repr_interval * 1000) self.periodic_callbacks["cluster-repr"] = pc pc.start() diff --git a/distributed/nanny.py b/distributed/nanny.py index 3f7c20f98f..f3a355dca8 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -11,7 +11,7 @@ import dask from dask.system import CPU_COUNT -from tornado.ioloop import IOLoop +from tornado.ioloop import IOLoop, PeriodicCallback from tornado import gen from .comm import get_address_host, unparse_host_port @@ -28,7 +28,6 @@ mp_context, silence_logging, json_load_robust, - PeriodicCallback, parse_timedelta, ignoring, TimeoutError, @@ -202,7 +201,7 @@ def __init__( self.scheduler = self.rpc(self.scheduler_addr) if self.memory_limit: - pc = PeriodicCallback(self.memory_monitor, 100, io_loop=self.loop) + pc = PeriodicCallback(self.memory_monitor, 100) self.periodic_callbacks["memory"] = pc if ( diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 86dd6b9203..521415c7c2 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -32,7 +32,7 @@ groupby, concat, ) -from tornado.ioloop import IOLoop +from tornado.ioloop import IOLoop, PeriodicCallback import dask @@ -64,7 +64,6 @@ no_default, parse_timedelta, parse_bytes, - PeriodicCallback, shutting_down, key_split_group, empty_context, @@ -1357,11 +1356,11 @@ def __init__( ) if self.worker_ttl: - pc = PeriodicCallback(self.check_worker_ttl, self.worker_ttl, io_loop=loop) + pc = PeriodicCallback(self.check_worker_ttl, self.worker_ttl) self.periodic_callbacks["worker-ttl"] = pc if self.idle_timeout: - pc = PeriodicCallback(self.check_idle, self.idle_timeout / 4, io_loop=loop) + pc = PeriodicCallback(self.check_idle, self.idle_timeout / 4) self.periodic_callbacks["idle-timeout"] = pc if extensions is None: diff --git a/distributed/semaphore.py b/distributed/semaphore.py index 976f54704c..263619c907 100644 --- a/distributed/semaphore.py +++ b/distributed/semaphore.py @@ -3,7 +3,8 @@ import asyncio import dask from asyncio import TimeoutError -from .utils import PeriodicCallback, log_errors, parse_timedelta +from tornado.ioloop import PeriodicCallback +from .utils import log_errors, parse_timedelta from .worker import get_client from .metrics import time import warnings @@ -66,14 +67,12 @@ def __init__(self, scheduler): self.scheduler.extensions["semaphores"] = self - validation_callback_time = 1000 * parse_timedelta( + validation_callback_time = parse_timedelta( dask.config.get("distributed.scheduler.locks.lease-validation-interval"), default="s", ) self._pc_lease_timeout = PeriodicCallback( - self._check_lease_timeout, - validation_callback_time, - io_loop=self.scheduler.loop, + self._check_lease_timeout, validation_callback_time * 1000, ) self._pc_lease_timeout.start() self.lease_timeout = parse_timedelta( @@ -344,9 +343,7 @@ def __init__(self, max_leases=1, name=None, client=None): ) self._refreshing_leases = False pc = PeriodicCallback( - self._refresh_leases, - callback_time=1000 * refresh_leases_interval, - io_loop=self.client.io_loop, + self._refresh_leases, callback_time=refresh_leases_interval * 1000 ) self.refresh_callback = pc # Registering the pc to the client here is important for proper cleanup diff --git a/distributed/stealing.py b/distributed/stealing.py index 0d552d1689..874ca98ce7 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -3,11 +3,13 @@ from math import log from time import time +from tornado.ioloop import PeriodicCallback + import dask from .comm.addressing import get_address_host from .core import CommClosedError from .diagnostics.plugin import SchedulerPlugin -from .utils import log_errors, parse_timedelta, PeriodicCallback +from .utils import log_errors, parse_timedelta from tlz import topk @@ -36,16 +38,12 @@ def __init__(self, scheduler): for worker in scheduler.workers: self.add_worker(worker=worker) - # `callback_time` is in milliseconds - callback_time = 1000 * parse_timedelta( + callback_time = parse_timedelta( dask.config.get("distributed.scheduler.work-stealing-interval"), default="ms", ) - pc = PeriodicCallback( - callback=self.balance, - callback_time=callback_time, - io_loop=self.scheduler.loop, - ) + # `callback_time` is in milliseconds + pc = PeriodicCallback(callback=self.balance, callback_time=callback_time * 1000) self._pc = pc self.scheduler.periodic_callbacks["stealing"] = pc self.scheduler.plugins.append(self) diff --git a/distributed/utils.py b/distributed/utils.py index 30af57a25f..46bd4c245e 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -48,7 +48,6 @@ ) import tlz as toolz -import tornado from tornado import gen from tornado.ioloop import IOLoop @@ -1118,17 +1117,6 @@ def nbytes(frame, _bytes_like=(bytes, bytearray)): return len(frame) -def PeriodicCallback(callback, callback_time, io_loop=None): - """ - Wrapper around tornado.IOLoop.PeriodicCallback, for compatibility - with removal of the `io_loop` parameter in Tornado 5.0. - """ - if tornado.version_info >= (5,): - return tornado.ioloop.PeriodicCallback(callback, callback_time) - else: - return tornado.ioloop.PeriodicCallback(callback, callback_time, io_loop) - - @contextmanager def time_warn(duration, text): start = time() @@ -1191,49 +1179,47 @@ def reset_logger_locks(): handler.createLock() -if tornado.version_info[0] >= 5: - - is_server_extension = False +is_server_extension = False - if "notebook" in sys.modules: - import traitlets - from notebook.notebookapp import NotebookApp - - is_server_extension = traitlets.config.Application.initialized() and isinstance( - traitlets.config.Application.instance(), NotebookApp - ) +if "notebook" in sys.modules: + import traitlets + from notebook.notebookapp import NotebookApp - if not is_server_extension: - is_kernel_and_no_running_loop = False + is_server_extension = traitlets.config.Application.initialized() and isinstance( + traitlets.config.Application.instance(), NotebookApp + ) - if is_kernel(): - try: - get_running_loop() - except RuntimeError: - is_kernel_and_no_running_loop = True - - if not is_kernel_and_no_running_loop: - - # TODO: Use tornado's AnyThreadEventLoopPolicy, instead of class below, - # once tornado > 6.0.3 is available. - if WINDOWS and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"): - # WindowsProactorEventLoopPolicy is not compatible with tornado 6 - # fallback to the pre-3.8 default of Selector - # https://github.com/tornadoweb/tornado/issues/2608 - BaseEventLoopPolicy = asyncio.WindowsSelectorEventLoopPolicy - else: - BaseEventLoopPolicy = asyncio.DefaultEventLoopPolicy +if not is_server_extension: + is_kernel_and_no_running_loop = False - class AnyThreadEventLoopPolicy(BaseEventLoopPolicy): - def get_event_loop(self): - try: - return super().get_event_loop() - except (RuntimeError, AssertionError): - loop = self.new_event_loop() - self.set_event_loop(loop) - return loop - - asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy()) + if is_kernel(): + try: + get_running_loop() + except RuntimeError: + is_kernel_and_no_running_loop = True + + if not is_kernel_and_no_running_loop: + + # TODO: Use tornado's AnyThreadEventLoopPolicy, instead of class below, + # once tornado > 6.0.3 is available. + if WINDOWS and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"): + # WindowsProactorEventLoopPolicy is not compatible with tornado 6 + # fallback to the pre-3.8 default of Selector + # https://github.com/tornadoweb/tornado/issues/2608 + BaseEventLoopPolicy = asyncio.WindowsSelectorEventLoopPolicy + else: + BaseEventLoopPolicy = asyncio.DefaultEventLoopPolicy + + class AnyThreadEventLoopPolicy(BaseEventLoopPolicy): + def get_event_loop(self): + try: + return super().get_event_loop() + except (RuntimeError, AssertionError): + loop = self.new_event_loop() + self.set_event_loop(loop) + return loop + + asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy()) @functools.lru_cache(1000) diff --git a/distributed/worker.py b/distributed/worker.py index c6734bbce9..ef95c1f4b7 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -24,7 +24,7 @@ from tlz import pluck, merge, first, keymap from tornado import gen -from tornado.ioloop import IOLoop +from tornado.ioloop import IOLoop, PeriodicCallback from . import profile, comm, system from .batched import BatchedSend @@ -55,7 +55,6 @@ json_load_robust, key_split, offload, - PeriodicCallback, parse_bytes, parse_timedelta, iscoroutinefunction, @@ -474,9 +473,6 @@ def __init__( self.available_resources = (resources or {}).copy() self.death_timeout = parse_timedelta(death_timeout) - self.memory_monitor_interval = parse_timedelta( - memory_monitor_interval, default="ms" - ) self.extensions = dict() if silence_logs: silence_logging(level=silence_logs) @@ -659,23 +655,22 @@ def __init__( "worker": self, } - pc = PeriodicCallback(self.heartbeat, 1000, io_loop=self.io_loop) + pc = PeriodicCallback(self.heartbeat, 1000) self.periodic_callbacks["heartbeat"] = pc pc = PeriodicCallback( - lambda: self.batched_stream.send({"op": "keep-alive"}), - 60000, - io_loop=self.io_loop, + lambda: self.batched_stream.send({"op": "keep-alive"}), 60000, ) self.periodic_callbacks["keep-alive"] = pc self._address = contact_address + self.memory_monitor_interval = parse_timedelta( + memory_monitor_interval, default="ms" + ) if self.memory_limit: self._memory_monitoring = False pc = PeriodicCallback( - self.memory_monitor, - self.memory_monitor_interval * 1000, - io_loop=self.io_loop, + self.memory_monitor, self.memory_monitor_interval * 1000, ) self.periodic_callbacks["memory"] = pc @@ -688,19 +683,13 @@ def __init__( setproctitle("dask-worker [not started]") - pc = PeriodicCallback( - self.trigger_profile, - parse_timedelta( - dask.config.get("distributed.worker.profile.interval"), default="ms" - ) - * 1000, - io_loop=self.io_loop, + profile_trigger_interval = parse_timedelta( + dask.config.get("distributed.worker.profile.interval"), default="ms" ) + pc = PeriodicCallback(self.trigger_profile, profile_trigger_interval * 1000) self.periodic_callbacks["profile"] = pc - pc = PeriodicCallback( - self.cycle_profile, profile_cycle_interval * 1000, io_loop=self.io_loop - ) + pc = PeriodicCallback(self.cycle_profile, profile_cycle_interval * 1000) self.periodic_callbacks["profile-cycle"] = pc self.plugins = {}