Skip to content

Commit

Permalink
Optionally compress on a frame-by-frame basis (dask#3586)
Browse files Browse the repository at this point in the history
Previously this converted a list of bytes-like objects into a list.
Now we consume a single one and use map when dealing with lists.

* Handle compression on a frame-by-frame basis

* Set cuda serialization to False rather than None

We've changed the convention so that None now means "proceed as usual"
rather than "don't do anything please"
  • Loading branch information
mrocklin authored and fjetter committed Mar 19, 2020
1 parent 511427b commit 0e407da
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 38 deletions.
28 changes: 19 additions & 9 deletions distributed/protocol/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,27 @@ def dumps(msg, serializers=None, on_error="message", context=None):
for key, (head, frames) in data.items():
if "lengths" not in head:
head["lengths"] = tuple(map(nbytes, frames))
if "compression" not in head:
frames = frame_split_size(frames)
if frames:
compression, frames = zip(*map(maybe_compress, frames))
else:
compression = []
head["compression"] = compression
head["count"] = len(frames)

# Compress frames that are not yet compressed
out_compression = []
_out_frames = []
for frame, compression in zip(
frames, head.get("compression") or [None] * len(frames)
):
if compression is None: # default behavior
_frames = frame_split_size(frame)
_compression, _frames = zip(*map(maybe_compress, _frames))
out_compression.extend(_compression)
_out_frames.extend(_frames)
else: # already specified, so pass
out_compression.append(compression)
_out_frames.append(frame)

head["compression"] = out_compression
head["count"] = len(_out_frames)
header["headers"][key] = head
header["keys"].append(key)
out_frames.extend(frames)
out_frames.extend(_out_frames)

for key, (head, frames) in pre.items():
if "lengths" not in head:
Expand Down
2 changes: 1 addition & 1 deletion distributed/protocol/cuda.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def cuda_dumps(x):
header, frames = dumps(x)
header["type-serialized"] = pickle.dumps(type(x))
header["serializer"] = "cuda"
header["compression"] = (None,) * len(frames) # no compression for gpu data
header["compression"] = (False,) * len(frames) # no compression for gpu data
return header, frames


Expand Down
2 changes: 1 addition & 1 deletion distributed/protocol/numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def serialize_numpy_ndarray(x):
header["broadcast_to"] = broadcast_to

if x.nbytes > 1e5:
frames = frame_split_size([data])
frames = frame_split_size(data)
else:
frames = [data]

Expand Down
6 changes: 5 additions & 1 deletion distributed/protocol/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,12 @@ def serialize(x, serializers=None, on_error="message", context=None):

frames = []
lengths = []
compressions = []
for _header, _frames in headers_frames:
frames.extend(_frames)
length = len(_frames)
lengths.append(length)
compressions.extend(_header.get("compression") or [None] * len(_frames))

headers = [obj[0] for obj in headers_frames]
headers = {
Expand All @@ -178,6 +180,8 @@ def serialize(x, serializers=None, on_error="message", context=None):
"frame-lengths": lengths,
"type-serialized": type(x).__name__,
}
if any(compression is not None for compression in compressions):
headers["compression"] = compressions
return headers, frames

tb = ""
Expand Down Expand Up @@ -436,7 +440,7 @@ def replace_inner(x):

def serialize_bytelist(x, **kwargs):
header, frames = serialize(x, **kwargs)
frames = frame_split_size(frames)
frames = sum(map(frame_split_size, frames), [])
if frames:
compression, frames = zip(*map(maybe_compress, frames))
else:
Expand Down
14 changes: 14 additions & 0 deletions distributed/protocol/tests/test_serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,3 +374,17 @@ async def test_profile_nested_sizeof():

msg = {"data": original}
frames = await to_frames(msg)


def test_compression_numpy_list():
class MyObj:
pass

@dask_serialize.register(MyObj)
def _(x):
header = {"compression": [False]}
frames = [b""]
return header, frames

header, frames = serialize([MyObj(), MyObj()])
assert header["compression"] == [False, False]
39 changes: 17 additions & 22 deletions distributed/protocol/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
msgpack_opts["encoding"] = "utf-8"


def frame_split_size(frames, n=BIG_BYTES_SHARD_SIZE):
def frame_split_size(frame, n=BIG_BYTES_SHARD_SIZE) -> list:
"""
Split a list of frames into a list of frames of maximum size
Split a frame into a list of frames of maximum size
This helps us to avoid passing around very large bytestrings.
Expand All @@ -30,26 +30,21 @@ def frame_split_size(frames, n=BIG_BYTES_SHARD_SIZE):
>>> frame_split_size([b'12345', b'678'], n=3) # doctest: +SKIP
[b'123', b'45', b'678']
"""
if not frames:
return frames

if max(map(nbytes, frames)) <= n:
return frames

out = []
for frame in frames:
if nbytes(frame) > n:
if isinstance(frame, (bytes, bytearray)):
frame = memoryview(frame)
try:
itemsize = frame.itemsize
except AttributeError:
itemsize = 1
for i in range(0, nbytes(frame) // itemsize, n // itemsize):
out.append(frame[i : i + n // itemsize])
else:
out.append(frame)
return out
if nbytes(frame) <= n:
return [frame]

if nbytes(frame) > n:
if isinstance(frame, (bytes, bytearray)):
frame = memoryview(frame)
try:
itemsize = frame.itemsize
except AttributeError:
itemsize = 1

return [
frame[i : i + n // itemsize]
for i in range(0, nbytes(frame) // itemsize, n // itemsize)
]


def merge_frames(header, frames):
Expand Down
2 changes: 1 addition & 1 deletion distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def steal_time_ratio(self, ts):
nbytes = sum(dep.get_nbytes() for dep in ts.dependencies)

transfer_time = nbytes / self.scheduler.bandwidth + LATENCY
split = ts.prefix
split = ts.prefix.name
if split in fast_tasks:
return None, None
ws = ts.processing_on
Expand Down
34 changes: 31 additions & 3 deletions distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import weakref

import pytest
from unittest import mock
from tlz import sliding_window, concat
from tornado import gen

Expand Down Expand Up @@ -145,23 +146,50 @@ def test_steal_related_tasks(e, s, a, b, c):


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 10, timeout=1000)
def test_dont_steal_fast_tasks(c, s, *workers):
async def test_dont_steal_fast_tasks_compute_time(c, s, *workers):
np = pytest.importorskip("numpy")
x = c.submit(np.random.random, 10000000, workers=workers[0].address)

def do_nothing(x, y=None):
pass

yield wait(c.submit(do_nothing, 1))
# execute and meassure runtime once
await wait(c.submit(do_nothing, 1))

futures = c.map(do_nothing, range(1000), y=x)

yield wait(futures)
await wait(futures)

assert len(s.who_has[x.key]) == 1
assert len(s.has_what[workers[0].address]) == 1001


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 10, timeout=1000)
async def test_dont_steal_fast_tasks_blacklist(c, s, *workers):
import time
# create a dependency
x = c.submit(slowinc, 1)

def fast_blacklisted(x, y=None):
time.sleep(0.01)
pass

mmock = mock.MagicMock()
s.extensions["stealing"].move_task_request = mmock
import distributed.stealing

with mock.patch.object(
distributed.stealing, "fast_tasks", {fast_blacklisted.__name__}
):

w = workers[0]
futures = c.map(fast_blacklisted, range(100), y=x)

await wait(futures)

mmock.assert_not_called()


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)], timeout=20)
def test_new_worker_steals(c, s, a):
yield wait(c.submit(slowinc, 1, delay=0.01))
Expand Down

0 comments on commit 0e407da

Please sign in to comment.