Skip to content

Commit

Permalink
Merge pull request #2994 from rjzamora/new-hash-dispatch
Browse files Browse the repository at this point in the history
[REVIEW] Use hash_object_dispatch and group_split_dispatch for split_out support
  • Loading branch information
rjzamora authored Oct 24, 2019
2 parents 3253ec2 + ffe85e0 commit 4613ba8
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
- PR #3149 Rename column_wrapper.cuh to column_wrapper.hpp
- PR #3168 Fix mutable_column_device_view head const_cast
- PR #3204 ORC writer: Fix ByteRLE encoding of NULLs
- PR #2994 Fix split_out-support but with hash_object_dispatch


# cuDF 0.10.0 (16 Oct 2019)
Expand Down
11 changes: 0 additions & 11 deletions python/cudf/cudf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,3 @@

__version__ = get_versions()["version"]
del get_versions


# Import dask_cudf dispatch functions
try:
from dask_cudf.backends import (
hash_df_cudf,
hash_df_cudf_index,
group_split_cudf,
)
except ImportError:
pass
1 change: 0 additions & 1 deletion python/cudf/cudf/core/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,6 @@ def __init__(self, values, **kwargs):
pd.Categorical(values, categories=values)
)
super(CategoricalIndex, self).__init__(values, **kwargs)
assert self._values.null_count == 0

@property
def names(self):
Expand Down
50 changes: 36 additions & 14 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,42 @@ def concat_cudf(

try:

from dask.dataframe.methods import group_split, hash_df

@hash_df.register(cudf.DataFrame)
def hash_df_cudf(dfs):
return dfs.hash_columns()

@hash_df.register(cudf.Index)
def hash_df_cudf_index(ind):
from cudf.core.column import column, numerical

cols = [column.as_column(ind)]
return cudf.Series(numerical.column_hash_values(*cols))

@group_split.register(cudf.DataFrame)
from dask.dataframe.utils import group_split_dispatch, hash_object_dispatch
from cudf.core.column import column, CategoricalColumn, StringColumn
import rmm
import cudf._lib as libcudf
from cudf.core.buffer import Buffer

def _string_safe_hash(df):
frame = df.copy(deep=False)
for col in frame.columns:
if isinstance(frame[col]._column, StringColumn):
out_dev_arr = rmm.device_array(len(frame), dtype="int32")
ptr = libcudf.cudf.get_ctype_ptr(out_dev_arr)
frame[col]._column.data.hash(devptr=ptr)
frame[col] = cudf.Series(Buffer(out_dev_arr))
return frame.hash_columns()

@hash_object_dispatch.register(cudf.DataFrame)
def hash_object_cudf(frame, index=True):
if index:
return _string_safe_hash(frame.reset_index)
return _string_safe_hash(frame)

@hash_object_dispatch.register(cudf.Index)
def hash_object_cudf_index(ind, index=None):

if isinstance(ind, cudf.MultiIndex):
return _string_safe_hash(ind.to_frame(index=False))

col = column.as_column(ind)
if isinstance(col, StringColumn):
col = col.as_numerical_column("int32")
elif isinstance(col, CategoricalColumn):
col = col.as_numerical
return cudf.Series(col).hash_values()

@group_split_dispatch.register(cudf.DataFrame)
def group_split_cudf(df, c, k):
return dict(zip(range(k), df.scatter_by_map(c, map_size=k)))

Expand Down
36 changes: 36 additions & 0 deletions python/dask_cudf/dask_cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,39 @@ def test_reset_index_multiindex():
.reset_index()
.merge(ddf_lookup, on="id_1"),
)


@pytest.mark.parametrize("split_out", [1, 2, 3])
@pytest.mark.parametrize(
"column", ["c", "d", "e", ["b", "c"], ["b", "d"], ["b", "e"]]
)
def test_groupby_split_out(split_out, column):
df = pd.DataFrame(
{
"a": np.arange(8),
"b": [1, 0, 0, 2, 1, 1, 2, 0],
"c": [0, 1] * 4,
"d": ["dog", "cat", "cat", "dog", "dog", "dog", "cat", "bird"],
}
)
df["e"] = df["d"].astype("category")
gdf = cudf.from_pandas(df)

ddf = dd.from_pandas(df, npartitions=3)
gddf = dask_cudf.from_cudf(gdf, npartitions=3)

ddf_result = (
ddf.groupby(column)
.a.mean(split_out=split_out)
.compute()
.sort_values()
.dropna()
)
gddf_result = (
gddf.groupby(column)
.a.mean(split_out=split_out)
.compute()
.sort_values()
)

dd.assert_eq(gddf_result, ddf_result, check_index=False)

0 comments on commit 4613ba8

Please sign in to comment.