Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask-cuDF cumulative groupby ops #10889

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion python/dask_cudf/dask_cudf/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@
import cudf
from cudf.utils.utils import _dask_cudf_nvtx_annotate

SUPPORTED_AGGS = (
CUMULATIVE_AGGS = (
'cumsum',
'cumcount',
)

AGGS = (
"count",
"mean",
"std",
Expand All @@ -34,6 +39,8 @@
"last",
)

SUPPORTED_AGGS = (*AGGS, *CUMULATIVE_AGGS)


def _check_groupby_supported(func):
"""
Expand Down
28 changes: 22 additions & 6 deletions python/dask_cudf/dask_cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@
from cudf.core._compat import PANDAS_GE_120

import dask_cudf
from dask_cudf.groupby import SUPPORTED_AGGS, _aggs_supported
from dask_cudf.groupby import AGGS, _aggs_supported, CUMULATIVE_AGGS
charlesbluca marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.parametrize("aggregation", SUPPORTED_AGGS)
@pytest.mark.parametrize("series", [False, True])
def test_groupby_basic(series, aggregation):
@pytest.fixture
def pdf():
np.random.seed(0)

# note that column name "x" is a substring of the groupby key;
Expand All @@ -28,6 +26,12 @@ def test_groupby_basic(series, aggregation):
"y": np.random.normal(size=10000),
}
)
return pdf


@pytest.mark.parametrize("aggregation", AGGS)
@pytest.mark.parametrize("series", [False, True])
def test_groupby_basic(series, aggregation, pdf):

gdf = cudf.DataFrame.from_pandas(pdf)
gdf_grouped = gdf.groupby("xx")
Expand All @@ -53,6 +57,18 @@ def test_groupby_basic(series, aggregation):
else:
dd.assert_eq(a, b)

@pytest.mark.parametrize("aggregation", CUMULATIVE_AGGS)
def test_groupby_cumulative(aggregation, pdf):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also be testing on series groupbys here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I added Series tests here and encountered what I think might be a bug in upstream dask. Here's a reproducer with no cuDF, modeled off of these tests:

import pandas as pd
import numpy as np
import dask.dataframe as dd

np.random.seed(0)
size=10
npartitions=2


pdf = pd.DataFrame(
    {
        "xx": np.random.randint(0, 5, size=size),
        "x": np.random.normal(size=size),
        "y": np.random.normal(size=size),
    }
)

ddf = dd.from_pandas(pdf, npartitions=npartitions)

pdf_grouped = pdf.groupby('xx').xx
ddf_grouped = ddf.groupby('xx').xx

pdf_grouped.cumsum()
ddf_grouped.cumsum().compute()

It's a little hard for me to reason about what the result "should" be here (we're aggregating one column of a dataframe groupby and taking...the cumulative sum of that?) but the above nets me different results for the last two lines. What do you think the best thing to do here is? I could file an issue and solve it before merging this, I could xfail this, etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this! IMO I would add the tests and xfail, this is what I've done for other tests that would otherwise fail here due to upstream Dask issues, for example:

pytest.param(
False,
["a", "b"],
marks=pytest.mark.xfail(
reason="https://github.com/dask/dask/issues/8817"
),
),

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gdf = cudf.DataFrame.from_pandas(pdf)
gdf_grouped = gdf.groupby("xx")
ddf_grouped = dask_cudf.from_cudf(gdf, npartitions=5).groupby("xx")

a = getattr(gdf_grouped, aggregation)()
b = getattr(ddf_grouped, aggregation)().compute()

dd.assert_eq(a, b)



@pytest.mark.parametrize(
"func",
Expand Down Expand Up @@ -679,7 +695,7 @@ def test_groupby_agg_redirect(aggregations):
],
)
def test_is_supported(arg, supported):
assert _aggs_supported(arg, SUPPORTED_AGGS) is supported
assert _aggs_supported(arg, AGGS) is supported
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpicky, but we might want to keep this as SUPPORTED_AGGS to make sure we don't eventually mess something up with support for new aggregations down the line:

Suggested change
assert _aggs_supported(arg, AGGS) is supported
assert _aggs_supported(arg, SUPPORTED_AGGS) is supported

Copy link
Member

@charlesbluca charlesbluca Jul 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, disregard this, I am forgetting that _aggs_supported really only needs to be tested for different groupby agg structures 😅 I think that a reasonable way to check that all aggregations are actually "supported" (i.e. use dask-cudf's groupby codepath) is to add the layer check I proposed in #10853



def test_groupby_unique_lists():
Expand Down