From f2f82c6c2e8d36731cb3fb82fb1f80ea0323358e Mon Sep 17 00:00:00 2001 From: jakirkham Date: Tue, 17 Mar 2020 08:51:52 -0700 Subject: [PATCH] Import tlz (#3579) Import from `tlz` for optional `cytoolz` support --- distributed/cfexecutor.py | 2 +- distributed/cli/dask_worker.py | 2 +- distributed/client.py | 6 ++---- distributed/core.py | 2 +- distributed/dashboard/components/__init__.py | 1 - distributed/dashboard/components/scheduler.py | 8 ++------ distributed/dashboard/components/shared.py | 2 +- distributed/dashboard/components/worker.py | 2 +- distributed/dashboard/scheduler.py | 5 +---- .../dashboard/tests/test_scheduler_bokeh.py | 4 ++-- .../dashboard/tests/test_worker_bokeh.py | 2 +- distributed/dashboard/utils.py | 8 ++------ distributed/dashboard/worker.py | 2 +- distributed/deploy/adaptive_core.py | 2 +- distributed/deploy/old_ssh.py | 2 +- distributed/deploy/tests/test_spec_cluster.py | 2 +- distributed/diagnostics/progress.py | 2 +- distributed/diagnostics/progress_stream.py | 2 +- distributed/diagnostics/progressbar.py | 2 +- .../diagnostics/tests/test_task_stream.py | 2 +- distributed/diagnostics/tests/test_widgets.py | 2 +- distributed/profile.py | 2 +- distributed/protocol/compression.py | 3 ++- distributed/protocol/core.py | 6 +----- distributed/protocol/serialize.py | 5 +---- distributed/protocol/tests/test_serialize.py | 2 +- distributed/scheduler.py | 17 ++++++++++++----- distributed/stealing.py | 5 +---- distributed/tests/test_batched.py | 2 +- distributed/tests/test_client.py | 3 ++- distributed/tests/test_client_executor.py | 2 +- distributed/tests/test_failed_workers.py | 2 +- distributed/tests/test_ipython.py | 2 +- distributed/tests/test_nanny.py | 2 +- distributed/tests/test_profile.py | 2 +- distributed/tests/test_pubsub.py | 2 +- distributed/tests/test_scheduler.py | 2 +- distributed/tests/test_steal.py | 2 +- distributed/tests/test_stress.py | 2 +- distributed/tests/test_worker.py | 2 +- distributed/utils.py | 2 +- distributed/utils_comm.py | 2 +- distributed/utils_test.py | 2 +- distributed/variable.py | 5 +---- distributed/worker.py | 6 ++---- docs/source/efficiency.rst | 2 +- requirements.txt | 2 +- 47 files changed, 64 insertions(+), 84 deletions(-) diff --git a/distributed/cfexecutor.py b/distributed/cfexecutor.py index 985a407bdb..545dbbced0 100644 --- a/distributed/cfexecutor.py +++ b/distributed/cfexecutor.py @@ -1,7 +1,7 @@ import concurrent.futures as cf import weakref -from toolz import merge +from tlz import merge from tornado import gen diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index 5188333b75..29261b5245 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -22,7 +22,7 @@ ) from distributed.utils import deserialize_for_cli, import_term -from toolz import valmap +from tlz import valmap from tornado.ioloop import IOLoop, TimeoutError logger = logging.getLogger("distributed.dask_worker") diff --git a/distributed/client.py b/distributed/client.py index 679e625470..06c6d245c0 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -30,10 +30,8 @@ from dask.compatibility import apply from dask.utils import ensure_dict, format_bytes, funcname -try: - from cytoolz import first, groupby, merge, valmap, keymap -except ImportError: - from toolz import first, groupby, merge, valmap, keymap +from tlz import first, groupby, merge, valmap, keymap + try: from dask.delayed import single_key except ImportError: diff --git a/distributed/core.py b/distributed/core.py index 5768f0f4d8..ec1e6c5214 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -10,7 +10,7 @@ import dask import tblib -from toolz import merge +from tlz import merge from tornado import gen from tornado.ioloop import IOLoop diff --git a/distributed/dashboard/components/__init__.py b/distributed/dashboard/components/__init__.py index bb8269083e..f6159e83bc 100644 --- a/distributed/dashboard/components/__init__.py +++ b/distributed/dashboard/components/__init__.py @@ -26,7 +26,6 @@ from bokeh.plotting import figure import dask from tornado import gen -import toolz from distributed.dashboard.utils import without_property_validation, BOKEH_VERSION from distributed import profile diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index ee037a4aab..c371210c70 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -35,7 +35,8 @@ from bokeh.io import curdoc import dask from dask.utils import format_bytes, key_split -from toolz import pipe +from tlz import pipe +from tlz.curried import map, concat, groupby from tornado import escape try: @@ -63,11 +64,6 @@ from distributed.diagnostics.graph_layout import GraphLayout from distributed.diagnostics.task_stream import TaskStreamPlugin -try: - from cytoolz.curried import map, concat, groupby -except ImportError: - from toolz.curried import map, concat, groupby - if dask.config.get("distributed.dashboard.export-tool"): from distributed.dashboard.export_tool import ExportTool else: diff --git a/distributed/dashboard/components/shared.py b/distributed/dashboard/components/shared.py index 611d281dd5..24db46385e 100644 --- a/distributed/dashboard/components/shared.py +++ b/distributed/dashboard/components/shared.py @@ -15,7 +15,7 @@ from bokeh.plotting import figure import dask from tornado import gen -import toolz +import tlz as toolz from distributed.dashboard.components import DashboardComponent from distributed.dashboard.utils import ( diff --git a/distributed/dashboard/components/worker.py b/distributed/dashboard/components/worker.py index 440e7279e3..a11d304783 100644 --- a/distributed/dashboard/components/worker.py +++ b/distributed/dashboard/components/worker.py @@ -20,7 +20,7 @@ from bokeh.palettes import RdBu from bokeh.themes import Theme from dask.utils import format_bytes -from toolz import merge, partition_all +from tlz import merge, partition_all from distributed.dashboard.components import add_periodic_callback from distributed.dashboard.components.shared import ( diff --git a/distributed/dashboard/scheduler.py b/distributed/dashboard/scheduler.py index 836cefbbd6..acaab24cd1 100644 --- a/distributed/dashboard/scheduler.py +++ b/distributed/dashboard/scheduler.py @@ -8,10 +8,7 @@ import dask from dask.utils import format_bytes -try: - from cytoolz import merge, merge_with -except ImportError: - from toolz import merge, merge_with +from tlz import merge, merge_with from tornado import escape from tornado.websocket import WebSocketHandler diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index 4977ee8fa7..f36bfd897e 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -7,7 +7,7 @@ import pytest pytest.importorskip("bokeh") -from toolz import first +from tlz import first from tornado import gen from tornado.httpclient import AsyncHTTPClient, HTTPRequest @@ -624,7 +624,7 @@ def test_proxy_to_workers(c, s, a, b): }, ) async def test_lots_of_tasks(c, s, a, b): - import toolz + import tlz as toolz ts = TaskStream(s) ts.update() diff --git a/distributed/dashboard/tests/test_worker_bokeh.py b/distributed/dashboard/tests/test_worker_bokeh.py index b33fc3ba18..97729fce14 100644 --- a/distributed/dashboard/tests/test_worker_bokeh.py +++ b/distributed/dashboard/tests/test_worker_bokeh.py @@ -6,7 +6,7 @@ pytest.importorskip("bokeh") import sys -from toolz import first +from tlz import first from tornado import gen from tornado.httpclient import AsyncHTTPClient diff --git a/distributed/dashboard/utils.py b/distributed/dashboard/utils.py index b47cb75d6b..394e016a4d 100644 --- a/distributed/dashboard/utils.py +++ b/distributed/dashboard/utils.py @@ -5,7 +5,8 @@ import bokeh from bokeh.io import curdoc from tornado import web -from toolz import partition +from tlz import partition +from tlz.curried import first try: import numpy as np @@ -13,11 +14,6 @@ np = False -try: - from cytoolz.curried import first -except ImportError: - from toolz.curried import first - BOKEH_VERSION = LooseVersion(bokeh.__version__) dirname = os.path.dirname(__file__) diff --git a/distributed/dashboard/worker.py b/distributed/dashboard/worker.py index db29480666..54b3a0a4a5 100644 --- a/distributed/dashboard/worker.py +++ b/distributed/dashboard/worker.py @@ -3,7 +3,7 @@ import os from bokeh.themes import Theme -from toolz import merge +from tlz import merge from .components.worker import ( status_doc, diff --git a/distributed/deploy/adaptive_core.py b/distributed/deploy/adaptive_core.py index dfd82ea33b..192e244bd0 100644 --- a/distributed/deploy/adaptive_core.py +++ b/distributed/deploy/adaptive_core.py @@ -2,7 +2,7 @@ import math from tornado.ioloop import IOLoop -import toolz +import tlz as toolz from ..metrics import time from ..utils import parse_timedelta, PeriodicCallback diff --git a/distributed/deploy/old_ssh.py b/distributed/deploy/old_ssh.py index b524e2d7c4..33e69772f9 100644 --- a/distributed/deploy/old_ssh.py +++ b/distributed/deploy/old_ssh.py @@ -12,7 +12,7 @@ from threading import Thread -from toolz import merge +from tlz import merge from tornado import gen diff --git a/distributed/deploy/tests/test_spec_cluster.py b/distributed/deploy/tests/test_spec_cluster.py index 68642cda9d..90ce9923c6 100644 --- a/distributed/deploy/tests/test_spec_cluster.py +++ b/distributed/deploy/tests/test_spec_cluster.py @@ -8,7 +8,7 @@ from distributed.metrics import time from distributed.utils_test import loop, cleanup # noqa: F401 from distributed.utils import is_valid_xml -import toolz +import tlz as toolz import pytest diff --git a/distributed/diagnostics/progress.py b/distributed/diagnostics/progress.py index 1dcab0dc9e..2aeba98683 100644 --- a/distributed/diagnostics/progress.py +++ b/distributed/diagnostics/progress.py @@ -3,7 +3,7 @@ import logging from timeit import default_timer -from toolz import groupby, valmap +from tlz import groupby, valmap from .plugin import SchedulerPlugin from ..utils import key_split, key_split_group, log_errors, tokey diff --git a/distributed/diagnostics/progress_stream.py b/distributed/diagnostics/progress_stream.py index e417ee8e35..c5e74a30f3 100644 --- a/distributed/diagnostics/progress_stream.py +++ b/distributed/diagnostics/progress_stream.py @@ -1,6 +1,6 @@ import logging -from toolz import valmap, merge +from tlz import valmap, merge from .progress import AllProgress diff --git a/distributed/diagnostics/progressbar.py b/distributed/diagnostics/progressbar.py index ab7800c212..11da7a30d3 100644 --- a/distributed/diagnostics/progressbar.py +++ b/distributed/diagnostics/progressbar.py @@ -4,7 +4,7 @@ import sys import weakref -from toolz import valmap +from tlz import valmap from tornado.ioloop import IOLoop from .progress import format_time, Progress, MultiProgress diff --git a/distributed/diagnostics/tests/test_task_stream.py b/distributed/diagnostics/tests/test_task_stream.py index 58f1c4319f..4639c7a7a0 100644 --- a/distributed/diagnostics/tests/test_task_stream.py +++ b/distributed/diagnostics/tests/test_task_stream.py @@ -2,7 +2,7 @@ from time import sleep import pytest -from toolz import frequencies +from tlz import frequencies from distributed import get_task_stream from distributed.utils_test import gen_cluster, div, inc, slowinc diff --git a/distributed/diagnostics/tests/test_widgets.py b/distributed/diagnostics/tests/test_widgets.py index 03689c88c1..c217d17e29 100644 --- a/distributed/diagnostics/tests/test_widgets.py +++ b/distributed/diagnostics/tests/test_widgets.py @@ -74,7 +74,7 @@ def record_display(*args): from operator import add import re -from toolz import valmap +from tlz import valmap from distributed.client import wait from distributed.worker import dumps_task diff --git a/distributed/profile.py b/distributed/profile.py index 1bef645097..5bf071e20d 100644 --- a/distributed/profile.py +++ b/distributed/profile.py @@ -31,7 +31,7 @@ import threading from time import sleep -import toolz +import tlz as toolz from .metrics import time from .utils import format_time, color_of, parse_timedelta diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 5e81cdbaf1..adb3c888be 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -3,11 +3,12 @@ Includes utilities for determining whether or not to compress """ +from functools import partial import logging import random import dask -from toolz import identity, partial +from tlz import identity try: import blosc diff --git a/distributed/protocol/core.py b/distributed/protocol/core.py index 3937c9c2fc..3bb863f78c 100644 --- a/distributed/protocol/core.py +++ b/distributed/protocol/core.py @@ -1,13 +1,9 @@ +from functools import reduce import logging import operator import msgpack -try: - from cytoolz import reduce -except ImportError: - from toolz import reduce - from .compression import compressions, maybe_compress, decompress from .serialize import serialize, deserialize, Serialize, Serialized, extract_serialize from .utils import frame_split_size, merge_frames, msgpack_opts diff --git a/distributed/protocol/serialize.py b/distributed/protocol/serialize.py index c462568cc4..c0fdb98449 100644 --- a/distributed/protocol/serialize.py +++ b/distributed/protocol/serialize.py @@ -4,10 +4,7 @@ import dask from dask.base import normalize_token -try: - from cytoolz import valmap, get_in -except ImportError: - from toolz import valmap, get_in +from tlz import valmap, get_in import msgpack diff --git a/distributed/protocol/tests/test_serialize.py b/distributed/protocol/tests/test_serialize.py index caf1bbe0ad..10e4c5e797 100644 --- a/distributed/protocol/tests/test_serialize.py +++ b/distributed/protocol/tests/test_serialize.py @@ -4,7 +4,7 @@ import msgpack import numpy as np import pytest -from toolz import identity +from tlz import identity from distributed import wait from distributed.protocol import ( diff --git a/distributed/scheduler.py b/distributed/scheduler.py index ab026f61d0..d543808340 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -19,11 +19,18 @@ import psutil import sortedcontainers -try: - from cytoolz import frequencies, merge, pluck, merge_sorted, first, merge_with -except ImportError: - from toolz import frequencies, merge, pluck, merge_sorted, first, merge_with -from toolz import valmap, second, compose, groupby +from tlz import ( + frequencies, + merge, + pluck, + merge_sorted, + first, + merge_with, + valmap, + second, + compose, + groupby, +) from tornado.ioloop import IOLoop import dask diff --git a/distributed/stealing.py b/distributed/stealing.py index 4fbb753e13..fcceba4824 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -9,10 +9,7 @@ from .diagnostics.plugin import SchedulerPlugin from .utils import log_errors, parse_timedelta, PeriodicCallback -try: - from cytoolz import topk -except ImportError: - from toolz import topk +from tlz import topk LATENCY = 10e-3 log_2 = log(2) diff --git a/distributed/tests/test_batched.py b/distributed/tests/test_batched.py index 74efba810d..f2b0be99ab 100644 --- a/distributed/tests/test_batched.py +++ b/distributed/tests/test_batched.py @@ -2,7 +2,7 @@ import random import pytest -from toolz import assoc +from tlz import assoc from distributed.batched import BatchedSend from distributed.core import listen, connect, CommClosedError diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 392aec73be..5c85391655 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -1,5 +1,6 @@ import asyncio from collections import deque +from functools import partial import gc import logging from operator import add @@ -18,7 +19,7 @@ import zipfile import pytest -from toolz import identity, isdistinct, concat, pluck, valmap, partial, first, merge +from tlz import identity, isdistinct, concat, pluck, valmap, first, merge from tornado import gen import dask diff --git a/distributed/tests/test_client_executor.py b/distributed/tests/test_client_executor.py index 1024990216..e7e3fc24c7 100644 --- a/distributed/tests/test_client_executor.py +++ b/distributed/tests/test_client_executor.py @@ -11,7 +11,7 @@ ) import pytest -from toolz import take +from tlz import take from distributed import Client from distributed.utils import CancelledError diff --git a/distributed/tests/test_failed_workers.py b/distributed/tests/test_failed_workers.py index cf0387c1cd..99b1b4a42a 100644 --- a/distributed/tests/test_failed_workers.py +++ b/distributed/tests/test_failed_workers.py @@ -3,7 +3,7 @@ from time import sleep import pytest -from toolz import partition_all, first +from tlz import partition_all, first from tornado import gen from dask import delayed diff --git a/distributed/tests/test_ipython.py b/distributed/tests/test_ipython.py index aa4a3e4092..a6d387589e 100644 --- a/distributed/tests/test_ipython.py +++ b/distributed/tests/test_ipython.py @@ -1,7 +1,7 @@ from unittest import mock import pytest -from toolz import first +from tlz import first import tornado from distributed import Client diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index c80974d997..2a19bdf874 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -9,7 +9,7 @@ import numpy as np import pytest -from toolz import valmap, first +from tlz import valmap, first from tornado import gen from tornado.ioloop import IOLoop diff --git a/distributed/tests/test_profile.py b/distributed/tests/test_profile.py index a022600d81..9f673e8caa 100644 --- a/distributed/tests/test_profile.py +++ b/distributed/tests/test_profile.py @@ -1,7 +1,7 @@ import pytest import sys import time -from toolz import first +from tlz import first import threading from distributed.compatibility import WINDOWS diff --git a/distributed/tests/test_pubsub.py b/distributed/tests/test_pubsub.py index 2e372dea88..639542df5c 100644 --- a/distributed/tests/test_pubsub.py +++ b/distributed/tests/test_pubsub.py @@ -3,7 +3,7 @@ import pytest from tornado import gen -import toolz +import tlz as toolz from distributed import Pub, Sub, wait, get_worker, TimeoutError from distributed.utils_test import gen_cluster diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index a5649dbfc8..5459716ca8 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -12,7 +12,7 @@ import dask from dask import delayed -from toolz import merge, concat, valmap, first, frequencies +from tlz import merge, concat, valmap, first, frequencies from tornado import gen import pytest diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 71f408749a..5b13d9157e 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -6,7 +6,7 @@ import weakref import pytest -from toolz import sliding_window, concat +from tlz import sliding_window, concat from tornado import gen import dask diff --git a/distributed/tests/test_stress.py b/distributed/tests/test_stress.py index ab996e2b30..d5e1e62c57 100644 --- a/distributed/tests/test_stress.py +++ b/distributed/tests/test_stress.py @@ -6,7 +6,7 @@ from dask import delayed import pytest -from toolz import concat, sliding_window +from tlz import concat, sliding_window from distributed import Client, wait, Nanny from distributed.config import config diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index b6da294c74..0bda344fd9 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -16,7 +16,7 @@ from dask.utils import format_bytes from dask.system import CPU_COUNT import pytest -from toolz import pluck, sliding_window, first +from tlz import pluck, sliding_window, first import tornado from tornado import gen diff --git a/distributed/utils.py b/distributed/utils.py index 429a53cddd..eb622f7b83 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -47,7 +47,7 @@ parse_timedelta, ) -import toolz +import tlz as toolz import tornado from tornado import gen from tornado.ioloop import IOLoop diff --git a/distributed/utils_comm.py b/distributed/utils_comm.py index 3d10ba5103..4240475452 100644 --- a/distributed/utils_comm.py +++ b/distributed/utils_comm.py @@ -8,7 +8,7 @@ from dask.optimization import SubgraphCallable import dask.config from dask.utils import parse_timedelta -from toolz import merge, concat, groupby, drop +from tlz import merge, concat, groupby, drop from .core import rpc from .utils import All, tokey diff --git a/distributed/utils_test.py b/distributed/utils_test.py index e16983b187..741fc76a8d 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -32,7 +32,7 @@ import pytest import dask -from toolz import merge, memoize, assoc +from tlz import merge, memoize, assoc from tornado import gen, queues from tornado.ioloop import IOLoop diff --git a/distributed/variable.py b/distributed/variable.py index fc4cc396da..3c6cc93116 100644 --- a/distributed/variable.py +++ b/distributed/variable.py @@ -3,10 +3,7 @@ import logging import uuid -try: - from cytoolz import merge -except ImportError: - from toolz import merge +from tlz import merge from .client import Future, _get_global_client, Client from .utils import tokey, log_errors, TimeoutError, ignoring diff --git a/distributed/worker.py b/distributed/worker.py index aa71a16640..247ffc9951 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3,6 +3,7 @@ from collections import defaultdict, deque, namedtuple from collections.abc import MutableMapping from datetime import timedelta +from functools import partial import heapq from inspect import isawaitable import logging @@ -21,10 +22,7 @@ from dask.utils import format_bytes, funcname from dask.system import CPU_COUNT -try: - from cytoolz import pluck, partial, merge, first, keymap -except ImportError: - from toolz import pluck, partial, merge, first, keymap +from tlz import pluck, merge, first, keymap from tornado import gen from tornado.ioloop import IOLoop diff --git a/docs/source/efficiency.rst b/docs/source/efficiency.rst index 94a603ea9a..ed3ad2428d 100644 --- a/docs/source/efficiency.rst +++ b/docs/source/efficiency.rst @@ -67,7 +67,7 @@ A common solution is to batch your input into larger chunks. >>> def f_many(chunk): ... return [f(x) for x in chunk] - >>> from toolz import partition_all + >>> from tlz import partition_all >>> chunks = partition_all(1000000, seq) # Collect into groups of size 1000 >>> futures = client.map(f_many, chunks) diff --git a/requirements.txt b/requirements.txt index 3f827e250e..4cb3ba60ae 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,7 @@ msgpack >= 0.6.0 psutil >= 5.0 sortedcontainers !=2.0.0, !=2.0.1 tblib >= 1.6.0 -toolz >= 0.7.4 +toolz >= 0.8.2 tornado >= 5;python_version<'3.8' tornado >= 6.0.3;python_version>='3.8' zict >= 0.1.3