Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Fix index handling in parquet reader and writer #6771

Merged
merged 27 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
210e34d
fix typo
galipremsagar Nov 11, 2020
f16ab6a
Fix writing index in parquet writer.
galipremsagar Nov 12, 2020
ecb15c4
Merge remote-tracking branch 'upstream/branch-0.17' into 6337
galipremsagar Nov 12, 2020
3dad784
Merge remote-tracking branch 'upstream/branch-0.17' into 6337
galipremsagar Nov 12, 2020
1a622a6
Merge remote-tracking branch 'upstream/branch-0.17' into 6337
galipremsagar Nov 16, 2020
614500a
Fix Parquet reader index handling
galipremsagar Nov 16, 2020
3bb0aec
Update CHANGELOG.md
galipremsagar Nov 16, 2020
01c6423
enable respecting columns and index
galipremsagar Nov 23, 2020
9833459
Merge remote-tracking branch 'upstream/branch-0.17' into 6821
galipremsagar Nov 23, 2020
a9d9200
fix columnAccessor constructor
galipremsagar Nov 23, 2020
b2c45b3
handle non-existent columns
galipremsagar Nov 24, 2020
865cc83
Merge remote-tracking branch 'upstream/branch-0.17' into 6337
galipremsagar Nov 24, 2020
3ce7a50
Merge remote-tracking branch 'upstream/branch-0.17' into 6821
galipremsagar Nov 24, 2020
c1cf47e
Handle more cases and add tests for the same.
galipremsagar Nov 24, 2020
30cc88d
Merge remote-tracking branch 'upstream/branch-0.17' into 6821
galipremsagar Nov 24, 2020
264c4cf
Fix dask issue and add tests
galipremsagar Nov 24, 2020
cfeef08
Update CHANGELOG.md
galipremsagar Nov 24, 2020
004f7c1
Add more changes
galipremsagar Nov 25, 2020
6f30990
Merge branch '6821' of https://github.com/galipremsagar/cudf into 6821
galipremsagar Nov 25, 2020
c6f50e2
Merge remote-tracking branch 'upstream/branch-0.17' into 6337
galipremsagar Dec 1, 2020
12849ba
handle index slicing when row groups is used
galipremsagar Dec 1, 2020
e6dfdea
Merge branch '6821' of https://github.com/galipremsagar/cudf into 6337
galipremsagar Dec 1, 2020
2b0b102
Merge remote-tracking branch 'upstream/branch-0.17' into 6337
galipremsagar Dec 1, 2020
fa8aa69
address review comments
galipremsagar Dec 1, 2020
820307c
remove unrelated commits
galipremsagar Dec 1, 2020
d509894
revert unrelated changes
galipremsagar Dec 1, 2020
2a9b65c
add back required change
galipremsagar Dec 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
- PR #6742 Fix concat bug in dask_cudf Series/Index creation
- PR #6632 Fix DataFrame initialization from list of dicts
- PR #6767 Fix sort order of parameters in `test_scalar_invalid_implicit_conversion` pytest
- PR #6771 Fix index handling in parquet reader and writer


# cuDF 0.16.0 (21 Oct 2020)
Expand Down
152 changes: 100 additions & 52 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import cudf
import errno
import os
import pyarrow as pa
from collections import OrderedDict

try:
import ujson as json
Expand Down Expand Up @@ -119,37 +120,34 @@ cpdef generate_pandas_metadata(Table table, index):

# Indexes
if index is not False:
for name in table._index.names:
if name is not None:
if isinstance(table._index, cudf.core.multiindex.MultiIndex):
idx = table.index.get_level_values(name)
else:
idx = table.index

if isinstance(idx, cudf.core.index.RangeIndex):
descr = {
"kind": "range",
"name": table.index.name,
"start": table.index._start,
"stop": table.index._stop,
"step": 1,
}
else:
descr = name
col_names.append(name)
if is_categorical_dtype(idx):
raise ValueError(
"'category' column dtypes are currently not "
+ "supported by the gpu accelerated parquet writer"
)
elif is_list_dtype(col):
types.append(col.dtype.to_arrow())
else:
types.append(np_to_pa_dtype(idx.dtype))
index_levels.append(idx)
index_descriptors.append(descr)
for level, name in enumerate(table._index.names):
if isinstance(table._index, cudf.core.multiindex.MultiIndex):
idx = table.index.get_level_values(level)
else:
col_names.append(name)
idx = table.index

if isinstance(idx, cudf.core.index.RangeIndex):
descr = {
"kind": "range",
"name": table.index.name,
"start": table.index.start,
"stop": table.index.stop,
"step": table.index.step,
}
else:
descr = _index_level_name(idx.name, level, col_names)
if is_categorical_dtype(idx):
raise ValueError(
"'category' column dtypes are currently not "
+ "supported by the gpu accelerated parquet writer"
)
elif is_list_dtype(idx):
types.append(col.dtype.to_arrow())
else:
types.append(np_to_pa_dtype(idx.dtype))
index_levels.append(idx)
col_names.append(name)
index_descriptors.append(descr)

metadata = pa.pandas_compat.construct_metadata(
table,
Expand Down Expand Up @@ -225,15 +223,24 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
column_names = [x.decode() for x in c_out_table.metadata.column_names]

# Access the Parquet user_data json to find the index
index_col = ''
index_col = None
cdef map[string, string] user_data = c_out_table.metadata.user_data
json_str = user_data[b'pandas'].decode('utf-8')
meta = None
if json_str != "":
meta = json.loads(json_str)
if 'index_columns' in meta and len(meta['index_columns']) > 0:
index_col = meta['index_columns'][0]

index_col = meta['index_columns']
if isinstance(index_col[0], dict) and \
index_col[0]['kind'] == 'range':
is_range_index = True
else:
is_range_index = False
index_col_names = OrderedDict()
for idx_col in index_col:
for c in meta['columns']:
if c['field_name'] == idx_col:
index_col_names[idx_col] = c['name']
kkraus14 marked this conversation as resolved.
Show resolved Hide resolved
df = cudf.DataFrame._from_table(
Table.from_unique_ptr(
move(c_out_table.tbl),
Expand All @@ -250,7 +257,7 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,

if not column_names:
column_names = [o['name'] for o in meta['columns']]
if index_col in cols_dtype_map:
if not is_range_index and index_col in cols_dtype_map:
column_names.remove(index_col)

for col in column_names:
Expand All @@ -261,16 +268,38 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
)

# Set the index column
if index_col is not '' and isinstance(index_col, str):
if index_col in column_names:
df = df.set_index(index_col)
new_index_name = pa.pandas_compat._backwards_compatible_index_name(
df.index.name, df.index.name
if index_col is not None and len(index_col) > 0:
if is_range_index:
range_index_meta = index_col[0]
idx = cudf.RangeIndex(
start=range_index_meta['start'],
stop=range_index_meta['stop'],
step=range_index_meta['step'],
name=range_index_meta['name']
)
df.index.name = new_index_name
if skiprows is not None:
idx = idx[skiprows:]
if num_rows is not None:
idx = idx[:num_rows]
df.index = idx
elif set(index_col).issubset(column_names):
index_data = df[index_col]
actual_index_names = list(index_col_names.values())
if len(index_data._data) == 1:
idx = cudf.Index(
index_data._data.columns[0],
name=actual_index_names[0]
)
else:
idx = cudf.MultiIndex.from_frame(
index_data,
names=actual_index_names
)
df.drop(columns=index_col, inplace=True)
df.index = idx
else:
if use_pandas_metadata:
df.index.name = index_col
df.index.names = index_col

return df

Expand All @@ -295,21 +324,20 @@ cpdef write_parquet(

cdef vector[string] column_names
cdef map[string, string] user_data
cdef table_view tv = table.data_view()
cdef table_view tv
cdef unique_ptr[cudf_io_types.data_sink] _data_sink
cdef cudf_io_types.sink_info sink = make_sink_info(path, _data_sink)

if index is not False:
if index is not False and not isinstance(table._index, cudf.RangeIndex):
tv = table.view()
if isinstance(table._index, cudf.core.multiindex.MultiIndex):
for idx_name in table._index.names:
column_names.push_back(str.encode(idx_name))
else:
if table._index.name is not None:
column_names.push_back(str.encode(table._index.name))
else:
# No named index exists so just write out columns
tv = table.data_view()
for level, idx_name in enumerate(table._index.names):
column_names.push_back(
str.encode(
_index_level_name(idx_name, level, table._column_names)
)
)
else:
tv = table.data_view()

for col_name in table._column_names:
column_names.push_back(str.encode(col_name))
Expand Down Expand Up @@ -541,3 +569,23 @@ cdef Column _update_column_struct_field_names(
)
col.set_base_children(tuple(children))
return col


def _index_level_name(index_name, level, column_names):
"""
Return the name of an index level or a default name
if `index_name` is None or is already a column name.

Parameters
----------
index_name : name of an Index object
level : level of the Index object

Returns
-------
name : str
"""
if index_name is not None and index_name not in column_names:
return index_name
else:
return f"__index_level_{level}__"
9 changes: 2 additions & 7 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4940,10 +4940,7 @@ def from_pandas(cls, dataframe, nan_as_null=None):
df.columns = dataframe.columns

# Set index
if isinstance(dataframe.index, pd.MultiIndex):
index = cudf.from_pandas(dataframe.index, nan_as_null=nan_as_null)
else:
index = dataframe.index
index = cudf.from_pandas(dataframe.index, nan_as_null=nan_as_null)
result = df.set_index(index)

return result
Expand Down Expand Up @@ -7137,10 +7134,8 @@ def from_pandas(obj, nan_as_null=None):
elif isinstance(obj, pd.MultiIndex):
return cudf.MultiIndex.from_pandas(obj, nan_as_null=nan_as_null)
elif isinstance(obj, pd.RangeIndex):
if obj._step and obj._step != 1:
raise ValueError("cudf RangeIndex requires step == 1")
return cudf.core.index.RangeIndex(
obj._start, stop=obj._stop, name=obj.name
start=obj.start, stop=obj.stop, step=obj.step, name=obj.name
)
elif isinstance(obj, pd.Index):
return cudf.Index.from_pandas(obj, nan_as_null=nan_as_null)
Expand Down
7 changes: 7 additions & 0 deletions python/cudf/cudf/core/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -1532,6 +1532,13 @@ def stop(self):
"""
return self._stop

@property
def step(self):
"""
The value of the step parameter.
"""
return self._step

Comment on lines +1535 to +1541
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching the missing step property!

@property
def _num_columns(self):
return 1
Expand Down
10 changes: 5 additions & 5 deletions python/cudf/cudf/tests/test_pandas_interop.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
# Copyright (c) 2018, NVIDIA CORPORATION.
# Copyright (c) 2018-2020, NVIDIA CORPORATION.

import numpy as np
import pandas as pd
import pytest

import cudf
from cudf.core import DataFrame
Expand Down Expand Up @@ -85,6 +84,7 @@ def test_from_pandas_rangeindex():


def test_from_pandas_rangeindex_step():
idx1 = pd.RangeIndex(start=0, stop=8, step=2, name="myindex")
with pytest.raises(ValueError):
cudf.from_pandas(idx1)
expected = pd.RangeIndex(start=0, stop=8, step=2, name="myindex")
actual = cudf.from_pandas(expected)

assert_eq(expected, actual)
84 changes: 62 additions & 22 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (c) 2019-2020, NVIDIA CORPORATION.

import os
import pathlib
import random
Expand Down Expand Up @@ -967,15 +968,7 @@ def test_parquet_reader_list_skiprows(skip, tmpdir):
src.to_parquet(fname)
assert os.path.exists(fname)

expect = pd.DataFrame(
{
"a": list_gen(int_gen, skip, num_rows - skip, 80, 50),
"b": list_gen(string_gen, skip, num_rows - skip, 80, 50),
"c": list_gen(
int_gen, skip, num_rows - skip, 80, 50, include_validity=True
),
}
)
expect = src.iloc[skip:]
got = cudf.read_parquet(fname, skiprows=skip)
assert_eq(expect, got, check_dtype=False)

Expand All @@ -998,18 +991,7 @@ def test_parquet_reader_list_num_rows(skip, tmpdir):
assert os.path.exists(fname)

rows_to_read = min(3, num_rows - skip)
expect = pd.DataFrame(
{
"a": list_gen(int_gen, skip, rows_to_read, 80, 50),
"b": list_gen(string_gen, skip, rows_to_read, 80, 50),
"c": list_gen(
int_gen, skip, rows_to_read, 80, 50, include_validity=True
),
"d": list_gen(
string_gen, skip, rows_to_read, 80, 50, include_validity=True
),
}
)
expect = src.iloc[skip:].head(rows_to_read)
got = cudf.read_parquet(fname, skiprows=skip, num_rows=rows_to_read)
assert_eq(expect, got, check_dtype=False)

Expand Down Expand Up @@ -1514,7 +1496,7 @@ def test_parquet_writer_sliced(tmpdir):
df_select = df.iloc[1:3]

df_select.to_parquet(cudf_path)
assert_eq(cudf.read_parquet(cudf_path), df_select.reset_index(drop=True))
assert_eq(cudf.read_parquet(cudf_path), df_select)


def test_parquet_writer_list_basic(tmpdir):
Expand Down Expand Up @@ -1579,3 +1561,61 @@ def test_parquet_nullable_boolean(tmpdir, engine):
actual_gdf = cudf.read_parquet(pandas_path, engine=engine)

assert_eq(actual_gdf, expected_gdf)


@pytest.mark.parametrize(
"pdf",
[
pd.DataFrame(index=[1, 2, 3]),
pytest.param(
pd.DataFrame(index=pd.RangeIndex(0, 10, 1)),
marks=pytest.mark.xfail(
reason="https://github.com/pandas-dev/pandas/issues/37897"
"https://github.com/pandas-dev/pandas/issues/37896"
kkraus14 marked this conversation as resolved.
Show resolved Hide resolved
),
),
pd.DataFrame({"a": [1, 2, 3]}, index=[0.43534, 345, 0.34534]),
pd.DataFrame(
{"b": [11, 22, 33], "c": ["a", "b", "c"]},
index=pd.Index(["a", "b", "c"], name="custom name"),
),
pd.DataFrame(
{"a": [10, 11, 12], "b": [99, 88, 77]},
index=pd.RangeIndex(12, 17, 2),
),
pd.DataFrame(
{"b": [99, 88, 77]},
index=pd.RangeIndex(22, 27, 2, name="hello index"),
),
pd.DataFrame(index=pd.Index(["a", "b", "c"], name="custom name")),
pd.DataFrame(
{"a": ["a", "bb", "cc"], "b": [10, 21, 32]},
index=pd.MultiIndex.from_tuples([[1, 2], [10, 11], [15, 16]]),
),
pd.DataFrame(
{"a": ["a", "bb", "cc"], "b": [10, 21, 32]},
index=pd.MultiIndex.from_tuples(
[[1, 2], [10, 11], [15, 16]], names=["first", "second"]
),
),
],
)
@pytest.mark.parametrize("index", [None, True, False])
def test_parquet_index(tmpdir, pdf, index):
pandas_path = tmpdir.join("pandas_index.parquet")
cudf_path = tmpdir.join("pandas_index.parquet")

gdf = cudf.from_pandas(pdf)

pdf.to_parquet(pandas_path, index=index)
gdf.to_parquet(cudf_path, index=index)

expected = pd.read_parquet(cudf_path)
actual = cudf.read_parquet(cudf_path)

assert_eq(expected, actual)

expected = pd.read_parquet(pandas_path)
actual = cudf.read_parquet(pandas_path)

assert_eq(expected, actual)