Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate dask-cudf CudfEngine to leverage ArrowDatasetEngine #8871

Merged
merged 22 commits into from
Aug 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
52a643c
save possible changes to enable multi-file parquet
rjzamora Apr 21, 2021
3b3dadb
Merge remote-tracking branch 'upstream/branch-21.06' into multi-file-…
rjzamora May 27, 2021
54644e1
align with latest upstream PR
rjzamora May 27, 2021
c5e660f
trigger format check
rjzamora May 27, 2021
e8cbc26
save some possible work aimed at uisng ParquetDatasetEngine
rjzamora Jul 6, 2021
2ab6838
Merge remote-tracking branch 'upstream/branch-21.08' into multi-file-…
rjzamora Jul 15, 2021
3079fff
Merge remote-tracking branch 'origin/multi-file-parquet' into migrate…
rjzamora Jul 16, 2021
d67f711
add test coverage
rjzamora Jul 16, 2021
a175daf
trigger format
rjzamora Jul 16, 2021
14038e9
Merge branch 'multi-file-parquet' into migrate-parquet-backend
rjzamora Jul 16, 2021
2407b89
Merge remote-tracking branch 'upstream/branch-21.08' into migrate-par…
rjzamora Jul 19, 2021
648c3ce
remove commented code
rjzamora Jul 27, 2021
494d585
remove commented code
rjzamora Jul 27, 2021
2ccc046
Merge branch 'branch-21.10' into migrate-parquet-backend
rjzamora Jul 27, 2021
07ed3c9
trigger formatting
rjzamora Jul 27, 2021
5a918d8
try reformatting
rjzamora Jul 29, 2021
f6a4ad6
Merge remote-tracking branch 'upstream/branch-21.10' into migrate-par…
rjzamora Jul 29, 2021
1473f36
Merge remote-tracking branch 'upstream/branch-21.10' into migrate-par…
rjzamora Jul 30, 2021
3602394
test tweak
rjzamora Jul 30, 2021
1db936a
Update python/dask_cudf/dask_cudf/io/tests/test_parquet.py
rjzamora Aug 10, 2021
675c314
build cat column in a more efficient way
rjzamora Aug 10, 2021
d672ab5
make cat column creation more efficient and fix schema-missmatch test
rjzamora Aug 10, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 46 additions & 15 deletions python/dask_cudf/dask_cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pyarrow import parquet as pq

from dask import dataframe as dd
from dask.dataframe.io.parquet.arrow import ArrowEngine
from dask.dataframe.io.parquet.arrow import ArrowDatasetEngine

