Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add collect list to dask-cudf groupby aggregations #8045

Merged
merged 10 commits into from
Jul 6, 2021

Conversation

charlesbluca
Copy link
Member

@charlesbluca charlesbluca commented Apr 23, 2021

Closes #7812

Adds support for cuDF's collect aggregation in dask-cuDF.

@github-actions github-actions bot added the Python Affects Python cuDF API. label Apr 23, 2021
@charlesbluca
Copy link
Member Author

To illustrate the problem:

>>> data = """a,b
    1595802,1611:0.92
    1595802,1610:0.07
    1524246,1807:0.92
    1524246,1608:0.07"""

>>> df = pd.read_csv(StringIO(data))
>>> ddf = dd.from_pandas(df, 2)

>>> gdf = cudf.from_pandas(df)
>>> gddf = dask_cudf.from_cudf(gdf, 2)

>>> print(ddf.groupby("a").agg({"b":list}).compute())
                              b
a                              
1595802  [1611:0.92, 1610:0.07]
1524246  [1807:0.92, 1608:0.07]
>>> print(gddf.groupby("a").agg({"b":list}).compute())
                                b
a                                
1524246  [[1807:0.92, 1608:0.07]]
1595802  [[1611:0.92, 1610:0.07]]

@codecov
Copy link

codecov bot commented Apr 23, 2021

Codecov Report

❗ No coverage uploaded for pull request base (branch-21.08@167c2b7). Click here to learn what that means.
The diff coverage is n/a.

Impacted file tree graph

@@               Coverage Diff               @@
##             branch-21.08    #8045   +/-   ##
===============================================
  Coverage                ?   10.64%           
===============================================
  Files                   ?      109           
  Lines                   ?    18653           
  Branches                ?        0           
===============================================
  Hits                    ?     1985           
  Misses                  ?    16668           
  Partials                ?        0           

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 167c2b7...405b2c7. Read the comment docs.

@kkraus14 kkraus14 added bug Something isn't working dask Dask issue non-breaking Non-breaking change labels Apr 27, 2021
@charlesbluca charlesbluca changed the base branch from branch-21.06 to branch-21.08 May 27, 2021 00:48
@charlesbluca charlesbluca added the 0 - Blocked Cannot progress due to external reasons label Jun 16, 2021
@charlesbluca charlesbluca linked an issue Jun 16, 2021 that may be closed by this pull request
@charlesbluca charlesbluca added 2 - In Progress Currently a work in progress and removed 0 - Blocked Cannot progress due to external reasons labels Jul 2, 2021
@charlesbluca charlesbluca marked this pull request as ready for review July 2, 2021 20:26
@charlesbluca charlesbluca requested a review from a team as a code owner July 2, 2021 20:26
@charlesbluca charlesbluca added 3 - Ready for Review Ready for review by team and removed 2 - In Progress Currently a work in progress labels Jul 2, 2021
Copy link
Member

@rjzamora rjzamora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great @charlesbluca - Thanks!

My only concern is that you are explicity setting the index names to None in the test. Is the "collection" result somehow different from other aggregations?

Comment on lines +316 to +317
list: "collect",
"list": "collect",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this approach.

@@ -478,6 +513,9 @@ def _finalize_gb_agg(
gb.drop(columns=[sum_name], inplace=True)
if "count" not in agg_list:
gb.drop(columns=[count_name], inplace=True)
if "collect" in agg_list:
collect_name = _make_name(col, "collect", sep=sep)
gb[collect_name] = gb[collect_name].list.concat()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth the wait - Thanks for this concat method @shwina :)

python/dask_cudf/dask_cudf/tests/test_groupby.py Outdated Show resolved Hide resolved
Comment on lines +128 to +136
@pytest.mark.parametrize(
"func",
[
lambda df: df.groupby("x").agg({"y": "collect"}),
pytest.param(
lambda df: df.groupby("x").y.agg("collect"), marks=pytest.mark.skip
),
],
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason to define func this way? Am I misunderstanding, or will the second of two cases always skipped?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This param skip, and the index nulling, I lifted from another dask-cudf groupby test:

@pytest.mark.parametrize(
"func",
[
lambda df: df.groupby("x").agg({"y": "max"}),
pytest.param(
lambda df: df.groupby("x").y.agg(["sum", "max"]),
marks=pytest.mark.skip,
),
],
)
def test_groupby_agg(func):
pdf = pd.DataFrame(
{
"x": np.random.randint(0, 5, size=10000),
"y": np.random.normal(size=10000),
}
)
gdf = cudf.DataFrame.from_pandas(pdf)
ddf = dask_cudf.from_cudf(gdf, npartitions=5)
a = func(gdf).to_pandas()
b = func(ddf).compute().to_pandas()
a.index.name = None
a.name = None
b.index.name = None
b.name = None
dd.assert_eq(a, b)

I can see what happens when we don't set the index to None here, but when that test isn't skipped we get an AssertionError on the types:

__________________________________________ test_groupby_collect[<lambda>1] ___________________________________________

func = <function <lambda> at 0x7f3c804a0d30>

    @pytest.mark.parametrize(
        "func",
        [
            lambda df: df.groupby("x").agg({"y": "collect"}),
            lambda df: df.groupby("x").y.agg("collect"),
        ],
    )
    def test_groupby_collect(func):
        pdf = pd.DataFrame(
            {
                "x": np.random.randint(0, 5, size=10000),
                "y": np.random.normal(size=10000),
            }
        )
    
        gdf = cudf.DataFrame.from_pandas(pdf)
    
        ddf = dask_cudf.from_cudf(gdf, npartitions=5)
    
        a = func(gdf).to_pandas()
        b = func(ddf).compute().to_pandas()
    
        a.index.name = None
        a.name = None
        b.index.name = None
        b.name = None
    
>       dd.assert_eq(a, b)

dask_cudf/tests/test_groupby.py:155: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../compose/etc/conda/cuda_11.2.72/envs/rapids/lib/python3.8/site-packages/dask/dataframe/utils.py:559: in assert_eq
    tm.assert_series_equal(a, b, check_names=check_names, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

left = 0    [-0.02279966962796973, -0.2268040371246616, 0....
1    [1.0547561143327269, 0.07632651478542447, -0.0...
2    [1.... [0.35305396010499146, 2.022936601816015, -0.02...
4    [0.7639835327097312, 0.9458744987601149, 0.370...
dtype: object
right =                                                    y
0  [-0.02279966962796973, -0.2268040371246616, 0....
1  [1.054756...796, 1.498...
3  [0.35305396010499146, 2.022936601816015, -0.02...
4  [0.7639835327097312, 0.9458744987601149, 0.370...
cls = <class 'pandas.core.series.Series'>

    def _check_isinstance(left, right, cls):
        """
        Helper method for our assert_* methods that ensures that
        the two objects being compared have the right type before
        proceeding with the comparison.
    
        Parameters
        ----------
        left : The first object being compared.
        right : The second object being compared.
        cls : The class type to check against.
    
        Raises
        ------
        AssertionError : Either `left` or `right` is not an instance of `cls`.
        """
        cls_name = cls.__name__
    
        if not isinstance(left, cls):
            raise AssertionError(
                f"{cls_name} Expected type {cls}, found {type(left)} instead"
            )
        if not isinstance(right, cls):
>           raise AssertionError(
                f"{cls_name} Expected type {cls}, found {type(right)} instead"
            )
E           AssertionError: Series Expected type <class 'pandas.core.series.Series'>, found <class 'pandas.core.frame.DataFrame'> instead

It looks like we are creating a dataframe here when we should be making a series.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we are creating a dataframe here when we should be making a series.

Ah - It seems like this was already a problem before this PR. In that case, it is probably okay to fix that in a follow-up PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, do you know if there is an open issue for this problem? If not, I can open one so we can keep track of the follow up fix.

Copy link
Member

@rjzamora rjzamora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again @charlesbluca - Note that I filed #8655 to make sure we address the Series/DataFrame inconsistency discussed here.

@charlesbluca
Copy link
Member Author

Thanks for opening the issue @rjzamora!

@charlesbluca
Copy link
Member Author

@gpucibot merge

@rapids-bot rapids-bot bot merged commit c54346e into rapidsai:branch-21.08 Jul 6, 2021
@charlesbluca charlesbluca deleted the dask-collect-list branch August 3, 2021 17:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
3 - Ready for Review Ready for review by team bug Something isn't working dask Dask issue non-breaking Non-breaking change Python Affects Python cuDF API.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] Groupby collect list fails with Dask
5 participants