Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QST]problems with dask_cudf custom aggregation #7481

Closed
yuanqingz opened this issue Mar 2, 2021 · 3 comments
Closed

[QST]problems with dask_cudf custom aggregation #7481

yuanqingz opened this issue Mar 2, 2021 · 3 comments
Labels
dask Dask issue Python Affects Python cuDF API. question Further information is requested

Comments

@yuanqingz
Copy link

What is your question?
Hi there,

I'm trying to do a string-join aggregation in dask_cudf groupby dataframe. The input dataframe looks like below:
documents_categories.compute()

document_id kv
1595802 1611:0.92
1595802 1610:0.07
1524246 1807:0.92
1524246 1608:0.07

documents_categories.dtypes

document_id int64
kv object
dtype: object

The expected string-joined result should be:

document_id kv
1595802 1611:0.92;1610:0.07
1524246 1807:0.92;1608:0.07

I have tried the following codes and other several methods, but still can't get this function running successfully. I'm not a expert in dask_cudf, any suggestions? Thanks!

custom_join = dask.dataframe.Aggregation("custom_join", lambda x: x.str.join(";"), lambda y: y.str.join(";"))
documents_categories.groupby('document_id').agg({"kv": custom_join})
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
    179     try:
--> 180         yield
    181     except Exception as e:

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in _emulate(func, *args, **kwargs)
   5315     with raise_on_meta_error(funcname(func), udf=kwargs.pop("udf", False)):
-> 5316         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
   5317 

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/groupby.py in _groupby_apply_funcs(df, *index, **kwargs)
    920     for result_column, func, func_kwargs in funcs:
--> 921         r = func(grouped, **func_kwargs)
    922 

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/groupby.py in _apply_func_to_column(df_like, column, func)
    966 
--> 967     return func(df_like[column])
    968 

<ipython-input-45-5dd27ef25785> in <lambda>(x)
----> 1 custom_join = dask.dataframe.Aggregation("custom_join", lambda x: x.str.join(";"), lambda y: y.str.join(";"))

/opt/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/groupby/groupby.py in __getattribute__(self, key)
     62         try:
---> 63             return super().__getattribute__(key)
     64         except AttributeError:

AttributeError: 'SeriesGroupBy' object has no attribute 'str'

The above exception was the direct cause of the following exception:

ValueError                                Traceback (most recent call last)
<ipython-input-46-31b5ac92e045> in <module>
----> 1 documents_categories.groupby('document_id').agg({"kv": custom_join})

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/groupby.py in agg(self, arg, split_every, split_out)
   1846     @derived_from(pd.core.groupby.DataFrameGroupBy)
   1847     def agg(self, arg, split_every=None, split_out=1):
-> 1848         return self.aggregate(arg, split_every=split_every, split_out=split_out)
   1849 
   1850 

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask_cudf/groupby.py in aggregate(self, arg, split_every, split_out)
     81 
     82         return super().aggregate(
---> 83             arg, split_every=split_every, split_out=split_out
     84         )
     85 

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/groupby.py in aggregate(self, arg, split_every, split_out)
   1842             return self.size()
   1843 
-> 1844         return super().aggregate(arg, split_every=split_every, split_out=split_out)
   1845 
   1846     @derived_from(pd.core.groupby.DataFrameGroupBy)

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/groupby.py in aggregate(self, arg, split_every, split_out)
   1622             split_out=split_out,
   1623             split_out_setup=split_out_on_index,
-> 1624             sort=self.sort,
   1625         )
   1626 

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in apply_concat_apply(args, chunk, aggregate, combine, meta, token, chunk_kwargs, aggregate_kwargs, combine_kwargs, split_every, split_out, split_out_setup, split_out_setup_kwargs, sort, ignore_index, **kwargs)
   5267 
   5268     if meta is no_default:
-> 5269         meta_chunk = _emulate(chunk, *args, udf=True, **chunk_kwargs)
   5270         meta = _emulate(
   5271             aggregate, _concat([meta_chunk], ignore_index), udf=True, **aggregate_kwargs

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in _emulate(func, *args, **kwargs)
   5314     """
   5315     with raise_on_meta_error(funcname(func), udf=kwargs.pop("udf", False)):
-> 5316         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
   5317 
   5318 

/opt/conda/envs/rapids/lib/python3.7/contextlib.py in __exit__(self, type, value, traceback)
    128                 value = type()
    129             try:
--> 130                 self.gen.throw(type, value, traceback)
    131             except StopIteration as exc:
    132                 # Suppress StopIteration *unless* it's the same exception that

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
    199         )
    200         msg = msg.format(" in `{0}`".format(funcname) if funcname else "", repr(e), tb)
--> 201         raise ValueError(msg) from e
    202 
    203 

ValueError: Metadata inference failed in `_groupby_apply_funcs`.

You have supplied a custom function and Dask is unable to 
determine the type of output that that function returns. 

To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.

Original error is below:
------------------------
AttributeError("'SeriesGroupBy' object has no attribute 'str'")

Traceback:
---------
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/utils.py", line 180, in raise_on_meta_error
    yield
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py", line 5316, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/groupby.py", line 921, in _groupby_apply_funcs
    r = func(grouped, **func_kwargs)
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/groupby.py", line 967, in _apply_func_to_column
    return func(df_like[column])
  File "<ipython-input-45-5dd27ef25785>", line 1, in <lambda>
    custom_join = dask.dataframe.Aggregation("custom_join", lambda x: x.str.join(";"), lambda y: y.str.join(";"))
  File "/opt/conda/envs/rapids/lib/python3.7/site-packages/cudf/core/groupby/groupby.py", line 63, in __getattribute__
    return super().__getattribute__(key)
@yuanqingz yuanqingz added Needs Triage Need team to review and classify question Further information is requested labels Mar 2, 2021
@kkraus14 kkraus14 added Python Affects Python cuDF API. dask Dask issue and removed Needs Triage Need team to review and classify labels Mar 2, 2021
@github-actions
Copy link

github-actions bot commented Apr 1, 2021

This issue has been labeled inactive-30d due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d if there is no activity in the next 60 days.

@beckernick
Copy link
Member

beckernick commented Apr 1, 2021

@cocorosiekz we've recently implemented collect list. It looks like it's not cleanly working with Dask (I'll file an issue), but perhaps the following would work for you?

import cudf
import dask_cudf
from io import StringIO
​
​
data = """document_id   kv
1595802 1611:0.92
1595802 1610:0.07
1524246 1807:0.92
1524246 1608:0.07"""df = cudf.read_csv(StringIO(data), sep="\t")
ddf = dask_cudf.from_cudf(df, 2)
​
​
def collect_list_agg(df):
    return df.groupby("document_id").agg({"kv": list})
​
# ensure every row of a given key is in the same partition
partitioned = ddf.shuffle(on=["document_id"])
​
# run a within-partition cudf groupby collect list
print(partitioned.map_partitions(collect_list_agg).compute())
                                 kv
document_id                        
1595802      [1611:0.92, 1610:0.07]
1524246      [1807:0.92, 1608:0.07]

@yuanqingz
Copy link
Author

Thanks @beckernick ! The shuffle-then-map-partitions way works for me. But it would be great to use groupby-collect-list to solve this. I think we can close this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dask Dask issue Python Affects Python cuDF API. question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants