diff --git a/docs/dask_cudf/source/best_practices.rst b/docs/dask_cudf/source/best_practices.rst index 142124163af..83039f86fed 100644 --- a/docs/dask_cudf/source/best_practices.rst +++ b/docs/dask_cudf/source/best_practices.rst @@ -252,6 +252,15 @@ result in a simple 1-to-1 mapping between files and output partitions. correspond to a reasonable partition size, use ``blocksize=None`` to avoid unnecessary metadata collection. +.. note:: + When reading from remote storage (e.g. S3 and GCS), performance will + likely improve with ``filesystem="arrow"``. When this option is set, + PyArrow will be used to perform IO on multiple CPU threads. Please be + aware that this feature is experimental, and behavior may change in + the future (without deprecation). Do not pass in ``blocksize`` or + ``aggregate_files`` when this feature is used. Instead, set the + ``"dataframe.parquet.minimum-partition-size"`` config to control + file aggregation. Use :func:`from_map` ~~~~~~~~~~~~~~~~~~~~ diff --git a/docs/dask_cudf/source/index.rst b/docs/dask_cudf/source/index.rst index 23ca7e49753..6eb755d7854 100644 --- a/docs/dask_cudf/source/index.rst +++ b/docs/dask_cudf/source/index.rst @@ -40,9 +40,10 @@ Using Dask cuDF The Dask DataFrame API (Recommended) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Simply use the `Dask configuration `__ system to -set the ``"dataframe.backend"`` option to ``"cudf"``. From Python, -this can be achieved like so:: +Simply use the `Dask configuration +`__ +system to set the ``"dataframe.backend"`` option to ``"cudf"``. +From Python, this can be achieved like so:: import dask diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index 9347ebba5de..bead964a0ef 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -8,6 +8,7 @@ import numpy as np import pandas as pd import pyarrow as pa +from packaging.version import Version from pandas.api.types import is_scalar import dask.dataframe as dd @@ -52,6 +53,10 @@ get_parallel_type.register(cudf.BaseIndex, lambda _: Index) +# Required for Arrow filesystem support in read_parquet +PYARROW_GE_15 = Version(pa.__version__) >= Version("15.0.0") + + @meta_nonempty.register(cudf.BaseIndex) @_dask_cudf_performance_tracking def _nonempty_index(idx): @@ -695,15 +700,140 @@ def from_dict( ) @staticmethod - def read_parquet(*args, engine=None, **kwargs): + def read_parquet(path, *args, filesystem="fsspec", engine=None, **kwargs): import dask_expr as dx + import fsspec - from dask_cudf.io.parquet import CudfEngine + if ( + isinstance(filesystem, fsspec.AbstractFileSystem) + or isinstance(filesystem, str) + and filesystem.lower() == "fsspec" + ): + # Default "fsspec" filesystem + from dask_cudf.io.parquet import CudfEngine - _raise_unsupported_parquet_kwargs(**kwargs) - return _default_backend( - dx.read_parquet, *args, engine=CudfEngine, **kwargs - ) + _raise_unsupported_parquet_kwargs(**kwargs) + return _default_backend( + dx.read_parquet, + path, + *args, + filesystem=filesystem, + engine=CudfEngine, + **kwargs, + ) + + else: + # EXPERIMENTAL filesystem="arrow" support. + # This code path uses PyArrow for IO, which is only + # beneficial for remote storage (e.g. S3) + + from fsspec.utils import stringify_path + from pyarrow import fs as pa_fs + + # CudfReadParquetPyarrowFS requires import of distributed beforehand + # (See: https://github.com/dask/dask/issues/11352) + import distributed # noqa: F401 + from dask.core import flatten + from dask.dataframe.utils import pyarrow_strings_enabled + + from dask_cudf.expr._expr import CudfReadParquetPyarrowFS + + if args: + raise ValueError(f"Unexpected positional arguments: {args}") + + if not ( + isinstance(filesystem, pa_fs.FileSystem) + or isinstance(filesystem, str) + and filesystem.lower() in ("arrow", "pyarrow") + ): + raise ValueError(f"Unexpected filesystem value: {filesystem}.") + + if not PYARROW_GE_15: + raise NotImplementedError( + "Experimental Arrow filesystem support requires pyarrow>=15" + ) + + if not isinstance(path, str): + path = stringify_path(path) + + # Extract kwargs + columns = kwargs.pop("columns", None) + filters = kwargs.pop("filters", None) + categories = kwargs.pop("categories", None) + index = kwargs.pop("index", None) + storage_options = kwargs.pop("storage_options", None) + dtype_backend = kwargs.pop("dtype_backend", None) + calculate_divisions = kwargs.pop("calculate_divisions", False) + ignore_metadata_file = kwargs.pop("ignore_metadata_file", False) + metadata_task_size = kwargs.pop("metadata_task_size", None) + split_row_groups = kwargs.pop("split_row_groups", "infer") + blocksize = kwargs.pop("blocksize", "default") + aggregate_files = kwargs.pop("aggregate_files", None) + parquet_file_extension = kwargs.pop( + "parquet_file_extension", (".parq", ".parquet", ".pq") + ) + arrow_to_pandas = kwargs.pop("arrow_to_pandas", None) + open_file_options = kwargs.pop("open_file_options", None) + + # Validate and normalize kwargs + kwargs["dtype_backend"] = dtype_backend + if arrow_to_pandas is not None: + raise ValueError( + "arrow_to_pandas not supported for the 'cudf' backend." + ) + if open_file_options is not None: + raise ValueError( + "The open_file_options argument is no longer supported " + "by the 'cudf' backend." + ) + if filters is not None: + for filter in flatten(filters, container=list): + _, op, val = filter + if op == "in" and not isinstance(val, (set, list, tuple)): + raise TypeError( + "Value of 'in' filter must be a list, set or tuple." + ) + if metadata_task_size is not None: + raise NotImplementedError( + "metadata_task_size is not supported when using the pyarrow filesystem." + ) + if split_row_groups != "infer": + raise NotImplementedError( + "split_row_groups is not supported when using the pyarrow filesystem." + ) + if parquet_file_extension != (".parq", ".parquet", ".pq"): + raise NotImplementedError( + "parquet_file_extension is not supported when using the pyarrow filesystem." + ) + if blocksize is not None and blocksize != "default": + warnings.warn( + "blocksize is not supported when using the pyarrow filesystem." + "blocksize argument will be ignored." + ) + if aggregate_files is not None: + warnings.warn( + "aggregate_files is not supported when using the pyarrow filesystem. " + "Please use the 'dataframe.parquet.minimum-partition-size' config." + "aggregate_files argument will be ignored." + ) + + return dx.new_collection( + CudfReadParquetPyarrowFS( + path, + columns=dx._util._convert_to_list(columns), + filters=filters, + categories=categories, + index=index, + calculate_divisions=calculate_divisions, + storage_options=storage_options, + filesystem=filesystem, + ignore_metadata_file=ignore_metadata_file, + arrow_to_pandas=arrow_to_pandas, + pyarrow_strings_enabled=pyarrow_strings_enabled(), + kwargs=kwargs, + _series=isinstance(columns, str), + ) + ) @staticmethod def read_csv( diff --git a/python/dask_cudf/dask_cudf/expr/_expr.py b/python/dask_cudf/dask_cudf/expr/_expr.py index b284ab3774d..af83a01da98 100644 --- a/python/dask_cudf/dask_cudf/expr/_expr.py +++ b/python/dask_cudf/dask_cudf/expr/_expr.py @@ -2,10 +2,13 @@ import functools import dask_expr._shuffle as _shuffle_module +import pandas as pd from dask_expr import new_collection from dask_expr._cumulative import CumulativeBlockwise from dask_expr._expr import Elemwise, Expr, RenameAxis, VarColumns from dask_expr._reductions import Reduction, Var +from dask_expr.io.io import FusedParquetIO +from dask_expr.io.parquet import ReadParquetPyarrowFS from dask.dataframe.core import is_dataframe_like, make_meta, meta_nonempty from dask.dataframe.dispatch import is_categorical_dtype @@ -18,6 +21,92 @@ ## +class CudfFusedParquetIO(FusedParquetIO): + @staticmethod + def _load_multiple_files( + frag_filters, + columns, + schema, + *to_pandas_args, + ): + import pyarrow as pa + + from dask.base import apply, tokenize + from dask.threaded import get + + token = tokenize(frag_filters, columns, schema) + name = f"pq-file-{token}" + dsk = { + (name, i): ( + CudfReadParquetPyarrowFS._fragment_to_table, + frag, + filter, + columns, + schema, + ) + for i, (frag, filter) in enumerate(frag_filters) + } + dsk[name] = ( + apply, + pa.concat_tables, + [list(dsk.keys())], + {"promote_options": "permissive"}, + ) + return CudfReadParquetPyarrowFS._table_to_pandas( + get(dsk, name), + *to_pandas_args, + ) + + +class CudfReadParquetPyarrowFS(ReadParquetPyarrowFS): + @functools.cached_property + def _dataset_info(self): + from dask_cudf.io.parquet import set_object_dtypes_from_pa_schema + + dataset_info = super()._dataset_info + meta_pd = dataset_info["base_meta"] + if isinstance(meta_pd, cudf.DataFrame): + return dataset_info + + # Convert to cudf + # (drop unsupported timezone information) + for k, v in meta_pd.dtypes.items(): + if isinstance(v, pd.DatetimeTZDtype) and v.tz is not None: + meta_pd[k] = meta_pd[k].dt.tz_localize(None) + meta_cudf = cudf.from_pandas(meta_pd) + + # Re-set "object" dtypes to align with pa schema + kwargs = dataset_info.get("kwargs", {}) + set_object_dtypes_from_pa_schema( + meta_cudf, + kwargs.get("schema", None), + ) + + dataset_info["base_meta"] = meta_cudf + self.operands[type(self)._parameters.index("_dataset_info_cache")] = ( + dataset_info + ) + return dataset_info + + @staticmethod + def _table_to_pandas( + table, + index_name, + *args, + ): + df = cudf.DataFrame.from_arrow(table) + if index_name is not None: + df = df.set_index(index_name) + return df + + def _tune_up(self, parent): + if self._fusion_compression_factor >= 1: + return + if isinstance(parent, CudfFusedParquetIO): + return + return parent.substitute(self, CudfFusedParquetIO(self)) + + class RenameAxisCudf(RenameAxis): # TODO: Remove this after rename_axis is supported in cudf # (See: https://github.com/rapidsai/cudf/issues/16895) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_s3.py b/python/dask_cudf/dask_cudf/io/tests/test_s3.py index a14ffbc37dc..cf8af82e112 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_s3.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_s3.py @@ -12,6 +12,7 @@ from dask.dataframe import assert_eq import dask_cudf +from dask_cudf.tests.utils import QUERY_PLANNING_ON moto = pytest.importorskip("moto", minversion="3.1.6") boto3 = pytest.importorskip("boto3") @@ -127,7 +128,20 @@ def test_read_parquet_open_file_options_raises(): ) -def test_read_parquet_filesystem(s3_base, s3so, pdf): +@pytest.mark.parametrize( + "filesystem", + [ + pytest.param( + "arrow", + marks=pytest.mark.skipif( + not QUERY_PLANNING_ON or not dask_cudf.backends.PYARROW_GE_15, + reason="Not supported", + ), + ), + "fsspec", + ], +) +def test_read_parquet_filesystem(s3_base, s3so, pdf, filesystem): fname = "test_parquet_filesystem.parquet" bucket = "parquet" buffer = BytesIO() @@ -135,21 +149,24 @@ def test_read_parquet_filesystem(s3_base, s3so, pdf): buffer.seek(0) with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): path = f"s3://{bucket}/{fname}" + if filesystem == "arrow": + # This feature requires arrow >= 15 + pytest.importorskip("pyarrow", minversion="15.0.0") - # Cannot pass filesystem="arrow" - with pytest.raises(ValueError): - dask_cudf.read_parquet( + import pyarrow.fs as pa_fs + + df = dask_cudf.read_parquet( + path, + filesystem=pa_fs.S3FileSystem( + endpoint_override=s3so["client_kwargs"]["endpoint_url"], + ), + ) + else: + df = dask_cudf.read_parquet( path, storage_options=s3so, - filesystem="arrow", + filesystem=filesystem, ) - - # Can pass filesystem="fsspec" - df = dask_cudf.read_parquet( - path, - storage_options=s3so, - filesystem="fsspec", - ) assert df.b.sum().compute() == 9