Skip to content

Commit

Permalink
Fix rolling-window count for null input (#6344)
Browse files Browse the repository at this point in the history
Closes #6343. Fixes COUNT_ALL, COUNT_VALID for window functions. In rolling_window() operations, COUNT_VALID/COUNT_ALL should only return null rows if the min_periods requirement is not satisfied. For all other cases, the count produced must be valid, even if the input row is null.
  • Loading branch information
mythrocks authored Oct 13, 2020
1 parent 4117d37 commit 4d5db1f
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 70 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@
- PR #6304 Fix span_tests.cu includes
- PR #6331 Avoids materializing `RangeIndex` during frame concatnation (when not needed)
- PR #6278 Add filter tests for struct columns
- PR #6344 Fix rolling-window count for null input
- PR #6353 Rename `skip_rows` parameter to `skiprows` in `read_parquet`, `read_avro` and `read_orc`
- PR #6361 Detect overflow in hash join
- PR #6397 Fix `build.sh` when `PARALLEL_LEVEL` environment variable isn't set
Expand Down
124 changes: 79 additions & 45 deletions cpp/src/rolling/rolling.cu
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
#include <thrust/binary_search.h>
#include <rmm/device_scalar.hpp>

#include <thrust/detail/execution_policy.h>
#include <thrust/iterator/counting_iterator.h>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/utilities/device_operators.cuh>
#include <cudf/utilities/error.hpp>
Expand All @@ -53,32 +55,66 @@ namespace cudf {
namespace detail {
namespace { // anonymous
/**
* @brief Only count operation is executed and count is updated
* @brief Only COUNT_VALID operation is executed and count is updated
* depending on `min_periods` and returns true if it was
* valid, else false.
*/
template <typename InputType,
typename OutputType,
typename agg_op,
aggregation::Kind op,
bool has_nulls>
std::enable_if_t<op == aggregation::COUNT_VALID || op == aggregation::COUNT_ALL, bool> __device__
process_rolling_window(column_device_view input,
column_device_view ignored_default_outputs,
mutable_column_device_view output,
size_type start_index,
size_type end_index,
size_type current_index,
size_type min_periods)
bool has_nulls,
std::enable_if_t<op == aggregation::COUNT_VALID>* = nullptr>
bool __device__ process_rolling_window(column_device_view input,
column_device_view ignored_default_outputs,
mutable_column_device_view output,
size_type start_index,
size_type end_index,
size_type current_index,
size_type min_periods)
{
// declare this as volatile to avoid some compiler optimizations that lead to incorrect results
// for CUDA 10.0 and below (fixed in CUDA 10.1)
volatile cudf::size_type count = 0;

for (size_type j = start_index; j < end_index; j++) {
if (op == aggregation::COUNT_ALL || !has_nulls || input.is_valid(j)) { count++; }
bool output_is_valid = ((end_index - start_index) >= min_periods);

if (output_is_valid) {
if (!has_nulls) {
count = end_index - start_index;
} else {
count = thrust::count_if(thrust::seq,
thrust::make_counting_iterator(start_index),
thrust::make_counting_iterator(end_index),
[&input](auto i) { return input.is_valid_nocheck(i); });
}
output.element<OutputType>(current_index) = count;
}

return output_is_valid;
}

/**
* @brief Only COUNT_ALL operation is executed and count is updated
* depending on `min_periods` and returns true if it was
* valid, else false.
*/
template <typename InputType,
typename OutputType,
typename agg_op,
aggregation::Kind op,
bool has_nulls,
std::enable_if_t<op == aggregation::COUNT_ALL>* = nullptr>
bool __device__ process_rolling_window(column_device_view input,
column_device_view ignored_default_outputs,
mutable_column_device_view output,
size_type start_index,
size_type end_index,
size_type current_index,
size_type min_periods)
{
cudf::size_type count = end_index - start_index;

bool output_is_valid = (count >= min_periods);
output.element<OutputType>(current_index) = count;

Expand All @@ -94,15 +130,15 @@ template <typename InputType,
typename OutputType,
typename agg_op,
aggregation::Kind op,
bool has_nulls>
std::enable_if_t<op == aggregation::ROW_NUMBER, bool> __device__
process_rolling_window(column_device_view input,
column_device_view ignored_default_outputs,
mutable_column_device_view output,
size_type start_index,
size_type end_index,
size_type current_index,
size_type min_periods)
bool has_nulls,
std::enable_if_t<op == aggregation::ROW_NUMBER>* = nullptr>
bool __device__ process_rolling_window(column_device_view input,
column_device_view ignored_default_outputs,
mutable_column_device_view output,
size_type start_index,
size_type end_index,
size_type current_index,
size_type min_periods)
{
bool output_is_valid = ((end_index - start_index) >= min_periods);
output.element<OutputType>(current_index) = ((current_index - start_index) + 1);
Expand Down Expand Up @@ -218,17 +254,16 @@ template <typename InputType,
typename OutputType,
typename agg_op,
aggregation::Kind op,
bool has_nulls>
std::enable_if_t<(op == aggregation::ARGMIN or op == aggregation::ARGMAX) and
std::is_same<InputType, cudf::string_view>::value,
bool>
__device__ process_rolling_window(column_device_view input,
column_device_view ignored_default_outputs,
mutable_column_device_view output,
size_type start_index,
size_type end_index,
size_type current_index,
size_type min_periods)
bool has_nulls,
std::enable_if_t<(op == aggregation::ARGMIN or op == aggregation::ARGMAX) and
std::is_same<InputType, cudf::string_view>::value>* = nullptr>
bool __device__ process_rolling_window(column_device_view input,
column_device_view ignored_default_outputs,
mutable_column_device_view output,
size_type start_index,
size_type end_index,
size_type current_index,
size_type min_periods)
{
// declare this as volatile to avoid some compiler optimizations that lead to incorrect results
// for CUDA 10.0 and below (fixed in CUDA 10.1)
Expand Down Expand Up @@ -263,19 +298,18 @@ template <typename InputType,
typename OutputType,
typename agg_op,
aggregation::Kind op,
bool has_nulls>
std::enable_if_t<!std::is_same<InputType, cudf::string_view>::value and
!(op == aggregation::COUNT_VALID || op == aggregation::COUNT_ALL ||
op == aggregation::ROW_NUMBER || op == aggregation::LEAD ||
op == aggregation::LAG),
bool>
__device__ process_rolling_window(column_device_view input,
column_device_view ignored_default_outputs,
mutable_column_device_view output,
size_type start_index,
size_type end_index,
size_type current_index,
size_type min_periods)
bool has_nulls,
std::enable_if_t<!std::is_same<InputType, cudf::string_view>::value and
!(op == aggregation::COUNT_VALID || op == aggregation::COUNT_ALL ||
op == aggregation::ROW_NUMBER || op == aggregation::LEAD ||
op == aggregation::LAG)>* = nullptr>
bool __device__ process_rolling_window(column_device_view input,
column_device_view ignored_default_outputs,
mutable_column_device_view output,
size_type start_index,
size_type end_index,
size_type current_index,
size_type min_periods)
{
// declare this as volatile to avoid some compiler optimizations that lead to incorrect results
// for CUDA 10.0 and below (fixed in CUDA 10.1)
Expand Down
4 changes: 2 additions & 2 deletions cpp/tests/grouped_rolling/grouped_rolling_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ class GroupedRollingTest : public cudf::test::BaseFixture {
if (include_nulls || !input.nullable() || cudf::bit_is_set(valid_mask, j)) count++;
}

ref_valid[i] = (count >= min_periods);
ref_valid[i] = ((end_index - start_index) >= min_periods);
if (ref_valid[i]) ref_data[i] = count;
}

Expand Down Expand Up @@ -861,7 +861,7 @@ class GroupedTimeRangeRollingTest : public cudf::test::BaseFixture {
if (include_nulls || !input.nullable() || cudf::bit_is_set(valid_mask, j)) count++;
}

ref_valid[i] = (count >= min_periods);
ref_valid[i] = ((end_index - start_index) >= min_periods);
if (ref_valid[i]) ref_data[i] = count;
}

Expand Down
6 changes: 3 additions & 3 deletions cpp/tests/rolling/rolling_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ TEST_F(RollingStringTest, MinPeriods)
cudf::test::strings_column_wrapper expected_max(
{"This", "test", "test", "test", "test", "string", "string", "string", "string"},
{0, 0, 0, 0, 1, 1, 1, 0, 0});
fixed_width_column_wrapper<size_type> expected_count_val({0, 2, 2, 2, 3, 3, 3, 3, 2},
{0, 0, 0, 0, 1, 1, 1, 0, 0});
fixed_width_column_wrapper<size_type> expected_count_val({1, 2, 1, 2, 3, 3, 3, 2, 2},
{1, 1, 1, 1, 1, 1, 1, 1, 0});
fixed_width_column_wrapper<size_type> expected_count_all({3, 4, 4, 4, 4, 4, 4, 3, 2},
{0, 1, 1, 1, 1, 1, 1, 0, 0});

Expand Down Expand Up @@ -248,7 +248,7 @@ class RollingTest : public cudf::test::BaseFixture {
if (include_nulls || !input.nullable() || cudf::bit_is_set(valid_mask, j)) count++;
}

ref_valid[i] = (count >= min_periods);
ref_valid[i] = ((end_index - start_index) >= min_periods);
if (ref_valid[i]) ref_data[i] = count;
}

Expand Down
4 changes: 2 additions & 2 deletions python/cudf/cudf/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,6 @@ def _apply_agg_dataframe(self, df, agg_name):
return result_df

def _apply_agg(self, agg_name):
if agg_name == "count" and not self._time_window:
self.min_periods = 0
if isinstance(self.obj, cudf.Series):
return self._apply_agg_series(self.obj, agg_name)
else:
Expand Down Expand Up @@ -388,6 +386,8 @@ def _window_to_window_sizes(self, window):
)

def _apply_agg(self, agg_name):
if agg_name == "count" and not self._time_window:
self.min_periods = 0
index = cudf.MultiIndex.from_frame(
cudf.DataFrame(
{
Expand Down
73 changes: 55 additions & 18 deletions python/cudf/cudf/tests/test_rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
@pytest.mark.parametrize("agg", ["sum", "min", "max", "mean", "count"])
@pytest.mark.parametrize("nulls", ["none", "one", "some", "all"])
@pytest.mark.parametrize("center", [True, False])
def test_rollling_series_basic(data, index, agg, nulls, center):
def test_rolling_series_basic(data, index, agg, nulls, center):
if PANDAS_GE_110:
kwargs = {"check_freq": False}
else:
Expand All @@ -47,15 +47,7 @@ def test_rollling_series_basic(data, index, agg, nulls, center):
got = getattr(
gsr.rolling(window_size, min_periods, center), agg
)().fillna(-1)
try:
assert_eq(expect, got, check_dtype=False, **kwargs)
except AssertionError as e:
if agg == "count" and data != []:
pytest.xfail(
reason="Differ from Pandas behavior for count"
)
else:
raise e
assert_eq(expect, got, check_dtype=False, **kwargs)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -97,17 +89,24 @@ def test_rolling_dataframe_basic(data, agg, nulls, center):
got = getattr(
gdf.rolling(window_size, min_periods, center), agg
)().fillna(-1)
try:
assert_eq(expect, got, check_dtype=False)
except AssertionError as e:
if agg == "count" and len(pdf) > 0:
pytest.xfail(reason="Differ from pandas behavior here")
else:
raise e
assert_eq(expect, got, check_dtype=False)


@pytest.mark.parametrize(
"agg", ["sum", pytest.param("min"), pytest.param("max"), "mean", "count"]
"agg",
[
pytest.param("sum"),
pytest.param("min"),
pytest.param("max"),
pytest.param("mean"),
pytest.param(
"count", # Does not follow similar conventions as
# with non-offset columns
marks=pytest.mark.xfail(
reason="Differs from pandas behaviour here"
),
),
],
)
def test_rolling_with_offset(agg):
psr = pd.Series(
Expand All @@ -129,6 +128,44 @@ def test_rolling_with_offset(agg):
)


def test_rolling_count_with_offset():
"""
This test covers the xfail case from test_rolling_with_offset["count"].
It is expected that count should return a non-Nan value, even if
the counted value is a Nan, unless the min-periods condition
is not met.
This behaviour is consistent with counts for rolling-windows,
in the non-offset window case.
"""
psr = pd.Series(
[1, 2, 4, 4, np.nan, 9],
index=[
pd.Timestamp("20190101 09:00:00"),
pd.Timestamp("20190101 09:00:01"),
pd.Timestamp("20190101 09:00:02"),
pd.Timestamp("20190101 09:00:04"),
pd.Timestamp("20190101 09:00:07"),
pd.Timestamp("20190101 09:00:08"),
],
)
gsr = cudf.from_pandas(psr)
assert_eq(
getattr(gsr.rolling("2s"), "count")().fillna(-1),
pd.Series(
[1, 2, 2, 1, 0, 1],
index=[
pd.Timestamp("20190101 09:00:00"),
pd.Timestamp("20190101 09:00:01"),
pd.Timestamp("20190101 09:00:02"),
pd.Timestamp("20190101 09:00:04"),
pd.Timestamp("20190101 09:00:07"),
pd.Timestamp("20190101 09:00:08"),
],
),
check_dtype=False,
)


def test_rolling_getattr():
pdf = pd.DataFrame({"a": [1, 2, 3, 4], "b": [1, 2, 3, 4]})
gdf = cudf.from_pandas(pdf)
Expand Down

0 comments on commit 4d5db1f

Please sign in to comment.