-
Notifications
You must be signed in to change notification settings - Fork 891
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] Groupby collect list fails with Dask #7812
Comments
I wonder if we might be able to support this by switching the Dask implementation to use cuDF forbids the iteration needed for |
I haven't looked through this deeply, but the quickest/easiest "fix" may be to add |
Following @beckernick's approach, we would also need to change the Dask implementation's Is there any way we could change the lambda in |
While working on adding cudf/python/dask_cudf/dask_cudf/groupby.py Lines 287 to 290 in 348ad4d
If _global_set = _global_set.union(set(arg[col])) Since |
Good catch @charlesbluca ! Do you have time to submit a fix to cudf? Note that you can also include it in the |
Sure! I opened up a PR for this at #7959; I agree that it's probably best handled separately. |
After chatting with @rjzamora, some other thoughts on the supported optimized aggs in Dask-cuDF:
df = pd.DataFrame(dict(a=[1, 1, 2, 3, 3, 1, 1, 2, 3, 3, 99, 10, 1],
b=[1, 3, 10, 3, 2, 1, 3, 10, 3, 3, 12, 0, 9]]))
gdf = cudf.from_pandas(df)
gddf = dask_cudf.from_cudf(gdf, 3)
print(gddf.groupby("a").agg({"b": "max"}).compute()) # uses dask-cudf's codepath
print(gddf.groupby("a").agg({"b": max}).compute()) # uses dask's codepath
from io import StringIO
data = """a,b
1595802,1611:0.92
1595802,1610:0.07
1524246,1807:0.92
1524246,1608:0.07"""
df = pd.read_csv(StringIO(data))
ddf = dd.from_pandas(df, 2)
print(ddf.groupby("a").agg({"b": list}).compute())
print(ddf.groupby("a").agg({"b": "list"}).compute()) # both have identical output This falls out of the scope of this issue, so for now I'll make a PR that adds support for the callable |
Thanks @charlesbluca ! I agree that the priority is to support the pandas/cudf As we discussed, it is somewhat of a coincidence that the |
After adding cudf/python/dask_cudf/dask_cudf/groupby.py Line 258 in 348ad4d
Due an invalid
Is this related to #7611? |
I see - This definitely seems like somethin we should ultimately resolve in collect. However, since you are taking the dask_cudf route here, perhaps a simple fix is to fall back on try:
_meta = ddf._meta.groupby(gb_cols, as_index=as_index).agg(_aggs)
except NotImplementedError:
_meta = ddf._meta_nonempty.groupby(gb_cols, as_index=as_index).agg(_aggs) |
Something else I noticed that isn't necessarily a bug, but does conflict with Dask.dataframe (not sure about Pandas); it looks like cudf/python/dask_cudf/dask_cudf/groupby.py Lines 321 to 323 in 8a504d1
This conflicts with @beckernick's example, which running through dask-cudf's codepath would raise a |
Right - It is certainly true that |
Thanks for the clarification @rjzamora - added a check for callables after the
That makes sense - I imagine that might make more sense in a separate PR, but I can add it in this one. |
Currently have a naive approach to the >>> print(gddf.groupby("a").agg({"b": list}).compute())
b
a
1524246 [[1807:0.92, 1608:0.07]]
1595802 [[1611:0.92], [1610:0.07]] From here, should we:
I would imagine the latter is preferred, but requires C++ code that I'm not really familiar with. |
Is there a reason you need to use _AggregationFactory.collect explicitly, rather than just including Regarding list-column flattening: I think this is the key challenge for supporting list aggregation :). It may make sense to add an arg to _tree_node_agg and _finalize_gb_agg to specify that the input should be flattened (if a "*_list" column is present), and then you can add a single utility function to perform this flattening. (Note that I haven't thought through this, so my understanding may be naive). EDIT: It looks like |
Thanks for the tip! I'll try out
I actually am using cudf/python/cudf/cudf/_lib/aggregation.pyx Lines 284 to 286 in 025c56a
|
I see - that makes sense! |
Just tried out >>> gb["b___list"]
a
1524246 [[1807:0.92, 1608:0.07]]
1595802 [[1611:0.92], [1610:0.07]]
Name: b___list, dtype: list
>>> gb["b___list"].explode()
a
1524246 [1807:0.92, 1608:0.07]
1595802 [1611:0.92]
1595802 [1610:0.07]
Name: b___list, dtype: list |
Boo. Thats a shame :/ |
>>> gb["b___list"]
a
1524246 [[1807:0.92, 1608:0.07]]
1595802 [[1611:0.92], [1610:0.07]]
Name: b___list, dtype: list This should expose the list.leaves and the offsets. Is it possible to use these to zero-copy construct a fresh flattened ListColumn @galipremsagar ? |
Yeah - Good thought Nick. It would be great if there was a simple way to remove the outer-most offsets. If there turns out to be no convenient way to "flatten" the list column in the way we need, we can still use Here is the (ugly) toy example :)import cudf
from io import StringIO
data = """a,b,c
1595802,1611:0.92,0
1595802,1610:0.07,1
1524246,1807:0.92,0
1524246,1608:0.07,1"""
# Read and split (proxy for dask_cudf)
gdf = cudf.read_csv(StringIO(data))
gdf1 = gdf[:3]
gdf2 = gdf[3:]
# Groupby on each partition
gb1_0 = gdf1.groupby("a").agg({"b":list, "c": [list, "sum"]}).reset_index()
gb2_0 = gdf2.groupby("a").agg({"b":list, "c": [list, "sum"]}).reset_index()
# Flatten Column Names
gb1_0.columns = [f"{col}_{agg}" if agg else col for col, agg in gb1_0.columns]
gb2_0.columns = [f"{col}_{agg}" if agg else col for col, agg in gb2_0.columns]
# Explode the output of each partition for list columns
gb1_0_b = gb1_0[["a", "b_list"]].explode("b_list")
gb1_0_c = gb1_0[["a", "c_list"]].explode("c_list")
gb1_0.drop(columns=["b_list", "c_list"], inplace=True)
gb2_0_b = gb2_0[["a", "b_list"]].explode("b_list")
gb2_0_c = gb2_0[["a", "c_list"]].explode("c_list")
gb2_0.drop(columns=["b_list", "c_list"], inplace=True)
# Concatenate partition-wise results and perform another list agg
result = cudf.concat([gb1_0, gb2_0], ignore_index=True).reset_index(drop=True)
result = result.groupby("a").agg({"c_sum": ["sum"]})
result.columns = ["c_sum"]
gdf_b = cudf.concat([gb1_0_b, gb2_0_b], ignore_index=True).reset_index(drop=True)
gdf_c = cudf.concat([gb1_0_c, gb2_0_c], ignore_index=True).reset_index(drop=True)
gdf_bc = cudf.concat([gdf_b, gdf_c["c_list"]], axis=1)
result_bc = gdf_bc.groupby("a").agg({"b_list":list, "c_list": list})
# "Final" Result
cudf.concat([result, result_bc[["b_list", "c_list"]]], axis=1) |
Oof, that's rough. I suspect there may be a way to do this or if not it might be nice to have some kind of import numpy as np
import pandas as pd
s = pd.Series([[[1, 2], [3, 4]], [[2, 3], [4, 5]]])
s.apply(np.ravel)
0 [1, 2, 3, 4]
1 [2, 3, 4, 5]
dtype: object |
In case any questions come up, this change was me. You can see the diff on #7818 in case that helps you find something you were used to looking for elsewhere, and feel free to ping me if something there is confusing. |
Would the use of import itertools as it
df["b___list"].applymap(lambda x: list(it.chain.from_iterable(x))) This would still require a |
@shwina is currently working on a In [42]: s = cudf.Series([[[1, 2], [3, 4], None], [[5, 6,], [7, 8]]])
In [43]: s.list.ravel() versus In [44]: s = cudf.Series([[[1, 2], [3, 4], [None]], [[5, 6,], [7, 8]]])
In [45]: s.list.ravel() Although I don't think the first example should ever come up in dask-cudf's collect implementation. |
…#8048) Redirects Python built-in functions to their named equivalent aggregation in Dask-cuDF; this ensures that the Dask-cuDF codepath is used for these aggregations regardless of how they're specified, see [#7812 (comment)](#7812 (comment)). Authors: - Charles Blackmon-Luca (https://github.com/charlesbluca) Approvers: - Keith Kraus (https://github.com/kkraus14) URL: #8048
This issue has been labeled |
Still waiting on #8006 to unblock the flattening issue; also #8279 resolves this blocker so that we don't need a try/except block. |
Closes #7812 Adds support for cuDF's `collect` aggregation in dask-cuDF. Authors: - Charles Blackmon-Luca (https://github.com/charlesbluca) Approvers: - Richard (Rick) Zamora (https://github.com/rjzamora) URL: #8045
cuDF recently implemented groupby collect list. I'd like to be able to use this with Dask, like I can on the CPU. Currently, it looks like Dask's collect list goes a codepath that requires iterating through the object, which we explicitly don't permit (#7481). We may want to explore this in Dask or special case in Dask cuDF.
The text was updated successfully, but these errors were encountered: