-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] Serialization fails when tuple of large CuPy arrays returned from remote Dask function #421
Comments
I suspect this is related to recent CuPy serialization work in dask/distributed. I don't think there is anything in ucx-py responsible for serialization. Any chance you can provide a reproducible example ? |
cc @jakirkham |
Yeah we’ve had a hard time narrowing this down unfortunately. |
I was able to reproduce with the following little script.
Specifically, the issue happened when I increased the number of features of the input ( It's important that I point out the current Naive Bayes implementation in cuML (branch-0.13 at the time of writing this) implements a workaround so in order to reproduce this issue, it takes a very small change to the Dask Naive Bayes. I modified the script in place in conda: I added the following line below this one:
For now, this is the most accurate reproduction of the behavior I'm seeing end to end. I'm looking forward to isolating and fixing the root cause so that I can remove the workaround from cuML. |
Thank you for doing this @cjnolet! 😄 Will take a closer look. |
I think this may be related to dask/distributed#2948, we had to add a specialization to handle collections of objects supporting I'm a bit short on time lately, so if you have the bandwidth to work on this @jakirkham , that's much appreciated. If there's any information I can provide to help, please ask. |
Sorry I've had no time to look deeper unfortunately. Have been totally snowed 😞 This one is most likely slipping. It sounds like there is a viable workaround near term, but please let us know ASAP if that is not the case @cjnolet. |
FWIW I ran the reproducer above with Dask + Distributed 2.12.0. Am not seeing the serialization issue, but did run into OOM. ---------------------------------------------------------------------------
OutOfMemoryError Traceback (most recent call last)
<ipython-input-10-6d10fe8973eb> in <module>
----> 1 model.fit(X, y)
/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/cuml/dask/naive_bayes/naive_bayes.py in fit(self, X, y, classes)
184 class_counts = self.client_.compute(
185 [self.client_.submit(MultinomialNB._get_class_counts, c)
--> 186 for c in counts], sync=True)
187 feature_counts = self.client_.compute(
188 [self.client_.submit(MultinomialNB._get_feature_counts, c)
/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/distributed/client.py in compute(self, collections, sync, optimize_graph, workers, allow_other_workers, resources, retries, priority, fifo_timeout, actors, traverse, **kwargs)
2798
2799 if sync:
-> 2800 result = self.gather(futures)
2801 else:
2802 result = futures
/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1891 direct=direct,
1892 local_worker=local_worker,
-> 1893 asynchronous=asynchronous,
1894 )
1895
/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
778 else:
779 return sync(
--> 780 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
781 )
782
/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
346 if error[0]:
347 typ, exc, tb = error[0]
--> 348 raise exc.with_traceback(tb)
349 else:
350 return result[0]
/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/distributed/utils.py in f()
330 if callback_timeout is not None:
331 future = asyncio.wait_for(future, callback_timeout)
--> 332 result[0] = yield future
333 except Exception as exc:
334 error[0] = sys.exc_info()
/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1750 exc = CancelledError(key)
1751 else:
-> 1752 raise exception.with_traceback(traceback)
1753 raise exc
1754 if errors == "skip":
/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/cuml/dask/naive_bayes/naive_bayes.py in _fit()
117
118 for x, y in Xy:
--> 119 model.partial_fit(x, y, classes=classes)
120
121 return model.class_count_, model.feature_count_
/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/cuml/naive_bayes/naive_bayes.py in partial_fit()
331 """
332 return self._partial_fit(X, y, sample_weight=sample_weight,
--> 333 _classes=classes)
334
335 @cp.prof.TimeRangeDecorator(message="predict()", color_id=1)
/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/cupy/prof/time_range.py in inner()
107 def inner(*args, **kwargs):
108 with self._recreate_cm(func.__name__):
--> 109 return func(*args, **kwargs)
110 return inner
/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/cuml/naive_bayes/naive_bayes.py in _partial_fit()
278 self._count(X, Y)
279
--> 280 self._update_feature_log_prob(self.alpha)
281 self._update_class_log_prior(class_prior=self.class_prior)
282
/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/cuml/naive_bayes/naive_bayes.py in _update_feature_log_prob()
569 smoothed_fc = self.feature_count_ + alpha
570 smoothed_cc = smoothed_fc.sum(axis=1).reshape(-1, 1)
--> 571 self.feature_log_prob_ = (cp.log(smoothed_fc) -
572 cp.log(smoothed_cc.reshape(-1, 1)))
573
cupy/core/_kernel.pyx in cupy.core._kernel.ufunc.__call__()
cupy/core/_kernel.pyx in cupy.core._kernel._get_out_args()
cupy/core/core.pyx in cupy.core.core.ndarray.__init__()
cupy/cuda/memory.pyx in cupy.cuda.memory.alloc()
cupy/cuda/memory.pyx in cupy.cuda.memory.MemoryPool.malloc()
cupy/cuda/memory.pyx in cupy.cuda.memory.MemoryPool.malloc()
cupy/cuda/memory.pyx in cupy.cuda.memory.SingleDeviceMemoryPool.malloc()
cupy/cuda/memory.pyx in cupy.cuda.memory.SingleDeviceMemoryPool._malloc()
cupy/cuda/memory.pyx in cupy.cuda.memory._try_malloc()
OutOfMemoryError: Out of memory allocating 1,280,000,000 bytes (allocated so far: 2,588,727,808 bytes). Looks like we need to enable to RMM-backed allocations with CuPy. import cupy as cp
import rmm
cp.cuda.set_allocator(rmm.rmm_cupy_allocator)
client.run(cp.cuda.set_allocator, rmm.rmm_cupy_allocator) |
Should add when I fix that I get some kind of crash. May have some out-of-date things though (
|
Ok same issue with NVLink and InfiniBand enabled on a DGX-1. |
All this to say I'm not seeing this specific issue, but am seeing other ones 😟 |
@jakirkham, can you see if these issues still occur when using rapidsai/cuml#1805? I'm actually not able to reproduce the deserialization error anymore either, so I'm thinking it must have been fixed at some point since I opened this issue. I'm also not encountering an OOM with PR #1805. At least not on my workstation. If you are just running my reproducible example, I'm not sure Infiniband is going to have much effect, since It's only a single partition that would be getting copied to the client. I guess whether NVLink has an effect would depend on which GPU the worker and client are using, right? |
@dantegd We were talking about this last week when we realized the RMM allocator settings from
@jakirkham, does setting RMM pool allocator on the |
Well cuDF will hook the RMM allocator into CuPy on cc @kkraus14 xref: rapidsai/cudf#4351 |
Ack! I spoke too soon- Indeed, I am still getting this error when I use UCX, unfortunately.
This runs successfully w/ Also, here's the ideal Naive Bayes fit() function (without the workaround):
|
RE: cudf import. I recall having a conversation very recently to do exactly what cudf appears to be doing (setting the rmm allocator globally) but was informed there was push-back against doing this. cc @JohnZed |
Hmm...yeah not sure. I'm guessing that In any event, when using |
Here's the offending header:
I can provide the frame as well, but it's a Looking at the corresponding cupy deserializer, it looks like anything that's not a
FWIW, @dantegd recently fixed this in the
It looks like after the |
This seems to have fixed the issue: @cuda_deserialize.register(cupy.ndarray)
def cuda_deserialize_cupy_ndarray(header, frames):
(frame,) = frames
if isinstance(frame, (bytes, bytearray)):
frame = cupy.array(memoryview(frame))
if not isinstance(frame, cupy.ndarray):
frame = PatchedCudaArrayInterface(frame)
arr = cupy.ndarray(
shape=header["shape"],
dtype=header["typestr"],
memptr=cupy.asarray(frame).data,
strides=header["strides"],
)
return arr |
Thanks for posting this Corey. Let's dig into this a bit. 🙂 The first thing that should be noted is the If we knew D2H/H2D was needed as part of the transmission protocol, we would handle that as part of CuPy The fact that we got |
Definitely agree it would be a good idea to fix this properly. Please let me know if you are also able to reproduce by making the change to |
@quasiben, just a heads up, because you mentioned working on reproducing this. I would recommend using a cuml nightly from prior to yesterday. |
It appears this is a Distributed bug that has since been fixed ( dask/distributed#3580 ). Closing... |
I have been getting the following exception in the _fit() task for cuML's distributed Naive Bayes when the number of features grows over about 5.5M:
For some reason, it appears the CuPy deserializer is receiving a frame of type
bytes
, rather than an instance of__cuda_array_interface__
. I modified the deserializer which is noted in the stack trace to print out the header of the offending message.Here's the modified deserializer function:
And here's the message header being passed to that function when the CuPy array (number of naive bayes features) grows too large:
This only happens when using
protocol=UCX
in Dask and it is not reproducible whenprotocol=TCP
. What's also very strange is that it only happens when I try to compute the resulting futures of_fit()
directly, which will end up trying to pass a tuple containing two CuPy arrays.As a short-term workaround, I've modified the Naive Bayes code to fetch each array individually, and this seems to work without error. This is not ideal, however, as the Naive Bayes code becomes more complex and 2 new extra tasks need to be scheduled to fetch the results of an upstream task that should be able to be returned directly.
I've filed this issue in ucx-py, rather than Dask, because this issue only seems to be affected when UCX is used.
The text was updated successfully, but these errors were encountered: