diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml
index 69b0de5f..67bbd027 100644
--- a/.github/workflows/build.yaml
+++ b/.github/workflows/build.yaml
@@ -28,7 +28,7 @@ concurrency:
jobs:
conda-python-build:
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/conda-python-build.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/conda-python-build.yaml@branch-24.10
with:
build_type: ${{ inputs.build_type || 'branch' }}
branch: ${{ inputs.branch }}
@@ -38,7 +38,7 @@ jobs:
if: github.ref_type == 'branch'
needs: [conda-python-build]
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/custom-job.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/custom-job.yaml@branch-24.10
with:
arch: "amd64"
branch: ${{ inputs.branch }}
@@ -51,7 +51,7 @@ jobs:
upload-conda:
needs: [conda-python-build]
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/conda-upload-packages.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/conda-upload-packages.yaml@branch-24.10
with:
build_type: ${{ inputs.build_type || 'branch' }}
branch: ${{ inputs.branch }}
@@ -59,7 +59,7 @@ jobs:
sha: ${{ inputs.sha }}
wheel-build:
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/wheels-build.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/wheels-build.yaml@branch-24.10
with:
build_type: ${{ inputs.build_type || 'branch' }}
branch: ${{ inputs.branch }}
@@ -72,7 +72,7 @@ jobs:
wheel-publish:
needs: wheel-build
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/wheels-publish.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/wheels-publish.yaml@branch-24.10
with:
build_type: ${{ inputs.build_type || 'branch' }}
branch: ${{ inputs.branch }}
diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml
index 4e56d24d..76014652 100644
--- a/.github/workflows/pr.yaml
+++ b/.github/workflows/pr.yaml
@@ -18,26 +18,26 @@ jobs:
- docs-build
- wheel-build
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/pr-builder.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/pr-builder.yaml@branch-24.10
checks:
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/checks.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/checks.yaml@branch-24.10
conda-python-build:
needs: checks
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/conda-python-build.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/conda-python-build.yaml@branch-24.10
with:
build_type: pull-request
conda-python-tests:
needs: conda-python-build
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/conda-python-tests.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/conda-python-tests.yaml@branch-24.10
with:
build_type: pull-request
docs-build:
needs: conda-python-build
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/custom-job.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/custom-job.yaml@branch-24.10
with:
build_type: pull-request
node_type: "gpu-v100-latest-1"
@@ -46,7 +46,7 @@ jobs:
run_script: "ci/build_docs.sh"
wheel-build:
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/wheels-build.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/wheels-build.yaml@branch-24.10
with:
build_type: pull-request
# Package is pure Python and only ever requires one build.
diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml
index 7a884c5c..1a0e7d87 100644
--- a/.github/workflows/test.yaml
+++ b/.github/workflows/test.yaml
@@ -16,7 +16,7 @@ on:
jobs:
conda-python-tests:
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/conda-python-tests.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/conda-python-tests.yaml@branch-24.10
with:
build_type: nightly
branch: ${{ inputs.branch }}
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 33508081..4707492a 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -13,7 +13,7 @@ repos:
hooks:
- id: black
- repo: https://github.com/PyCQA/flake8
- rev: 3.8.3
+ rev: 7.1.1
hooks:
- id: flake8
- repo: https://github.com/codespell-project/codespell
@@ -33,7 +33,7 @@ repos:
args: ["--module=dask_cuda", "--ignore-missing-imports"]
pass_filenames: false
- repo: https://github.com/rapidsai/pre-commit-hooks
- rev: v0.3.0
+ rev: v0.4.0
hooks:
- id: verify-alpha-spec
- repo: https://github.com/rapidsai/dependency-file-generator
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 37c58851..f8c992fb 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,32 @@
+# dask-cuda 24.10.00 (9 Oct 2024)
+
+## 🚨 Breaking Changes
+
+- Replace cuDF (de)serializer with cuDF spill-aware (de)serializer ([#1369](https://github.com/rapidsai/dask-cuda/pull/1369)) [@pentschev](https://github.com/pentschev)
+
+## 📖 Documentation
+
+- Fix typo in spilling documentation ([#1384](https://github.com/rapidsai/dask-cuda/pull/1384)) [@rjzamora](https://github.com/rjzamora)
+- Add notes on cudf spilling to docs ([#1383](https://github.com/rapidsai/dask-cuda/pull/1383)) [@rjzamora](https://github.com/rjzamora)
+
+## 🚀 New Features
+
+- [Benchmark] Add parquet read benchmark ([#1371](https://github.com/rapidsai/dask-cuda/pull/1371)) [@rjzamora](https://github.com/rjzamora)
+- Replace cuDF (de)serializer with cuDF spill-aware (de)serializer ([#1369](https://github.com/rapidsai/dask-cuda/pull/1369)) [@pentschev](https://github.com/pentschev)
+
+## 🛠️ Improvements
+
+- Update update-version.sh to use packaging lib ([#1387](https://github.com/rapidsai/dask-cuda/pull/1387)) [@AyodeAwe](https://github.com/AyodeAwe)
+- Use CI workflow branch 'branch-24.10' again ([#1386](https://github.com/rapidsai/dask-cuda/pull/1386)) [@jameslamb](https://github.com/jameslamb)
+- Update to flake8 7.1.1. ([#1385](https://github.com/rapidsai/dask-cuda/pull/1385)) [@bdice](https://github.com/bdice)
+- enable Python 3.12 tests on PRs ([#1382](https://github.com/rapidsai/dask-cuda/pull/1382)) [@jameslamb](https://github.com/jameslamb)
+- Add support for Python 3.12 ([#1380](https://github.com/rapidsai/dask-cuda/pull/1380)) [@jameslamb](https://github.com/jameslamb)
+- Update rapidsai/pre-commit-hooks ([#1379](https://github.com/rapidsai/dask-cuda/pull/1379)) [@KyleFromNVIDIA](https://github.com/KyleFromNVIDIA)
+- Drop Python 3.9 support ([#1377](https://github.com/rapidsai/dask-cuda/pull/1377)) [@jameslamb](https://github.com/jameslamb)
+- Remove NumPy <2 pin ([#1375](https://github.com/rapidsai/dask-cuda/pull/1375)) [@seberg](https://github.com/seberg)
+- Update pre-commit hooks ([#1373](https://github.com/rapidsai/dask-cuda/pull/1373)) [@KyleFromNVIDIA](https://github.com/KyleFromNVIDIA)
+- Merge branch-24.08 into branch-24.10 ([#1368](https://github.com/rapidsai/dask-cuda/pull/1368)) [@jameslamb](https://github.com/jameslamb)
+
# dask-cuda 24.08.00 (7 Aug 2024)
## 🐛 Bug Fixes
diff --git a/VERSION b/VERSION
index ec8489fd..7c7ba044 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-24.08.00
+24.10.00
diff --git a/ci/build_docs.sh b/ci/build_docs.sh
index c2a65a41..42103004 100755
--- a/ci/build_docs.sh
+++ b/ci/build_docs.sh
@@ -23,7 +23,7 @@ rapids-mamba-retry install \
--channel "${PYTHON_CHANNEL}" \
dask-cuda
-export RAPIDS_VERSION_NUMBER="24.08"
+export RAPIDS_VERSION_NUMBER="24.10"
export RAPIDS_DOCS_DIR="$(mktemp -d)"
rapids-logger "Build Python docs"
diff --git a/ci/release/update-version.sh b/ci/release/update-version.sh
index a9fe1d02..2dbe504c 100755
--- a/ci/release/update-version.sh
+++ b/ci/release/update-version.sh
@@ -22,7 +22,7 @@ CURRENT_SHORT_TAG=${CURRENT_MAJOR}.${CURRENT_MINOR}
NEXT_MAJOR=$(echo $NEXT_FULL_TAG | awk '{split($0, a, "."); print a[1]}')
NEXT_MINOR=$(echo $NEXT_FULL_TAG | awk '{split($0, a, "."); print a[2]}')
NEXT_SHORT_TAG=${NEXT_MAJOR}.${NEXT_MINOR}
-NEXT_SHORT_TAG_PEP440=$(python -c "from setuptools.extern import packaging; print(packaging.version.Version('${NEXT_SHORT_TAG}'))")
+NEXT_SHORT_TAG_PEP440=$(python -c "from packaging.version import Version; print(Version('${NEXT_SHORT_TAG}'))")
NEXT_UCXPY_VERSION="$(curl -s https://version.gpuci.io/rapids/${NEXT_SHORT_TAG})"
echo "Preparing release $CURRENT_TAG => $NEXT_FULL_TAG"
diff --git a/conda/environments/all_cuda-114_arch-x86_64.yaml b/conda/environments/all_cuda-114_arch-x86_64.yaml
index c0fed8e5..3cfd9cb2 100644
--- a/conda/environments/all_cuda-114_arch-x86_64.yaml
+++ b/conda/environments/all_cuda-114_arch-x86_64.yaml
@@ -10,28 +10,28 @@ dependencies:
- click >=8.1
- cuda-version=11.4
- cudatoolkit
-- cudf==24.8.*,>=0.0.0a0
-- dask-cudf==24.8.*,>=0.0.0a0
-- distributed-ucxx==0.39.*,>=0.0.0a0
-- kvikio==24.8.*,>=0.0.0a0
+- cudf==24.10.*,>=0.0.0a0
+- dask-cudf==24.10.*,>=0.0.0a0
+- distributed-ucxx==0.40.*,>=0.0.0a0
+- kvikio==24.10.*,>=0.0.0a0
- numactl-devel-cos7-x86_64
- numba>=0.57
-- numpy>=1.23,<2.0a0
+- numpy>=1.23,<3.0a0
- numpydoc>=1.1.0
- pandas>=1.3
- pre-commit
- pynvml>=11.0.0,<11.5
- pytest
- pytest-cov
-- python>=3.9,<3.12
+- python>=3.10,<3.13
- rapids-build-backend>=0.3.0,<0.4.0dev0
-- rapids-dask-dependency==24.8.*,>=0.0.0a0
+- rapids-dask-dependency==24.10.*,>=0.0.0a0
- setuptools>=64.0.0
- sphinx
- sphinx-click>=2.7.1
- sphinx-rtd-theme>=0.5.1
- ucx-proc=*=gpu
-- ucx-py==0.39.*,>=0.0.0a0
-- ucxx==0.39.*,>=0.0.0a0
+- ucx-py==0.40.*,>=0.0.0a0
+- ucxx==0.40.*,>=0.0.0a0
- zict>=2.0.0
name: all_cuda-114_arch-x86_64
diff --git a/conda/environments/all_cuda-118_arch-x86_64.yaml b/conda/environments/all_cuda-118_arch-x86_64.yaml
index d1f6933c..b7b99751 100644
--- a/conda/environments/all_cuda-118_arch-x86_64.yaml
+++ b/conda/environments/all_cuda-118_arch-x86_64.yaml
@@ -10,28 +10,28 @@ dependencies:
- click >=8.1
- cuda-version=11.8
- cudatoolkit
-- cudf==24.8.*,>=0.0.0a0
-- dask-cudf==24.8.*,>=0.0.0a0
-- distributed-ucxx==0.39.*,>=0.0.0a0
-- kvikio==24.8.*,>=0.0.0a0
+- cudf==24.10.*,>=0.0.0a0
+- dask-cudf==24.10.*,>=0.0.0a0
+- distributed-ucxx==0.40.*,>=0.0.0a0
+- kvikio==24.10.*,>=0.0.0a0
- numactl-devel-cos7-x86_64
- numba>=0.57
-- numpy>=1.23,<2.0a0
+- numpy>=1.23,<3.0a0
- numpydoc>=1.1.0
- pandas>=1.3
- pre-commit
- pynvml>=11.0.0,<11.5
- pytest
- pytest-cov
-- python>=3.9,<3.12
+- python>=3.10,<3.13
- rapids-build-backend>=0.3.0,<0.4.0dev0
-- rapids-dask-dependency==24.8.*,>=0.0.0a0
+- rapids-dask-dependency==24.10.*,>=0.0.0a0
- setuptools>=64.0.0
- sphinx
- sphinx-click>=2.7.1
- sphinx-rtd-theme>=0.5.1
- ucx-proc=*=gpu
-- ucx-py==0.39.*,>=0.0.0a0
-- ucxx==0.39.*,>=0.0.0a0
+- ucx-py==0.40.*,>=0.0.0a0
+- ucxx==0.40.*,>=0.0.0a0
- zict>=2.0.0
name: all_cuda-118_arch-x86_64
diff --git a/conda/environments/all_cuda-125_arch-x86_64.yaml b/conda/environments/all_cuda-125_arch-x86_64.yaml
index a27dea72..652a8f0c 100644
--- a/conda/environments/all_cuda-125_arch-x86_64.yaml
+++ b/conda/environments/all_cuda-125_arch-x86_64.yaml
@@ -11,28 +11,28 @@ dependencies:
- cuda-nvcc-impl
- cuda-nvrtc
- cuda-version=12.5
-- cudf==24.8.*,>=0.0.0a0
-- dask-cudf==24.8.*,>=0.0.0a0
-- distributed-ucxx==0.39.*,>=0.0.0a0
-- kvikio==24.8.*,>=0.0.0a0
+- cudf==24.10.*,>=0.0.0a0
+- dask-cudf==24.10.*,>=0.0.0a0
+- distributed-ucxx==0.40.*,>=0.0.0a0
+- kvikio==24.10.*,>=0.0.0a0
- numactl-devel-cos7-x86_64
- numba>=0.57
-- numpy>=1.23,<2.0a0
+- numpy>=1.23,<3.0a0
- numpydoc>=1.1.0
- pandas>=1.3
- pre-commit
- pynvml>=11.0.0,<11.5
- pytest
- pytest-cov
-- python>=3.9,<3.12
+- python>=3.10,<3.13
- rapids-build-backend>=0.3.0,<0.4.0dev0
-- rapids-dask-dependency==24.8.*,>=0.0.0a0
+- rapids-dask-dependency==24.10.*,>=0.0.0a0
- setuptools>=64.0.0
- sphinx
- sphinx-click>=2.7.1
- sphinx-rtd-theme>=0.5.1
- ucx-proc=*=gpu
-- ucx-py==0.39.*,>=0.0.0a0
-- ucxx==0.39.*,>=0.0.0a0
+- ucx-py==0.40.*,>=0.0.0a0
+- ucxx==0.40.*,>=0.0.0a0
- zict>=2.0.0
name: all_cuda-125_arch-x86_64
diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py
index 516599da..5711ac08 100644
--- a/dask_cuda/__init__.py
+++ b/dask_cuda/__init__.py
@@ -9,6 +9,8 @@
import dask.dataframe.shuffle
import dask.dataframe.multi
import dask.bag.core
+from distributed.protocol.cuda import cuda_deserialize, cuda_serialize
+from distributed.protocol.serialize import dask_deserialize, dask_serialize
from ._version import __git_commit__, __version__
from .cuda_worker import CUDAWorker
@@ -48,3 +50,20 @@
dask.dataframe.shuffle.shuffle_group
)
dask.dataframe.core._concat = unproxify_decorator(dask.dataframe.core._concat)
+
+
+def _register_cudf_spill_aware():
+ import cudf
+
+ # Only enable Dask/cuDF spilling if cuDF spilling is disabled, see
+ # https://github.com/rapidsai/dask-cuda/issues/1363
+ if not cudf.get_option("spill"):
+ # This reproduces the implementation of `_register_cudf`, see
+ # https://github.com/dask/distributed/blob/40fcd65e991382a956c3b879e438be1b100dff97/distributed/protocol/__init__.py#L106-L115
+ from cudf.comm import serialize
+
+
+for registry in [cuda_serialize, cuda_deserialize, dask_serialize, dask_deserialize]:
+ for lib in ["cudf", "dask_cudf"]:
+ if lib in registry._lazy:
+ registry._lazy[lib] = _register_cudf_spill_aware
diff --git a/dask_cuda/benchmarks/local_cudf_groupby.py b/dask_cuda/benchmarks/local_cudf_groupby.py
index 2f07e3df..f094ff18 100644
--- a/dask_cuda/benchmarks/local_cudf_groupby.py
+++ b/dask_cuda/benchmarks/local_cudf_groupby.py
@@ -7,7 +7,7 @@
import dask
import dask.dataframe as dd
from dask.distributed import performance_report, wait
-from dask.utils import format_bytes, parse_bytes
+from dask.utils import format_bytes
from dask_cuda.benchmarks.common import Config, execute_benchmark
from dask_cuda.benchmarks.utils import (
@@ -260,13 +260,6 @@ def parse_args():
"type": str,
"help": "Do shuffle with GPU or CPU dataframes (default 'gpu')",
},
- {
- "name": "--ignore-size",
- "default": "1 MiB",
- "metavar": "nbytes",
- "type": parse_bytes,
- "help": "Ignore messages smaller than this (default '1 MB')",
- },
{
"name": "--runs",
"default": 3,
diff --git a/dask_cuda/benchmarks/local_cudf_merge.py b/dask_cuda/benchmarks/local_cudf_merge.py
index 6a68ad78..e2b03520 100644
--- a/dask_cuda/benchmarks/local_cudf_merge.py
+++ b/dask_cuda/benchmarks/local_cudf_merge.py
@@ -9,7 +9,7 @@
import dask
import dask.dataframe as dd
from dask.distributed import performance_report, wait
-from dask.utils import format_bytes, parse_bytes
+from dask.utils import format_bytes
from dask_cuda.benchmarks.common import Config, execute_benchmark
from dask_cuda.benchmarks.utils import (
@@ -335,13 +335,6 @@ def parse_args():
"action": "store_true",
"help": "Use shuffle join (takes precedence over '--broadcast-join').",
},
- {
- "name": "--ignore-size",
- "default": "1 MiB",
- "metavar": "nbytes",
- "type": parse_bytes,
- "help": "Ignore messages smaller than this (default '1 MB')",
- },
{
"name": "--frac-match",
"default": 0.3,
diff --git a/dask_cuda/benchmarks/local_cudf_shuffle.py b/dask_cuda/benchmarks/local_cudf_shuffle.py
index a1129dd3..25f42e59 100644
--- a/dask_cuda/benchmarks/local_cudf_shuffle.py
+++ b/dask_cuda/benchmarks/local_cudf_shuffle.py
@@ -228,13 +228,6 @@ def parse_args():
"type": str,
"help": "Do shuffle with GPU or CPU dataframes (default 'gpu')",
},
- {
- "name": "--ignore-size",
- "default": "1 MiB",
- "metavar": "nbytes",
- "type": parse_bytes,
- "help": "Ignore messages smaller than this (default '1 MB')",
- },
{
"name": "--runs",
"default": 3,
diff --git a/dask_cuda/benchmarks/local_cupy.py b/dask_cuda/benchmarks/local_cupy.py
index 22c51556..c9c8fe1c 100644
--- a/dask_cuda/benchmarks/local_cupy.py
+++ b/dask_cuda/benchmarks/local_cupy.py
@@ -8,7 +8,7 @@
from dask import array as da
from dask.distributed import performance_report, wait
-from dask.utils import format_bytes, parse_bytes
+from dask.utils import format_bytes
from dask_cuda.benchmarks.common import Config, execute_benchmark
from dask_cuda.benchmarks.utils import (
@@ -297,13 +297,6 @@ def parse_args():
"type": int,
"help": "Chunk size (default 2500).",
},
- {
- "name": "--ignore-size",
- "default": "1 MiB",
- "metavar": "nbytes",
- "type": parse_bytes,
- "help": "Ignore messages smaller than this (default '1 MB').",
- },
{
"name": "--runs",
"default": 3,
diff --git a/dask_cuda/benchmarks/local_cupy_map_overlap.py b/dask_cuda/benchmarks/local_cupy_map_overlap.py
index 8250c9f9..8b975a24 100644
--- a/dask_cuda/benchmarks/local_cupy_map_overlap.py
+++ b/dask_cuda/benchmarks/local_cupy_map_overlap.py
@@ -10,7 +10,7 @@
from dask import array as da
from dask.distributed import performance_report, wait
-from dask.utils import format_bytes, parse_bytes
+from dask.utils import format_bytes
from dask_cuda.benchmarks.common import Config, execute_benchmark
from dask_cuda.benchmarks.utils import (
@@ -168,13 +168,6 @@ def parse_args():
"type": int,
"help": "Kernel size, 2*k+1, in each dimension (default 1)",
},
- {
- "name": "--ignore-size",
- "default": "1 MiB",
- "metavar": "nbytes",
- "type": parse_bytes,
- "help": "Ignore messages smaller than this (default '1 MB')",
- },
{
"name": "--runs",
"default": 3,
diff --git a/dask_cuda/benchmarks/read_parquet.py b/dask_cuda/benchmarks/read_parquet.py
new file mode 100644
index 00000000..bce69673
--- /dev/null
+++ b/dask_cuda/benchmarks/read_parquet.py
@@ -0,0 +1,268 @@
+import contextlib
+from collections import ChainMap
+from time import perf_counter as clock
+
+import fsspec
+import pandas as pd
+
+import dask
+import dask.dataframe as dd
+from dask.base import tokenize
+from dask.distributed import performance_report
+from dask.utils import format_bytes, parse_bytes
+
+from dask_cuda.benchmarks.common import Config, execute_benchmark
+from dask_cuda.benchmarks.utils import (
+ parse_benchmark_args,
+ print_key_value,
+ print_separator,
+ print_throughput_bandwidth,
+)
+
+DISK_SIZE_CACHE = {}
+OPTIONS_CACHE = {}
+
+
+def _noop(df):
+ return df
+
+
+def read_data(paths, columns, backend, **kwargs):
+ with dask.config.set({"dataframe.backend": backend}):
+ return dd.read_parquet(
+ paths,
+ columns=columns,
+ **kwargs,
+ )
+
+
+def get_fs_paths_kwargs(args):
+ kwargs = {}
+
+ storage_options = {}
+ if args.key:
+ storage_options["key"] = args.key
+ if args.secret:
+ storage_options["secret"] = args.secret
+
+ if args.filesystem == "arrow":
+ import pyarrow.fs as pa_fs
+ from fsspec.implementations.arrow import ArrowFSWrapper
+
+ _mapping = {
+ "key": "access_key",
+ "secret": "secret_key",
+ } # See: pyarrow.fs.S3FileSystem docs
+ s3_args = {}
+ for k, v in storage_options.items():
+ s3_args[_mapping[k]] = v
+
+ fs = pa_fs.FileSystem.from_uri(args.path)[0]
+ try:
+ region = {"region": fs.region}
+ except AttributeError:
+ region = {}
+ kwargs["filesystem"] = type(fs)(**region, **s3_args)
+ fsspec_fs = ArrowFSWrapper(kwargs["filesystem"])
+
+ if args.type == "gpu":
+ kwargs["blocksize"] = args.blocksize
+ else:
+ fsspec_fs = fsspec.core.get_fs_token_paths(
+ args.path, mode="rb", storage_options=storage_options
+ )[0]
+ kwargs["filesystem"] = fsspec_fs
+ kwargs["blocksize"] = args.blocksize
+ kwargs["aggregate_files"] = args.aggregate_files
+
+ # Collect list of paths
+ stripped_url_path = fsspec_fs._strip_protocol(args.path)
+ if stripped_url_path.endswith("/"):
+ stripped_url_path = stripped_url_path[:-1]
+ paths = fsspec_fs.glob(f"{stripped_url_path}/*.parquet")
+ if args.file_count:
+ paths = paths[: args.file_count]
+
+ return fsspec_fs, paths, kwargs
+
+
+def bench_once(client, args, write_profile=None):
+ global OPTIONS_CACHE
+ global DISK_SIZE_CACHE
+
+ # Construct kwargs
+ token = tokenize(args)
+ try:
+ fsspec_fs, paths, kwargs = OPTIONS_CACHE[token]
+ except KeyError:
+ fsspec_fs, paths, kwargs = get_fs_paths_kwargs(args)
+ OPTIONS_CACHE[token] = (fsspec_fs, paths, kwargs)
+
+ if write_profile is None:
+ ctx = contextlib.nullcontext()
+ else:
+ ctx = performance_report(filename=args.profile)
+
+ with ctx:
+ t1 = clock()
+ df = read_data(
+ paths,
+ columns=args.columns,
+ backend="cudf" if args.type == "gpu" else "pandas",
+ **kwargs,
+ )
+ num_rows = len(
+ # Use opaque `map_partitions` call to "block"
+ # dask-expr from using pq metadata to get length
+ df.map_partitions(
+ _noop,
+ meta=df._meta,
+ enforce_metadata=False,
+ )
+ )
+ t2 = clock()
+
+ # Extract total size of files on disk
+ token = tokenize(paths)
+ try:
+ disk_size = DISK_SIZE_CACHE[token]
+ except KeyError:
+ disk_size = sum(fsspec_fs.sizes(paths))
+ DISK_SIZE_CACHE[token] = disk_size
+
+ return (disk_size, num_rows, t2 - t1)
+
+
+def pretty_print_results(args, address_to_index, p2p_bw, results):
+ if args.markdown:
+ print("```")
+ print("Parquet read benchmark")
+ data_processed, row_count, durations = zip(*results)
+ print_separator(separator="-")
+ backend = "cudf" if args.type == "gpu" else "pandas"
+ print_key_value(key="Path", value=args.path)
+ print_key_value(key="Columns", value=f"{args.columns}")
+ print_key_value(key="Backend", value=f"{backend}")
+ print_key_value(key="Filesystem", value=f"{args.filesystem}")
+ print_key_value(key="Blocksize", value=f"{format_bytes(args.blocksize)}")
+ print_key_value(key="Aggregate files", value=f"{args.aggregate_files}")
+ print_key_value(key="Row count", value=f"{row_count[0]}")
+ print_key_value(key="Size on disk", value=f"{format_bytes(data_processed[0])}")
+ if args.markdown:
+ print("\n```")
+ args.no_show_p2p_bandwidth = True
+ print_throughput_bandwidth(
+ args, durations, data_processed, p2p_bw, address_to_index
+ )
+ print_separator(separator="=")
+
+
+def create_tidy_results(args, p2p_bw, results):
+ configuration = {
+ "path": args.path,
+ "columns": args.columns,
+ "backend": "cudf" if args.type == "gpu" else "pandas",
+ "filesystem": args.filesystem,
+ "blocksize": args.blocksize,
+ "aggregate_files": args.aggregate_files,
+ }
+ timing_data = pd.DataFrame(
+ [
+ pd.Series(
+ data=ChainMap(
+ configuration,
+ {
+ "wallclock": duration,
+ "data_processed": data_processed,
+ "num_rows": num_rows,
+ },
+ )
+ )
+ for data_processed, num_rows, duration in results
+ ]
+ )
+ return timing_data, p2p_bw
+
+
+def parse_args():
+ special_args = [
+ {
+ "name": "path",
+ "type": str,
+ "help": "Parquet directory to read from (must be a flat directory).",
+ },
+ {
+ "name": "--blocksize",
+ "default": "256MB",
+ "type": parse_bytes,
+ "help": "How to set the blocksize option",
+ },
+ {
+ "name": "--aggregate-files",
+ "default": False,
+ "action": "store_true",
+ "help": "How to set the aggregate_files option",
+ },
+ {
+ "name": "--file-count",
+ "type": int,
+ "help": "Maximum number of files to read.",
+ },
+ {
+ "name": "--columns",
+ "type": str,
+ "help": "Columns to read/select from data.",
+ },
+ {
+ "name": "--key",
+ "type": str,
+ "help": "Public S3 key.",
+ },
+ {
+ "name": "--secret",
+ "type": str,
+ "help": "Secret S3 key.",
+ },
+ {
+ "name": [
+ "-t",
+ "--type",
+ ],
+ "choices": ["cpu", "gpu"],
+ "default": "gpu",
+ "type": str,
+ "help": "Use GPU or CPU dataframes (default 'gpu')",
+ },
+ {
+ "name": "--filesystem",
+ "choices": ["arrow", "fsspec"],
+ "default": "fsspec",
+ "type": str,
+ "help": "Filesystem backend",
+ },
+ {
+ "name": "--runs",
+ "default": 3,
+ "type": int,
+ "help": "Number of runs",
+ },
+ ]
+
+ args = parse_benchmark_args(
+ description="Parquet read benchmark",
+ args_list=special_args,
+ check_explicit_comms=False,
+ )
+ args.no_show_p2p_bandwidth = True
+ return args
+
+
+if __name__ == "__main__":
+ execute_benchmark(
+ Config(
+ args=parse_args(),
+ bench_once=bench_once,
+ create_tidy_results=create_tidy_results,
+ pretty_print_results=pretty_print_results,
+ )
+ )
diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py
index 48e4755f..de7e2ae1 100644
--- a/dask_cuda/benchmarks/utils.py
+++ b/dask_cuda/benchmarks/utils.py
@@ -337,6 +337,13 @@ def parse_benchmark_args(
"If the files already exist, new files are created with a uniquified "
"BASENAME.",
)
+ parser.add_argument(
+ "--ignore-size",
+ default="1 MiB",
+ metavar="nbytes",
+ type=parse_bytes,
+ help="Bandwidth statistics: ignore messages smaller than this (default '1 MB')",
+ )
for args in args_list:
name = args.pop("name")
@@ -765,7 +772,7 @@ def print_throughput_bandwidth(
)
print_key_value(
key="Wall clock",
- value=f"{format_time(durations.mean())} +/- {format_time(durations.std()) }",
+ value=f"{format_time(durations.mean())} +/- {format_time(durations.std())}",
)
if not args.no_show_p2p_bandwidth:
print_separator(separator="=")
diff --git a/dask_cuda/cli.py b/dask_cuda/cli.py
index 6a3518e0..a8c6d972 100644
--- a/dask_cuda/cli.py
+++ b/dask_cuda/cli.py
@@ -167,10 +167,11 @@ def cuda():
@click.option(
"--rmm-release-threshold",
default=None,
- help="""When ``rmm.async`` is ``True`` and the pool size grows beyond this value, unused
- memory held by the pool will be released at the next synchronization point. Can be
- an integer (bytes), float (fraction of total device memory), string (like ``"5GB"``
- or ``"5000M"``) or ``None``. By default, this feature is disabled.
+ help="""When ``rmm.async`` is ``True`` and the pool size grows beyond this
+ value, unused memory held by the pool will be released at the next
+ synchronization point. Can be an integer (bytes), float (fraction of total
+ device memory), string (like ``"5GB"`` or ``"5000M"``) or ``None``. By
+ default, this feature is disabled.
.. note::
This size is a per-worker configuration, and not cluster-wide.""",
diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py
index b88c9bc9..3e03ed29 100644
--- a/dask_cuda/cuda_worker.py
+++ b/dask_cuda/cuda_worker.py
@@ -195,6 +195,14 @@ def del_pid_file():
},
)
+ cudf_spill_warning = dask.config.get("cudf-spill-warning", default=True)
+ if enable_cudf_spill and cudf_spill_warning:
+ warnings.warn(
+ "cuDF spilling is enabled, please ensure the client and scheduler "
+ "processes set `CUDF_SPILL=on` as well. To disable this warning "
+ "set `DASK_CUDF_SPILL_WARNING=False`."
+ )
+
self.nannies = [
Nanny(
scheduler,
diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py
index 202373e9..c037223b 100644
--- a/dask_cuda/local_cuda_cluster.py
+++ b/dask_cuda/local_cuda_cluster.py
@@ -244,6 +244,13 @@ def __init__(
# initialization happens before we can set CUDA_VISIBLE_DEVICES
os.environ["RAPIDS_NO_INITIALIZE"] = "True"
+ if enable_cudf_spill:
+ import cudf
+
+ # cuDF spilling must be enabled in the client/scheduler process too.
+ cudf.set_option("spill", enable_cudf_spill)
+ cudf.set_option("spill_stats", cudf_spill_stats)
+
if threads_per_worker < 1:
raise ValueError("threads_per_worker must be higher than 0.")
diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py
index 505af12f..049fe85f 100644
--- a/dask_cuda/tests/test_dask_cuda_worker.py
+++ b/dask_cuda/tests/test_dask_cuda_worker.py
@@ -567,3 +567,30 @@ def test_worker_timeout():
assert "reason: nanny-close" in ret.stderr.lower()
assert ret.returncode == 0
+
+
+@pytest.mark.parametrize("enable_cudf_spill_warning", [False, True])
+def test_worker_cudf_spill_warning(enable_cudf_spill_warning): # noqa: F811
+ pytest.importorskip("rmm")
+
+ environ = {"CUDA_VISIBLE_DEVICES": "0"}
+ if not enable_cudf_spill_warning:
+ environ["DASK_CUDF_SPILL_WARNING"] = "False"
+
+ with patch.dict(os.environ, environ):
+ ret = subprocess.run(
+ [
+ "dask",
+ "cuda",
+ "worker",
+ "127.0.0.1:9369",
+ "--enable-cudf-spill",
+ "--death-timeout",
+ "1",
+ ],
+ capture_output=True,
+ )
+ if enable_cudf_spill_warning:
+ assert b"UserWarning: cuDF spilling is enabled" in ret.stderr
+ else:
+ assert b"UserWarning: cuDF spilling is enabled" not in ret.stderr
diff --git a/dask_cuda/tests/test_gds.py b/dask_cuda/tests/test_gds.py
index c8667025..262369e6 100644
--- a/dask_cuda/tests/test_gds.py
+++ b/dask_cuda/tests/test_gds.py
@@ -38,7 +38,7 @@ def test_gds(gds_enabled, cuda_lib):
a = data_create()
header, frames = serialize(a, serializers=("disk",))
b = deserialize(header, frames)
- assert type(a) == type(b)
+ assert type(a) is type(b)
assert data_compare(a, b)
finally:
ProxifyHostFile.register_disk_spilling() # Reset disk spilling options
diff --git a/dask_cuda/tests/test_proxify_host_file.py b/dask_cuda/tests/test_proxify_host_file.py
index 2683ea36..56fe7f8d 100644
--- a/dask_cuda/tests/test_proxify_host_file.py
+++ b/dask_cuda/tests/test_proxify_host_file.py
@@ -252,7 +252,7 @@ def task(x):
assert "ProxyObject" in str(type(x))
assert x._pxy_get().serializer == "dask"
else:
- assert type(x) == cudf.DataFrame
+ assert type(x) is cudf.DataFrame
assert len(x) == 10 # Trigger deserialization
return x
diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py
index 31a9e996..90b84e90 100644
--- a/dask_cuda/tests/test_proxy.py
+++ b/dask_cuda/tests/test_proxy.py
@@ -114,7 +114,7 @@ def test_proxy_object_of_array(serializers, backend):
pxy = proxy_object.asproxy(org.copy(), serializers=serializers)
expect = op(org)
got = op(pxy)
- assert type(expect) == type(got)
+ assert type(expect) is type(got)
assert expect == got
# Check unary operators
@@ -124,7 +124,7 @@ def test_proxy_object_of_array(serializers, backend):
pxy = proxy_object.asproxy(org.copy(), serializers=serializers)
expect = op(org)
got = op(pxy)
- assert type(expect) == type(got)
+ assert type(expect) is type(got)
assert all(expect == got)
# Check binary operators that takes a scalar as second argument
@@ -134,7 +134,7 @@ def test_proxy_object_of_array(serializers, backend):
pxy = proxy_object.asproxy(org.copy(), serializers=serializers)
expect = op(org, 2)
got = op(pxy, 2)
- assert type(expect) == type(got)
+ assert type(expect) is type(got)
assert all(expect == got)
# Check binary operators
@@ -192,7 +192,7 @@ def test_proxy_object_of_array(serializers, backend):
pxy = proxy_object.asproxy(org.copy(), serializers=serializers)
expect = op(org)
got = op(pxy)
- assert type(expect) == type(got)
+ assert type(expect) is type(got)
assert expect == got
# Check reflected methods
@@ -297,7 +297,7 @@ def task(x):
assert "ProxyObject" in str(type(x))
assert x._pxy_get().serializer == "dask"
else:
- assert type(x) == cudf.DataFrame
+ assert type(x) is cudf.DataFrame
assert len(x) == 10 # Trigger deserialization
return x
diff --git a/dask_cuda/tests/test_spill.py b/dask_cuda/tests/test_spill.py
index f8df7e04..bdd012d5 100644
--- a/dask_cuda/tests/test_spill.py
+++ b/dask_cuda/tests/test_spill.py
@@ -11,6 +11,8 @@
from distributed.sizeof import sizeof
from distributed.utils_test import gen_cluster, gen_test, loop # noqa: F401
+import dask_cudf
+
from dask_cuda import LocalCUDACluster, utils
from dask_cuda.utils_test import IncreasedCloseTimeoutNanny
@@ -18,6 +20,57 @@
pytest.skip("Not enough GPU memory", allow_module_level=True)
+def _set_cudf_device_limit():
+ """Ensure spilling for objects of all sizes"""
+ import cudf
+
+ cudf.set_option("spill_device_limit", 0)
+
+
+def _assert_cudf_spill_stats(enable_cudf_spill, dask_worker=None):
+ """Ensure cuDF has spilled data with its internal mechanism"""
+ import cudf
+
+ global_manager = cudf.core.buffer.spill_manager.get_global_manager()
+
+ if enable_cudf_spill:
+ stats = global_manager.statistics
+ buffers = global_manager.buffers()
+ assert stats.spill_totals[("gpu", "cpu")][0] > 1000
+ assert stats.spill_totals[("cpu", "gpu")][0] > 1000
+ assert len(buffers) > 0
+ else:
+ assert global_manager is None
+
+
+@pytest.fixture(params=[False, True])
+def cudf_spill(request):
+ """Fixture to enable and clear cuDF spill manager in client process"""
+ cudf = pytest.importorskip("cudf")
+
+ enable_cudf_spill = request.param
+
+ if enable_cudf_spill:
+ # If the global spill manager was previously set, fail.
+ assert cudf.core.buffer.spill_manager._global_manager is None
+
+ cudf.set_option("spill", True)
+ cudf.set_option("spill_stats", True)
+
+ # This change is to prevent changing RMM resource stack in cuDF,
+ # workers do not need this because they are spawned as new
+ # processes for every new test that runs.
+ cudf.set_option("spill_on_demand", False)
+
+ _set_cudf_device_limit()
+
+ yield enable_cudf_spill
+
+ cudf.set_option("spill", False)
+ cudf.core.buffer.spill_manager._global_manager_uninitialized = True
+ cudf.core.buffer.spill_manager._global_manager = None
+
+
def device_host_file_size_matches(
dhf, total_bytes, device_chunk_overhead=0, serialized_chunk_overhead=1024
):
@@ -244,9 +297,11 @@ async def test_cupy_cluster_device_spill(params):
],
)
@gen_test(timeout=30)
-async def test_cudf_cluster_device_spill(params):
+async def test_cudf_cluster_device_spill(params, cudf_spill):
cudf = pytest.importorskip("cudf")
+ enable_cudf_spill = cudf_spill
+
with dask.config.set(
{
"distributed.comm.compression": False,
@@ -266,6 +321,7 @@ async def test_cudf_cluster_device_spill(params):
device_memory_limit=params["device_memory_limit"],
memory_limit=params["memory_limit"],
worker_class=IncreasedCloseTimeoutNanny,
+ enable_cudf_spill=enable_cudf_spill,
) as cluster:
async with Client(cluster, asynchronous=True) as client:
@@ -294,21 +350,28 @@ async def test_cudf_cluster_device_spill(params):
del cdf
gc.collect()
- await client.run(
- assert_host_chunks,
- params["spills_to_disk"],
- )
- await client.run(
- assert_disk_chunks,
- params["spills_to_disk"],
- )
-
- await client.run(
- worker_assert,
- nbytes,
- 32,
- 2048,
- )
+ if enable_cudf_spill:
+ await client.run(
+ worker_assert,
+ 0,
+ 0,
+ 0,
+ )
+ else:
+ await client.run(
+ assert_host_chunks,
+ params["spills_to_disk"],
+ )
+ await client.run(
+ assert_disk_chunks,
+ params["spills_to_disk"],
+ )
+ await client.run(
+ worker_assert,
+ nbytes,
+ 32,
+ 2048,
+ )
del cdf2
@@ -324,3 +387,40 @@ async def test_cudf_cluster_device_spill(params):
gc.collect()
else:
break
+
+
+@gen_test(timeout=30)
+async def test_cudf_spill_cluster(cudf_spill):
+ cudf = pytest.importorskip("cudf")
+ enable_cudf_spill = cudf_spill
+
+ async with LocalCUDACluster(
+ n_workers=1,
+ scheduler_port=0,
+ silence_logs=False,
+ dashboard_address=None,
+ asynchronous=True,
+ device_memory_limit=None,
+ memory_limit=None,
+ worker_class=IncreasedCloseTimeoutNanny,
+ enable_cudf_spill=enable_cudf_spill,
+ cudf_spill_stats=enable_cudf_spill,
+ ) as cluster:
+ async with Client(cluster, asynchronous=True) as client:
+
+ await client.wait_for_workers(1)
+ await client.run(_set_cudf_device_limit)
+
+ cdf = cudf.DataFrame(
+ {
+ "a": list(range(200)),
+ "b": list(reversed(range(200))),
+ "c": list(range(200)),
+ }
+ )
+
+ ddf = dask_cudf.from_cudf(cdf, npartitions=2).sum().persist()
+ await wait(ddf)
+
+ await client.run(_assert_cudf_spill_stats, enable_cudf_spill)
+ _assert_cudf_spill_stats(enable_cudf_spill)
diff --git a/dependencies.yaml b/dependencies.yaml
index c3b62965..9e6b3a10 100644
--- a/dependencies.yaml
+++ b/dependencies.yaml
@@ -134,10 +134,6 @@ dependencies:
specific:
- output_types: conda
matrices:
- - matrix:
- py: "3.9"
- packages:
- - python=3.9
- matrix:
py: "3.10"
packages:
@@ -146,19 +142,23 @@ dependencies:
py: "3.11"
packages:
- python=3.11
+ - matrix:
+ py: "3.12"
+ packages:
+ - python=3.12
- matrix:
packages:
- - python>=3.9,<3.12
+ - python>=3.10,<3.13
run_python:
common:
- output_types: [conda, requirements, pyproject]
packages:
- click >=8.1
- numba>=0.57
- - numpy>=1.23,<2.0a0
+ - numpy>=1.23,<3.0a0
- pandas>=1.3
- pynvml>=11.0.0,<11.5
- - rapids-dask-dependency==24.8.*,>=0.0.0a0
+ - rapids-dask-dependency==24.10.*,>=0.0.0a0
- zict>=2.0.0
test_python:
common:
@@ -168,13 +168,13 @@ dependencies:
- pytest-cov
- output_types: [conda]
packages:
- - &cudf_unsuffixed cudf==24.8.*,>=0.0.0a0
- - &dask_cudf_unsuffixed dask-cudf==24.8.*,>=0.0.0a0
- - distributed-ucxx==0.39.*,>=0.0.0a0
- - &kvikio_unsuffixed kvikio==24.8.*,>=0.0.0a0
- - &ucx_py_unsuffixed ucx-py==0.39.*,>=0.0.0a0
+ - &cudf_unsuffixed cudf==24.10.*,>=0.0.0a0
+ - &dask_cudf_unsuffixed dask-cudf==24.10.*,>=0.0.0a0
+ - distributed-ucxx==0.40.*,>=0.0.0a0
+ - &kvikio_unsuffixed kvikio==24.10.*,>=0.0.0a0
+ - &ucx_py_unsuffixed ucx-py==0.40.*,>=0.0.0a0
- ucx-proc=*=gpu
- - ucxx==0.39.*,>=0.0.0a0
+ - ucxx==0.40.*,>=0.0.0a0
specific:
- output_types: conda
matrices:
@@ -194,16 +194,16 @@ dependencies:
cuda: "12.*"
cuda_suffixed: "true"
packages:
- - cudf-cu12==24.8.*,>=0.0.0a0
- - dask-cudf-cu12==24.8.*,>=0.0.0a0
- - ucx-py-cu12==0.39.*,>=0.0.0a0
+ - cudf-cu12==24.10.*,>=0.0.0a0
+ - dask-cudf-cu12==24.10.*,>=0.0.0a0
+ - ucx-py-cu12==0.40.*,>=0.0.0a0
- matrix:
cuda: "11.*"
cuda_suffixed: "true"
packages:
- - cudf-cu11==24.8.*,>=0.0.0a0
- - dask-cudf-cu11==24.8.*,>=0.0.0a0
- - ucx-py-cu11==0.39.*,>=0.0.0a0
+ - cudf-cu11==24.10.*,>=0.0.0a0
+ - dask-cudf-cu11==24.10.*,>=0.0.0a0
+ - ucx-py-cu11==0.40.*,>=0.0.0a0
- matrix:
packages:
- *cudf_unsuffixed
diff --git a/docs/source/examples/best-practices.rst b/docs/source/examples/best-practices.rst
index 2de3809c..d0ddc510 100644
--- a/docs/source/examples/best-practices.rst
+++ b/docs/source/examples/best-practices.rst
@@ -44,6 +44,15 @@ We also recommend allocating most, though not all, of the GPU memory space. We d
Additionally, when using `Accelerated Networking`_ , we only need to register a single IPC handle for the whole pool (which is expensive, but only done once) since from the IPC point of viewer there's only a single allocation. As opposed to just using RMM without a pool where each new allocation must be registered with IPC.
+Spilling from Device
+~~~~~~~~~~~~~~~~~~~~
+
+Dask-CUDA offers several different ways to enable automatic spilling from device memory.
+The best method often depends on the specific workflow. For classic ETL workloads using
+`Dask cuDF `_, native cuDF spilling is usually
+the best place to start. See :ref:`Dask-CUDA's spilling documentation `
+for more details.
+
Accelerated Networking
~~~~~~~~~~~~~~~~~~~~~~
diff --git a/docs/source/explicit_comms.rst b/docs/source/explicit_comms.rst
index 9fde8756..af317056 100644
--- a/docs/source/explicit_comms.rst
+++ b/docs/source/explicit_comms.rst
@@ -14,4 +14,4 @@ Usage
In order to use explicit-comms in Dask/Distributed automatically, simply define the environment variable ``DASK_EXPLICIT_COMMS=True`` or setting the ``"explicit-comms"``
key in the `Dask configuration `_.
-It is also possible to use explicit-comms in tasks manually, see the `API <../api/#explicit-comms>`_ and our `implementation of shuffle `_ for guidance.
+It is also possible to use explicit-comms in tasks manually, see the `API <../api/#explicit-comms>`_ and our `implementation of shuffle `_ for guidance.
diff --git a/docs/source/spilling.rst b/docs/source/spilling.rst
index a237adf7..c86b5ce4 100644
--- a/docs/source/spilling.rst
+++ b/docs/source/spilling.rst
@@ -1,3 +1,5 @@
+.. _spilling-from-device:
+
Spilling from device
====================
@@ -105,3 +107,80 @@ type checking doesn't:
Thus, if encountering problems remember that it is always possible to use ``unproxy()``
to access the proxied object directly, or set ``DASK_JIT_UNSPILL_COMPATIBILITY_MODE=True``
to enable compatibility mode, which automatically calls ``unproxy()`` on all function inputs.
+
+
+cuDF Spilling
+-------------
+
+When executing an ETL workflow with `Dask cuDF `_
+(i.e. Dask DataFrame), it is usually best to leverage `native spilling support in cuDF
+`_.
+
+Native cuDF spilling has an important advantage over the other methodologies mentioned
+above. When JIT-unspill or default spilling are used, the worker is only able to spill
+the input or output of a task. This means that any data that is created within the task
+is completely off limits until the task is done executing. When cuDF spilling is used,
+however, individual device buffers can be spilled/unspilled as needed while the task
+is executing.
+
+When deploying a ``LocalCUDACluster``, cuDF spilling can be enabled with the ``enable_cudf_spill`` argument:
+
+.. code-block::
+
+ >>> from distributed import Client
+ >>> from dask_cuda import LocalCUDACluster
+
+ >>> cluster = LocalCUDACluster(n_workers=10, enable_cudf_spill=True)
+ >>> client = Client(cluster)
+
+The same applies for ``dask cuda worker``:
+
+.. code-block::
+
+ $ dask scheduler
+ distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:8786
+
+ $ dask cuda worker --enable-cudf-spill
+
+
+Statistics
+~~~~~~~~~~
+
+When cuDF spilling is enabled, it is also possible to have cuDF collect basic
+spill statistics. Collecting this information can be a useful way to understand
+the performance of memory-intensive workflows using cuDF.
+
+When deploying a ``LocalCUDACluster``, cuDF spilling can be enabled with the
+``cudf_spill_stats`` argument:
+
+.. code-block::
+
+ >>> cluster = LocalCUDACluster(n_workers=10, enable_cudf_spill=True, cudf_spill_stats=1)
+
+The same applies for ``dask cuda worker``:
+
+.. code-block::
+
+ $ dask cuda worker --enable-cudf-spill --cudf-spill-stats 1
+
+To have each dask-cuda worker print spill statistics within the workflow, do something like:
+
+.. code-block::
+
+ def spill_info():
+ from cudf.core.buffer.spill_manager import get_global_manager
+ print(get_global_manager().statistics)
+ client.submit(spill_info)
+
+See the `cuDF spilling documentation
+`_
+for more information on the available spill-statistics options.
+
+Limitations
+~~~~~~~~~~~
+
+Although cuDF spilling is the best option for most ETL workflows using Dask cuDF,
+it will be much less effective if that workflow converts between ``cudf.DataFrame``
+and other data formats (e.g. ``cupy.ndarray``). Once the underlying device buffers
+are "exposed" to external memory references, they become "unspillable" by cuDF.
+In cases like this (e.g., Dask-CUDA + XGBoost), JIT-Unspill is usually a better choice.
diff --git a/pyproject.toml b/pyproject.toml
index b6c431d6..730225ad 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -14,14 +14,14 @@ authors = [
{ name = "NVIDIA Corporation" },
]
license = { text = "Apache 2.0" }
-requires-python = ">=3.9"
+requires-python = ">=3.10"
dependencies = [
"click >=8.1",
"numba>=0.57",
- "numpy>=1.23,<2.0a0",
+ "numpy>=1.23,<3.0a0",
"pandas>=1.3",
"pynvml>=11.0.0,<11.5",
- "rapids-dask-dependency==24.8.*,>=0.0.0a0",
+ "rapids-dask-dependency==24.10.*,>=0.0.0a0",
"zict>=2.0.0",
] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit dependencies.yaml and run `rapids-dependency-file-generator`.
classifiers = [
@@ -30,9 +30,9 @@ classifiers = [
"Topic :: Scientific/Engineering",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3",
- "Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
+ "Programming Language :: Python :: 3.12",
]
[project.scripts]
@@ -50,12 +50,12 @@ docs = [
"sphinx-rtd-theme>=0.5.1",
] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit dependencies.yaml and run `rapids-dependency-file-generator`.
test = [
- "cudf==24.8.*,>=0.0.0a0",
- "dask-cudf==24.8.*,>=0.0.0a0",
- "kvikio==24.8.*,>=0.0.0a0",
+ "cudf==24.10.*,>=0.0.0a0",
+ "dask-cudf==24.10.*,>=0.0.0a0",
+ "kvikio==24.10.*,>=0.0.0a0",
"pytest",
"pytest-cov",
- "ucx-py==0.39.*,>=0.0.0a0",
+ "ucx-py==0.40.*,>=0.0.0a0",
] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit dependencies.yaml and run `rapids-dependency-file-generator`.
[project.urls]