Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate dask-cudf CudfEngine to leverage ArrowDatasetEngine #8871

Merged
merged 22 commits into from
Aug 13, 2021
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
52a643c
save possible changes to enable multi-file parquet
rjzamora Apr 21, 2021
3b3dadb
Merge remote-tracking branch 'upstream/branch-21.06' into multi-file-…
rjzamora May 27, 2021
54644e1
align with latest upstream PR
rjzamora May 27, 2021
c5e660f
trigger format check
rjzamora May 27, 2021
e8cbc26
save some possible work aimed at uisng ParquetDatasetEngine
rjzamora Jul 6, 2021
2ab6838
Merge remote-tracking branch 'upstream/branch-21.08' into multi-file-…
rjzamora Jul 15, 2021
3079fff
Merge remote-tracking branch 'origin/multi-file-parquet' into migrate…
rjzamora Jul 16, 2021
d67f711
add test coverage
rjzamora Jul 16, 2021
a175daf
trigger format
rjzamora Jul 16, 2021
14038e9
Merge branch 'multi-file-parquet' into migrate-parquet-backend
rjzamora Jul 16, 2021
2407b89
Merge remote-tracking branch 'upstream/branch-21.08' into migrate-par…
rjzamora Jul 19, 2021
648c3ce
remove commented code
rjzamora Jul 27, 2021
494d585
remove commented code
rjzamora Jul 27, 2021
2ccc046
Merge branch 'branch-21.10' into migrate-parquet-backend
rjzamora Jul 27, 2021
07ed3c9
trigger formatting
rjzamora Jul 27, 2021
5a918d8
try reformatting
rjzamora Jul 29, 2021
f6a4ad6
Merge remote-tracking branch 'upstream/branch-21.10' into migrate-par…
rjzamora Jul 29, 2021
1473f36
Merge remote-tracking branch 'upstream/branch-21.10' into migrate-par…
rjzamora Jul 30, 2021
3602394
test tweak
rjzamora Jul 30, 2021
1db936a
Update python/dask_cudf/dask_cudf/io/tests/test_parquet.py
rjzamora Aug 10, 2021
675c314
build cat column in a more efficient way
rjzamora Aug 10, 2021
d672ab5
make cat column creation more efficient and fix schema-missmatch test
rjzamora Aug 10, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions python/dask_cudf/dask_cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pyarrow import parquet as pq

from dask import dataframe as dd
from dask.dataframe.io.parquet.arrow import ArrowEngine
from dask.dataframe.io.parquet.arrow import ArrowDatasetEngine

try:
from dask.dataframe.io.parquet import (
Expand All @@ -17,14 +17,16 @@
create_metadata_file_dd = None

import cudf
from cudf.core.column import as_column, build_categorical_column
from cudf.core.column import as_column
from cudf.io import write_to_dataset


class CudfEngine(ArrowEngine):
class CudfEngine(ArrowDatasetEngine):
@staticmethod
def read_metadata(*args, **kwargs):
meta, stats, parts, index = ArrowEngine.read_metadata(*args, **kwargs)
meta, stats, parts, index = ArrowDatasetEngine.read_metadata(
*args, **kwargs
)

# If `strings_to_categorical==True`, convert objects to int32
strings_to_cats = kwargs.get("strings_to_categorical", False)
Expand Down Expand Up @@ -59,7 +61,6 @@ def read_partition(
pieces = [pieces]

strings_to_cats = kwargs.get("strings_to_categorical", False)

if len(pieces) > 1:

paths = []
Expand All @@ -72,6 +73,9 @@ def read_partition(
rgs.append(None)
else:
(path, row_group, partition_keys) = piece

row_group = None if row_group == [None] else row_group

paths.append(path)
rgs.append(
[row_group]
Expand All @@ -96,6 +100,7 @@ def read_partition(
partition_keys = []
else:
(path, row_group, partition_keys) = pieces[0]
row_group = None if row_group == [None] else row_group

if cudf.utils.ioutils._is_local_filesystem(fs):
df = cudf.read_parquet(
Expand Down Expand Up @@ -127,18 +132,17 @@ def read_partition(
if partition_keys:
if partitions is None:
raise ValueError("Must pass partition sets")

for i, (name, index2) in enumerate(partition_keys):
categories = [
val.as_py() for val in partitions.levels[i].dictionary
]

categories = partitions[i].keys

col = as_column(index2).as_frame().repeat(len(df))._data[None]
df[name] = build_categorical_column(
categories=categories,
codes=as_column(col.base_data, dtype=col.dtype),
size=col.size,
offset=col.offset,
ordered=False,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot nicer with supported categorical column creation!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that the new change here was a bit inefficent. We were creating a column with the partition-based category repeated in every element, and then converting it to a categorical column. It makes a bit more sense to repeat the index of the partition-based category in every element, and build the categorical column directly.

df[name] = col.as_categorical_column(
cudf.CategoricalDtype(
categories=categories, ordered=False,
)
)

return df
Expand Down Expand Up @@ -243,9 +247,9 @@ def read_parquet(
""" Read parquet files into a Dask DataFrame

Calls ``dask.dataframe.read_parquet`` to cordinate the execution of
``cudf.read_parquet``, and ultimately read multiple partitions into a
single Dask dataframe. The Dask version must supply an ``ArrowEngine``
class to support full functionality.
``cudf.read_parquet``, and ultimately read multiple partitions into
a single Dask dataframe. The Dask version must supply an
``ArrowDatasetEngine`` class to support full functionality.
See ``cudf.read_parquet`` and Dask documentation for further details.

Examples
Expand Down
19 changes: 11 additions & 8 deletions python/dask_cudf/dask_cudf/io/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,17 +455,20 @@ def test_create_metadata_file_inconsistent_schema(tmpdir):
p1 = os.path.join(tmpdir, "part.1.parquet")
df1.to_parquet(p1, engine="pyarrow")

with pytest.raises(RuntimeError):
# Pyarrow will fail to aggregate metadata
# if gather_statistics=True
dask_cudf.read_parquet(str(tmpdir), gather_statistics=True,).compute()
# New pyarrow-dataset base can handle an incosistent
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
# schema even without a _metadata file
ddf1 = dask_cudf.read_parquet(str(tmpdir), gather_statistics=True)

# Add global metadata file.
# Dask-CuDF can do this without requiring schema
# consistency. Once the _metadata file is avaible,
# parsing metadata should no longer be a problem
# consistency.
dask_cudf.io.parquet.create_metadata_file([p0, p1])

# Check that we can now read the ddf
# Check that we can still read the ddf
# with the _metadata file present
dask_cudf.read_parquet(str(tmpdir), gather_statistics=True,).compute()
ddf2 = dask_cudf.read_parquet(str(tmpdir), gather_statistics=True)

# Check that the result is the same with
# and without the _metadata file
dd.assert_eq(ddf1, ddf2, check_dtypes=False)
dd.assert_eq(ddf1.compute(), ddf2.compute())