Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Serialization fails when tuple of large CuPy arrays returned from remote Dask function #421

Closed
cjnolet opened this issue Feb 24, 2020 · 23 comments

Comments

@cjnolet
Copy link
Member

cjnolet commented Feb 24, 2020

I have been getting the following exception in the _fit() task for cuML's distributed Naive Bayes when the number of features grows over about 5.5M:

distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/core.py", line 124, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 257, in deserialize
    deserializers=deserializers,
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 270, in deserialize
    return loads(header, frames)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cuda.py", line 28, in cuda_loads
    return loads(header, frames)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 70, in cuda_deserialize_cupy_ndarray
    frame = PatchedCudaArrayInterface(frame)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 26, in __init__
    self.__cuda_array_interface__ = ary.__cuda_array_interface__
AttributeError: 'bytes' object has no attribute '__cuda_array_interface__'
distributed.utils - ERROR - 'bytes' object has no attribute '__cuda_array_interface__'
Traceback (most recent call last):
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 665, in log_errors
    yield
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/comm/ucx.py", line 192, in read
    frames, deserialize=self.deserialize, deserializers=deserializers
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/comm/utils.py", line 73, in from_frames
    res = await offload(_from_frames)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 1458, in offload
    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 1458, in <lambda>
    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/comm/utils.py", line 61, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/core.py", line 124, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 257, in deserialize
    deserializers=deserializers,
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 270, in deserialize
    return loads(header, frames)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cuda.py", line 28, in cuda_loads
    return loads(header, frames)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 70, in cuda_deserialize_cupy_ndarray
    frame = PatchedCudaArrayInterface(frame)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 26, in __init__
    self.__cuda_array_interface__ = ary.__cuda_array_interface__
AttributeError: 'bytes' object has no attribute '__cuda_array_interface__'
Traceback (most recent call last):
  File "dev-bb-query-28.py", line 396, in <module>
    result_df, acc, prec, cmat = main(client)
  File "../../tools/utils.py", line 229, in profiled
    result = func(*args, **kwargs)
  File "dev-bb-query-28.py", line 314, in main
    model.fit(X_train, y_train)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/cuml/dask/naive_bayes/naive_bayes.py", line 197, in fit
    counts1 = self.client_.compute(counts, sync=True)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/client.py", line 2800, in compute
    result = self.gather(futures)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/client.py", line 1893, in gather
    asynchronous=asynchronous,
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/client.py", line 780, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 348, in sync
    raise exc.with_traceback(tb)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 332, in f
    result[0] = yield future
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/client.py", line 1781, in _gather
    response = await future
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/client.py", line 1832, in _gather_remote
    response = await retry_operation(self.scheduler.gather, keys=keys)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils_comm.py", line 391, in retry_operation
    operation=operation,
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils_comm.py", line 379, in retry
    return await coro()
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/core.py", line 757, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/core.py", line 540, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/comm/ucx.py", line 192, in read
    frames, deserialize=self.deserialize, deserializers=deserializers
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/comm/utils.py", line 73, in from_frames
    res = await offload(_from_frames)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 1458, in offload
    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 1458, in <lambda>
    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/comm/utils.py", line 61, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/core.py", line 124, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 257, in deserialize
    deserializers=deserializers,
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 270, in deserialize
    return loads(header, frames)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cuda.py", line 28, in cuda_loads
    return loads(header, frames)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 70, in cuda_deserialize_cupy_ndarray
    frame = PatchedCudaArrayInterface(frame)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 26, in __init__
    self.__cuda_array_interface__ = ary.__cuda_array_interface__
