-
Notifications
You must be signed in to change notification settings - Fork 891
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix quantile division / partition handling for dask-cudf sort on null dataframes #9259
Changes from 9 commits
8f5de21
ea21e28
8d187ef
c9f430d
5e58ca8
4bed8a3
194cac0
b32cb5f
b956c7c
55150d6
0eaa4e7
7862a85
2afaa4d
b799e0a
0058cfa
81cb66a
7239a27
bf3f7a4
d5f807d
0952280
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,6 @@ | |
import cudf | ||
|
||
import dask_cudf | ||
galipremsagar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
from dask_cudf.sorting import quantile_divisions | ||
|
||
|
||
@pytest.mark.parametrize("by", ["a", "b", "c", "d", ["a", "b"], ["c", "d"]]) | ||
|
@@ -25,7 +24,7 @@ def test_sort_values(nelem, nparts, by): | |
with dask.config.set(scheduler="single-threaded"): | ||
got = ddf.sort_values(by=by) | ||
expect = df.sort_values(by=by) | ||
dd.assert_eq(got, expect, check_index=False) | ||
dd.assert_eq(got.reset_index(), expect.reset_index(), check_index=False) | ||
charlesbluca marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
@pytest.mark.parametrize("by", ["a", "b", ["a", "b"]]) | ||
|
@@ -53,23 +52,21 @@ def test_sort_repartition(): | |
|
||
|
||
@pytest.mark.parametrize("by", ["a", "b", ["a", "b"]]) | ||
def test_sort_values_with_nulls(by): | ||
df = cudf.DataFrame( | ||
@pytest.mark.parametrize( | ||
"data", | ||
[ | ||
{ | ||
"a": list(range(50)) + [None] * 50 + list(range(50, 100)), | ||
"b": [None] * 100 + list(range(100, 150)), | ||
} | ||
) | ||
ddf = dd.from_pandas(df, npartitions=10) | ||
|
||
# assert that quantile divisions of dataframe contains nulls | ||
divisions = quantile_divisions(ddf, by, ddf.npartitions) | ||
if isinstance(divisions, list): | ||
assert None in divisions | ||
else: | ||
assert all([divisions[col].has_nulls for col in by]) | ||
Comment on lines
-68
to
-73
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm trying to remember why this check was here... Do we still want to check that the divisions includes null values, or was this check flawed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Originally this check was here because I wanted to verify that dataframes with nulls are sorted properly even when I ended up removing this because it isn't always necessarily true - we may end up adding a test case to this function that consists of a dataframe with nulls with a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Okay, I see - This makes sense |
||
}, | ||
{"a": list(range(15)) + [None] * 5, "b": list(reversed(range(20)))}, | ||
], | ||
) | ||
def test_sort_values_with_nulls(data, by): | ||
df = cudf.DataFrame(data) | ||
ddf = dd.from_pandas(df, npartitions=5) | ||
|
||
got = ddf.sort_values(by=by) | ||
expect = df.sort_values(by=by) | ||
|
||
dd.assert_eq(got, expect) | ||
dd.assert_eq(got.reset_index(), expect.reset_index(), check_index=False) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe sorting is non-deterministic for nulls here? In that case we may want to remove the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good to me. Looks like the CI passed now too. Feel free to have this taken care of in this PR or in #9264, this should be good to go. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yikes, my approval triggered a merge since there was already a merge comment. Probably can you take care of it in #9264? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious why this is necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drop_duplicates
pushes null values first in the series/dataframe, which cause trouble later on in_set_partitions_pre
sincesearchsorted
is expecting null values to be place last; this is meant to be equivalent to a similar sorting we do for the single-column case:cudf/python/dask_cudf/dask_cudf/sorting.py
Lines 189 to 192 in dda5210
That being said, it looks like removing both of those sorts doesn't actually break any of dask-cudf's sorting tests now - it feels like something should be breaking here, as without the sorts we'll sometimes end up assigning the rows of our input dataframe to less unique partitions than the intended number of output partitions; essentially, for this step:
cudf/python/dask_cudf/dask_cudf/sorting.py
Lines 239 to 246 in dda5210
We would sometimes have
npartitions == 3
butdf["_partitions"].nunique() == 2
, which in my head should cause an erroneous sort but isn't in any of the test cases.I'm going to play around with this more and see if I can get a good test case for this situation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After digging into
rearrange_by_column
, I now understand that this shouldn't cause the sort to fail as long as all rows are assigned to properly ordered partitions (which they are with the additional check in_set_partitions_pre
). However, we would still end up with a dataframe that has empty partitions - is that something we would want to avoid here by sortingdivisions
?