try:
from dask.dataframe.io.parquet import (
Expand All @@ -19,12 +19,20 @@
import cudf
from cudf.core.column import as_column, build_categorical_column
from cudf.io import write_to_dataset
from cudf.utils.dtypes import cudf_dtype_from_pa_type


class CudfEngine(ArrowEngine):
class CudfEngine(ArrowDatasetEngine):
@staticmethod
def read_metadata(*args, **kwargs):
meta, stats, parts, index = ArrowEngine.read_metadata(*args, **kwargs)
meta, stats, parts, index = ArrowDatasetEngine.read_metadata(
*args, **kwargs
)
if parts:
# Re-set "object" dtypes align with pa schema
set_object_dtypes_from_pa_schema(
meta, parts[0].get("common_kwargs", {}).get("schema", None),
)

# If `strings_to_categorical==True`, convert objects to int32
strings_to_cats = kwargs.get("strings_to_categorical", False)
Expand Down Expand Up @@ -59,7 +67,6 @@ def read_partition(
pieces = [pieces]

strings_to_cats = kwargs.get("strings_to_categorical", False)

if len(pieces) > 1:

paths = []
Expand All @@ -72,6 +79,9 @@ def read_partition(
rgs.append(None)
else:
(path, row_group, partition_keys) = piece

row_group = None if row_group == [None] else row_group

paths.append(path)
rgs.append(
[row_group]
Expand All @@ -96,6 +106,7 @@ def read_partition(
partition_keys = []
else:
(path, row_group, partition_keys) = pieces[0]
row_group = None if row_group == [None] else row_group

if cudf.utils.ioutils._is_local_filesystem(fs):
df = cudf.read_parquet(
Expand All @@ -117,6 +128,9 @@ def read_partition(
**kwargs.get("read", {}),
)

# Re-set "object" dtypes align with pa schema
set_object_dtypes_from_pa_schema(df, kwargs.get("schema", None))

if index and (index[0] in df.columns):
df = df.set_index(index[0])
elif index is False and set(df.index.names).issubset(columns):
Expand All @@ -127,17 +141,22 @@ def read_partition(
if partition_keys:
if partitions is None:
raise ValueError("Must pass partition sets")

for i, (name, index2) in enumerate(partition_keys):
categories = [
val.as_py() for val in partitions.levels[i].dictionary
]

col = as_column(index2).as_frame().repeat(len(df))._data[None]
# Build the column from `codes` directly
# (since the category is often a larger dtype)
codes = (
as_column(partitions[i].keys.index(index2))
.as_frame()
.repeat(len(df))
._data[None]
)
df[name] = build_categorical_column(
categories=categories,
codes=as_column(col.base_data, dtype=col.dtype),
size=col.size,
offset=col.offset,
categories=partitions[i].keys,
codes=codes,
size=codes.size,
offset=codes.offset,
ordered=False,
)

Expand Down Expand Up @@ -233,6 +252,18 @@ def aggregate_metadata(cls, meta_list, fs, out_path):
return meta


def set_object_dtypes_from_pa_schema(df, schema):
# Simple utility to modify cudf DataFrame
# "object" dtypes to agree with a specific
# pyarrow schema.
if schema:
for name in df.columns:
if name in schema.names and df[name].dtype == "O":
df[name] = df[name].astype(
cudf_dtype_from_pa_type(schema.field(name).type)
)


def read_parquet(
path,
columns=None,
Expand All @@ -243,9 +274,9 @@ def read_parquet(
""" Read parquet files into a Dask DataFrame

Calls ``dask.dataframe.read_parquet`` to cordinate the execution of
``cudf.read_parquet``, and ultimately read multiple partitions into a
single Dask dataframe. The Dask version must supply an ``ArrowEngine``
class to support full functionality.
``cudf.read_parquet``, and ultimately read multiple partitions into
a single Dask dataframe. The Dask version must supply an
``ArrowDatasetEngine`` class to support full functionality.
See ``cudf.read_parquet`` and Dask documentation for further details.

Examples
Expand Down
23 changes: 15 additions & 8 deletions python/dask_cudf/dask_cudf/io/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,17 +455,24 @@ def test_create_metadata_file_inconsistent_schema(tmpdir):
p1 = os.path.join(tmpdir, "part.1.parquet")
df1.to_parquet(p1, engine="pyarrow")

with pytest.raises(RuntimeError):
# Pyarrow will fail to aggregate metadata
# if gather_statistics=True
dask_cudf.read_parquet(str(tmpdir), gather_statistics=True,).compute()
# New pyarrow-dataset base can handle an inconsistent
# schema (even without a _metadata file), but computing
# and dtype validation may fail
ddf1 = dask_cudf.read_parquet(str(tmpdir), gather_statistics=True)

# Add global metadata file.
# Dask-CuDF can do this without requiring schema
# consistency. Once the _metadata file is avaible,
# parsing metadata should no longer be a problem
# consistency.
dask_cudf.io.parquet.create_metadata_file([p0, p1])

# Check that we can now read the ddf
# Check that we can still read the ddf
# with the _metadata file present
dask_cudf.read_parquet(str(tmpdir), gather_statistics=True,).compute()
ddf2 = dask_cudf.read_parquet(str(tmpdir), gather_statistics=True)

# Check that the result is the same with and
# without the _metadata file. Note that we must
# call `compute` on `ddf1`, because the dtype of
# the inconsistent column ("a") may be "object"
# before computing, and "int" after
dd.assert_eq(ddf1.compute(), ddf2)
dd.assert_eq(ddf1.compute(), ddf2.compute())