Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix quantile division / partition handling for dask-cudf sort on null dataframes #9259

Merged
merged 20 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8f5de21
Fix null sort test, add more failures
charlesbluca Sep 20, 2021
ea21e28
Sort quantile divisions for multi column case
charlesbluca Sep 20, 2021
8d187ef
Make sure values less than first quantile get mapped to the first res…
charlesbluca Sep 21, 2021
c9f430d
Check output index of sort_values
charlesbluca Sep 21, 2021
5e58ca8
Simplify last partition assignment
charlesbluca Oct 1, 2021
4bed8a3
Merge remote-tracking branch 'upstream/branch-21.12' into fix-9255
charlesbluca Oct 4, 2021
194cac0
Revert "Simplify last partition assignment"
charlesbluca Oct 4, 2021
b32cb5f
Merge remote-tracking branch 'upstream/branch-21.12' into fix-9255
charlesbluca Oct 7, 2021
b956c7c
Reimplement last partition assignment
charlesbluca Oct 8, 2021
55150d6
Add clarification for reset_index calls
charlesbluca Oct 12, 2021
0eaa4e7
Use np.random.seed(0) in null sort tests
charlesbluca Oct 12, 2021
7862a85
Update python/dask_cudf/dask_cudf/tests/test_sort.py
galipremsagar Oct 13, 2021
2afaa4d
Update python/dask_cudf/dask_cudf/tests/test_sort.py
galipremsagar Oct 13, 2021
b799e0a
style
galipremsagar Oct 13, 2021
0058cfa
Merge remote-tracking branch 'upstream/branch-21.12' into fix-9255
galipremsagar Oct 13, 2021
81cb66a
test single-threaded
galipremsagar Oct 13, 2021
7239a27
Merge remote-tracking branch 'upstream/branch-21.12' into fix-9255
charlesbluca Oct 13, 2021
bf3f7a4
Fix test failures from upstream merge
charlesbluca Oct 13, 2021
d5f807d
Remove commented out partitions setting
charlesbluca Oct 13, 2021
0952280
Update python/dask_cudf/dask_cudf/tests/test_sort.py
galipremsagar Oct 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions python/dask_cudf/dask_cudf/sorting.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ def set_index_post(df, index_name, drop, column_dtype):

def _set_partitions_pre(s, divisions):
partitions = divisions.searchsorted(s, side="right") - 1
partitions[
divisions.tail(1).searchsorted(s, side="right").astype("bool")
] = (len(divisions) - 2)
partitions[partitions < 0] = 0
partitions[partitions >= len(divisions) - 1] = len(divisions) - 2
return partitions


Expand Down Expand Up @@ -201,7 +200,7 @@ def quantile_divisions(df, by, npartitions):
divisions[col].iloc[-1] = chr(
ord(divisions[col].iloc[-1][0]) + 1
)
divisions = divisions.drop_duplicates()
divisions = divisions.drop_duplicates().sort_index()
Comment on lines -209 to +210
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why this is necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop_duplicates pushes null values first in the series/dataframe, which cause trouble later on in _set_partitions_pre since searchsorted is expecting null values to be place last; this is meant to be equivalent to a similar sorting we do for the single-column case:

divisions = sorted(
divisions.drop_duplicates().astype(dtype).to_arrow().tolist(),
key=lambda x: (x is None, x),
)

That being said, it looks like removing both of those sorts doesn't actually break any of dask-cudf's sorting tests now - it feels like something should be breaking here, as without the sorts we'll sometimes end up assigning the rows of our input dataframe to less unique partitions than the intended number of output partitions; essentially, for this step:

df3 = rearrange_by_column(
df2,
"_partitions",
max_branch=max_branch,
npartitions=len(divisions) - 1,
shuffle="tasks",
ignore_index=ignore_index,
).drop(columns=["_partitions"])

We would sometimes have npartitions == 3 but df["_partitions"].nunique() == 2, which in my head should cause an erroneous sort but isn't in any of the test cases.

I'm going to play around with this more and see if I can get a good test case for this situation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After digging into rearrange_by_column, I now understand that this shouldn't cause the sort to fail as long as all rows are assigned to properly ordered partitions (which they are with the additional check in _set_partitions_pre). However, we would still end up with a dataframe that has empty partitions - is that something we would want to avoid here by sorting divisions?

return divisions


Expand Down
27 changes: 12 additions & 15 deletions python/dask_cudf/dask_cudf/tests/test_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import cudf

import dask_cudf
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved
from dask_cudf.sorting import quantile_divisions


@pytest.mark.parametrize("by", ["a", "b", "c", "d", ["a", "b"], ["c", "d"]])
Expand All @@ -25,7 +24,7 @@ def test_sort_values(nelem, nparts, by):
with dask.config.set(scheduler="single-threaded"):
got = ddf.sort_values(by=by)
expect = df.sort_values(by=by)
dd.assert_eq(got, expect, check_index=False)
dd.assert_eq(got.reset_index(), expect.reset_index(), check_index=False)
charlesbluca marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.parametrize("by", ["a", "b", ["a", "b"]])
Expand Down Expand Up @@ -53,23 +52,21 @@ def test_sort_repartition():


@pytest.mark.parametrize("by", ["a", "b", ["a", "b"]])
def test_sort_values_with_nulls(by):
df = cudf.DataFrame(
@pytest.mark.parametrize(
"data",
[
{
"a": list(range(50)) + [None] * 50 + list(range(50, 100)),
"b": [None] * 100 + list(range(100, 150)),
}
)
ddf = dd.from_pandas(df, npartitions=10)

# assert that quantile divisions of dataframe contains nulls
divisions = quantile_divisions(ddf, by, ddf.npartitions)
if isinstance(divisions, list):
assert None in divisions
else:
assert all([divisions[col].has_nulls for col in by])
Comment on lines -68 to -73
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to remember why this check was here... Do we still want to check that the divisions includes null values, or was this check flawed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally this check was here because I wanted to verify that dataframes with nulls are sorted properly even when divisions contains nulls.

I ended up removing this because it isn't always necessarily true - we may end up adding a test case to this function that consists of a dataframe with nulls with a sort_values operation that doesn't lead to nulls in divisions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up removing this because it isn't always necessarily true

Okay, I see - This makes sense

},
{"a": list(range(15)) + [None] * 5, "b": list(reversed(range(20)))},
],
)
def test_sort_values_with_nulls(data, by):
df = cudf.DataFrame(data)
ddf = dd.from_pandas(df, npartitions=5)

got = ddf.sort_values(by=by)
expect = df.sort_values(by=by)

dd.assert_eq(got, expect)
dd.assert_eq(got.reset_index(), expect.reset_index(), check_index=False)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe sorting is non-deterministic for nulls here? In that case we may want to remove the reset_index and just test for equality on the values of the dataframe. Currently testing this out in #9264.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. Looks like the CI passed now too. Feel free to have this taken care of in this PR or in #9264, this should be good to go.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yikes, my approval triggered a merge since there was already a merge comment. Probably can you take care of it in #9264?