Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend range window queries to non-timestamp order-by columns #7866

Merged
merged 15 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conda/recipes/libcudf/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ test:
- test -f $PREFIX/include/cudf/replace.hpp
- test -f $PREFIX/include/cudf/reshape.hpp
- test -f $PREFIX/include/cudf/rolling.hpp
- test -f $PREFIX/include/cudf/rolling/range_window_bounds.hpp
- test -f $PREFIX/include/cudf/round.hpp
- test -f $PREFIX/include/cudf/scalar/scalar_factories.hpp
- test -f $PREFIX/include/cudf/scalar/scalar.hpp
Expand Down
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ add_library(cudf
src/reshape/tile.cu
src/rolling/grouped_rolling.cu
src/rolling/rolling.cu
src/rolling/range_window_bounds.cpp
src/round/round.cu
src/scalar/scalar.cpp
src/scalar/scalar_factories.cpp
Expand Down
163 changes: 152 additions & 11 deletions cpp/include/cudf/rolling.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <cudf/rolling/range_window_bounds.hpp>
#include <cudf/types.hpp>

#include <memory>
Expand Down Expand Up @@ -119,6 +120,7 @@ struct window_bounds {
{
}
};

/**
* @brief Applies a grouping-aware, fixed-size rolling window function to the values in a column.
*
Expand Down Expand Up @@ -264,7 +266,7 @@ std::unique_ptr<column> grouped_rolling_window(

/**
* @brief Applies a grouping-aware, timestamp-based rolling window function to the values in a
*column.
* column.
*
* Like `rolling_window()`, this function aggregates values in a window around each
* element of a specified `input` column. It differs from `rolling_window()` in two respects:
Expand Down Expand Up @@ -357,16 +359,36 @@ std::unique_ptr<column> grouped_time_range_rolling_window(
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @copydoc std::unique_ptr<column> grouped_time_range_rolling_window(
* table_view const& group_keys,
* column_view const& timestamp_column,
* cudf::order const& timestamp_order,
* column_view const& input,
* size_type preceding_window_in_days,
* size_type following_window_in_days,
* size_type min_periods,
* std::unique_ptr<aggregation> const& aggr,
* rmm::mr::device_memory_resource* mr)
* @brief Applies a grouping-aware, timestamp-based rolling window function to the values in a
* column,.
*
* @copydetails std::unique_ptr<column> grouped_time_range_rolling_window(
* table_view const& group_keys,
* column_view const& timestamp_column,
* cudf::order const& timestamp_order,
* column_view const& input,
* size_type preceding_window_in_days,
* size_type following_window_in_days,
* size_type min_periods,
* std::unique_ptr<aggregation> const& aggr,
* rmm::mr::device_memory_resource* mr)
*
* The `preceding_window_in_days` and `following_window_in_days` supports "unbounded" windows,
* if set to `window_bounds::unbounded()`.
*
* @param[in] group_keys The (pre-sorted) grouping columns
* @param[in] timestamp_column The (pre-sorted) timestamps for each row
* @param[in] timestamp_order The order (ASCENDING/DESCENDING) in which the timestamps are sorted
* @param[in] input The input column (to be aggregated)
* @param[in] preceding_window_in_days Possibly unbounded time-interval in the backward direction,
* specified as a `window_bounds`
* @param[in] following_window_in_days Possibly unbounded time-interval in the forward direction,
* specified as a `window_bounds`
* @param[in] min_periods Minimum number of observations in window required to have a value,
* otherwise element `i` is null.
* @param[in] aggr The rolling window aggregation type (SUM, MAX, MIN, etc.)
*
* @returns A nullable output column containing the rolling window results
*/
std::unique_ptr<column> grouped_time_range_rolling_window(
table_view const& group_keys,
Expand All @@ -379,6 +401,125 @@ std::unique_ptr<column> grouped_time_range_rolling_window(
std::unique_ptr<aggregation> const& aggr,
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Applies a grouping-aware, value range-based rolling window function to the values in a
* column.
*
* This function aggregates rows in a window around each element of a specified `input` column.
* The window is determined based on the values of an ordered `orderby` column, and on the values
* of a `preceding` and `following` scalar representing an inclusive range of orderby column values.
*
* 1. The elements of the `input` column are grouped into distinct groups (e.g. the result of a
* groupby), determined by the corresponding values of the columns under `group_keys`. The
* window-aggregation cannot cross the group boundaries.
* 2. Within a group, with all rows sorted by the `orderby` column, the aggregation window
* for a row at index `i` is determined as follows:
* a) If `orderby` is ASCENDING, aggregation window for row `i` includes all `input` rows at
* index `j` such that:
* @code{.pseudo}
* (orderby[i] - preceding) <= orderby[j] <= orderby[i] + following
* @endcode
* b) If `orderby` is DESCENDING, aggregation window for row `i` includes all `input` rows at
* index `j` such that:
* @code{.pseudo}
* (orderby[i] + preceding) >= orderby[j] >= orderby[i] - following
* @endcode
*
* Note: This method requires that the rows are presorted by the group keys and orderby column
* values.
*
* The window intervals are specified as scalar values appropriate for the orderby column.
* Currently, only the following combinations of `orderby` column type and range types
* are supported:
* 1. If `orderby` column is a TIMESTAMP, the `preceding`/`following` windows are specified
* in terms of lower resolution `DURATION` scalars.
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
* E.g. For `orderby` column of type `TIMESTAMP_SECONDS`, the intervals may be
* `DURATION_SECONDS` or `DURATION_DAYS`. Higher resolution durations (e.g.
* `DURATION_NANOSECONDS`) cannot be used with lower resolution timestamps.
* 2. If the `orderby` column is an integral type (e.g. `INT32`), the `preceding`/`following`
* should be the exact same type (`INT32`).
*
* @code{.pseudo}
* Example: Consider an motor-racing statistics dataset, containing the following columns:
* 1. driver_name: (STRING) Name of the car driver
* 2. num_overtakes: (INT32) Number of times the driver overtook another car in a lap
* 3. lap_number: (INT32) The number of the lap
*
* The `group_range_rolling_window()` function allows one to calculate the total number of overtakes
* each driver made within any 3 lap window of each entry:
* 1. Group/partition the dataset by `driver_id` (This is the group_keys argument.)
* 2. Sort each group by the `lap_number` (i.e. This is the orderby_column.)
* 3. Calculate the SUM(num_overtakes) over a window (preceding=1, following=1)
*
* For the following input:
*
* [ // driver_name, num_overtakes, lap_number
* { "bottas", 1, 1 },
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
* { "lewis", 2, 1 },
* { "bottas", 2, 2 },
* { "bottas", 1, 3 },
* { "lewis", 3, 1 },
* { "lewis", 8, 2 },
* { "bottas", 5, 7 },
* { "bottas", 6, 8 },
* { "lewis", 4, 4 }
* ]
*
* Partitioning (grouping) by `driver_name`, and ordering by `lap_number` yields the following
* `num_overtakes` vector (with 2 groups, one for each distinct `driver_name`):
*
* lap_number: [ 1, 2, 3, 7, 8, 1, 1, 2, 4 ]
* num_overtakes: [ 1, 2, 1, 5, 6, 2, 3, 8, 4 ]
* <-----bottas------>|<----lewis------>
*
* The SUM aggregation is applied, with 1 preceding, and 1 following, with a minimum of 1
* period. The aggregation window is thus 3 (laps) wide, yielding the following output column:
*
* Results: [ 3, 4, 3, 11, 11, 13, 13, 13, 4 ]
*
* @endcode
*
* Note: The number of rows participating in each window might vary, based on the index within the
* group, datestamp, and `min_periods`. Apropos:
* 1. results[0] considers 2 values, because it is at the beginning of its group, and has no
* preceding values.
* 2. results[5] considers 3 values, despite being at the beginning of its group. It must include 2
* following values, based on its orderby_column value.
*
* Each aggregation operation cannot cross group boundaries.
*
* The type of the returned column depends on the input column type `T`, and the aggregation:
* 1. COUNT returns `INT32` columns
* 2. MIN/MAX returns `T` columns
* 3. SUM returns the promoted type for T. Sum on `INT32` yields `INT64`.
* 4. MEAN returns FLOAT64 columns
* 5. COLLECT returns columns of type `LIST<T>`.
*
* LEAD/LAG/ROW_NUMBER are undefined for range queries.
*
* @param[in] group_keys The (pre-sorted) grouping columns
* @param[in] orderby_column The (pre-sorted) order-by column, for range comparisons
* @param[in] order The order (ASCENDING/DESCENDING) in which the order-by column is sorted
* @param[in] input The input column (to be aggregated)
* @param[in] preceding The interval value in the backward direction
* @param[in] following The interval value in the forward direction.
* @param[in] min_periods Minimum number of observations in window required to have a value,
* otherwise element `i` is null.
* @param[in] aggr The rolling window aggregation type (SUM, MAX, MIN, etc.)
*
* @returns A nullable output column containing the rolling window results
*/
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
std::unique_ptr<column> grouped_range_rolling_window(
table_view const& group_keys,
column_view const& orderby_column,
cudf::order const& order,
column_view const& input,
range_window_bounds const& preceding,
range_window_bounds const& following,
size_type min_periods,
std::unique_ptr<aggregation> const& aggr,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Applies a variable-size rolling window function to the values in a column.
*
Expand Down
77 changes: 77 additions & 0 deletions cpp/include/cudf/rolling/range_window_bounds.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/scalar/scalar.hpp>

namespace cudf {

/**
* @brief Abstraction for window boundary sizes, to be used with
* `grouped_range_rolling_window()`.
*
* Similar to `window_bounds` in `grouped_rolling_window()`, `range_window_bounds`
* represents window boundaries for use with `grouped_range_rolling_window()`.
* A window may be specified as either of the following:
* 1. A fixed-width numeric scalar value. E.g.
* a) A `DURATION_DAYS` scalar, for use with a `TIMESTAMP_DAYS` orderby column
* b) An `INT32` scalar, for use with an `INT32` orderby column
* 2. "unbounded", indicating that the bounds stretch to the first/last
* row in the group.
*/
struct range_window_bounds {
public:
/**
* @brief Factory method to construct a bounded window boundary.
*
* @param value Finite window boundary
*
*/
static range_window_bounds get(scalar const&);

/**
* @brief Factory method to construct an unbounded window boundary.
*
* @param @type The datatype of the window boundary
*/
static range_window_bounds unbounded(data_type type);

/**
* @brief Whether or not the window is unbounded
*
* @return true If window is unbounded
* @return false If window is of finite bounds
*/
bool is_unbounded() const { return _is_unbounded; }
mythrocks marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Returns the underlying scalar value for the bounds
*/
scalar const& range_scalar() const { return *_range_scalar; }

range_window_bounds(range_window_bounds const&) =
default; // Required to return (by copy) from functions.
range_window_bounds() = default; // Required for use as return types from dispatch functors.

private:
const bool _is_unbounded{true};
std::shared_ptr<scalar> _range_scalar{nullptr};
vuule marked this conversation as resolved.
Show resolved Hide resolved

range_window_bounds(bool is_unbounded_, scalar* range_scalar_);
};

} // namespace cudf
Loading