-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CuPy (De)serialization error #3580
Comments
I think I found the culprit here. I raised this value [1] to I also verified this is where it's being called [3] and I don't see the header being updated to reflect that the I'd think we would still want a CuPy array to be returned from the deserializer, right? This would imply that we either want the the CuPy deserializer to understand how to deserialize [1] https://github.com/dask/distributed/blob/master/distributed/protocol/utils.py#L6 |
@cjnolet I'm confused about how the byte array is getting to frame_split_size to begin with. Is the |
@quasiben, it's a cupy ndarray all the way through the We are using the dask serializer and also the cuda serializer in cuml. This error seems to only happen when UCX protocol is used in Dask, however. |
I would be surprised if a cupy array arrived at msgpack. I would certainly expect this to fail. The |
Running the MRE posted in rapidsai/ucx-py#421 and I'll get back with an update shortly |
@mrocklin, my mistake. I saw the The part I find weird is that everything works when my arrays are smaller than |
I ran with latest ucx-py/distributed/cuml and was not able to reproduce the error |
Sorry not latest of cuml:
|
@quasiben Did you make the changes outlined in the Github issue? |
@quasiben, actually, here's a more simple reproducible example: import dask
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import cupy as cp
cluster = LocalCUDACluster(protocol="ucx")
client = Client(cluster)
def make_large_data(x):
return cp.random.random((5, 8000000)), cp.random.random((5))
fut = [client.submit(make_large_data, i) for i in range(5)]
client.compute(fut, sync=True) This is the exception I get:
|
Thanks for that simple reproducer Corey! 😄 |
Lol, it took me awhile to figure out the behavior that was causing it to be able to reproduce it. |
Still digging through this but I think it's possible we are inadvertently calling distributed/distributed/protocol/utils.py Line 88 in f2f82c6
|
FWIW CuPy doesn't support the Python Buffer Protocol ( cupy/cupy#1532 ). So if it got there, we would see an error. Not sure how Distributed would handle that (if at all). In [1]: import cupy
In [2]: a = cupy.arange(5)
In [3]: memoryview(a)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-3-dd900d27ccee> in <module>
----> 1 memoryview(a)
TypeError: memoryview: a bytes-like object is required, not 'cupy.core.core.ndarray' |
I think I found the issue. With large and irregular sizes, within merge_frames is logic which calls distributed/distributed/protocol/utils.py Lines 94 to 95 in f2f82c6
We can avoid this routine by a check at the end of the merge_frames routine: else:
if any([hasattr(f, "__cuda_array_interface__") for f in L]):
out.extend(L)
else:
out.append(b"".join(map(ensure_bytes, L))) |
That's a nice find Ben! 😄 IIUC this function is used to combine frames into their original sizes. So how do we wind up in |
Thank you for digging into this, Ben! I ran the Naive Bayes example again after making the change and I'm getting a little strange behavior. I modified the reproducer script slightly so that I could try to see the values in the resulting arrays, but I'm getting an exception. Maybe it's possible there's still something being done on the UCX side that is causing this one. import dask
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import cupy as cp
if __name__ == "__main__":
cluster = LocalCUDACluster(protocol="ucx")
client = Client(cluster)
def make_large_data(x):
return cp.ones((5, 8000000)), cp.ones((5))
fut = [client.submit(make_large_data, i) for i in range(5)]
local = [cp.sum(f[0], axis=1) for f in client.compute(fut, sync=True)]
print(str(local)) Here's the exception:
Can you guys see if this error occurs for you? |
Alternatively, maybe with UCX we shouldn't be splitting and merging frames at all? |
@cjnolet, yup I am seeing that as well. @mrocklin @jakirkham seems like you two both want the same thing :) |
I just verified that not splitting frames with if nbytes(frame) > n and not hasattr(frame, "__cuda_array_interface__"): https://github.com/dask/distributed/blob/master/distributed/protocol/utils.py#L41 |
I think we want to check if we are using UCX not if the frame has cuda_array_interface |
Sounds good to me! I just wanted to verify that it works when the frame has not been split |
I'm encountering the following exception when trying to perform a custom tree-reduce on a set of large cupy objects. I don't get this exception with smaller objects, so I have not been able to reproduce it with the normal pytests. The
n_features
in theHashingVectorizer
that is used as input to the Naive Bayes pytest, however, can be modified to 8M in order to reproduce this exception.I believe this is the same as rapidsai/ucx-py#421, and while it only occurs when
protocol=ucx
, the stack trace is giving me alldask.distributed
errors, so I've opted to start a fresh thread here.cc @jakirkham
The text was updated successfully, but these errors were encountered: