Skip to content

Commit

Permalink
Materialize window offsets as separate columns.
Browse files Browse the repository at this point in the history
  • Loading branch information
mythrocks committed Dec 9, 2020
1 parent 44ab142 commit 6b0a3b0
Showing 1 changed file with 43 additions and 44 deletions.
87 changes: 43 additions & 44 deletions cpp/src/rolling/grouped_rolling.cu
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,25 @@ std::tuple<size_type, size_type> get_null_bounds_for_timestamp_column(

using TimeT = int64_t; // Timestamp representations normalized to int64_t.

template <typename Calculator>
std::unique_ptr<column> expand_to_column(Calculator const& calc,
size_type const& num_rows,
rmm::mr::device_memory_resource* mr)
{
auto window_column = cudf::make_fixed_width_column(cudf::data_type{type_id::INT32},
num_rows,
cudf::mask_state::UNALLOCATED,
rmm::cuda_stream_default,
mr);

auto begin = thrust::make_transform_iterator(thrust::make_counting_iterator<size_type>(0), calc);

thrust::copy_n(
rmm::exec_policy()->on(0), begin, num_rows, window_column->mutable_view().data<int32_t>());

return window_column;
}

/// Time-range window computation, with
/// 1. no grouping keys specified
/// 2. timetamps in ASCENDING order.
Expand Down Expand Up @@ -279,6 +298,8 @@ std::unique_ptr<column> time_range_window_ASC(column_view const& input,
1; // Add 1, for `preceding` to account for current row.
};

auto preceding_column = expand_to_column(preceding_calculator, input.size(), mr);

auto following_calculator =
[nulls_begin_idx,
nulls_end_idx,
Expand Down Expand Up @@ -310,17 +331,10 @@ std::unique_ptr<column> time_range_window_ASC(column_view const& input,
1;
};

return cudf::detail::rolling_window(
input,
empty_like(input)->view(),
thrust::make_transform_iterator(thrust::make_counting_iterator<size_type>(0),
preceding_calculator),
thrust::make_transform_iterator(thrust::make_counting_iterator<size_type>(0),
following_calculator),
min_periods,
aggr,
rmm::cuda_stream_default,
mr);
auto following_column = expand_to_column(following_calculator, input.size(), mr);

return cudf::rolling_window(
input, preceding_column->view(), following_column->view(), min_periods, aggr, mr);
}

/// Given a timestamp column grouped as specified in group_offsets,
Expand Down Expand Up @@ -449,6 +463,8 @@ std::unique_ptr<column> time_range_window_ASC(
1; // Add 1, for `preceding` to account for current row.
};

auto preceding_column = expand_to_column(preceding_calculator, input.size(), mr);

auto following_calculator =
[d_group_offsets = group_offsets.data().get(),
d_group_labels = group_labels.data().get(),
Expand Down Expand Up @@ -491,17 +507,10 @@ std::unique_ptr<column> time_range_window_ASC(
1;
};

return cudf::detail::rolling_window(
input,
empty_like(input)->view(),
thrust::make_transform_iterator(thrust::make_counting_iterator<size_type>(0),
preceding_calculator),
thrust::make_transform_iterator(thrust::make_counting_iterator<size_type>(0),
following_calculator),
min_periods,
aggr,
rmm::cuda_stream_default,
mr);
auto following_column = expand_to_column(following_calculator, input.size(), mr);

return cudf::rolling_window(
input, preceding_column->view(), following_column->view(), min_periods, aggr, mr);
}

/// Time-range window computation, with
Expand Down Expand Up @@ -555,6 +564,8 @@ std::unique_ptr<column> time_range_window_DESC(column_view const& input,
1; // Add 1, for `preceding` to account for current row.
};

auto preceding_column = expand_to_column(preceding_calculator, input.size(), mr);

auto following_calculator =
[nulls_begin_idx,
nulls_end_idx,
Expand Down Expand Up @@ -587,17 +598,10 @@ std::unique_ptr<column> time_range_window_DESC(column_view const& input,
1;
};

return cudf::detail::rolling_window(
input,
empty_like(input)->view(),
thrust::make_transform_iterator(thrust::make_counting_iterator<size_type>(0),
preceding_calculator),
thrust::make_transform_iterator(thrust::make_counting_iterator<size_type>(0),
following_calculator),
min_periods,
aggr,
rmm::cuda_stream_default,
mr);
auto following_column = expand_to_column(following_calculator, input.size(), mr);

return cudf::rolling_window(
input, preceding_column->view(), following_column->view(), min_periods, aggr, mr);
}

// Time-range window computation, for timestamps in DESCENDING order.
Expand Down Expand Up @@ -658,6 +662,8 @@ std::unique_ptr<column> time_range_window_DESC(
1; // Add 1, for `preceding` to account for current row.
};

auto preceding_column = expand_to_column(preceding_calculator, input.size(), mr);

auto following_calculator =
[d_group_offsets = group_offsets.data().get(),
d_group_labels = group_labels.data().get(),
Expand Down Expand Up @@ -699,20 +705,13 @@ std::unique_ptr<column> time_range_window_DESC(
1;
};

auto following_column = expand_to_column(following_calculator, input.size(), mr);

if (aggr->kind == aggregation::CUDA || aggr->kind == aggregation::PTX) {
CUDF_FAIL("Time ranged rolling window does NOT (yet) support UDF.");
} else {
return cudf::detail::rolling_window(
input,
empty_like(input)->view(),
thrust::make_transform_iterator(thrust::make_counting_iterator<size_type>(0),
preceding_calculator),
thrust::make_transform_iterator(thrust::make_counting_iterator<size_type>(0),
following_calculator),
min_periods,
aggr,
rmm::cuda_stream_default,
mr);
return cudf::rolling_window(
input, preceding_column->view(), following_column->view(), min_periods, aggr, mr);
}
}

Expand Down

0 comments on commit 6b0a3b0

Please sign in to comment.