diff --git a/CHANGELOG.md b/CHANGELOG.md index cf4ebd8dfef..db2142805c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -163,6 +163,7 @@ - PR #6742 Fix concat bug in dask_cudf Series/Index creation - PR #6632 Fix DataFrame initialization from list of dicts - PR #6767 Fix sort order of parameters in `test_scalar_invalid_implicit_conversion` pytest +- PR #6771 Fix index handling in parquet reader and writer - PR #6787 Update java reduction APIs to reflect C++ changes - PR #6790 Fix result representation in groupby.apply - PR #6794 Fix AVRO reader issues with empty input diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 743f1b41caf..19da062f7c2 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -6,6 +6,7 @@ import cudf import errno import os import pyarrow as pa +from collections import OrderedDict try: import ujson as json @@ -119,37 +120,34 @@ cpdef generate_pandas_metadata(Table table, index): # Indexes if index is not False: - for name in table._index.names: - if name is not None: - if isinstance(table._index, cudf.core.multiindex.MultiIndex): - idx = table.index.get_level_values(name) - else: - idx = table.index - - if isinstance(idx, cudf.core.index.RangeIndex): - descr = { - "kind": "range", - "name": table.index.name, - "start": table.index._start, - "stop": table.index._stop, - "step": 1, - } - else: - descr = name - col_names.append(name) - if is_categorical_dtype(idx): - raise ValueError( - "'category' column dtypes are currently not " - + "supported by the gpu accelerated parquet writer" - ) - elif is_list_dtype(col): - types.append(col.dtype.to_arrow()) - else: - types.append(np_to_pa_dtype(idx.dtype)) - index_levels.append(idx) - index_descriptors.append(descr) + for level, name in enumerate(table._index.names): + if isinstance(table._index, cudf.core.multiindex.MultiIndex): + idx = table.index.get_level_values(level) else: - col_names.append(name) + idx = table.index + + if isinstance(idx, cudf.core.index.RangeIndex): + descr = { + "kind": "range", + "name": table.index.name, + "start": table.index.start, + "stop": table.index.stop, + "step": table.index.step, + } + else: + descr = _index_level_name(idx.name, level, col_names) + if is_categorical_dtype(idx): + raise ValueError( + "'category' column dtypes are currently not " + + "supported by the gpu accelerated parquet writer" + ) + elif is_list_dtype(idx): + types.append(col.dtype.to_arrow()) + else: + types.append(np_to_pa_dtype(idx.dtype)) + index_levels.append(idx) + col_names.append(name) + index_descriptors.append(descr) metadata = pa.pandas_compat.construct_metadata( table, @@ -225,15 +223,24 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, column_names = [x.decode() for x in c_out_table.metadata.column_names] # Access the Parquet user_data json to find the index - index_col = '' + index_col = None cdef map[string, string] user_data = c_out_table.metadata.user_data json_str = user_data[b'pandas'].decode('utf-8') meta = None if json_str != "": meta = json.loads(json_str) if 'index_columns' in meta and len(meta['index_columns']) > 0: - index_col = meta['index_columns'][0] - + index_col = meta['index_columns'] + if isinstance(index_col[0], dict) and \ + index_col[0]['kind'] == 'range': + is_range_index = True + else: + is_range_index = False + index_col_names = OrderedDict() + for idx_col in index_col: + for c in meta['columns']: + if c['field_name'] == idx_col: + index_col_names[idx_col] = c['name'] df = cudf.DataFrame._from_table( Table.from_unique_ptr( move(c_out_table.tbl), @@ -250,7 +257,7 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, if not column_names: column_names = [o['name'] for o in meta['columns']] - if index_col in cols_dtype_map: + if not is_range_index and index_col in cols_dtype_map: column_names.remove(index_col) for col in column_names: @@ -261,16 +268,66 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, ) # Set the index column - if index_col is not '' and isinstance(index_col, str): - if index_col in column_names: - df = df.set_index(index_col) - new_index_name = pa.pandas_compat._backwards_compatible_index_name( - df.index.name, df.index.name - ) - df.index.name = new_index_name + if index_col is not None and len(index_col) > 0: + if is_range_index: + range_index_meta = index_col[0] + if row_groups is not None: + per_file_metadata = [ + pa.parquet.read_metadata(s) for s in filepaths_or_buffers + ] + + filtered_idx = [] + for i, file_meta in enumerate(per_file_metadata): + row_groups_i = [] + start = 0 + for row_group in range(file_meta.num_row_groups): + stop = start + file_meta.row_group(row_group).num_rows + row_groups_i.append((start, stop)) + start = stop + + for rg in row_groups[i]: + filtered_idx.append( + cudf.RangeIndex( + start=row_groups_i[rg][0], + stop=row_groups_i[rg][1], + step=range_index_meta['step'] + ) + ) + + if len(filtered_idx) > 0: + idx = cudf.concat(filtered_idx) + else: + idx = cudf.Index(cudf.core.column.column_empty(0)) + else: + idx = cudf.RangeIndex( + start=range_index_meta['start'], + stop=range_index_meta['stop'], + step=range_index_meta['step'], + name=range_index_meta['name'] + ) + if skiprows is not None: + idx = idx[skiprows:] + if num_rows is not None: + idx = idx[:num_rows] + df.index = idx + elif set(index_col).issubset(column_names): + index_data = df[index_col] + actual_index_names = list(index_col_names.values()) + if len(index_data._data) == 1: + idx = cudf.Index( + index_data._data.columns[0], + name=actual_index_names[0] + ) + else: + idx = cudf.MultiIndex.from_frame( + index_data, + names=actual_index_names + ) + df.drop(columns=index_col, inplace=True) + df.index = idx else: if use_pandas_metadata: - df.index.name = index_col + df.index.names = index_col return df @@ -296,21 +353,20 @@ cpdef write_parquet( cdef vector[string] column_names cdef map[string, string] user_data - cdef table_view tv = table.data_view() + cdef table_view tv cdef unique_ptr[cudf_io_types.data_sink] _data_sink cdef cudf_io_types.sink_info sink = make_sink_info(path, _data_sink) - if index is not False: + if index is not False and not isinstance(table._index, cudf.RangeIndex): tv = table.view() - if isinstance(table._index, cudf.core.multiindex.MultiIndex): - for idx_name in table._index.names: - column_names.push_back(str.encode(idx_name)) - else: - if table._index.name is not None: - column_names.push_back(str.encode(table._index.name)) - else: - # No named index exists so just write out columns - tv = table.data_view() + for level, idx_name in enumerate(table._index.names): + column_names.push_back( + str.encode( + _index_level_name(idx_name, level, table._column_names) + ) + ) + else: + tv = table.data_view() for col_name in table._column_names: column_names.push_back(str.encode(col_name)) @@ -544,3 +600,23 @@ cdef Column _update_column_struct_field_names( ) col.set_base_children(tuple(children)) return col + + +def _index_level_name(index_name, level, column_names): + """ + Return the name of an index level or a default name + if `index_name` is None or is already a column name. + + Parameters + ---------- + index_name : name of an Index object + level : level of the Index object + + Returns + ------- + name : str + """ + if index_name is not None and index_name not in column_names: + return index_name + else: + return f"__index_level_{level}__" diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index a730e3488eb..107d2d20e38 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -4940,10 +4940,7 @@ def from_pandas(cls, dataframe, nan_as_null=None): df.columns = dataframe.columns # Set index - if isinstance(dataframe.index, pd.MultiIndex): - index = cudf.from_pandas(dataframe.index, nan_as_null=nan_as_null) - else: - index = dataframe.index + index = cudf.from_pandas(dataframe.index, nan_as_null=nan_as_null) result = df.set_index(index) return result @@ -7137,10 +7134,8 @@ def from_pandas(obj, nan_as_null=None): elif isinstance(obj, pd.MultiIndex): return cudf.MultiIndex.from_pandas(obj, nan_as_null=nan_as_null) elif isinstance(obj, pd.RangeIndex): - if obj._step and obj._step != 1: - raise ValueError("cudf RangeIndex requires step == 1") return cudf.core.index.RangeIndex( - obj._start, stop=obj._stop, name=obj.name + start=obj.start, stop=obj.stop, step=obj.step, name=obj.name ) elif isinstance(obj, pd.Index): return cudf.Index.from_pandas(obj, nan_as_null=nan_as_null) diff --git a/python/cudf/cudf/core/index.py b/python/cudf/cudf/core/index.py index 7485b99b0ce..56348e4a1a4 100644 --- a/python/cudf/cudf/core/index.py +++ b/python/cudf/cudf/core/index.py @@ -1532,6 +1532,13 @@ def stop(self): """ return self._stop + @property + def step(self): + """ + The value of the step parameter. + """ + return self._step + @property def _num_columns(self): return 1 diff --git a/python/cudf/cudf/tests/test_pandas_interop.py b/python/cudf/cudf/tests/test_pandas_interop.py index 064b73f1052..15b1acdfc08 100644 --- a/python/cudf/cudf/tests/test_pandas_interop.py +++ b/python/cudf/cudf/tests/test_pandas_interop.py @@ -1,8 +1,7 @@ -# Copyright (c) 2018, NVIDIA CORPORATION. +# Copyright (c) 2018-2020, NVIDIA CORPORATION. import numpy as np import pandas as pd -import pytest import cudf from cudf.core import DataFrame @@ -85,6 +84,7 @@ def test_from_pandas_rangeindex(): def test_from_pandas_rangeindex_step(): - idx1 = pd.RangeIndex(start=0, stop=8, step=2, name="myindex") - with pytest.raises(ValueError): - cudf.from_pandas(idx1) + expected = pd.RangeIndex(start=0, stop=8, step=2, name="myindex") + actual = cudf.from_pandas(expected) + + assert_eq(expected, actual) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 1e4b6a2b6f6..fb8c293017a 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1,9 +1,10 @@ # Copyright (c) 2019-2020, NVIDIA CORPORATION. + +import datetime +import math import os import pathlib import random -import datetime -import math from glob import glob from io import BytesIO from string import ascii_letters @@ -459,7 +460,10 @@ def test_parquet_read_filtered_multiple_files(tmpdir): [fname_0, fname_1, fname_2], filters=[("x", "==", 2)] ) assert_eq( - filtered_df, cudf.DataFrame({"x": [2, 3, 2, 3], "y": list("bbcc")}) + filtered_df, + cudf.DataFrame( + {"x": [2, 3, 2, 3], "y": list("bbcc")}, index=[2, 3, 2, 3] + ), ) @@ -1010,15 +1014,7 @@ def test_parquet_reader_list_skiprows(skip, tmpdir): src.to_parquet(fname) assert os.path.exists(fname) - expect = pd.DataFrame( - { - "a": list_gen(int_gen, skip, num_rows - skip, 80, 50), - "b": list_gen(string_gen, skip, num_rows - skip, 80, 50), - "c": list_gen( - int_gen, skip, num_rows - skip, 80, 50, include_validity=True - ), - } - ) + expect = src.iloc[skip:] got = cudf.read_parquet(fname, skiprows=skip) assert_eq(expect, got, check_dtype=False) @@ -1041,18 +1037,7 @@ def test_parquet_reader_list_num_rows(skip, tmpdir): assert os.path.exists(fname) rows_to_read = min(3, num_rows - skip) - expect = pd.DataFrame( - { - "a": list_gen(int_gen, skip, rows_to_read, 80, 50), - "b": list_gen(string_gen, skip, rows_to_read, 80, 50), - "c": list_gen( - int_gen, skip, rows_to_read, 80, 50, include_validity=True - ), - "d": list_gen( - string_gen, skip, rows_to_read, 80, 50, include_validity=True - ), - } - ) + expect = src.iloc[skip:].head(rows_to_read) got = cudf.read_parquet(fname, skiprows=skip, num_rows=rows_to_read) assert_eq(expect, got, check_dtype=False) @@ -1577,7 +1562,7 @@ def test_parquet_writer_sliced(tmpdir): df_select = df.iloc[1:3] df_select.to_parquet(cudf_path) - assert_eq(cudf.read_parquet(cudf_path), df_select.reset_index(drop=True)) + assert_eq(cudf.read_parquet(cudf_path), df_select) def test_parquet_writer_list_basic(tmpdir): @@ -1644,6 +1629,63 @@ def test_parquet_nullable_boolean(tmpdir, engine): assert_eq(actual_gdf, expected_gdf) +@pytest.mark.parametrize( + "pdf", + [ + pd.DataFrame(index=[1, 2, 3]), + pytest.param( + pd.DataFrame(index=pd.RangeIndex(0, 10, 1)), + marks=pytest.mark.xfail( + reason="https://issues.apache.org/jira/browse/ARROW-10643" + ), + ), + pd.DataFrame({"a": [1, 2, 3]}, index=[0.43534, 345, 0.34534]), + pd.DataFrame( + {"b": [11, 22, 33], "c": ["a", "b", "c"]}, + index=pd.Index(["a", "b", "c"], name="custom name"), + ), + pd.DataFrame( + {"a": [10, 11, 12], "b": [99, 88, 77]}, + index=pd.RangeIndex(12, 17, 2), + ), + pd.DataFrame( + {"b": [99, 88, 77]}, + index=pd.RangeIndex(22, 27, 2, name="hello index"), + ), + pd.DataFrame(index=pd.Index(["a", "b", "c"], name="custom name")), + pd.DataFrame( + {"a": ["a", "bb", "cc"], "b": [10, 21, 32]}, + index=pd.MultiIndex.from_tuples([[1, 2], [10, 11], [15, 16]]), + ), + pd.DataFrame( + {"a": ["a", "bb", "cc"], "b": [10, 21, 32]}, + index=pd.MultiIndex.from_tuples( + [[1, 2], [10, 11], [15, 16]], names=["first", "second"] + ), + ), + ], +) +@pytest.mark.parametrize("index", [None, True, False]) +def test_parquet_index(tmpdir, pdf, index): + pandas_path = tmpdir.join("pandas_index.parquet") + cudf_path = tmpdir.join("pandas_index.parquet") + + gdf = cudf.from_pandas(pdf) + + pdf.to_parquet(pandas_path, index=index) + gdf.to_parquet(cudf_path, index=index) + + expected = pd.read_parquet(cudf_path) + actual = cudf.read_parquet(cudf_path) + + assert_eq(expected, actual) + + expected = pd.read_parquet(pandas_path) + actual = cudf.read_parquet(pandas_path) + + assert_eq(expected, actual) + + @pytest.mark.parametrize("engine", ["cudf", "pyarrow"]) def test_parquet_allnull_str(tmpdir, engine): pandas_path = tmpdir.join("pandas_allnulls.parquet")