Skip to content

Commit

Permalink
Optionally compress on a frame-by-frame basis (dask#3586)
Browse files Browse the repository at this point in the history
Previously this converted a list of bytes-like objects into a list.
Now we consume a single one and use map when dealing with lists.

* Handle compression on a frame-by-frame basis

* Set cuda serialization to False rather than None

We've changed the convention so that None now means "proceed as usual"
rather than "don't do anything please"
  • Loading branch information
mrocklin authored and fjetter committed Mar 19, 2020
1 parent 2acffc3 commit 33f988f
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 4 deletions.
2 changes: 1 addition & 1 deletion distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def steal_time_ratio(self, ts):
nbytes = sum(dep.get_nbytes() for dep in ts.dependencies)

transfer_time = nbytes / self.scheduler.bandwidth + LATENCY
split = ts.prefix
split = ts.prefix.name
if split in fast_tasks:
return None, None
ws = ts.processing_on
Expand Down
34 changes: 31 additions & 3 deletions distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import weakref

import pytest
from unittest import mock
from tlz import sliding_window, concat
from tornado import gen

Expand Down Expand Up @@ -145,23 +146,50 @@ def test_steal_related_tasks(e, s, a, b, c):


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 10, timeout=1000)
def test_dont_steal_fast_tasks(c, s, *workers):
async def test_dont_steal_fast_tasks_compute_time(c, s, *workers):
np = pytest.importorskip("numpy")
x = c.submit(np.random.random, 10000000, workers=workers[0].address)

def do_nothing(x, y=None):
pass

yield wait(c.submit(do_nothing, 1))
# execute and meassure runtime once
await wait(c.submit(do_nothing, 1))

futures = c.map(do_nothing, range(1000), y=x)

yield wait(futures)
await wait(futures)

assert len(s.who_has[x.key]) == 1
assert len(s.has_what[workers[0].address]) == 1001


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 10, timeout=1000)
async def test_dont_steal_fast_tasks_blacklist(c, s, *workers):
import time
# create a dependency
x = c.submit(slowinc, 1)

def fast_blacklisted(x, y=None):
time.sleep(0.01)
pass

mmock = mock.MagicMock()
s.extensions["stealing"].move_task_request = mmock
import distributed.stealing

with mock.patch.object(
distributed.stealing, "fast_tasks", {fast_blacklisted.__name__}
):

w = workers[0]
futures = c.map(fast_blacklisted, range(100), y=x)

await wait(futures)

mmock.assert_not_called()


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)], timeout=20)
def test_new_worker_steals(c, s, a):
yield wait(c.submit(slowinc, 1, delay=0.01))
Expand Down

0 comments on commit 33f988f

Please sign in to comment.