diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 510b5730169..0ac0af2842b 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -7,7 +7,7 @@ from pyarrow import parquet as pq from dask import dataframe as dd -from dask.dataframe.io.parquet.arrow import ArrowEngine +from dask.dataframe.io.parquet.arrow import ArrowDatasetEngine try: from dask.dataframe.io.parquet import ( @@ -19,12 +19,20 @@ import cudf from cudf.core.column import as_column, build_categorical_column from cudf.io import write_to_dataset +from cudf.utils.dtypes import cudf_dtype_from_pa_type -class CudfEngine(ArrowEngine): +class CudfEngine(ArrowDatasetEngine): @staticmethod def read_metadata(*args, **kwargs): - meta, stats, parts, index = ArrowEngine.read_metadata(*args, **kwargs) + meta, stats, parts, index = ArrowDatasetEngine.read_metadata( + *args, **kwargs + ) + if parts: + # Re-set "object" dtypes align with pa schema + set_object_dtypes_from_pa_schema( + meta, parts[0].get("common_kwargs", {}).get("schema", None), + ) # If `strings_to_categorical==True`, convert objects to int32 strings_to_cats = kwargs.get("strings_to_categorical", False) @@ -59,7 +67,6 @@ def read_partition( pieces = [pieces] strings_to_cats = kwargs.get("strings_to_categorical", False) - if len(pieces) > 1: paths = [] @@ -72,6 +79,9 @@ def read_partition( rgs.append(None) else: (path, row_group, partition_keys) = piece + + row_group = None if row_group == [None] else row_group + paths.append(path) rgs.append( [row_group] @@ -96,6 +106,7 @@ def read_partition( partition_keys = [] else: (path, row_group, partition_keys) = pieces[0] + row_group = None if row_group == [None] else row_group if cudf.utils.ioutils._is_local_filesystem(fs): df = cudf.read_parquet( @@ -117,6 +128,9 @@ def read_partition( **kwargs.get("read", {}), ) + # Re-set "object" dtypes align with pa schema + set_object_dtypes_from_pa_schema(df, kwargs.get("schema", None)) + if index and (index[0] in df.columns): df = df.set_index(index[0]) elif index is False and set(df.index.names).issubset(columns): @@ -127,17 +141,22 @@ def read_partition( if partition_keys: if partitions is None: raise ValueError("Must pass partition sets") + for i, (name, index2) in enumerate(partition_keys): - categories = [ - val.as_py() for val in partitions.levels[i].dictionary - ] - col = as_column(index2).as_frame().repeat(len(df))._data[None] + # Build the column from `codes` directly + # (since the category is often a larger dtype) + codes = ( + as_column(partitions[i].keys.index(index2)) + .as_frame() + .repeat(len(df)) + ._data[None] + ) df[name] = build_categorical_column( - categories=categories, - codes=as_column(col.base_data, dtype=col.dtype), - size=col.size, - offset=col.offset, + categories=partitions[i].keys, + codes=codes, + size=codes.size, + offset=codes.offset, ordered=False, ) @@ -233,6 +252,18 @@ def aggregate_metadata(cls, meta_list, fs, out_path): return meta +def set_object_dtypes_from_pa_schema(df, schema): + # Simple utility to modify cudf DataFrame + # "object" dtypes to agree with a specific + # pyarrow schema. + if schema: + for name in df.columns: + if name in schema.names and df[name].dtype == "O": + df[name] = df[name].astype( + cudf_dtype_from_pa_type(schema.field(name).type) + ) + + def read_parquet( path, columns=None, @@ -243,9 +274,9 @@ def read_parquet( """ Read parquet files into a Dask DataFrame Calls ``dask.dataframe.read_parquet`` to cordinate the execution of - ``cudf.read_parquet``, and ultimately read multiple partitions into a - single Dask dataframe. The Dask version must supply an ``ArrowEngine`` - class to support full functionality. + ``cudf.read_parquet``, and ultimately read multiple partitions into + a single Dask dataframe. The Dask version must supply an + ``ArrowDatasetEngine`` class to support full functionality. See ``cudf.read_parquet`` and Dask documentation for further details. Examples diff --git a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py index 740a2d48ce2..a5492bc5fc0 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py @@ -455,17 +455,24 @@ def test_create_metadata_file_inconsistent_schema(tmpdir): p1 = os.path.join(tmpdir, "part.1.parquet") df1.to_parquet(p1, engine="pyarrow") - with pytest.raises(RuntimeError): - # Pyarrow will fail to aggregate metadata - # if gather_statistics=True - dask_cudf.read_parquet(str(tmpdir), gather_statistics=True,).compute() + # New pyarrow-dataset base can handle an inconsistent + # schema (even without a _metadata file), but computing + # and dtype validation may fail + ddf1 = dask_cudf.read_parquet(str(tmpdir), gather_statistics=True) # Add global metadata file. # Dask-CuDF can do this without requiring schema - # consistency. Once the _metadata file is avaible, - # parsing metadata should no longer be a problem + # consistency. dask_cudf.io.parquet.create_metadata_file([p0, p1]) - # Check that we can now read the ddf + # Check that we can still read the ddf # with the _metadata file present - dask_cudf.read_parquet(str(tmpdir), gather_statistics=True,).compute() + ddf2 = dask_cudf.read_parquet(str(tmpdir), gather_statistics=True) + + # Check that the result is the same with and + # without the _metadata file. Note that we must + # call `compute` on `ddf1`, because the dtype of + # the inconsistent column ("a") may be "object" + # before computing, and "int" after + dd.assert_eq(ddf1.compute(), ddf2) + dd.assert_eq(ddf1.compute(), ddf2.compute())