Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask-cuDF cumulative groupby ops #10889

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion python/dask_cudf/dask_cudf/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@
import cudf
from cudf.utils.utils import _dask_cudf_nvtx_annotate

SUPPORTED_AGGS = (
CUMULATIVE_AGGS = (
"cumsum",
"cumcount",
)

AGGS = (
"count",
"mean",
"std",
Expand All @@ -34,6 +39,8 @@
"last",
)

SUPPORTED_AGGS = (*AGGS, *CUMULATIVE_AGGS)


def _check_groupby_supported(func):
"""
Expand Down
38 changes: 33 additions & 5 deletions python/dask_cudf/dask_cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@
from cudf.core._compat import PANDAS_GE_120

import dask_cudf
from dask_cudf.groupby import SUPPORTED_AGGS, _aggs_supported
from dask_cudf.groupby import AGGS, CUMULATIVE_AGGS, _aggs_supported


@pytest.mark.parametrize("aggregation", SUPPORTED_AGGS)
@pytest.mark.parametrize("series", [False, True])
def test_groupby_basic(series, aggregation):
@pytest.fixture
def pdf():
np.random.seed(0)

# note that column name "x" is a substring of the groupby key;
Expand All @@ -28,6 +27,12 @@ def test_groupby_basic(series, aggregation):
"y": np.random.normal(size=10000),
}
)
return pdf


@pytest.mark.parametrize("aggregation", AGGS)
@pytest.mark.parametrize("series", [False, True])
def test_groupby_basic(series, aggregation, pdf):

gdf = cudf.DataFrame.from_pandas(pdf)
gdf_grouped = gdf.groupby("xx")
Expand All @@ -54,6 +59,29 @@ def test_groupby_basic(series, aggregation):
dd.assert_eq(a, b)


@pytest.mark.parametrize("series", [True, False])
@pytest.mark.parametrize("aggregation", CUMULATIVE_AGGS)
def test_groupby_cumulative(aggregation, pdf, series):
gdf = cudf.DataFrame.from_pandas(pdf)
ddf = dask_cudf.from_cudf(gdf, npartitions=5)

gdf_grouped = gdf.groupby("xx")
ddf_grouped = ddf.groupby("xx")

if series:
gdf_grouped = gdf_grouped.xx
ddf_grouped = ddf_grouped.xx

a = getattr(gdf_grouped, aggregation)()
b = getattr(ddf_grouped, aggregation)().compute()

if aggregation == "cumsum" and series:
with pytest.xfail(reason="https://github.com/dask/dask/issues/9313"):
dd.assert_eq(a, b)
else:
dd.assert_eq(a, b)


@pytest.mark.parametrize(
"func",
[
Expand Down Expand Up @@ -679,7 +707,7 @@ def test_groupby_agg_redirect(aggregations):
],
)
def test_is_supported(arg, supported):
assert _aggs_supported(arg, SUPPORTED_AGGS) is supported
assert _aggs_supported(arg, AGGS) is supported
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpicky, but we might want to keep this as SUPPORTED_AGGS to make sure we don't eventually mess something up with support for new aggregations down the line:

Suggested change
assert _aggs_supported(arg, AGGS) is supported
assert _aggs_supported(arg, SUPPORTED_AGGS) is supported

Copy link
Member

@charlesbluca charlesbluca Jul 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, disregard this, I am forgetting that _aggs_supported really only needs to be tested for different groupby agg structures 😅 I think that a reasonable way to check that all aggregations are actually "supported" (i.e. use dask-cudf's groupby codepath) is to add the layer check I proposed in #10853



def test_groupby_unique_lists():
Expand Down