Skip to content

Commit

Permalink
Relax NumPy requirement in UCX (#3731)
Browse files Browse the repository at this point in the history
* Make `device_array`'s shape a `tuple`

While it works to have this be a single `int` (as it will be coerced to
a `tuple`), go ahead and make it a `tuple` for clarity and to match more
closely to the Numba case.

* Use `"u1"` to specify `uint8` typed arrays

This is equivalent to using NumPy's `uint8`, but has the added benefit
of not requiring NumPy be imported to work.

* Rename `is_cudas` to `cuda_frames`

Matches the variable name in the `send` case to make things easier to
follow.

* Use `pack`/`unpack` for UCX frame metadata

As `struct.pack` and `struct.unpack` are able to build `bytes` objects
containing the frame metadata needed by UCX easily, just use these
functions instead of creating NumPy arrays each time. Helps soften the
NumPy requirement a bit.

* Rename `cuda_array` to `device_array`

Matches more closely to the name used by RMM and Numba.

* Create function to allocate arrays on host

To relax the NumPy requirement completely, add a function to allocate
arrays on host. If NumPy is not present, this falls back to just
allocating `bytearray` objects, which work just as well.

* Fix formatting with black

* Define `cuda_frames` with other frame definitions

* Store `nframes` for simplicity

Avoids multiple calls to `len(frames)`, is a bit easier to read, and
matches the receive code path more closely.

* Collect sizes along with other frame info

* Use `sizes` to pick out non-trivial frames to send

* Simply call `sum` on `sizes` for bytes sent

* Use `host_array` to make buffers to receive into

* Pack per frame metadata into one message

To send fewer and larger messages, pack both which frames are on device
and how large each frame is into one message.

* Note what `struct` lines are packing/unpacking
  • Loading branch information
jakirkham authored Apr 21, 2020
1 parent 9cfd066 commit 6db09f3
Showing 1 changed file with 52 additions and 31 deletions.
83 changes: 52 additions & 31 deletions distributed/comm/ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
.. _UCX: https://github.com/openucx/ucx
"""
import logging
import struct
import weakref

import dask
import numpy as np

from .addressing import parse_host_port, unparse_host_port
from .core import Comm, Connector, Listener, CommClosedError
Expand All @@ -33,7 +33,8 @@
# required to ensure Dask configuration gets propagated to UCX, which needs
# variables to be set before being imported.
ucp = None
cuda_array = None
host_array = None
device_array = None


def synchronize_stream(stream=0):
Expand All @@ -46,7 +47,7 @@ def synchronize_stream(stream=0):


def init_once():
global ucp, cuda_array
global ucp, host_array, device_array
if ucp is not None:
return

Expand All @@ -59,34 +60,42 @@ def init_once():

ucp.init(options=ucx_config, env_takes_precedence=True)

# Find the function, `host_array()`, to use when allocating new host arrays
try:
import numpy

host_array = lambda n: numpy.empty((n,), dtype="u1")
except ImportError:
host_array = lambda n: bytearray(n)

# Find the function, `cuda_array()`, to use when allocating new CUDA arrays
try:
import rmm

if hasattr(rmm, "DeviceBuffer"):
cuda_array = lambda n: rmm.DeviceBuffer(size=n)
device_array = lambda n: rmm.DeviceBuffer(size=n)
else: # pre-0.11.0
import numba.cuda

def rmm_cuda_array(n):
a = rmm.device_array(n, dtype=np.uint8)
def rmm_device_array(n):
a = rmm.device_array(n, dtype="u1")
weakref.finalize(a, numba.cuda.current_context)
return a

cuda_array = rmm_cuda_array
device_array = rmm_device_array
except ImportError:
try:
import numba.cuda

def numba_cuda_array(n):
a = numba.cuda.device_array((n,), dtype=np.uint8)
def numba_device_array(n):
a = numba.cuda.device_array((n,), dtype="u1")
weakref.finalize(a, numba.cuda.current_context)
return a

cuda_array = numba_cuda_array
device_array = numba_device_array
except ImportError:

def cuda_array(n):
def device_array(n):
raise RuntimeError(
"In order to send/recv CUDA arrays, Numba or RMM is required"
)
Expand Down Expand Up @@ -169,19 +178,25 @@ async def write(
frames = await to_frames(
msg, serializers=serializers, on_error=on_error
)
nframes = len(frames)
cuda_frames = tuple(
hasattr(f, "__cuda_array_interface__") for f in frames
)
sizes = tuple(nbytes(f) for f in frames)
send_frames = [
each_frame for each_frame in frames if len(each_frame) > 0
each_frame
for each_frame, each_size in zip(frames, sizes)
if each_size
]

# Send meta data
cuda_frames = np.array(
[hasattr(f, "__cuda_array_interface__") for f in frames],
dtype=np.bool,
)
await self.ep.send(np.array([len(frames)], dtype=np.uint64))
await self.ep.send(cuda_frames)

# Send # of frames (uint64)
await self.ep.send(struct.pack("Q", nframes))
# Send which frames are CUDA (bool) and
# how large each frame is (uint64)
await self.ep.send(
np.array([nbytes(f) for f in frames], dtype=np.uint64)
struct.pack(nframes * "?" + nframes * "Q", *cuda_frames, *sizes)
)

# Send frames
Expand All @@ -191,12 +206,12 @@ async def write(
# syncing the default stream will wait for other non-blocking CUDA streams.
# Note this is only sufficient if the memory being sent is not currently in use on
# non-blocking CUDA streams.
if cuda_frames.any():
if any(cuda_frames):
synchronize_stream(0)

for each_frame in send_frames:
await self.ep.send(each_frame)
return sum(map(nbytes, send_frames))
return sum(sizes)
except (ucp.exceptions.UCXBaseException):
self.abort()
raise CommClosedError("While writing, the connection was closed")
Expand All @@ -211,30 +226,36 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")):

try:
# Recv meta data
nframes = np.empty(1, dtype=np.uint64)

# Recv # of frames (uint64)
nframes_fmt = "Q"
nframes = host_array(struct.calcsize(nframes_fmt))
await self.ep.recv(nframes)
is_cudas = np.empty(nframes[0], dtype=np.bool)
await self.ep.recv(is_cudas)
sizes = np.empty(nframes[0], dtype=np.uint64)
await self.ep.recv(sizes)
(nframes,) = struct.unpack(nframes_fmt, nframes)

# Recv which frames are CUDA (bool) and
# how large each frame is (uint64)
header_fmt = nframes * "?" + nframes * "Q"
header = host_array(struct.calcsize(header_fmt))
await self.ep.recv(header)
header = struct.unpack(header_fmt, header)
cuda_frames, sizes = header[:nframes], header[nframes:]
except (ucp.exceptions.UCXBaseException, CancelledError):
self.abort()
raise CommClosedError("While reading, the connection was closed")
else:
# Recv frames
frames = [
cuda_array(each_size)
if is_cuda
else np.empty(each_size, dtype=np.uint8)
for is_cuda, each_size in zip(is_cudas.tolist(), sizes.tolist())
device_array(each_size) if is_cuda else host_array(each_size)
for is_cuda, each_size in zip(cuda_frames, sizes)
]
recv_frames = [
each_frame for each_frame in frames if len(each_frame) > 0
]

# It is necessary to first populate `frames` with CUDA arrays and synchronize
# the default stream before starting receiving to ensure buffers have been allocated
if is_cudas.any():
if any(cuda_frames):
synchronize_stream(0)

for each_frame in recv_frames:
Expand Down

0 comments on commit 6db09f3

Please sign in to comment.