Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relax NumPy requirement in UCX #3731

Merged
merged 15 commits into from
Apr 21, 2020
Merged
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 48 additions & 32 deletions distributed/comm/ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
.. _UCX: https://github.com/openucx/ucx
"""
import logging
import struct
import weakref

import dask
import numpy as np

from .addressing import parse_host_port, unparse_host_port
from .core import Comm, Connector, Listener, CommClosedError
Expand All @@ -33,7 +33,8 @@
# required to ensure Dask configuration gets propagated to UCX, which needs
# variables to be set before being imported.
ucp = None
cuda_array = None
host_array = None
device_array = None


def synchronize_stream(stream=0):
Expand All @@ -46,7 +47,7 @@ def synchronize_stream(stream=0):


def init_once():
global ucp, cuda_array
global ucp, host_array, device_array
if ucp is not None:
return

Expand All @@ -59,34 +60,42 @@ def init_once():

ucp.init(options=ucx_config, env_takes_precedence=True)

# Find the function, `host_array()`, to use when allocating new host arrays
try:
quasiben marked this conversation as resolved.
Show resolved Hide resolved
import numpy

host_array = lambda n: numpy.empty((n,), dtype="u1")
except ImportError:
host_array = lambda n: bytearray(n)

# Find the function, `cuda_array()`, to use when allocating new CUDA arrays
try:
import rmm

if hasattr(rmm, "DeviceBuffer"):
cuda_array = lambda n: rmm.DeviceBuffer(size=n)
device_array = lambda n: rmm.DeviceBuffer(size=n)
else: # pre-0.11.0
import numba.cuda

def rmm_cuda_array(n):
a = rmm.device_array(n, dtype=np.uint8)
def rmm_device_array(n):
a = rmm.device_array(n, dtype="u1")
weakref.finalize(a, numba.cuda.current_context)
return a

cuda_array = rmm_cuda_array
device_array = rmm_device_array
except ImportError:
try:
import numba.cuda

def numba_cuda_array(n):
a = numba.cuda.device_array((n,), dtype=np.uint8)
def numba_device_array(n):
a = numba.cuda.device_array((n,), dtype="u1")
weakref.finalize(a, numba.cuda.current_context)
return a

cuda_array = numba_cuda_array
device_array = numba_device_array
except ImportError:

def cuda_array(n):
def device_array(n):
raise RuntimeError(
"In order to send/recv CUDA arrays, Numba or RMM is required"
)
Expand Down Expand Up @@ -169,20 +178,21 @@ async def write(
frames = await to_frames(
msg, serializers=serializers, on_error=on_error
)
nframes = len(frames)
cuda_frames = tuple(
hasattr(f, "__cuda_array_interface__") for f in frames
)
sizes = tuple(nbytes(f) for f in frames)
send_frames = [
each_frame for each_frame in frames if len(each_frame) > 0
each_frame
for each_frame, each_size in zip(frames, sizes)
if each_size
]

# Send meta data
cuda_frames = np.array(
[hasattr(f, "__cuda_array_interface__") for f in frames],
dtype=np.bool,
)
await self.ep.send(np.array([len(frames)], dtype=np.uint64))
await self.ep.send(cuda_frames)
await self.ep.send(
np.array([nbytes(f) for f in frames], dtype=np.uint64)
)
await self.ep.send(struct.pack("Q", nframes))
await self.ep.send(struct.pack(nframes * "?", *cuda_frames))
await self.ep.send(struct.pack(nframes * "Q", *sizes))

# Send frames

Expand All @@ -191,12 +201,12 @@ async def write(
# syncing the default stream will wait for other non-blocking CUDA streams.
# Note this is only sufficient if the memory being sent is not currently in use on
# non-blocking CUDA streams.
if cuda_frames.any():
if any(cuda_frames):
synchronize_stream(0)

for each_frame in send_frames:
await self.ep.send(each_frame)
return sum(map(nbytes, send_frames))
return sum(sizes)
except (ucp.exceptions.UCXBaseException):
self.abort()
raise CommClosedError("While writing, the connection was closed")
Expand All @@ -211,30 +221,36 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")):

try:
# Recv meta data
nframes = np.empty(1, dtype=np.uint64)
nframes_fmt = "Q"
nframes = host_array(struct.calcsize(nframes_fmt))
await self.ep.recv(nframes)
is_cudas = np.empty(nframes[0], dtype=np.bool)
await self.ep.recv(is_cudas)
sizes = np.empty(nframes[0], dtype=np.uint64)
(nframes,) = struct.unpack(nframes_fmt, nframes)

cuda_frames_fmt = nframes * "?"
cuda_frames = host_array(struct.calcsize(cuda_frames_fmt))
await self.ep.recv(cuda_frames)
cuda_frames = struct.unpack(cuda_frames_fmt, cuda_frames)

sizes_fmt = nframes * "Q"
sizes = host_array(struct.calcsize(sizes_fmt))
await self.ep.recv(sizes)
sizes = struct.unpack(sizes_fmt, sizes)
except (ucp.exceptions.UCXBaseException, CancelledError):
self.abort()
raise CommClosedError("While reading, the connection was closed")
else:
# Recv frames
frames = [
cuda_array(each_size)
if is_cuda
else np.empty(each_size, dtype=np.uint8)
for is_cuda, each_size in zip(is_cudas.tolist(), sizes.tolist())
device_array(each_size) if is_cuda else host_array(each_size)
for is_cuda, each_size in zip(cuda_frames, sizes)
]
recv_frames = [
each_frame for each_frame in frames if len(each_frame) > 0
]

# It is necessary to first populate `frames` with CUDA arrays and synchronize
# the default stream before starting receiving to ensure buffers have been allocated
if is_cudas.any():
if any(cuda_frames):
synchronize_stream(0)

for each_frame in recv_frames:
Expand Down