diff --git a/python/dask_cudf/dask_cudf/sorting.py b/python/dask_cudf/dask_cudf/sorting.py index f9058ed9aed..5e17a3aa29f 100644 --- a/python/dask_cudf/dask_cudf/sorting.py +++ b/python/dask_cudf/dask_cudf/sorting.py @@ -29,9 +29,10 @@ def _set_partitions_pre(s, divisions, ascending=True): partitions = ( len(divisions) - divisions.searchsorted(s, side="right") - 1 ) - partitions[ - divisions.tail(1).searchsorted(s, side="right").astype("bool") - ] = ((len(divisions) - 2) if ascending else 0) + partitions[(partitions < 0) | (partitions >= len(divisions) - 1)] = ( + 0 if ascending else (len(divisions) - 2) + ) + partitions[s._columns[0].isna().values] = len(divisions) - 2 return partitions @@ -206,7 +207,7 @@ def quantile_divisions(df, by, npartitions): divisions[col].iloc[-1] = chr( ord(divisions[col].iloc[-1][0]) + 1 ) - divisions = divisions.drop_duplicates() + divisions = divisions.drop_duplicates().sort_index() return divisions diff --git a/python/dask_cudf/dask_cudf/tests/test_sort.py b/python/dask_cudf/dask_cudf/tests/test_sort.py index a93a3c783fa..036b2a9f50c 100644 --- a/python/dask_cudf/dask_cudf/tests/test_sort.py +++ b/python/dask_cudf/dask_cudf/tests/test_sort.py @@ -1,3 +1,4 @@ +import cupy as cp import numpy as np import pytest @@ -7,7 +8,6 @@ import cudf import dask_cudf -from dask_cudf.sorting import quantile_divisions @pytest.mark.parametrize("ascending", [True, False]) @@ -26,7 +26,9 @@ def test_sort_values(nelem, nparts, by, ascending): with dask.config.set(scheduler="single-threaded"): got = ddf.sort_values(by=by, ascending=ascending) expect = df.sort_values(by=by, ascending=ascending) - dd.assert_eq(got, expect, check_index=False) + + # check that sorted indices are identical + dd.assert_eq(got.reset_index(), expect.reset_index(), check_index=False) @pytest.mark.parametrize("ascending", [True, False]) @@ -56,23 +58,25 @@ def test_sort_repartition(): @pytest.mark.parametrize("ascending", [True, False]) @pytest.mark.parametrize("by", ["a", "b", ["a", "b"]]) -def test_sort_values_with_nulls(by, ascending): - df = cudf.DataFrame( +@pytest.mark.parametrize( + "data", + [ { "a": list(range(50)) + [None] * 50 + list(range(50, 100)), "b": [None] * 100 + list(range(100, 150)), - } - ) - ddf = dd.from_pandas(df, npartitions=10) - - # assert that quantile divisions of dataframe contains nulls - divisions = quantile_divisions(ddf, by, ddf.npartitions) - if isinstance(divisions, list): - assert None in divisions - else: - assert all([divisions[col].has_nulls for col in by]) + }, + {"a": list(range(15)) + [None] * 5, "b": list(reversed(range(20)))}, + ], +) +def test_sort_values_with_nulls(data, by, ascending): + np.random.seed(0) + cp.random.seed(0) + df = cudf.DataFrame(data) + ddf = dd.from_pandas(df, npartitions=5) with dask.config.set(scheduler="single-threaded"): got = ddf.sort_values(by=by, ascending=ascending) - expect = df.sort_values(by=by, ascending=ascending) - dd.assert_eq(got, expect) + expect = df.sort_values(by=by, ascending=ascending) + + # check that sorted indices are identical + dd.assert_eq(got.reset_index(), expect.reset_index(), check_index=False)