Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace cuDF (de)serializer with cuDF spill-aware (de)serializer #1369

Merged
16 changes: 16 additions & 0 deletions dask_cuda/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import dask.dataframe.shuffle
import dask.dataframe.multi
import dask.bag.core
from distributed.protocol.cuda import cuda_deserialize, cuda_serialize
from distributed.protocol.serialize import dask_deserialize, dask_serialize

from ._version import __git_commit__, __version__
from .cuda_worker import CUDAWorker
Expand Down Expand Up @@ -48,3 +50,17 @@
dask.dataframe.shuffle.shuffle_group
)
dask.dataframe.core._concat = unproxify_decorator(dask.dataframe.core._concat)


def _register_cudf_spill_aware():
import cudf

# Only enable Dask/cuDF spilling if cuDF spilling is disabled
if not cudf.get_option("spill"):
from cudf.comm import serialize


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good with a comment here saying why this it needed and include a link to #1363

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done via 29751de

for registry in [cuda_serialize, cuda_deserialize, dask_serialize, dask_deserialize]:
for lib in ["cudf", "dask_cudf"]:
if lib in registry._lazy:
registry._lazy[lib] = _register_cudf_spill_aware
8 changes: 8 additions & 0 deletions dask_cuda/cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ def del_pid_file():
},
)

cudf_spill_warning = dask.config.get("cudf-spill-warning", default=True)
if enable_cudf_spill and cudf_spill_warning:
warnings.warn(
"cuDF spilling is enabled, please ensure the client and scheduler "
"processes set `CUDF_SPILL=on` as well. To disable this warning "
"set `DASK_CUDF_SPILL_WARNING=False`."
)

self.nannies = [
Nanny(
scheduler,
Expand Down
7 changes: 7 additions & 0 deletions dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,13 @@ def __init__(
# initialization happens before we can set CUDA_VISIBLE_DEVICES
os.environ["RAPIDS_NO_INITIALIZE"] = "True"

if enable_cudf_spill:
import cudf

# cuDF spilling must be enabled in the client/scheduler process too.
cudf.set_option("spill", enable_cudf_spill)
cudf.set_option("spill_stats", cudf_spill_stats)

if threads_per_worker < 1:
raise ValueError("threads_per_worker must be higher than 0.")

Expand Down
27 changes: 27 additions & 0 deletions dask_cuda/tests/test_dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,3 +567,30 @@ def test_worker_timeout():
assert "reason: nanny-close" in ret.stderr.lower()

assert ret.returncode == 0


@pytest.mark.parametrize("enable_cudf_spill_warning", [False, True])
def test_worker_cudf_spill_warning(enable_cudf_spill_warning): # noqa: F811
pytest.importorskip("rmm")

environ = {"CUDA_VISIBLE_DEVICES": "0"}
if not enable_cudf_spill_warning:
environ["DASK_CUDF_SPILL_WARNING"] = "False"

with patch.dict(os.environ, environ):
ret = subprocess.run(
[
"dask",
"cuda",
"worker",
"127.0.0.1:9369",
"--enable-cudf-spill",
"--death-timeout",
"1",
],
capture_output=True,
)
if enable_cudf_spill_warning:
assert b"UserWarning: cuDF spilling is enabled" in ret.stderr
else:
assert b"UserWarning: cuDF spilling is enabled" not in ret.stderr
132 changes: 116 additions & 16 deletions dask_cuda/tests/test_spill.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,66 @@
from distributed.sizeof import sizeof
from distributed.utils_test import gen_cluster, gen_test, loop # noqa: F401

import dask_cudf

from dask_cuda import LocalCUDACluster, utils
from dask_cuda.utils_test import IncreasedCloseTimeoutNanny

if utils.get_device_total_memory() < 1e10:
pytest.skip("Not enough GPU memory", allow_module_level=True)


def _set_cudf_device_limit():
"""Ensure spilling for objects of all sizes"""
import cudf

cudf.set_option("spill_device_limit", 0)


def _assert_cudf_spill_stats(enable_cudf_spill, dask_worker=None):
"""Ensure cuDF has spilled data with its internal mechanism"""
import cudf

global_manager = cudf.core.buffer.spill_manager.get_global_manager()

if enable_cudf_spill:
stats = global_manager.statistics
buffers = global_manager.buffers()
assert stats.spill_totals[("gpu", "cpu")][0] > 1000
assert stats.spill_totals[("cpu", "gpu")][0] > 1000
assert len(buffers) > 0
else:
assert global_manager is None


@pytest.fixture(params=[False, True])
def cudf_spill(request):
"""Fixture to enable and clear cuDF spill manager in client process"""
cudf = pytest.importorskip("cudf")

enable_cudf_spill = request.param

if enable_cudf_spill:
# If the global spill manager was previously set, fail.
assert cudf.core.buffer.spill_manager._global_manager is None

cudf.set_option("spill", True)
cudf.set_option("spill_stats", True)

# This change is to prevent changing RMM resource stack in cuDF,
# workers do not need this because they are spawned as new
# processes for every new test that runs.
cudf.set_option("spill_on_demand", False)

_set_cudf_device_limit()

yield enable_cudf_spill

cudf.set_option("spill", False)
cudf.core.buffer.spill_manager._global_manager_uninitialized = True
cudf.core.buffer.spill_manager._global_manager = None


def device_host_file_size_matches(
dhf, total_bytes, device_chunk_overhead=0, serialized_chunk_overhead=1024
):
Expand Down Expand Up @@ -244,9 +297,11 @@ async def test_cupy_cluster_device_spill(params):
],
)
@gen_test(timeout=30)
async def test_cudf_cluster_device_spill(params):
async def test_cudf_cluster_device_spill(params, cudf_spill):
cudf = pytest.importorskip("cudf")

enable_cudf_spill = cudf_spill

with dask.config.set(
{
"distributed.comm.compression": False,
Expand All @@ -266,6 +321,7 @@ async def test_cudf_cluster_device_spill(params):
device_memory_limit=params["device_memory_limit"],
memory_limit=params["memory_limit"],
worker_class=IncreasedCloseTimeoutNanny,
enable_cudf_spill=enable_cudf_spill,
) as cluster:
async with Client(cluster, asynchronous=True) as client:

Expand Down Expand Up @@ -294,21 +350,28 @@ async def test_cudf_cluster_device_spill(params):
del cdf
gc.collect()

await client.run(
assert_host_chunks,
params["spills_to_disk"],
)
await client.run(
assert_disk_chunks,
params["spills_to_disk"],
)

await client.run(
worker_assert,
nbytes,
32,
2048,
)
if enable_cudf_spill:
await client.run(
worker_assert,
0,
0,
0,
)
else:
await client.run(
assert_host_chunks,
params["spills_to_disk"],
)
await client.run(
assert_disk_chunks,
params["spills_to_disk"],
)
await client.run(
worker_assert,
nbytes,
32,
2048,
)

del cdf2

Expand All @@ -324,3 +387,40 @@ async def test_cudf_cluster_device_spill(params):
gc.collect()
else:
break


@gen_test(timeout=30)
async def test_cudf_spill_cluster(cudf_spill):
cudf = pytest.importorskip("cudf")
enable_cudf_spill = cudf_spill

async with LocalCUDACluster(
n_workers=1,
scheduler_port=0,
silence_logs=False,
dashboard_address=None,
asynchronous=True,
device_memory_limit=None,
memory_limit=None,
worker_class=IncreasedCloseTimeoutNanny,
enable_cudf_spill=enable_cudf_spill,
cudf_spill_stats=enable_cudf_spill,
) as cluster:
async with Client(cluster, asynchronous=True) as client:

await client.wait_for_workers(1)
await client.run(_set_cudf_device_limit)

cdf = cudf.DataFrame(
{
"a": list(range(200)),
"b": list(reversed(range(200))),
"c": list(range(200)),
}
)

ddf = dask_cudf.from_cudf(cdf, npartitions=2).sum().persist()
await wait(ddf)

await client.run(_assert_cudf_spill_stats, enable_cudf_spill)
_assert_cudf_spill_stats(enable_cudf_spill)
Loading