Skip to content

Commit

Permalink
Use PeriodicCallback class from tornado (#3725)
Browse files Browse the repository at this point in the history
  • Loading branch information
jrbourbeau authored Apr 21, 2020
1 parent 8376f22 commit 35dc940
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 119 deletions.
9 changes: 3 additions & 6 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
except ImportError:
single_key = first
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.ioloop import IOLoop, PeriodicCallback

from .batched import BatchedSend
from .utils_comm import (
Expand Down Expand Up @@ -72,7 +72,6 @@
key_split,
thread_state,
no_default,
PeriodicCallback,
LoopRunner,
parse_timedelta,
shutting_down,
Expand Down Expand Up @@ -682,12 +681,10 @@ def __init__(

self._periodic_callbacks = dict()
self._periodic_callbacks["scheduler-info"] = PeriodicCallback(
self._update_scheduler_info,
scheduler_info_interval * 1000,
io_loop=self.loop,
self._update_scheduler_info, scheduler_info_interval * 1000,
)
self._periodic_callbacks["heartbeat"] = PeriodicCallback(
self._heartbeat, heartbeat_interval * 1000, io_loop=self.loop
self._heartbeat, heartbeat_interval * 1000
)

self._start_arg = address
Expand Down
15 changes: 5 additions & 10 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import tblib
from tlz import merge
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.ioloop import IOLoop, PeriodicCallback

from .comm import (
connect,
Expand All @@ -31,7 +31,6 @@
truncate_exception,
ignoring,
shutting_down,
PeriodicCallback,
parse_timedelta,
has_keyword,
CancelledError,
Expand Down Expand Up @@ -176,18 +175,14 @@ def stop():

self.periodic_callbacks = dict()

pc = PeriodicCallback(self.monitor.update, 500, io_loop=self.io_loop)
pc = PeriodicCallback(self.monitor.update, 500)
self.periodic_callbacks["monitor"] = pc

self._last_tick = time()
pc = PeriodicCallback(
self._measure_tick,
parse_timedelta(
dask.config.get("distributed.admin.tick.interval"), default="ms"
)
* 1000,
io_loop=self.io_loop,
measure_tick_interval = parse_timedelta(
dask.config.get("distributed.admin.tick.interval"), default="ms"
)
pc = PeriodicCallback(self._measure_tick, measure_tick_interval * 1000)
self.periodic_callbacks["tick"] = pc

self.thread_id = 0
Expand Down
4 changes: 1 addition & 3 deletions distributed/counter.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from collections import defaultdict

from tornado.ioloop import IOLoop

from .utils import PeriodicCallback
from tornado.ioloop import IOLoop, PeriodicCallback


try:
Expand Down
4 changes: 2 additions & 2 deletions distributed/deploy/adaptive_core.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import collections
import math

from tornado.ioloop import IOLoop
from tornado.ioloop import IOLoop, PeriodicCallback
import tlz as toolz

from ..metrics import time
from ..utils import parse_timedelta, PeriodicCallback
from ..utils import parse_timedelta


class AdaptiveCore:
Expand Down
4 changes: 2 additions & 2 deletions distributed/deploy/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
import logging
import threading
import warnings
from tornado.ioloop import PeriodicCallback

import dask.config
from dask.utils import format_bytes

from .adaptive import Adaptive

from ..utils import (
PeriodicCallback,
log_errors,
ignoring,
sync,
Expand Down Expand Up @@ -324,7 +324,7 @@ def update():
cluster_repr_interval = parse_timedelta(
dask.config.get("distributed.deploy.cluster-repr-interval", default="ms")
)
pc = PeriodicCallback(update, cluster_repr_interval * 1000, io_loop=self.loop)
pc = PeriodicCallback(update, cluster_repr_interval * 1000)
self.periodic_callbacks["cluster-repr"] = pc
pc.start()

Expand Down
5 changes: 2 additions & 3 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import dask
from dask.system import CPU_COUNT
from tornado.ioloop import IOLoop
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado import gen

from .comm import get_address_host, unparse_host_port
Expand All @@ -28,7 +28,6 @@
mp_context,
silence_logging,
json_load_robust,
PeriodicCallback,
parse_timedelta,
ignoring,
TimeoutError,
Expand Down Expand Up @@ -202,7 +201,7 @@ def __init__(
self.scheduler = self.rpc(self.scheduler_addr)

if self.memory_limit:
pc = PeriodicCallback(self.memory_monitor, 100, io_loop=self.loop)
pc = PeriodicCallback(self.memory_monitor, 100)
self.periodic_callbacks["memory"] = pc

if (
Expand Down
7 changes: 3 additions & 4 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
groupby,
concat,
)
from tornado.ioloop import IOLoop
from tornado.ioloop import IOLoop, PeriodicCallback

import dask

Expand Down Expand Up @@ -64,7 +64,6 @@
no_default,
parse_timedelta,
parse_bytes,
PeriodicCallback,
shutting_down,
key_split_group,
empty_context,
Expand Down Expand Up @@ -1357,11 +1356,11 @@ def __init__(
)

if self.worker_ttl:
pc = PeriodicCallback(self.check_worker_ttl, self.worker_ttl, io_loop=loop)
pc = PeriodicCallback(self.check_worker_ttl, self.worker_ttl)
self.periodic_callbacks["worker-ttl"] = pc

if self.idle_timeout:
pc = PeriodicCallback(self.check_idle, self.idle_timeout / 4, io_loop=loop)
pc = PeriodicCallback(self.check_idle, self.idle_timeout / 4)
self.periodic_callbacks["idle-timeout"] = pc

if extensions is None:
Expand Down
13 changes: 5 additions & 8 deletions distributed/semaphore.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import asyncio
import dask
from asyncio import TimeoutError
from .utils import PeriodicCallback, log_errors, parse_timedelta
from tornado.ioloop import PeriodicCallback
from .utils import log_errors, parse_timedelta
from .worker import get_client
from .metrics import time
import warnings
Expand Down Expand Up @@ -66,14 +67,12 @@ def __init__(self, scheduler):

self.scheduler.extensions["semaphores"] = self

validation_callback_time = 1000 * parse_timedelta(
validation_callback_time = parse_timedelta(
dask.config.get("distributed.scheduler.locks.lease-validation-interval"),
default="s",
)
self._pc_lease_timeout = PeriodicCallback(
self._check_lease_timeout,
validation_callback_time,
io_loop=self.scheduler.loop,
self._check_lease_timeout, validation_callback_time * 1000,
)
self._pc_lease_timeout.start()
self.lease_timeout = parse_timedelta(
Expand Down Expand Up @@ -344,9 +343,7 @@ def __init__(self, max_leases=1, name=None, client=None):
)
self._refreshing_leases = False
pc = PeriodicCallback(
self._refresh_leases,
callback_time=1000 * refresh_leases_interval,
io_loop=self.client.io_loop,
self._refresh_leases, callback_time=refresh_leases_interval * 1000
)
self.refresh_callback = pc
# Registering the pc to the client here is important for proper cleanup
Expand Down
14 changes: 6 additions & 8 deletions distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
from math import log
from time import time

from tornado.ioloop import PeriodicCallback

import dask
from .comm.addressing import get_address_host
from .core import CommClosedError
from .diagnostics.plugin import SchedulerPlugin
from .utils import log_errors, parse_timedelta, PeriodicCallback
from .utils import log_errors, parse_timedelta

from tlz import topk

Expand Down Expand Up @@ -36,16 +38,12 @@ def __init__(self, scheduler):
for worker in scheduler.workers:
self.add_worker(worker=worker)

# `callback_time` is in milliseconds
callback_time = 1000 * parse_timedelta(
callback_time = parse_timedelta(
dask.config.get("distributed.scheduler.work-stealing-interval"),
default="ms",
)
pc = PeriodicCallback(
callback=self.balance,
callback_time=callback_time,
io_loop=self.scheduler.loop,
)
# `callback_time` is in milliseconds
pc = PeriodicCallback(callback=self.balance, callback_time=callback_time * 1000)
self._pc = pc
self.scheduler.periodic_callbacks["stealing"] = pc
self.scheduler.plugins.append(self)
Expand Down
88 changes: 37 additions & 51 deletions distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
)

import tlz as toolz
import tornado
from tornado import gen
from tornado.ioloop import IOLoop

Expand Down Expand Up @@ -1118,17 +1117,6 @@ def nbytes(frame, _bytes_like=(bytes, bytearray)):
return len(frame)


def PeriodicCallback(callback, callback_time, io_loop=None):
"""
Wrapper around tornado.IOLoop.PeriodicCallback, for compatibility
with removal of the `io_loop` parameter in Tornado 5.0.
"""
if tornado.version_info >= (5,):
return tornado.ioloop.PeriodicCallback(callback, callback_time)
else:
return tornado.ioloop.PeriodicCallback(callback, callback_time, io_loop)


@contextmanager
def time_warn(duration, text):
start = time()
Expand Down Expand Up @@ -1191,49 +1179,47 @@ def reset_logger_locks():
handler.createLock()


if tornado.version_info[0] >= 5:

is_server_extension = False
is_server_extension = False

if "notebook" in sys.modules:
import traitlets
from notebook.notebookapp import NotebookApp

is_server_extension = traitlets.config.Application.initialized() and isinstance(
traitlets.config.Application.instance(), NotebookApp
)
if "notebook" in sys.modules:
import traitlets
from notebook.notebookapp import NotebookApp

if not is_server_extension:
is_kernel_and_no_running_loop = False
is_server_extension = traitlets.config.Application.initialized() and isinstance(
traitlets.config.Application.instance(), NotebookApp
)

if is_kernel():
try:
get_running_loop()
except RuntimeError:
is_kernel_and_no_running_loop = True

if not is_kernel_and_no_running_loop:

# TODO: Use tornado's AnyThreadEventLoopPolicy, instead of class below,
# once tornado > 6.0.3 is available.
if WINDOWS and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"):
# WindowsProactorEventLoopPolicy is not compatible with tornado 6
# fallback to the pre-3.8 default of Selector
# https://github.com/tornadoweb/tornado/issues/2608
BaseEventLoopPolicy = asyncio.WindowsSelectorEventLoopPolicy
else:
BaseEventLoopPolicy = asyncio.DefaultEventLoopPolicy
if not is_server_extension:
is_kernel_and_no_running_loop = False

class AnyThreadEventLoopPolicy(BaseEventLoopPolicy):
def get_event_loop(self):
try:
return super().get_event_loop()
except (RuntimeError, AssertionError):
loop = self.new_event_loop()
self.set_event_loop(loop)
return loop

asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())
if is_kernel():
try:
get_running_loop()
except RuntimeError:
is_kernel_and_no_running_loop = True

if not is_kernel_and_no_running_loop:

# TODO: Use tornado's AnyThreadEventLoopPolicy, instead of class below,
# once tornado > 6.0.3 is available.
if WINDOWS and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"):
# WindowsProactorEventLoopPolicy is not compatible with tornado 6
# fallback to the pre-3.8 default of Selector
# https://github.com/tornadoweb/tornado/issues/2608
BaseEventLoopPolicy = asyncio.WindowsSelectorEventLoopPolicy
else:
BaseEventLoopPolicy = asyncio.DefaultEventLoopPolicy

class AnyThreadEventLoopPolicy(BaseEventLoopPolicy):
def get_event_loop(self):
try:
return super().get_event_loop()
except (RuntimeError, AssertionError):
loop = self.new_event_loop()
self.set_event_loop(loop)
return loop

asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())


@functools.lru_cache(1000)
Expand Down
Loading

0 comments on commit 35dc940

Please sign in to comment.