-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Don't Split Frames for UCX #3584
Conversation
|
||
@pytest.mark.asyncio | ||
async def test_serialize_no_splitting(): | ||
cp = pytest.importorskip("cupy") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not super happy that this requires cupy but I was unable to trigger a result with numpy. Open to suggestions if folks have any
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah there isn't one really as this is done correctly when using the "dask"
serializer. We have to use something that uses the "cuda"
serializer.
@mrocklin @jakirkham changed the PR around based on comments to not split frames when using UCX |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks Ben! 😄
LGTM, verified that Naive Bayes is working as expected with these changes. Thanks again, Ben! |
Also, thank you to @jakirkham for spending the time to help me narrow this down. |
distributed/protocol/core.py
Outdated
frames = frame_split_size(frames) | ||
# splitting frames is not the default behavior for UCX | ||
if split_frames: | ||
frames = frame_split_size(frames) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we only consider splitting frames due to compression. I wonder if we can make this a bit simpler and avoid the keyword argument if we always include "compression": False
in the header?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would probably work but there may be a GPU compression story in the future. I suppose we could handle that then if/when it becomes available
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Give me a couple minutes to look things over
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still suggest that we go with setting compression=False
for now rather than introduce the new keyword argument.
Tying frame splitting to compression is not entirely clean, but it does happen to be the only reason to split frames today. In the future if that continues to be the case then we might have compressors register a maximum frame size. If that doesn't continue to be the case (perhaps some comms also have a maximum size limit) then we will have to rejigger this code regardless.
However, looking at this a bit more, I notice that cuda_serialize
already specifies a compression value. My guess is that this is coming from serializing tuples/lists/dicts of objects, and that we don't move compression values through in that case. I'll take a look at this later tonight.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah ok, so coupling compression with serialization gets complicated, at least with our current logic.
distributed/distributed/protocol/core.py
Lines 51 to 57 in 511427b
if "compression" not in head: | |
frames = frame_split_size(frames) | |
if frames: | |
compression, frames = zip(*map(maybe_compress, frames)) | |
else: | |
compression = [] | |
head["compression"] = compression |
Currently a serializer can choose to pre-compress data. If so, it adds the compression used to the header. Great, we can pass on compression (and thus frame splitting). The CUDA serializers avoids this by setting compression=None
claiming "I've already handled compression, don't bother" to downstream. Great. If we send a single cupy array or something along then we'll be fine.
However, if we happen to include a few things in a small dict/tuple/list then we lose this information. The headers for the individual bits get thrown into a subheader, and so we lose direct access to the compression.
distributed/distributed/protocol/serialize.py
Lines 144 to 181 in 511427b
if ( | |
type(x) in (list, set, tuple) | |
and len(x) <= 5 | |
or type(x) is dict | |
and len(x) <= 5 | |
and dict_safe | |
): | |
if isinstance(x, dict): | |
headers_frames = [] | |
for k, v in x.items(): | |
_header, _frames = serialize( | |
v, serializers=serializers, on_error=on_error, context=context | |
) | |
_header["key"] = k | |
headers_frames.append((_header, _frames)) | |
else: | |
headers_frames = [ | |
serialize( | |
obj, serializers=serializers, on_error=on_error, context=context | |
) | |
for obj in x | |
] | |
frames = [] | |
lengths = [] | |
for _header, _frames in headers_frames: | |
frames.extend(_frames) | |
length = len(_frames) | |
lengths.append(length) | |
headers = [obj[0] for obj in headers_frames] | |
headers = { | |
"sub-headers": headers, | |
"is-collection": True, | |
"frame-lengths": lengths, | |
"type-serialized": type(x).__name__, | |
} | |
return headers, frames |
Currently our logic though is "If you specify any compression anywhere, don't do anything". But really we want to handle this on a frame-by-frame basis. Otherwise the first time someone sends a combined result of [numpy_array, cupy_array]
things will get messy.
I think I'm shaving a yak here, but it could be that the right thing to do here is to rework how we handle compression on a frame by frame basis. Submitting PR now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well the other thing I was thinking about was checking head
for "cuda"
not in "serializer"
, which would also be easy and avoid this.
Sorry, I missed the discussion as I was sorting through the same issues of collections. Looking at #3586 now |
closing in favor of #3586 |
PR fixes #3580 .
Previously, duringmerge_frames
dask would callensure_bytes
on a list of objects. This would trigger a conversion to host memory and would invalidate deserialization logic downstream. This PR fixes this issue by checking if the list is composed of cuda objects. If it is, simply extend the output listUpdate:
PR adds functionality which allows for frames to not be split during serialization. Additionally, communication with UCX will no longer split frames