Skip to content

Commit

Permalink
Migrate dask-cudf CudfEngine to leverage ArrowDatasetEngine (#8871)
Browse files Browse the repository at this point in the history
Closes #8656 

Addresses the impending deprecation of the ArrowLegacyEngine (which dask-cudf currently depends on), by migrating the `CudfEngine` backend to the newer `ArrowDatasetEngine`.

TODO:

- [x] ~Benchmark/check for any (significant) performance regressions~ (**EDIT**: [pyarrow-deprecations in pyarrow-5](dask/dask#7961) make this migration necessary IMO)

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Benjamin Zaitlen (https://github.com/quasiben)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #8871
  • Loading branch information
rjzamora authored Aug 13, 2021
1 parent 2e980b8 commit 3be1c4c
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 23 deletions.
61 changes: 46 additions & 15 deletions python/dask_cudf/dask_cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pyarrow import parquet as pq

from dask import dataframe as dd
from dask.dataframe.io.parquet.arrow import ArrowEngine
from dask.dataframe.io.parquet.arrow import ArrowDatasetEngine

try:
from dask.dataframe.io.parquet import (
Expand All @@ -19,12 +19,20 @@
import cudf
from cudf.core.column import as_column, build_categorical_column
from cudf.io import write_to_dataset
from cudf.utils.dtypes import cudf_dtype_from_pa_type


class CudfEngine(ArrowEngine):
class CudfEngine(ArrowDatasetEngine):
@staticmethod
def read_metadata(*args, **kwargs):
meta, stats, parts, index = ArrowEngine.read_metadata(*args, **kwargs)
meta, stats, parts, index = ArrowDatasetEngine.read_metadata(
*args, **kwargs
)
if parts:
# Re-set "object" dtypes align with pa schema
set_object_dtypes_from_pa_schema(
meta, parts[0].get("common_kwargs", {}).get("schema", None),
)

# If `strings_to_categorical==True`, convert objects to int32
strings_to_cats = kwargs.get("strings_to_categorical", False)
Expand Down Expand Up @@ -59,7 +67,6 @@ def read_partition(
pieces = [pieces]

strings_to_cats = kwargs.get("strings_to_categorical", False)

if len(pieces) > 1:

paths = []
Expand All @@ -72,6 +79,9 @@ def read_partition(
rgs.append(None)
else:
(path, row_group, partition_keys) = piece

row_group = None if row_group == [None] else row_group

paths.append(path)
rgs.append(
[row_group]
Expand All @@ -96,6 +106,7 @@ def read_partition(
partition_keys = []
else:
(path, row_group, partition_keys) = pieces[0]
row_group = None if row_group == [None] else row_group

if cudf.utils.ioutils._is_local_filesystem(fs):
df = cudf.read_parquet(
Expand All @@ -117,6 +128,9 @@ def read_partition(
**kwargs.get("read", {}),
)

# Re-set "object" dtypes align with pa schema
set_object_dtypes_from_pa_schema(df, kwargs.get("schema", None))

if index and (index[0] in df.columns):
df = df.set_index(index[0])
elif index is False and set(df.index.names).issubset(columns):
Expand All @@ -127,17 +141,22 @@ def read_partition(
if partition_keys:
if partitions is None:
raise ValueError("Must pass partition sets")

for i, (name, index2) in enumerate(partition_keys):
categories = [
val.as_py() for val in partitions.levels[i].dictionary
]

col = as_column(index2).as_frame().repeat(len(df))._data[None]
# Build the column from `codes` directly
# (since the category is often a larger dtype)
codes = (
as_column(partitions[i].keys.index(index2))
.as_frame()
.repeat(len(df))
._data[None]
)
df[name] = build_categorical_column(
categories=categories,
codes=as_column(col.base_data, dtype=col.dtype),
size=col.size,
offset=col.offset,
categories=partitions[i].keys,
codes=codes,
size=codes.size,
offset=codes.offset,
ordered=False,
)

Expand Down Expand Up @@ -233,6 +252,18 @@ def aggregate_metadata(cls, meta_list, fs, out_path):
return meta


def set_object_dtypes_from_pa_schema(df, schema):
# Simple utility to modify cudf DataFrame
# "object" dtypes to agree with a specific
# pyarrow schema.
if schema:
for name in df.columns:
if name in schema.names and df[name].dtype == "O":
df[name] = df[name].astype(
cudf_dtype_from_pa_type(schema.field(name).type)
)


def read_parquet(
path,
columns=None,
Expand All @@ -243,9 +274,9 @@ def read_parquet(
""" Read parquet files into a Dask DataFrame
Calls ``dask.dataframe.read_parquet`` to cordinate the execution of
``cudf.read_parquet``, and ultimately read multiple partitions into a
single Dask dataframe. The Dask version must supply an ``ArrowEngine``
class to support full functionality.
``cudf.read_parquet``, and ultimately read multiple partitions into
a single Dask dataframe. The Dask version must supply an
``ArrowDatasetEngine`` class to support full functionality.
See ``cudf.read_parquet`` and Dask documentation for further details.
Examples
Expand Down
23 changes: 15 additions & 8 deletions python/dask_cudf/dask_cudf/io/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,17 +455,24 @@ def test_create_metadata_file_inconsistent_schema(tmpdir):
p1 = os.path.join(tmpdir, "part.1.parquet")
df1.to_parquet(p1, engine="pyarrow")

with pytest.raises(RuntimeError):
# Pyarrow will fail to aggregate metadata
# if gather_statistics=True
dask_cudf.read_parquet(str(tmpdir), gather_statistics=True,).compute()
# New pyarrow-dataset base can handle an inconsistent
# schema (even without a _metadata file), but computing
# and dtype validation may fail
ddf1 = dask_cudf.read_parquet(str(tmpdir), gather_statistics=True)

# Add global metadata file.
# Dask-CuDF can do this without requiring schema
# consistency. Once the _metadata file is avaible,
# parsing metadata should no longer be a problem
# consistency.
dask_cudf.io.parquet.create_metadata_file([p0, p1])

# Check that we can now read the ddf
# Check that we can still read the ddf
# with the _metadata file present
dask_cudf.read_parquet(str(tmpdir), gather_statistics=True,).compute()
ddf2 = dask_cudf.read_parquet(str(tmpdir), gather_statistics=True)

# Check that the result is the same with and
# without the _metadata file. Note that we must
# call `compute` on `ddf1`, because the dtype of
# the inconsistent column ("a") may be "object"
# before computing, and "int" after
dd.assert_eq(ddf1.compute(), ddf2)
dd.assert_eq(ddf1.compute(), ddf2.compute())

0 comments on commit 3be1c4c

Please sign in to comment.