AttributeError: 'bytes' object has no attribute '__cuda_array_interface__'
^[[A^[[A^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 200, in ignoring
    yield
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/deploy/spec.py", line 607, in close_clusters
    cluster.close(timeout=10)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 81, in close
    return self.sync(self._close, callback_timeout=timeout)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 160, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 345, in sync
    e.wait(10)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/threading.py", line 552, in wait
    signaled = self._cond.wait(timeout)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/threading.py", line 300, in wait
    gotit = waiter.acquire(True, timeout)
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/contextlib.py", line 130, in __exit__
    self.gen.throw(type, value, traceback)
  File "/home/nfs/cnolet/miniconda3/envs/dev/lib/python3.7/site-packages/distributed/utils.py", line 201, in ignoring
    except exceptions as e:
TypeError: catching classes that do not inherit from BaseException is not allowed

For some reason, it appears the CuPy deserializer is receiving a frame of type bytes, rather than an instance of __cuda_array_interface__. I modified the deserializer which is noted in the stack trace to print out the header of the offending message.

Here's the modified deserializer function:

@cuda_deserialize.register(cupy.ndarray)
def cuda_deserialize_cupy_ndarray(header, frames):
    (frame,) = frames
    print("Deserializing Frame: %s" % str(type(frame)))
    if not isinstance(frame, cupy.ndarray):
        if isinstance(frame, bytes):
            print("Header from bytes from: %s" % str(header))
        frame = PatchedCudaArrayInterface(frame)
    arr = cupy.ndarray(
        shape=header["shape"],
        dtype=header["typestr"],
        memptr=cupy.asarray(frame).data,
        strides=header["strides"],
    )
    return arr

And here's the message header being passed to that function when the CuPy array (number of naive bayes features) grows too large:

{'shape': (3, 8388608), 'typestr': '<f4', 'descr': (('', '<f4'),), 'version': 2, 'strides': (4, 12), 'data': (139841484357632, False), 'type-serialized': b'\x80\x04\x95\x1e\x00\x00\x00\x00\x00\x00\x00\x8c\x0ecupy.core.core\x94\x8c\x07ndarray\x94\x93\x94.', 'serializer': 'cuda', 'compression': (None,)}

This only happens when using protocol=UCX in Dask and it is not reproducible when protocol=TCP. What's also very strange is that it only happens when I try to compute the resulting futures of _fit() directly, which will end up trying to pass a tuple containing two CuPy arrays.

As a short-term workaround, I've modified the Naive Bayes code to fetch each array individually, and this seems to work without error. This is not ideal, however, as the Naive Bayes code becomes more complex and 2 new extra tasks need to be scheduled to fetch the results of an upstream task that should be able to be returned directly.

I've filed this issue in ucx-py, rather than Dask, because this issue only seems to be affected when UCX is used.

@quasiben
Copy link
Member

I suspect this is related to recent CuPy serialization work in dask/distributed. I don't think there is anything in ucx-py responsible for serialization. Any chance you can provide a reproducible example ?

@quasiben
Copy link
Member

cc @jakirkham

@jakirkham
Copy link
Member

Yeah we’ve had a hard time narrowing this down unfortunately.

@cjnolet
Copy link
Member Author

cjnolet commented Feb 25, 2020

I was able to reproduce with the following little script.

import cupy as cp
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import HashingVectorizer
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import dask
from cuml.dask.common import to_sp_dask_array
from cuml.dask.naive_bayes import MultinomialNB
cluster = LocalCUDACluster(protocol="ucx")
client = Client(cluster)
twenty_train = fetch_20newsgroups(subset='train',
                                  shuffle=True, random_state=42)
hv = HashingVectorizer(n_features=8000000)
xformed = hv.fit_transform(twenty_train.data)
X = to_sp_dask_array(xformed, client)
y = dask.array.from_array(twenty_train.target, asarray=False,
                          fancy=False).astype(cp.int32)
model = MultinomialNB()
model.fit(X, y)

Specifically, the issue happened when I increased the number of features of the input (hv = HashingVectorizer(n_features=8000000)) So far I've been finding that I start to see the issue after about 5.5M.

It's important that I point out the current Naive Bayes implementation in cuML (branch-0.13 at the time of writing this) implements a workaround so in order to reproduce this issue, it takes a very small change to the Dask Naive Bayes. I modified the script in place in conda: ~/miniconda3/envs/cuml_dev/lib/python3.7/site-packages/cuml/dask/naive_bayes/naive_bayes.py

I added the following line below this one:

        counts = [self.client_.submit(
            MultinomialNB._fit,
            p,
            classes,
            self.kwargs,
            workers=[w]
        ) for w, p in worker_parts.items()]
        counts1 = self.client_.compute(counts, sync=True)

For now, this is the most accurate reproduction of the behavior I'm seeing end to end. I'm looking forward to isolating and fixing the root cause so that I can remove the workaround from cuML.

@jakirkham
Copy link
Member

Thank you for doing this @cjnolet! 😄 Will take a closer look.

@pentschev
Copy link
Member

I think this may be related to dask/distributed#2948, we had to add a specialization to handle collections of objects supporting __cuda_array_interface__. Note that we decided to use msgpack only for serialization of collections of length greater than 5, due to performance concerns. We have not tested whether that was an optimal number, but maybe it's worth taking a look at that.

I'm a bit short on time lately, so if you have the bandwidth to work on this @jakirkham , that's much appreciated. If there's any information I can provide to help, please ask.

@jakirkham
Copy link
Member

Sorry I've had no time to look deeper unfortunately. Have been totally snowed 😞

This one is most likely slipping. It sounds like there is a viable workaround near term, but please let us know ASAP if that is not the case @cjnolet.

@jakirkham
Copy link
Member

FWIW I ran the reproducer above with Dask + Distributed 2.12.0. Am not seeing the serialization issue, but did run into OOM.

---------------------------------------------------------------------------
OutOfMemoryError                          Traceback (most recent call last)
<ipython-input-10-6d10fe8973eb> in <module>
----> 1 model.fit(X, y)

/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/cuml/dask/naive_bayes/naive_bayes.py in fit(self, X, y, classes)
    184         class_counts = self.client_.compute(
    185             [self.client_.submit(MultinomialNB._get_class_counts, c)
--> 186              for c in counts], sync=True)
    187         feature_counts = self.client_.compute(
    188             [self.client_.submit(MultinomialNB._get_feature_counts, c)

/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/distributed/client.py in compute(self, collections, sync, optimize_graph, workers, allow_other_workers, resources, retries, priority, fifo_timeout, actors, traverse, **kwargs)
   2798 
   2799         if sync:
-> 2800             result = self.gather(futures)
   2801         else:
   2802             result = futures

/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1891                 direct=direct,
   1892                 local_worker=local_worker,
-> 1893                 asynchronous=asynchronous,
   1894             )
   1895 

/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    778         else:
    779             return sync(
--> 780                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    781             )
    782 

/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    346     if error[0]:
    347         typ, exc, tb = error[0]
--> 348         raise exc.with_traceback(tb)
    349     else:
    350         return result[0]

/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/distributed/utils.py in f()
    330             if callback_timeout is not None:
    331                 future = asyncio.wait_for(future, callback_timeout)
--> 332             result[0] = yield future
    333         except Exception as exc:
    334             error[0] = sys.exc_info()

/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1750                             exc = CancelledError(key)
   1751                         else:
-> 1752                             raise exception.with_traceback(traceback)
   1753                         raise exc
   1754                     if errors == "skip":

/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/cuml/dask/naive_bayes/naive_bayes.py in _fit()
    117 
    118         for x, y in Xy:
--> 119             model.partial_fit(x, y, classes=classes)
    120 
    121         return model.class_count_, model.feature_count_

/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/cuml/naive_bayes/naive_bayes.py in partial_fit()
    331         """
    332         return self._partial_fit(X, y, sample_weight=sample_weight,
--> 333                                  _classes=classes)
    334 
    335     @cp.prof.TimeRangeDecorator(message="predict()", color_id=1)

/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/cupy/prof/time_range.py in inner()
    107         def inner(*args, **kwargs):
    108             with self._recreate_cm(func.__name__):
--> 109                 return func(*args, **kwargs)
    110         return inner

/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/cuml/naive_bayes/naive_bayes.py in _partial_fit()
    278         self._count(X, Y)
    279 
--> 280         self._update_feature_log_prob(self.alpha)
    281         self._update_class_log_prior(class_prior=self.class_prior)
    282 

/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/cuml/naive_bayes/naive_bayes.py in _update_feature_log_prob()
    569         smoothed_fc = self.feature_count_ + alpha
    570         smoothed_cc = smoothed_fc.sum(axis=1).reshape(-1, 1)
--> 571         self.feature_log_prob_ = (cp.log(smoothed_fc) -
    572                                   cp.log(smoothed_cc.reshape(-1, 1)))
    573 

cupy/core/_kernel.pyx in cupy.core._kernel.ufunc.__call__()

cupy/core/_kernel.pyx in cupy.core._kernel._get_out_args()

cupy/core/core.pyx in cupy.core.core.ndarray.__init__()

cupy/cuda/memory.pyx in cupy.cuda.memory.alloc()

cupy/cuda/memory.pyx in cupy.cuda.memory.MemoryPool.malloc()

cupy/cuda/memory.pyx in cupy.cuda.memory.MemoryPool.malloc()

cupy/cuda/memory.pyx in cupy.cuda.memory.SingleDeviceMemoryPool.malloc()

cupy/cuda/memory.pyx in cupy.cuda.memory.SingleDeviceMemoryPool._malloc()

cupy/cuda/memory.pyx in cupy.cuda.memory._try_malloc()

OutOfMemoryError: Out of memory allocating 1,280,000,000 bytes (allocated so far: 2,588,727,808 bytes).

Looks like we need to enable to RMM-backed allocations with CuPy.

import cupy as cp
import rmm

cp.cuda.set_allocator(rmm.rmm_cupy_allocator)
client.run(cp.cuda.set_allocator, rmm.rmm_cupy_allocator)

@jakirkham
Copy link
Member

Should add when I fix that I get some kind of crash. May have some out-of-date things though (ucx-py had an update today for example). It's also worth noting we are not using NVLink or InfiniBand.

model.fit(X, y)                                                        
mlx5: dgx15: got completion with error:
00000000 00000000 00000000 00000000
00000000 00000000 00000000 00000000
00000006 00000000 00000000 00000000
00000000 8c006801 08002b4f 001493d2
[dgx15:58268:0:58268] rc_verbs_iface.c:69   send completion with error: local length error qpn 0x2b4f wrid 0x6 vendor_err 0x68
==== backtrace ====
    0  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/ucp/_libs/../../../../libucs.so.0(ucs_fatal_error_message+0x51) [0x7fd102c90171]
    1  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/ucp/_libs/../../../../libucs.so.0(+0x22f37) [0x7fd102c93f37]
    2  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/ucp/_libs/../../../../libucs.so.0(ucs_log_dispatch+0xd9) [0x7fd102c94069]
    3  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/ucp/_libs/../../../../ucx/libuct_ib.so.0(+0x1b42e) [0x7fd102bab42e]
    4  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/ucp/_libs/../../../../ucx/libuct_ib.so.0(+0x1ba6e) [0x7fd102baba6e]
    5  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/ucp/_libs/../../../../libucp.so.0(ucp_worker_progress+0x6a) [0x7fd102cdb7ba]
    6  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/ucp/_libs/core.cpython-36m-x86_64-linux-gnu.so(+0x15252) [0x7fd102d47252]
    7  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/ucp/_libs/core.cpython-36m-x86_64-linux-gnu.so(+0x2a7f2) [0x7fd102d5c7f2]
    8  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/ucp/_libs/core.cpython-36m-x86_64-linux-gnu.so(+0x44114) [0x7fd102d76114]
    9  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/ucp/_libs/core.cpython-36m-x86_64-linux-gnu.so(+0x17294) [0x7fd102d49294]
   10  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/ucp/_libs/core.cpython-36m-x86_64-linux-gnu.so(+0x38602) [0x7fd102d6a602]
   11  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyCFunction_FastCallDict+0x105) [0x56184257fd65]
   12  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyObject_FastCallDict+0x2bf) [0x5618425801bf]
   13  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyObject_CallMethodIdObjArgs+0x100) [0x5618425a9730]
   14  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/lib-dynload/_asyncio.cpython-36m-x86_64-linux-gnu.so(+0x9bd0) [0x7fd1f6e2cbd0]
   15  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(PyObject_Call+0x3e) [0x56184257fbbe]
   16  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x1990) [0x561842632990]
   17  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x16fa2b) [0x5618425dca2b]
   18  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x1a15b5) [0x56184260e5b5]
   19  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x30a) [0x56184263130a]
   20  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x16fa2b) [0x5618425dca2b]
   21  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x1a15b5) [0x56184260e5b5]
   22  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x30a) [0x56184263130a]
   23  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x16fa2b) [0x5618425dca2b]
   24  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x1a15b5) [0x56184260e5b5]
   25  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x30a) [0x56184263130a]
   26  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x16fa2b) [0x5618425dca2b]
   27  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x1a15b5) [0x56184260e5b5]
   28  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x30a) [0x56184263130a]
   29  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x16f0d3) [0x5618425dc0d3]
   30  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x16fc61) [0x5618425dcc61]
   31  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x1a15b5) [0x56184260e5b5]
   32  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x30a) [0x56184263130a]
   33  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x16f0d3) [0x5618425dc0d3]
   34  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyFunction_FastCallDict+0x3d8) [0x5618425dd358]
   35  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyObject_FastCallDict+0x26f) [0x56184258016f]
   36  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyObject_Call_Prepend+0x63) [0x561842584d33]
   37  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(PyObject_Call+0x3e) [0x56184257fbbe]
   38  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x1990) [0x561842632990]
   39  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyFunction_FastCallDict+0x11b) [0x5618425dd09b]
   40  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyObject_FastCallDict+0x26f) [0x56184258016f]
   41  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyObject_Call_Prepend+0x63) [0x561842584d33]
   42  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(PyObject_Call+0x3e) [0x56184257fbbe]
   43  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x1990) [0x561842632990]
   44  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x16fa2b) [0x5618425dca2b]
   45  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x1a15b5) [0x56184260e5b5]
   46  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x30a) [0x56184263130a]
   47  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x16fa2b) [0x5618425dca2b]
   48  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x1a15b5) [0x56184260e5b5]
   49  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x30a) [0x56184263130a]
   50  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x16fa2b) [0x5618425dca2b]
   51  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x1a15b5) [0x56184260e5b5]
   52  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x30a) [0x56184263130a]
   53  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x16eda4) [0x5618425dbda4]
   54  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x16fc61) [0x5618425dcc61]
   55  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x1a15b5) [0x56184260e5b5]
   56  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x10c8) [0x5618426320c8]
   57  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(PyEval_EvalCodeEx+0x329) [0x5618425e1019]
   58  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(PyEval_EvalCode+0x1c) [0x5618425e1dac]
   59  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x216514) [0x561842683514]
   60  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(PyRun_StringFlags+0x7d) [0x5618426835ad]
   61  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(PyRun_SimpleStringFlags+0x3f) [0x56184268360f]
===================
distributed.nanny - WARNING - Restarting worker
[dgx15:57440:0:58158]   tag_match.c:82   Bug: expected request not found
==== backtrace ====
    0  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/ucp/_libs/../../../../libucs.so.0(ucs_fatal_error_message+0x51) [0x7f6b2c156171]
    1  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/ucp/_libs/../../../../libucs.so.0(ucs_fatal_error_format+0xf5) [0x7f6b2c156325]
    2  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/ucp/_libs/../../../../libucp.so.0(ucp_tag_exp_search_all+0) [0x7f6b2c1bc920]
    3  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/ucp/_libs/../../../../libucp.so.0(ucp_request_cancel+0x6b) [0x7f6b2c19b22b]
    4  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/ucp/_libs/core.cpython-36m-x86_64-linux-gnu.so(+0x46409) [0x7f6b2c23e409]
    5  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/ucp/_libs/core.cpython-36m-x86_64-linux-gnu.so(+0x1a64f) [0x7f6b2c21264f]
    6  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/ucp/_libs/core.cpython-36m-x86_64-linux-gnu.so(+0x1a84e) [0x7f6b2c21284e]
    7  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/ucp/_libs/core.cpython-36m-x86_64-linux-gnu.so(+0x3e1bd) [0x7f6b2c2361bd]
    8  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/ucp/_libs/core.cpython-36m-x86_64-linux-gnu.so(+0x17294) [0x7f6b2c20f294]
    9  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/site-packages/ucp/_libs/core.cpython-36m-x86_64-linux-gnu.so(+0x38849) [0x7f6b2c230849]
   10  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x4fed) [0x5575ff477fed]
   11  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyGen_Send+0x134) [0x5575ff4536c4]
   12  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x1440) [0x5575ff474440]
   13  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyGen_Send+0x134) [0x5575ff4536c4]
   14  /datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/lib-dynload/_asyncio.cpython-36m-x86_64-linux-gnu.so(+0x95c6) [0x7f6c9fd2a5c6]
   15  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(PyObject_Call+0x3e) [0x5575ff3c1bbe]
   16  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x1990) [0x5575ff474990]
   17  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x16fa2b) [0x5575ff41ea2b]
   18  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x1a15b5) [0x5575ff4505b5]
   19  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x30a) [0x5575ff47330a]
   20  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x16fa2b) [0x5575ff41ea2b]
   21  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x1a15b5) [0x5575ff4505b5]
   22  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x30a) [0x5575ff47330a]
   23  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x16fa2b) [0x5575ff41ea2b]
   24  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x1a15b5) [0x5575ff4505b5]
   25  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x30a) [0x5575ff47330a]
   26  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x16fa2b) [0x5575ff41ea2b]
   27  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x1a15b5) [0x5575ff4505b5]
   28  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x30a) [0x5575ff47330a]
   29  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(PyEval_EvalCodeEx+0x966) [0x5575ff423656]
   30  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x174f36) [0x5575ff423f36]
   31  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(PyObject_Call+0x3e) [0x5575ff3c1bbe]
   32  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x1990) [0x5575ff474990]
   33  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x16fa2b) [0x5575ff41ea2b]
   34  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x1a15b5) [0x5575ff4505b5]
   35  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x30a) [0x5575ff47330a]
   36  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x16fa2b) [0x5575ff41ea2b]
   37  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x1a15b5) [0x5575ff4505b5]
   38  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyEval_EvalFrameDefault+0x30a) [0x5575ff47330a]
   39  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyFunction_FastCallDict+0x11b) [0x5575ff41f09b]
   40  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyObject_FastCallDict+0x26f) [0x5575ff3c216f]
   41  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(_PyObject_Call_Prepend+0x63) [0x5575ff3c6d33]
   42  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(PyObject_Call+0x3e) [0x5575ff3c1bbe]
   43  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x210766) [0x5575ff4bf766]
   44  /datasets/jkirkham/miniconda/envs/rapids13dev/bin/python(+0x1cb1b8) [0x5575ff47a1b8]
   45  /lib/x86_64-linux-gnu/libpthread.so.0(+0x76db) [0x7f6ca21556db]
   46  /lib/x86_64-linux-gnu/libc.so.6(clone+0x3f) [0x7f6ca1e7e88f]
===================
/datasets/jkirkham/miniconda/envs/rapids13dev/lib/python3.6/multiprocessing/semaphore_tracker.py:143: UserWarning: semaphore_tracker: There appear to be 48 leaked semaphores to clean up at shutdown
  len(cache))
Aborted (core dumped)

@jakirkham
Copy link
Member

Ok same issue with NVLink and InfiniBand enabled on a DGX-1.

@jakirkham
Copy link
Member

All this to say I'm not seeing this specific issue, but am seeing other ones 😟

@cjnolet
Copy link
Member Author

cjnolet commented Mar 11, 2020

@jakirkham, can you see if these issues still occur when using rapidsai/cuml#1805? I'm actually not able to reproduce the deserialization error anymore either, so I'm thinking it must have been fixed at some point since I opened this issue.

I'm also not encountering an OOM with PR #1805. At least not on my workstation. If you are just running my reproducible example, I'm not sure Infiniband is going to have much effect, since It's only a single partition that would be getting copied to the client. I guess whether NVLink has an effect would depend on which GPU the worker and client are using, right?

@cjnolet
Copy link
Member Author

cjnolet commented Mar 11, 2020

Looks like we need to enable to RMM-backed allocations with CuPy.

@dantegd We were talking about this last week when we realized the RMM allocator settings from @with_cupy_rmm weren't being propagated to the Dask workers. Did we ever reach a solution / conclusion? I recall us discussing whether we should be running the following code globally in cuml in the meantime, but I can't quite remember if we decided to check this in for 0.13:

import cupy as cp
import rmm

cp.cuda.set_allocator(rmm.rmm_cupy_allocator)
client.run(cp.cuda.set_allocator, rmm.rmm_cupy_allocator)

@jakirkham, does setting RMM pool allocator on the LocalCUDACluster also set this automatically? This seems to me like something we'd ultimately want to have on the Dask CUDA side (or at least be supporting generally across RAPIDS).

@jakirkham
Copy link
Member

Well cuDF will hook the RMM allocator into CuPy on import cudf IIUC. So maybe that solves the issue for cuML?

cc @kkraus14

xref: rapidsai/cudf#4351

@cjnolet
Copy link
Member Author

cjnolet commented Mar 11, 2020

Ack! I spoke too soon- Indeed, I am still getting this error when I use UCX, unfortunately.

cuml/test/dask/test_naive_bayes.py bokeh.server.util - WARNING - Host wildcard '*' will allow connections originating from multiple (or possibly all) hostnames or IPs. Use non-wildcard values to restrict access explicitly
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/share/software/miniconda3/envs/cuml_dev_3/lib/python3.7/site-packages/distributed/protocol/core.py", line 124, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/share/software/miniconda3/envs/cuml_dev_3/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 255, in deserialize
    deserializers=deserializers,
  File "/share/software/miniconda3/envs/cuml_dev_3/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 268, in deserialize
    return loads(header, frames)
  File "/share/software/miniconda3/envs/cuml_dev_3/lib/python3.7/site-packages/distributed/protocol/cuda.py", line 28, in cuda_loads
    return loads(header, frames)
  File "/share/software/miniconda3/envs/cuml_dev_3/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 63, in cuda_deserialize_cupy_ndarray
    frame = PatchedCudaArrayInterface(frame)
  File "/share/software/miniconda3/envs/cuml_dev_3/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 26, in __init__
    self.__cuda_array_interface__ = ary.__cuda_array_interface__
AttributeError: 'bytes' object has no attribute '__cuda_array_interface__'
distributed.utils - ERROR - 'bytes' object has no attribute '__cuda_array_interface__'
Traceback (most recent call last):
  File "/share/software/miniconda3/envs/cuml_dev_3/lib/python3.7/site-packages/distributed/utils.py", line 665, in log_errors
    yield
  File "/share/software/miniconda3/envs/cuml_dev_3/lib/python3.7/site-packages/distributed/comm/ucx.py", line 207, in read
    frames, deserialize=self.deserialize, deserializers=deserializers
  File "/share/software/miniconda3/envs/cuml_dev_3/lib/python3.7/site-packages/distributed/comm/utils.py", line 73, in from_frames
    res = await offload(_from_frames)
  File "/share/software/miniconda3/envs/cuml_dev_3/lib/python3.7/site-packages/distributed/utils.py", line 1458, in offload
    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
  File "/share/software/miniconda3/envs/cuml_dev_3/lib/python3.7/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/share/software/miniconda3/envs/cuml_dev_3/lib/python3.7/site-packages/distributed/utils.py", line 1458, in <lambda>
    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
  File "/share/software/miniconda3/envs/cuml_dev_3/lib/python3.7/site-packages/distributed/comm/utils.py", line 61, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/share/software/miniconda3/envs/cuml_dev_3/lib/python3.7/site-packages/distributed/protocol/core.py", line 124, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/share/software/miniconda3/envs/cuml_dev_3/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 255, in deserialize
    deserializers=deserializers,
  File "/share/software/miniconda3/envs/cuml_dev_3/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 268, in deserialize
    return loads(header, frames)
  File "/share/software/miniconda3/envs/cuml_dev_3/lib/python3.7/site-packages/distributed/protocol/cuda.py", line 28, in cuda_loads
    return loads(header, frames)
  File "/share/software/miniconda3/envs/cuml_dev_3/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 63, in cuda_deserialize_cupy_ndarray
    frame = PatchedCudaArrayInterface(frame)
  File "/share/software/miniconda3/envs/cuml_dev_3/lib/python3.7/site-packages/distributed/protocol/cupy.py", line 26, in __init__
    self.__cuda_array_interface__ = ary.__cuda_array_interface__
AttributeError: 'bytes' object has no attribute '__cuda_array_interface__'
Fbokeh.server.util - WARNING - Host wildcard '*' will allow connections originating from multiple (or possibly all) hostnames or IPs. Use non-wildcard values to restrict access explicitly
.

This runs successfully w/ protocol='tcp'.

Also, here's the ideal Naive Bayes fit() function (without the workaround):

    @with_cupy_rmm
    def fit(self, X, y, classes=None):

        """
        Fit distributed Naive Bayes classifier model

        Parameters
        ----------

        X : dask.Array with blocks containing dense or sparse cupy arrays
        y : dask.Array with blocks containing cupy.ndarray
        classes : array-like containing unique class labels

        Returns
        -------

        cuml.dask.naive_bayes.MultinomialNB current model instance
        """

        # Only Dask.Array supported for now
        if not isinstance(X, dask.array.core.Array):
            raise ValueError("Only dask.Array is supported for X")

        if not isinstance(y, dask.array.core.Array):
            raise ValueError("Only dask.Array is supported for y")

        if len(X.chunks[1]) != 1:
            raise ValueError("X must be chunked by row only. "
                             "Multi-dimensional chunking is not supported")

        worker_parts = self.client_.sync(extract_arr_partitions,
                                         [X, y])

        worker_parts = workers_to_parts(worker_parts)

        n_features = X.shape[1]

        classes = MultinomialNB._unique(y.map_blocks(
            MultinomialNB._unique).compute()) if classes is None else classes

        n_classes = len(classes)

        counts = [self.client_.submit(
            MultinomialNB._fit,
            p,
            classes,
            self.kwargs,
            workers=[w]
        ) for w, p in worker_parts.items()]

        local_counts = self.client_.compute(counts, sync=True)

        self.local_model = MNB(**self.kwargs)
        self.local_model.classes_ = classes
        self.local_model.n_classes = n_classes
        self.local_model.n_features = X.shape[1]

        self.local_model.class_count_ = cp.zeros(n_classes,
                                                 order="F",
                                                 dtype=cp.float32)
        self.local_model.feature_count_ = cp.zeros((n_classes, n_features),
                                                   order="F",
                                                   dtype=cp.float32)

        for class_count_, feature_count_ in local_counts:
            self.local_model.class_count_ += class_count_
            self.local_model.feature_count_ += feature_count_

        self.local_model.update_log_probs()

@cjnolet
Copy link
Member Author

cjnolet commented Mar 11, 2020

RE: cudf import. I recall having a conversation very recently to do exactly what cudf appears to be doing (setting the rmm allocator globally) but was informed there was push-back against doing this. cc @JohnZed

@jakirkham
Copy link
Member

Hmm...yeah not sure. I'm guessing that fit would be the same as in the MRE, right?

In any event, when using protocol="ucx" most users are adding enable_nvlink=True. This ends up being pretty important for performance. I'm curious if not enabling NVLink and this issue are intertwined. For instance do you see this issue with that flag passed to LocalCUDACluster?

@cjnolet
Copy link
Member Author

cjnolet commented Mar 11, 2020

Here's the offending header:

Offending header: {'shape': (4, 8000000), 'typestr': '<f4', 'descr': (('', '<f4'),), 'version': 2, 'strides': (4, 16), 'data': (140229675581440, False), 'type-serialized': b'\x80\x04\x95\x1e\x00\x00\x00\x00\x00\x00\x00\x8c\x0ecupy.core.core\x94\x8c\x07ndarray\x94\x93\x94.', 'serializer': 'cuda', 'compression': (None,)}

I can provide the frame as well, but it's a bytes object and it's pretty large (8000000 * n_classes floats in the reproducible example above). Not sure how useful that will be- it's undoubtedly a sequence of bytes.

Looking at the corresponding cupy deserializer, it looks like anything that's not a cupy.ndarray is assumed to be a cuda array interface, but in this case it appears to be bytes.

@cuda_deserialize.register(cupy.ndarray)
def cuda_deserialize_cupy_ndarray(header, frames):
    (frame,) = frames
    if not isinstance(frame, cupy.ndarray):
        frame = PatchedCudaArrayInterface(frame)
    arr = cupy.ndarray(
        shape=header["shape"],
        dtype=header["typestr"],
        memptr=cupy.asarray(frame).data,
        strides=header["strides"],
    )
    return arr

FWIW, @dantegd recently fixed this in the CumlArray with this conditional:

        # Base class (Buffer) constructor call
        if isinstance(data, bytearray) or isinstance(data, bytes):
            data = memoryview(data)
        size, shape = _get_size_from_shape(shape, dtype)
        super(CumlArray, self).__init__(data=data, owner=owner, size=size)

It looks like after the memoryview is created in cuml, it's passed through this conditional in the cudf Buffer: https://github.com/rapidsai/cudf/blob/46ef3ddab5f0fed581478bfc27498fd49cbc6106/python/cudf/cudf/core/buffer.py#L87

@cjnolet
Copy link
Member Author

cjnolet commented Mar 11, 2020

This seems to have fixed the issue:

@cuda_deserialize.register(cupy.ndarray)
def cuda_deserialize_cupy_ndarray(header, frames):
    (frame,) = frames
    if isinstance(frame, (bytes, bytearray)):
        frame = cupy.array(memoryview(frame))
    if not isinstance(frame, cupy.ndarray):
        frame = PatchedCudaArrayInterface(frame)
    arr = cupy.ndarray(
        shape=header["shape"],
        dtype=header["typestr"],
        memptr=cupy.asarray(frame).data,
        strides=header["strides"],
    )
    return arr

@jakirkham
Copy link
Member

Thanks for posting this Corey. Let's dig into this a bit. 🙂

The first thing that should be noted is the 'serializer': 'cuda' part. This means that when the CuPy array was serialized it went through cuda_serialize_cupy_ndarray. This produces a frame containing a flattened uint8 CuPy ndarray as part of this write step. We flag any CUDA objects for the receiver, which are then read into RMM allocated memory.

If we knew D2H/H2D was needed as part of the transmission protocol, we would handle that as part of CuPy ndarray deserialization. However that doesn't happen because the header has 'serializer': 'cuda' and not 'serializer': 'dask'. So while we could make things work for this case. The performance would suffer and we would be obfuscating a problem. IOW the error here is good. It tells something is not working correctly.

The fact that we got bytes means one of these steps didn't happen as expected or it means something else happened inbetween. It also means the data was moved to host at some stage and the header was not updated to reflect that.

@cjnolet
Copy link
Member Author

cjnolet commented Mar 11, 2020

So while we could make things work for this case. The performance would suffer and we would be obfuscating a problem. IOW the error here is good. It tells something is not working correctly.

Definitely agree it would be a good idea to fix this properly. Please let me know if you are also able to reproduce by making the change to naive_bayes.py and if I can provide any additional info in this thread.

@cjnolet
Copy link
Member Author

cjnolet commented Mar 17, 2020

@quasiben, just a heads up, because you mentioned working on reproducing this. I would recommend using a cuml nightly from prior to yesterday.

@jakirkham
Copy link
Member

It appears this is a Distributed bug that has since been fixed ( dask/distributed#3580 ). Closing...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants