Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Groupby collect list fails with Dask #7812

Closed
beckernick opened this issue Apr 1, 2021 · 27 comments · Fixed by #8045
Closed

[BUG] Groupby collect list fails with Dask #7812

beckernick opened this issue Apr 1, 2021 · 27 comments · Fixed by #8045
Assignees
Labels
bug Something isn't working dask Dask issue Python Affects Python cuDF API.

Comments

@beckernick
Copy link
Member

beckernick commented Apr 1, 2021

cuDF recently implemented groupby collect list. I'd like to be able to use this with Dask, like I can on the CPU. Currently, it looks like Dask's collect list goes a codepath that requires iterating through the object, which we explicitly don't permit (#7481). We may want to explore this in Dask or special case in Dask cuDF.

import cudf
import dask_cudf
import pandas as pd
import dask.dataframe as dd
​
from io import StringIO

​
data = """a,b
1595802,1611:0.92
1595802,1610:0.07
1524246,1807:0.92
1524246,1608:0.07"""
​
df = pd.read_csv(StringIO(data))
ddf = dd.from_pandas(df, 2)
​
gdf = cudf.from_pandas(df)
gddf = dask_cudf.from_cudf(gdf, 2)
​
print(ddf.groupby("a").agg({"b":list}).compute()) # works as expected
print(gddf.groupby("a").agg({"b":list}).compute())
                              b
a                              
1595802  [1611:0.92, 1610:0.07]
1524246  [1807:0.92, 1608:0.07]
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
    179     try:
--> 180         yield
    181     except Exception as e:

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/core.py in _emulate(func, *args, **kwargs)
   5507     with raise_on_meta_error(funcname(func), udf=kwargs.pop("udf", False)):
-> 5508         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
   5509 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py in _groupby_apply_funcs(df, *index, **kwargs)
    920     for result_column, func, func_kwargs in funcs:
--> 921         r = func(grouped, **func_kwargs)
    922 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py in _apply_func_to_column(df_like, column, func)
    966 
--> 967     return func(df_like[column])
    968 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py in <lambda>(s)
    843                 _apply_func_to_column,
--> 844                 dict(column=input_column, func=lambda s: s.apply(list)),
    845             )

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/groupby/groupby.py in apply(self, function)
    422         ]
--> 423         chunk_results = [function(chk) for chk in chunks]
    424 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/groupby/groupby.py in <listcomp>(.0)
    422         ]
--> 423         chunk_results = [function(chk) for chk in chunks]
    424 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/series.py in __iter__(self)
   1189     def __iter__(self):
-> 1190         cudf.utils.utils.raise_iteration_error(obj=self)
   1191 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/utils/utils.py in raise_iteration_error(obj)
    358     raise TypeError(
--> 359         f"{obj.__class__.__name__} object is not iterable. "
    360         f"Consider using `.to_arrow()`, `.to_pandas()` or `.values_host` "

TypeError: Series object is not iterable. Consider using `.to_arrow()`, `.to_pandas()` or `.values_host` if you wish to iterate over the values.

The above exception was the direct cause of the following exception:

ValueError                                Traceback (most recent call last)
<ipython-input-16-08ab17754440> in <module>
     22 
     23 print(ddf.groupby("a").agg({"b":list}).compute()) # works as expected
---> 24 print(gddf.groupby("a").agg({"b":list}).compute())

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py in agg(self, arg, split_every, split_out)
   1846     @derived_from(pd.core.groupby.DataFrameGroupBy)
   1847     def agg(self, arg, split_every=None, split_out=1):
-> 1848         return self.aggregate(arg, split_every=split_every, split_out=split_out)
   1849 
   1850 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask_cudf/groupby.py in aggregate(self, arg, split_every, split_out)
     81 
     82         return super().aggregate(
---> 83             arg, split_every=split_every, split_out=split_out
     84         )
     85 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py in aggregate(self, arg, split_every, split_out)
   1842             return self.size()
   1843 
-> 1844         return super().aggregate(arg, split_every=split_every, split_out=split_out)
   1845 
   1846     @derived_from(pd.core.groupby.DataFrameGroupBy)

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py in aggregate(self, arg, split_every, split_out)
   1622             split_out=split_out,
   1623             split_out_setup=split_out_on_index,
-> 1624             sort=self.sort,
   1625         )
   1626 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/core.py in apply_concat_apply(args, chunk, aggregate, combine, meta, token, chunk_kwargs, aggregate_kwargs, combine_kwargs, split_every, split_out, split_out_setup, split_out_setup_kwargs, sort, ignore_index, **kwargs)
   5459 
   5460     if meta is no_default:
-> 5461         meta_chunk = _emulate(chunk, *args, udf=True, **chunk_kwargs)
   5462         meta = _emulate(
   5463             aggregate, _concat([meta_chunk], ignore_index), udf=True, **aggregate_kwargs

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/core.py in _emulate(func, *args, **kwargs)
   5506     """
   5507     with raise_on_meta_error(funcname(func), udf=kwargs.pop("udf", False)):
-> 5508         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
   5509 
   5510 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/contextlib.py in __exit__(self, type, value, traceback)
    128                 value = type()
    129             try:
--> 130                 self.gen.throw(type, value, traceback)
    131             except StopIteration as exc:
    132                 # Suppress StopIteration *unless* it's the same exception that

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
    199         )
    200         msg = msg.format(" in `{0}`".format(funcname) if funcname else "", repr(e), tb)
--> 201         raise ValueError(msg) from e
    202 
    203 

ValueError: Metadata inference failed in `_groupby_apply_funcs`.

You have supplied a custom function and Dask is unable to 
determine the type of output that that function returns. 

To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.

Original error is below:
------------------------
TypeError('Series object is not iterable. Consider using `.to_arrow()`, `.to_pandas()` or `.values_host` if you wish to iterate over the values.')

Traceback:
---------
  File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/utils.py", line 180, in raise_on_meta_error
    yield
  File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/core.py", line 5508, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
  File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py", line 921, in _groupby_apply_funcs
    r = func(grouped, **func_kwargs)
  File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py", line 967, in _apply_func_to_column
    return func(df_like[column])
  File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py", line 844, in <lambda>
    dict(column=input_column, func=lambda s: s.apply(list)),
  File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/groupby/groupby.py", line 423, in apply
    chunk_results = [function(chk) for chk in chunks]
  File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/groupby/groupby.py", line 423, in <listcomp>
    chunk_results = [function(chk) for chk in chunks]
  File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/series.py", line 1190, in __iter__
    cudf.utils.utils.raise_iteration_error(obj=self)
  File "/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/utils/utils.py", line 359, in raise_iteration_error
    f"{obj.__class__.__name__} object is not iterable. "
!conda list | grep "rapids\|dask\|numpy\|cupy\|arrow\|pandas"
# packages in environment at /raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331:
arrow-cpp                 1.0.1           py37h2318771_14_cuda    conda-forge
arrow-cpp-proc            3.0.0                      cuda    conda-forge
cudf                      0.19.0a210331   cuda_10.2_py37_gc99fcef41b_313    rapidsai-nightly
cuml                      0.19.0a210331   cuda10.2_py37_g83168076c_138    rapidsai-nightly
cupy                      8.6.0            py37h7fc54ca_0    conda-forge
dask                      2021.3.1           pyhd8ed1ab_0    conda-forge
dask-core                 2021.3.1           pyhd8ed1ab_0    conda-forge
dask-cuda                 0.19.0a210331           py37_45    rapidsai-nightly
dask-cudf                 0.19.0a210331   py37_gc99fcef41b_313    rapidsai-nightly
libcudf                   0.19.0a210331   cuda10.2_gbe2f0c000f_314    rapidsai-nightly
libcuml                   0.19.0a210331   cuda10.2_g83168076c_138    rapidsai-nightly
libcumlprims              0.19.0a210316   cuda10.2_ge7e82a0_12    rapidsai-nightly
librmm                    0.19.0a210331   cuda10.2_g9d1ba02_50    rapidsai-nightly
numpy                     1.19.5           py37haa41c4c_1    conda-forge
pandas                    1.2.3            py37hdc94413_0    conda-forge
pyarrow                   1.0.1           py37hbeecfa9_14_cuda    conda-forge
rmm                       0.19.0a210331   cuda_10.2_py37_g9d1ba02_50    rapidsai-nightly
ucx                       1.9.0+gcd9efd3       cuda10.2_0    rapidsai-nightly
ucx-proc                  1.0.0                       gpu    rapidsai-nightly
ucx-py                    0.19.0a210331   py37_gcd9efd3_46    rapidsai-nightly
@beckernick beckernick added bug Something isn't working Python Affects Python cuDF API. dask Dask issue labels Apr 1, 2021
@beckernick
Copy link
Member Author

beckernick commented Apr 5, 2021

I wonder if we might be able to support this by switching the Dask implementation to use lambda s: s.agg(list) rather than lambda s: s.apply(list) here: https://github.com/dask/dask/blob/bc785e1a88f0e662afa0aba749a83953b0eb66da/dask/dataframe/groupby.py#L843

cuDF forbids the iteration needed for apply(list) on the groupby, but both cudf and pandas support agg(list) in the same way. Though perhaps the downstream iteration in the _build_agg_args_list would still block

@rjzamora
Copy link
Member

rjzamora commented Apr 9, 2021

I haven't looked through this deeply, but the quickest/easiest "fix" may be to add list to the optimized groupby-aggregation code path in dask_cudf. It may not require much work to get that code path working (would need to add support for list in _tree_node_agg, but I'm not seeing any real blockers).

@charlesbluca
Copy link
Member

Following @beckernick's approach, we would also need to change the Dask implementation's s0.apply(...) to s0.agg(...), as it causes the same iteration error. However, in doing that we make it so the aggregation function is now trying to iterate over a cudf._lib.aggregation._AggregationFactory, which leads to a similar error.

Is there any way we could change the lambda in s0.agg(...) to avoid this iteration? If not, I am also trying out @rjzamora's approach as an alternative.

@charlesbluca
Copy link
Member

charlesbluca commented Apr 14, 2021

While working on adding list to the optimized code path, I noticed what I think is a bug in _is_supported():

if isinstance(arg[col], list):
_global_set.union(set(arg[col]))
else:
_global_set.add(arg[col])

If _global_set is meant to contain all the aggregations across all columns, I think that _global_set.union(...) should be

_global_set = _global_set.union(set(arg[col]))

Since set.union() doesn't actually modify the set in place. Right now, passing something like {"a": ["unsupported_agg"]} into _is_supported() would always return True.

@rjzamora
Copy link
Member

rjzamora commented Apr 14, 2021

Good catch @charlesbluca ! Do you have time to submit a fix to cudf?

Note that you can also include it in the list fix, but it may make sense to address the union bug separately.

@charlesbluca
Copy link
Member

Sure! I opened up a PR for this at #7959; I agree that it's probably best handled separately.

@charlesbluca
Copy link
Member

After chatting with @rjzamora, some other thoughts on the supported optimized aggs in Dask-cuDF:

  • Currently, dask-cudf only supports string function names when checking if an aggregation is supported; this means that providing the callable equivalent will result in the groupby operation being done through Dask:
df = pd.DataFrame(dict(a=[1, 1, 2, 3, 3, 1, 1, 2, 3, 3, 99, 10, 1],
                       b=[1, 3, 10, 3, 2, 1, 3, 10, 3, 3, 12, 0, 9]]))

gdf = cudf.from_pandas(df)
gddf = dask_cudf.from_cudf(gdf, 3)

print(gddf.groupby("a").agg({"b": "max"}).compute())  # uses dask-cudf's codepath
print(gddf.groupby("a").agg({"b": max}).compute())    # uses dask's codepath
  • Unlike pandas and cuDF, Dask dataframes will do a groupby collect list if you provide the callable list or the string "list"; I'm not sure if dask-cudf should aim to mirror this behavior, or mirror the behavior of cuDF by only accepting the callable:
from io import StringIO

data = """a,b
1595802,1611:0.92
1595802,1610:0.07
1524246,1807:0.92
1524246,1608:0.07"""

df = pd.read_csv(StringIO(data))
ddf = dd.from_pandas(df, 2)

print(ddf.groupby("a").agg({"b": list}).compute())
print(ddf.groupby("a").agg({"b": "list"}).compute())  # both have identical output

This falls out of the scope of this issue, so for now I'll make a PR that adds support for the callable list, and later on a separate PR that expands _is_supported() to handle callables, and optionally emulate Dask.dataframe's "list" behavior.

@rjzamora
Copy link
Member

This falls out of the scope of this issue, so for now I'll make a PR that adds support for the callable list, and later on a separate PR that expands _is_supported() to handle callables, and optionally emulate Dask.dataframe's "list" behavior.

Thanks @charlesbluca ! I agree that the priority is to support the pandas/cudf list-aggregation syntax.

As we discussed, it is somewhat of a coincidence that the "list" label is supported by dask-dataframe (since it is not supported by pandas or cudf), but it shouldn't hurt to support it in dask_cudf as well.

@charlesbluca
Copy link
Member

After adding list to the supported aggs, we seem to be getting blocked on

_meta = ddf._meta.groupby(gb_cols, as_index=as_index).agg(_aggs)

Due an invalid make_empty_column call:

Traceback (most recent call last):
  File "cudf/_lib/groupby.pyx", line 158, in cudf._lib.groupby.GroupBy.aggregate
RuntimeError: cuDF failure at: /raid/charlesb/github/cudf/cpp/src/column/column_factories.cpp:68: make_empty_column is invalid to call on nested types

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "test_list_collect.py", line 20, in <module>
    print(gddf.groupby("a").agg({"b": list}).compute())
  File "/raid/charlesb/github/dask/dask/dataframe/groupby.py", line 1847, in agg
    return self.aggregate(arg, split_every=split_every, split_out=split_out)
  File "/raid/charlesb/github/cudf/python/dask_cudf/dask_cudf/groupby.py", line 70, in aggregate
    return groupby_agg(
  File "/raid/charlesb/github/cudf/python/dask_cudf/dask_cudf/groupby.py", line 258, in groupby_agg
    _meta = ddf._meta.groupby(gb_cols, as_index=as_index).agg(_aggs)
  File "/raid/charlesb/miniconda3/envs/cudf_dev/lib/python3.8/contextlib.py", line 75, in inner
    return func(*args, **kwds)
  File "/raid/charlesb/miniconda3/envs/cudf_dev/lib/python3.8/site-packages/cudf-0.20.0a0+138.g18964ffccd-py3.8-linux-x86_64.egg/cudf/core/groupby/groupby.py", line 165, in agg
    result = self._groupby.aggregate(self.obj, normalized_aggs)
  File "cudf/_lib/groupby.pyx", line 167, in cudf._lib.groupby.GroupBy.aggregate
NotImplementedError: Aggregation not supported for empty columns

Is this related to #7611?

@rjzamora
Copy link
Member

rjzamora commented Apr 15, 2021

I see - This definitely seems like somethin we should ultimately resolve in collect. However, since you are taking the dask_cudf route here, perhaps a simple fix is to fall back on _meta_nonempty when things fail?

try:
    _meta = ddf._meta.groupby(gb_cols, as_index=as_index).agg(_aggs)
except NotImplementedError:
    _meta = ddf._meta_nonempty.groupby(gb_cols, as_index=as_index).agg(_aggs)

@charlesbluca
Copy link
Member

charlesbluca commented Apr 15, 2021

Something else I noticed that isn't necessarily a bug, but does conflict with Dask.dataframe (not sure about Pandas); it looks like _groupby_partition_agg is expecting an aggs dictionary with lists of the aggregations per-column:

for col, agg_list in aggs.items():
_agg_dict[col] = set()
for agg in agg_list:

This conflicts with @beckernick's example, which running through dask-cudf's codepath would raise a TypeError because list is not iterable. Not sure if the best solution for this would be to explicitly specify examples of input for the aggs dict, or a check in the code to make sure all aggregations are in lists.

@rjzamora
Copy link
Member

This conflicts with @beckernick's example, which running through dask-cudf's codepath would raise a TypeError because list is not iterable. Not sure if the best solution for this would be to explicitly specify examples of input for the aggs dict, or a check in the code to make sure all aggregations are in lists.

Right - It is certainly true that _groupby_partition_agg is expecting a dict with list values. This is by design, and requires the parent groupby_agg function to ensure that the aggregation plan is rewritten to meet this specification. Regarding the motivating example for this issue: It looks like you just need to expand upon this str check to handle calllable objects as well. It also seems that we will need to add a check for general aggregations, like df.groubpy("a").agg(list).

@charlesbluca
Copy link
Member

Thanks for the clarification @rjzamora - added a check for callables after the str check.

It also seems that we will need to add a check for general aggregations, like df.groubpy("a").agg(list).

That makes sense - I imagine that might make more sense in a separate PR, but I can add it in this one.

@charlesbluca
Copy link
Member

Currently have a naive approach to the _tree_node_agg, which uses _AggregationFactory.collect to combine the partitions. This results in a nested list structure, which is expected:

>>> print(gddf.groupby("a").agg({"b": list}).compute())
                                  b
a                                  
1524246    [[1807:0.92, 1608:0.07]]
1595802  [[1611:0.92], [1610:0.07]]

From here, should we:

  • Add a step to _finalize_gb_agg to flatten this nested list structure
  • Add a new method to _AggregationFactory like collect_tree to be used in the tree reduction

I would imagine the latter is preferred, but requires C++ code that I'm not really familiar with.

@rjzamora
Copy link
Member

rjzamora commented Apr 16, 2021

Currently have a naive approach to the _tree_node_agg, which uses _AggregationFactory.collect to combine the partitions.

Is there a reason you need to use _AggregationFactory.collect explicitly, rather than just including list in the aggregation again? I know you will get a nested list, but it seems that _AggregationFactory.collect gives you this as well?

Regarding list-column flattening: I think this is the key challenge for supporting list aggregation :). It may make sense to add an arg to _tree_node_agg and _finalize_gb_agg to specify that the input should be flattened (if a "*_list" column is present), and then you can add a single utility function to perform this flattening. (Note that I haven't thought through this, so my understanding may be naive).

EDIT: It looks like df['col_list'].explode() may flatten the column for you.

@charlesbluca
Copy link
Member

Thanks for the tip! I'll try out explode().

Is there a reason you need to use _AggregationFactory.collect explicitly, rather than just including list in the aggregation again?

I actually am using list in the aggregation again, which redirects to Aggregation.collect() (looks like the internals have been updated recently so _AggregationFactory is now Aggregation)

elif callable(op):
if op is list:
agg = Aggregation.collect()

@rjzamora
Copy link
Member

I actually am using list in the aggregation again, which redirects to Aggregation.collect() (looks like the internals have been updated recently so _AggregationFactory is now Aggregation)

I see - that makes sense!

@charlesbluca
Copy link
Member

Just tried out explode(), looks like that doesn't work in all cases:

>>> gb["b___list"]
a
1524246      [[1807:0.92, 1608:0.07]]
1595802    [[1611:0.92], [1610:0.07]]
Name: b___list, dtype: list
>>> gb["b___list"].explode()
a
1524246    [1807:0.92, 1608:0.07]
1595802               [1611:0.92]
1595802               [1610:0.07]
Name: b___list, dtype: list

@rjzamora
Copy link
Member

looks like that doesn't work in all cases

Boo. Thats a shame :/

@beckernick
Copy link
Member Author

beckernick commented Apr 16, 2021

>>> gb["b___list"]
a
1524246      [[1807:0.92, 1608:0.07]]
1595802    [[1611:0.92], [1610:0.07]]
Name: b___list, dtype: list

This should expose the list.leaves and the offsets. Is it possible to use these to zero-copy construct a fresh flattened ListColumn @galipremsagar ?

@rjzamora
Copy link
Member

rjzamora commented Apr 16, 2021

Yeah - Good thought Nick. It would be great if there was a simple way to remove the outer-most offsets.

If there turns out to be no convenient way to "flatten" the list column in the way we need, we can still use explode. However, since the row-count is not preserved, we would need to propagate seperate dataframes through the reduction tree for each of the specified list aggregations. I have confirmed in some toy code that this will work, but the logic will need to be pretty messy to handle a general use case. So, I'd much prefer a simple cudf utility/primitive :)

Here is the (ugly) toy example :)
import cudf
from io import StringIO

data = """a,b,c
1595802,1611:0.92,0
1595802,1610:0.07,1
1524246,1807:0.92,0
1524246,1608:0.07,1"""

# Read and split (proxy for dask_cudf)
gdf = cudf.read_csv(StringIO(data))
gdf1 = gdf[:3]
gdf2 = gdf[3:]

# Groupby on each partition
gb1_0 = gdf1.groupby("a").agg({"b":list, "c": [list, "sum"]}).reset_index()
gb2_0 = gdf2.groupby("a").agg({"b":list, "c": [list, "sum"]}).reset_index()

# Flatten Column Names
gb1_0.columns = [f"{col}_{agg}" if agg else col for col, agg in gb1_0.columns]
gb2_0.columns = [f"{col}_{agg}" if agg else col for col, agg in gb2_0.columns]

# Explode the output of each partition for list columns
gb1_0_b = gb1_0[["a", "b_list"]].explode("b_list")
gb1_0_c = gb1_0[["a", "c_list"]].explode("c_list")
gb1_0.drop(columns=["b_list", "c_list"], inplace=True)

gb2_0_b = gb2_0[["a", "b_list"]].explode("b_list")
gb2_0_c = gb2_0[["a", "c_list"]].explode("c_list")
gb2_0.drop(columns=["b_list", "c_list"], inplace=True)

# Concatenate partition-wise results and perform another list agg
result = cudf.concat([gb1_0, gb2_0], ignore_index=True).reset_index(drop=True)
result = result.groupby("a").agg({"c_sum": ["sum"]})
result.columns = ["c_sum"]

gdf_b = cudf.concat([gb1_0_b, gb2_0_b], ignore_index=True).reset_index(drop=True)
gdf_c = cudf.concat([gb1_0_c, gb2_0_c], ignore_index=True).reset_index(drop=True)
gdf_bc = cudf.concat([gdf_b, gdf_c["c_list"]], axis=1)
result_bc = gdf_bc.groupby("a").agg({"b_list":list, "c_list": list})

# "Final" Result
cudf.concat([result, result_bc[["b_list", "c_list"]]], axis=1)

@beckernick
Copy link
Member Author

beckernick commented Apr 16, 2021

Oof, that's rough. I suspect there may be a way to do this or if not it might be nice to have some kind of list.flatten or ravel API like ndarray libraries. This example is brittle but perhaps useful.

import numpy as np
import pandas as pd
s = pd.Series([[[1, 2], [3, 4]], [[2, 3], [4, 5]]])
s.apply(np.ravel)
0    [1, 2, 3, 4]
1    [2, 3, 4, 5]
dtype: object

@vyasr
Copy link
Contributor

vyasr commented Apr 16, 2021

I actually am using list in the aggregation again, which redirects to Aggregation.collect() (looks like the internals have been updated recently so _AggregationFactory is now Aggregation)

elif callable(op):
if op is list:
agg = Aggregation.collect()

In case any questions come up, this change was me. You can see the diff on #7818 in case that helps you find something you were used to looking for elsewhere, and feel free to ping me if something there is confusing.

@charlesbluca
Copy link
Member

Would the use of applymap() be viable here? Something like

import itertools as it

df["b___list"].applymap(lambda x: list(it.chain.from_iterable(x)))

This would still require a unary_operator to be defined for the ListColumn.

@charlesbluca
Copy link
Member

@shwina is currently working on a ravel() method for series that should unblock this (#8006); still some discussion to be had around the handling of None there, i.e. what should happen for

In [42]: s = cudf.Series([[[1, 2], [3, 4], None], [[5, 6,], [7, 8]]])
In [43]: s.list.ravel()

versus

In [44]: s = cudf.Series([[[1, 2], [3, 4], [None]], [[5, 6,], [7, 8]]])
In [45]: s.list.ravel()

Although I don't think the first example should ever come up in dask-cudf's collect implementation.

rapids-bot bot pushed a commit that referenced this issue May 3, 2021
…#8048)

Redirects Python built-in functions to their named equivalent aggregation in Dask-cuDF; this ensures that the Dask-cuDF codepath is used for these aggregations regardless of how they're specified, see [#7812 (comment)](#7812 (comment)).

Authors:
  - Charles Blackmon-Luca (https://github.com/charlesbluca)

Approvers:
  - Keith Kraus (https://github.com/kkraus14)

URL: #8048
@github-actions
Copy link

This issue has been labeled inactive-30d due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d if there is no activity in the next 60 days.

@charlesbluca
Copy link
Member

Still waiting on #8006 to unblock the flattening issue; also #8279 resolves this blocker so that we don't need a try/except block.

@charlesbluca charlesbluca linked a pull request Jun 16, 2021 that will close this issue
@rapids-bot rapids-bot bot closed this as completed in #8045 Jul 6, 2021
rapids-bot bot pushed a commit that referenced this issue Jul 6, 2021
Closes #7812

Adds support for cuDF's `collect` aggregation in dask-cuDF.

Authors:
  - Charles Blackmon-Luca (https://github.com/charlesbluca)

Approvers:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

URL: #8045
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working dask Dask issue Python Affects Python cuDF API.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants