Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add experimental filesystem="arrow" support in dask_cudf.read_parquet #16684

Merged
merged 49 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
469bc5e
allow pyarrow-based read with cudf backend
rjzamora Aug 27, 2024
f20cc25
re-org
rjzamora Aug 27, 2024
8f0f598
temporary change for debugging
rjzamora Aug 28, 2024
64fd701
adjust for upstream bug
rjzamora Aug 28, 2024
8e0c902
remove stale comment
rjzamora Aug 28, 2024
18e1c08
add file aggregation
rjzamora Aug 28, 2024
5215a05
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Aug 29, 2024
c51a7bb
test coverage
rjzamora Aug 29, 2024
b7a90c1
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Aug 29, 2024
43274e2
allow aggregate_files=True
rjzamora Aug 30, 2024
63c3f04
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Aug 30, 2024
a1bd43c
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Aug 30, 2024
e3ca47f
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 3, 2024
12c09a5
fix test
rjzamora Sep 3, 2024
daee7ec
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Sep 4, 2024
d068103
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 4, 2024
257eb26
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Sep 5, 2024
ec38b1e
Make isinstance check pass for proxy ndarrays (#16601)
Matt711 Sep 5, 2024
853c76b
Performance improvement for strings::slice for wide strings (#16574)
davidwendt Sep 5, 2024
bdd2bab
skip for pyarrow<15
rjzamora Sep 6, 2024
d943d8d
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 6, 2024
eb9eee0
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Sep 10, 2024
b9c5147
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Sep 10, 2024
ec04e78
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Sep 18, 2024
e391789
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Sep 19, 2024
e154d01
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 19, 2024
3246d67
Intentionally leak thread_local CUDA resources to avoid crash (part 1…
kingcrimsontianyu Sep 19, 2024
2f424f2
Access Frame attributes instead of ColumnAccessor attributes when ava…
mroeschke Sep 19, 2024
362195d
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 24, 2024
4ce83d4
isolate expermental code path
rjzamora Sep 24, 2024
4d87013
remove unncessary logic
rjzamora Sep 24, 2024
e5b272a
remove unncessary logic - forgot to save
rjzamora Sep 24, 2024
8d87c54
add warning
rjzamora Sep 24, 2024
8cfe71e
remove blocksize and aggregate_files handling
rjzamora Sep 24, 2024
badf359
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 24, 2024
4c1c5ae
warn rather than raise for blocksize
rjzamora Sep 24, 2024
3f1d925
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
galipremsagar Sep 24, 2024
8c267c7
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 25, 2024
91d2d77
address code review from mads
rjzamora Sep 25, 2024
239639f
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 25, 2024
c944a52
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
galipremsagar Sep 25, 2024
791a4fd
more cleanup
rjzamora Sep 25, 2024
4c5ee6d
remove warning and add not to best-practices
rjzamora Sep 25, 2024
4d28db7
Build `cudf-polars` with `build.sh` (#16898)
brandon-b-miller Sep 25, 2024
9aa5aca
Fix DataFrame.drop(columns=cudf.Series/Index, axis=1) (#16712)
mroeschke Sep 25, 2024
42a15ee
[DOC] Update Pylibcudf doc strings (#16810)
Matt711 Sep 25, 2024
2c5bb57
Optimization of tdigest merge aggregation. (#16780)
nvdbaranec Sep 25, 2024
ed19b2e
Display deltas for `cudf.pandas` test summary (#16864)
galipremsagar Sep 25, 2024
aa492f5
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/dask_cudf/source/best_practices.rst
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,15 @@ result in a simple 1-to-1 mapping between files and output partitions.
correspond to a reasonable partition size, use ``blocksize=None``
to avoid unnecessary metadata collection.

.. note::
When reading from remote storage (e.g. S3 and GCS), performance will
likely improve with ``filesystem="arrow"``. When this option is set,
PyArrow will be used to perform IO on multiple CPU threads. Please be
aware that this feature is experimental, and behavior may change in
the future (without deprecation). Do not pass in ``blocksize`` or
``aggregate_files`` when this feature is used. Instead, set the
``"dataframe.parquet.minimum-partition-size"`` config to control
file aggregation.

Use :func:`from_map`
~~~~~~~~~~~~~~~~~~~~
Expand Down
7 changes: 4 additions & 3 deletions docs/dask_cudf/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ Using Dask cuDF
The Dask DataFrame API (Recommended)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Simply use the `Dask configuration <dask:configuration>`__ system to
set the ``"dataframe.backend"`` option to ``"cudf"``. From Python,
this can be achieved like so::
Simply use the `Dask configuration
<https://docs.dask.org/en/stable/how-to/selecting-the-collection-backend.html>`__
system to set the ``"dataframe.backend"`` option to ``"cudf"``.
From Python, this can be achieved like so::

import dask

Expand Down
142 changes: 136 additions & 6 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import numpy as np
import pandas as pd
import pyarrow as pa
from packaging.version import Version
from pandas.api.types import is_scalar

import dask.dataframe as dd
Expand Down Expand Up @@ -52,6 +53,10 @@
get_parallel_type.register(cudf.BaseIndex, lambda _: Index)


# Required for Arrow filesystem support in read_parquet
PYARROW_GE_15 = Version(pa.__version__) >= Version("15.0.0")


@meta_nonempty.register(cudf.BaseIndex)
@_dask_cudf_performance_tracking
def _nonempty_index(idx):
Expand Down Expand Up @@ -695,15 +700,140 @@ def from_dict(
)

@staticmethod
def read_parquet(*args, engine=None, **kwargs):
def read_parquet(path, *args, filesystem="fsspec", engine=None, **kwargs):
import dask_expr as dx
import fsspec

from dask_cudf.io.parquet import CudfEngine
if (
isinstance(filesystem, fsspec.AbstractFileSystem)
or isinstance(filesystem, str)
and filesystem.lower() == "fsspec"
):
# Default "fsspec" filesystem
from dask_cudf.io.parquet import CudfEngine

_raise_unsupported_parquet_kwargs(**kwargs)
return _default_backend(
dx.read_parquet, *args, engine=CudfEngine, **kwargs
)
_raise_unsupported_parquet_kwargs(**kwargs)
return _default_backend(
dx.read_parquet,
path,
*args,
filesystem=filesystem,
engine=CudfEngine,
**kwargs,
)

else:
# EXPERIMENTAL filesystem="arrow" support.
# This code path uses PyArrow for IO, which is only
# beneficial for remote storage (e.g. S3)

from fsspec.utils import stringify_path
from pyarrow import fs as pa_fs

# CudfReadParquetPyarrowFS requires import of distributed beforehand
# (See: https://github.com/dask/dask/issues/11352)
import distributed # noqa: F401
from dask.core import flatten
from dask.dataframe.utils import pyarrow_strings_enabled

from dask_cudf.expr._expr import CudfReadParquetPyarrowFS

if args:
raise ValueError(f"Unexpected positional arguments: {args}")

if not (
isinstance(filesystem, pa_fs.FileSystem)
or isinstance(filesystem, str)
and filesystem.lower() in ("arrow", "pyarrow")
):
raise ValueError(f"Unexpected filesystem value: {filesystem}.")

if not PYARROW_GE_15:
raise NotImplementedError(
"Experimental Arrow filesystem support requires pyarrow>=15"
)

if not isinstance(path, str):
path = stringify_path(path)

# Extract kwargs
columns = kwargs.pop("columns", None)
filters = kwargs.pop("filters", None)
categories = kwargs.pop("categories", None)
index = kwargs.pop("index", None)
storage_options = kwargs.pop("storage_options", None)
dtype_backend = kwargs.pop("dtype_backend", None)
calculate_divisions = kwargs.pop("calculate_divisions", False)
ignore_metadata_file = kwargs.pop("ignore_metadata_file", False)
metadata_task_size = kwargs.pop("metadata_task_size", None)
split_row_groups = kwargs.pop("split_row_groups", "infer")
blocksize = kwargs.pop("blocksize", "default")
aggregate_files = kwargs.pop("aggregate_files", None)
parquet_file_extension = kwargs.pop(
"parquet_file_extension", (".parq", ".parquet", ".pq")
)
arrow_to_pandas = kwargs.pop("arrow_to_pandas", None)
open_file_options = kwargs.pop("open_file_options", None)

# Validate and normalize kwargs
kwargs["dtype_backend"] = dtype_backend
if arrow_to_pandas is not None:
raise ValueError(
"arrow_to_pandas not supported for the 'cudf' backend."
)
if open_file_options is not None:
raise ValueError(
"The open_file_options argument is no longer supported "
"by the 'cudf' backend."
)
if filters is not None:
for filter in flatten(filters, container=list):
_, op, val = filter
if op == "in" and not isinstance(val, (set, list, tuple)):
raise TypeError(
"Value of 'in' filter must be a list, set or tuple."
)
if metadata_task_size is not None:
raise NotImplementedError(
"metadata_task_size is not supported when using the pyarrow filesystem."
)
if split_row_groups != "infer":
raise NotImplementedError(
"split_row_groups is not supported when using the pyarrow filesystem."
)
if parquet_file_extension != (".parq", ".parquet", ".pq"):
raise NotImplementedError(
"parquet_file_extension is not supported when using the pyarrow filesystem."
)
if blocksize is not None and blocksize != "default":
warnings.warn(
"blocksize is not supported when using the pyarrow filesystem."
"blocksize argument will be ignored."
)
if aggregate_files is not None:
warnings.warn(
"aggregate_files is not supported when using the pyarrow filesystem. "
"Please use the 'dataframe.parquet.minimum-partition-size' config."
"aggregate_files argument will be ignored."
)

return dx.new_collection(
CudfReadParquetPyarrowFS(
path,
columns=dx._util._convert_to_list(columns),
filters=filters,
categories=categories,
index=index,
calculate_divisions=calculate_divisions,
storage_options=storage_options,
filesystem=filesystem,
ignore_metadata_file=ignore_metadata_file,
arrow_to_pandas=arrow_to_pandas,
pyarrow_strings_enabled=pyarrow_strings_enabled(),
kwargs=kwargs,
_series=isinstance(columns, str),
)
)

@staticmethod
def read_csv(
Expand Down
89 changes: 89 additions & 0 deletions python/dask_cudf/dask_cudf/expr/_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
import functools

import dask_expr._shuffle as _shuffle_module
import pandas as pd
from dask_expr import new_collection
from dask_expr._cumulative import CumulativeBlockwise
from dask_expr._expr import Elemwise, Expr, RenameAxis, VarColumns
from dask_expr._reductions import Reduction, Var
from dask_expr.io.io import FusedParquetIO
from dask_expr.io.parquet import ReadParquetPyarrowFS

from dask.dataframe.core import is_dataframe_like, make_meta, meta_nonempty
from dask.dataframe.dispatch import is_categorical_dtype
Expand All @@ -18,6 +21,92 @@
##


class CudfFusedParquetIO(FusedParquetIO):
@staticmethod
def _load_multiple_files(
frag_filters,
columns,
schema,
*to_pandas_args,
):
import pyarrow as pa

from dask.base import apply, tokenize
from dask.threaded import get

token = tokenize(frag_filters, columns, schema)
name = f"pq-file-{token}"
dsk = {
(name, i): (
CudfReadParquetPyarrowFS._fragment_to_table,
frag,
filter,
columns,
schema,
)
for i, (frag, filter) in enumerate(frag_filters)
}
dsk[name] = (
apply,
pa.concat_tables,
[list(dsk.keys())],
{"promote_options": "permissive"},
)
return CudfReadParquetPyarrowFS._table_to_pandas(
get(dsk, name),
*to_pandas_args,
)


class CudfReadParquetPyarrowFS(ReadParquetPyarrowFS):
@functools.cached_property
def _dataset_info(self):
from dask_cudf.io.parquet import set_object_dtypes_from_pa_schema

dataset_info = super()._dataset_info
meta_pd = dataset_info["base_meta"]
if isinstance(meta_pd, cudf.DataFrame):
return dataset_info

# Convert to cudf
# (drop unsupported timezone information)
for k, v in meta_pd.dtypes.items():
if isinstance(v, pd.DatetimeTZDtype) and v.tz is not None:
meta_pd[k] = meta_pd[k].dt.tz_localize(None)
meta_cudf = cudf.from_pandas(meta_pd)

# Re-set "object" dtypes to align with pa schema
kwargs = dataset_info.get("kwargs", {})
set_object_dtypes_from_pa_schema(
meta_cudf,
kwargs.get("schema", None),
)

dataset_info["base_meta"] = meta_cudf
self.operands[type(self)._parameters.index("_dataset_info_cache")] = (
dataset_info
)
return dataset_info

@staticmethod
def _table_to_pandas(
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
table,
index_name,
*args,
):
df = cudf.DataFrame.from_arrow(table)
if index_name is not None:
df = df.set_index(index_name)
return df

def _tune_up(self, parent):
if self._fusion_compression_factor >= 1:
return
if isinstance(parent, CudfFusedParquetIO):
return
return parent.substitute(self, CudfFusedParquetIO(self))


class RenameAxisCudf(RenameAxis):
# TODO: Remove this after rename_axis is supported in cudf
# (See: https://github.com/rapidsai/cudf/issues/16895)
Expand Down
41 changes: 29 additions & 12 deletions python/dask_cudf/dask_cudf/io/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from dask.dataframe import assert_eq

import dask_cudf
from dask_cudf.tests.utils import QUERY_PLANNING_ON

moto = pytest.importorskip("moto", minversion="3.1.6")
boto3 = pytest.importorskip("boto3")
Expand Down Expand Up @@ -127,29 +128,45 @@ def test_read_parquet_open_file_options_raises():
)


def test_read_parquet_filesystem(s3_base, s3so, pdf):
@pytest.mark.parametrize(
"filesystem",
[
pytest.param(
"arrow",
marks=pytest.mark.skipif(
not QUERY_PLANNING_ON or not dask_cudf.backends.PYARROW_GE_15,
reason="Not supported",
),
),
"fsspec",
],
)
def test_read_parquet_filesystem(s3_base, s3so, pdf, filesystem):
fname = "test_parquet_filesystem.parquet"
bucket = "parquet"
buffer = BytesIO()
pdf.to_parquet(path=buffer)
buffer.seek(0)
with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}):
path = f"s3://{bucket}/{fname}"
if filesystem == "arrow":
# This feature requires arrow >= 15
pytest.importorskip("pyarrow", minversion="15.0.0")

# Cannot pass filesystem="arrow"
with pytest.raises(ValueError):
dask_cudf.read_parquet(
import pyarrow.fs as pa_fs

df = dask_cudf.read_parquet(
path,
filesystem=pa_fs.S3FileSystem(
endpoint_override=s3so["client_kwargs"]["endpoint_url"],
),
)
else:
df = dask_cudf.read_parquet(
path,
storage_options=s3so,
filesystem="arrow",
filesystem=filesystem,
)

# Can pass filesystem="fsspec"
df = dask_cudf.read_parquet(
path,
storage_options=s3so,
filesystem="fsspec",
)
assert df.b.sum().compute() == 9


Expand Down
Loading