Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend range window queries to non-timestamp order-by columns #7675

Closed
wants to merge 10 commits into from
1 change: 1 addition & 0 deletions conda/recipes/libcudf/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ test:
- test -f $PREFIX/include/cudf/replace.hpp
- test -f $PREFIX/include/cudf/reshape.hpp
- test -f $PREFIX/include/cudf/rolling.hpp
- test -f $PREFIX/include/cudf/rolling/range_window_bounds.hpp
- test -f $PREFIX/include/cudf/round.hpp
- test -f $PREFIX/include/cudf/scalar/scalar_factories.hpp
- test -f $PREFIX/include/cudf/scalar/scalar.hpp
Expand Down
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ add_library(cudf
src/rolling/jit/code/kernel.cpp
src/rolling/jit/code/operation.cpp
src/rolling/rolling.cu
src/rolling/range_window_bounds.cpp
src/round/round.cu
src/scalar/scalar.cpp
src/scalar/scalar_factories.cpp
Expand Down
115 changes: 115 additions & 0 deletions cpp/include/cudf/rolling.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <cudf/rolling/range_window_bounds.hpp>
#include <cudf/types.hpp>

#include <memory>
Expand Down Expand Up @@ -119,6 +120,7 @@ struct window_bounds {
{
}
};

/**
* @brief Applies a grouping-aware, fixed-size rolling window function to the values in a column.
*
Expand Down Expand Up @@ -379,6 +381,119 @@ std::unique_ptr<column> grouped_time_range_rolling_window(
std::unique_ptr<aggregation> const& aggr,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Applies a grouping-aware, value range-based rolling window function to the values in a
* column.
*
* This is a generalization of `grouped_time_range_rolling_window()` to work with non-timestamp
* columns. Like `grouped_time_range_rolling_window()`, this function aggregates values in a window
* around each element of a specified `input` column. It differs from
* `grouped_time_range_rolling_window()` in that instead of a timestamp column, the ordered column
* can be of integral types as well as timestamps.
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
*
* 1. The elements of the `input` column are grouped into distinct groups (e.g. the result of a
* groupby), determined by the corresponding values of the columns under `group_keys`. The
* window-aggregation cannot cross the group boundaries.
* 2. Within a group, the aggregation window is calculated based on an interval (e.g. number
* of days preceding/following the current row). The value intervals are applied on the
* `orderby_column` argument.
*
* Note: This method requires that the rows are presorted by the group keys and orderby column
* values.
*
* The window intervals are specified as scalar values appropriate for the orderby column:
* 1. If `orderby` column is a timestamp, the `preceding`/`following` windows are specified
* in terms of lower resolution `duration` scalars.
* E.g. For `orderby` column of type `TIMESTAMP_SECONDS`, the intervals may be
* `DURATION_SECONDS` or `DURATION_DAYS`. Higher resolution durations (e.g. `DURATION_NANOSECONDS`)
* cannot be used with lower resolution timestamps.
* 2. If the `orderby` column is an integral type (e.g. `INT32`), the `preceding`/`following`
* should be the exact same type (`INT32`).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems inconsistent. Seems like the timestamp interval should also be the same type. There's no loss of information in converting a lower resolution timestamp value to a higher resolution, yes? E.g., if the orderby column is seconds, and the internal is [-1, +1] days, then that's no different than [-86400, +86400] seconds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't grasped your point, I think.

The order-by column is the timestamp at a certain resolution. The window bounds are durations (with matching or lower resolution). The bounds can't be timestamps because they are relative intervals. (E.g. 7 days preceding, 86400 seconds following.)

The old API took size_type and interpreted them in days (I.e. lowest resolution).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I meant the resolution should be the same, not the exact type. Otherwise you need a double dispatch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise you need a double dispatch.

You're right; I'm doing the double-dispatch in this version of things. (By the way, is host-side dispatch expensive? My understanding is that the dispatch itself isn't more expensive than an if check. The real cost is in the casting.)

The reason this is permitted is to support how Spark/SQL specifies intervals at different resolutions. Would you recommend having the caller cast appropriately instead?

(The same will likely apply for decimals.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(By the way, is host-side dispatch expensive? My understanding is that the dispatch itself isn't more expensive than an if check. The real cost is in the casting.)

It's not really expensive in runtime, but can be quite expensive in compile time/binary size depending on the amount of code in the leaves of the double dispatched code paths.

The reason this is permitted is to support how Spark/SQL specifies intervals at different resolutions. Would you recommend having the caller cast appropriately instead?

Yes, the way I see it, it doesn't seem any different from how Pandas/Spark allow joining on columns of different types but libcudf doesn't support this. The caller is expected to perform any casting beforehand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... That'd certainly simplify certain other aspects of this feature.

Let me try pulling out the scaling part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, Jake. If the scaling feature is removed, I'd still need to check that the window-duration and the orderby-timstamps are of the same resolution.
I don't know how I'd do that without doing a double dispatch, short of long if-else.

Would you be averse to the double dispatch in this case?

Copy link
Contributor Author

@mythrocks mythrocks Mar 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, it took me a while to realize what should have been obvious to me: The scaling functionality is pre-existing. :/

The grouped_time_range...() function already permitted timestamp columns in any resolution, accepted the window widths in days, and scaled it to the timetamp's resolution. All that scale_to() does is to move the scaling logic into range_window_bounds.

Removing range_window_bounds::scale_to() would break existing functionality in grouped_time_range...(), and callers thereof (including Spark) who depend on the window widths being in DAYS.

For what it's worth, there isn't a way to avoid the double-dispatch anyway, even without scale_to(), because we'd still have to validate that the window range type (integral or duration) are compatible with the order-by column (integral or timestamp). This is currently checked as part of the scale_to call stack.

*
* @code{.pseudo}
* Example: Consider a user-sales dataset, where the rows look as follows:
* { "user_id", sales_amt, date }
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
*
* This method enables windowing queries such as grouping a dataset by `user_id`, sorting by
* increasing `date`, and summing up the `sales_amt` column over a window of 3 days (1 preceding
* day, the current day, and 1 following day).
*
* In this example,
* 1. `group_keys == [ user_id ]` of type `STRING`
* 2. `orderby_column == date` of type `timestamp_S`
* 3. `input == sales_amt` of type int
* The data are grouped by `user_id`, and ordered by `date`. The aggregation
* (SUM) is then calculated for a window of 3 days around (and including) each row.
*
* For the following input:
*
* [ // user, sales_amt, YYYYMMDDhhmmss (date)
* { "user1", 10, 20200101000000 },
* { "user2", 20, 20200101000000 },
* { "user1", 20, 20200102000000 },
* { "user1", 10, 20200103000000 },
* { "user2", 30, 20200101000000 },
* { "user2", 80, 20200102000000 },
* { "user1", 50, 20200107000000 },
* { "user1", 60, 20200107000000 },
* { "user2", 40, 20200104000000 }
* ]
*
* Partitioning (grouping) by `user_id`, and ordering by `date` yields the following `sales_amt`
* vector (with 2 groups, one for each distinct `user_id`):
*
* Date :(202001-) [ 01, 02, 03, 07, 07, 01, 01, 02, 04 ]
* Input: [ 10, 20, 10, 50, 60, 20, 30, 80, 40 ]
* <-------user1-------->|<---------user2--------->
*
* The SUM aggregation is applied, with 1 day preceding, and 1 day following, with a minimum of 1
* period. The aggregation window is thus 3 *days* wide, yielding the following output column:
*
* Results: [ 30, 40, 30, 110, 110, 130, 130, 130, 40 ]
*
* @endcode
*
* Note: The number of rows participating in each window might vary, based on the index within the
* group, datestamp, and `min_periods`. Apropos:
* 1. results[0] considers 2 values, because it is at the beginning of its group, and has no
* preceding values.
* 2. results[5] considers 3 values, despite being at the beginning of its group. It must include 2
* following values, based on its datestamp.
*
* Each aggregation operation cannot cross group boundaries.
*
* The type of the returned column depends on the input column type `T`, and the aggregation:
* 1. COUNT returns `INT32` columns
* 2. MIN/MAX returns `T` columns
* 3. SUM returns the promoted type for T. Sum on `INT32` yields `INT64`.
* 4. MEAN returns FLOAT64 columns
* 5. COLLECT returns columns of type `LIST<T>`.
*
* LEAD/LAG/ROW_NUMBER are undefined for range queries.
*
* @param[in] group_keys The (pre-sorted) grouping columns
* @param[in] orderby_column The (pre-sorted) order-by column, for range comparisons
* @param[in] order The order (ASCENDING/DESCENDING) in which the order-by column is sorted
* @param[in] input The input column (to be aggregated)
* @param[in] preceding The interval value in the backward direction
* @param[in] following The interval value in the forward direction.
* @param[in] min_periods Minimum number of observations in window required to have a value,
* otherwise element `i` is null.
* @param[in] aggr The rolling window aggregation type (SUM, MAX, MIN, etc.)
*
* @returns A nullable output column containing the rolling window results
*/
std::unique_ptr<column> grouped_range_rolling_window(
table_view const& group_keys,
column_view const& orderby_column,
cudf::order const& order,
column_view const& input,
range_window_bounds&& preceding,
range_window_bounds&& following,
Comment on lines +497 to +498
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these r-value refs? What if someone wants to reuse the same range_window_bounds for multiple calls to grouped_range_rolling_window?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This had to do with the ownership of the range scalar in range_window_bounds. I'll see if this can be removed if scale_to is removed.

size_type min_periods,
std::unique_ptr<aggregation> const& aggr,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Applies a variable-size rolling window function to the values in a column.
*
Expand Down
108 changes: 108 additions & 0 deletions cpp/include/cudf/rolling/range_window_bounds.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/scalar/scalar.hpp>

namespace cudf {

/**
* @brief Abstraction for window boundary sizes, to be used with
* `grouped_range_rolling_window()`.
*
* Similar to `window_bounds` in `grouped_rolling_window()`, `range_window_bounds`
* represents window boundaries for use with `grouped_range_rolling_window()`.
* A window may be specified as either of the following:
* 1. A fixed-width numeric scalar value. E.g.
* a) A `duration_D` scalar, for use with a `TIMESTAMP_DAYS` orderby column
* b) An `INT32` scalar, for use with an `INT32` orderby column
* 2. "unbounded", indicating that the bounds stretch to the first/last
* row in the group.
*/
struct range_window_bounds {
public:
/**
* @brief Factory method to construct a bounded window boundary.
*
* @param value Finite window boundary
*
*/
static range_window_bounds get(std::unique_ptr<scalar>&& scalar_)
{
return range_window_bounds(false, std::move(scalar_));
}

/**
* @brief Factory method to construct an unbounded window boundary.
*
* @param @type The datatype of the window boundary
*/
static range_window_bounds unbounded(data_type type);

/**
* @brief Whether or not the window is unbounded
*
* @return true If window is unbounded
* @return false If window is of finite bounds
*/
bool is_unbounded() const { return _is_unbounded; }

/**
* @brief Returns the underlying scalar value for the bounds
*/
scalar const& range_scalar() const { return *_range_scalar; }

/**
* @brief Rescale underlying scalar to the specified target type.
*
* A range_window_bounds is used in conjunction with the orderby column
* in `grouped_range_rolling_window()`. Its scalar value is compared against
* the rows in the orderby column to determine the width of the window.
*
* For instance, if the orderby column is integral (INT32), the range_window_bounds
* must also be integral (INT32). No scaling is required for comparison.
*
* However, if the orderby column is in TIMESTAMP_SECONDS, the range_window_bounds
* value must be specified as a comparable duration (between timestamp rows).
* The duration may be of similar precision (DURATION_SECONDS) or lower (DURATION_DAYS).
*
* `scale_to()` scales the bounds scalar from its original granularity (e.g. DURATION_DAYS)
* to the orderby column's granularity (DURATION_SECONDS), before comparions are made.
Comment on lines +83 to +84
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be the user's responsibility. Otherwise it's very inconsistent both with itself (you can't do the same thing with non-time types?) and with other libcudf APIs that require types already be matched.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... Otherwise it's very inconsistent both with itself (you can't do the same thing with non-time types?)

My thought was that scaling would also apply to decimal types, when we add support for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't rescale decimal types in other APIs either. Decimal types with different scales are treated like different types.

*
* @param target_type The type to which the range_window_bounds scalar must be scaled
* @param stream The CUDA stream to use for device memory operations
* @param mr Device memory resource used to allocate the scalar
*/
void scale_to(data_type target_type,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

private:
const bool _is_unbounded{false};
std::unique_ptr<scalar> _range_scalar{nullptr}; // Required: Reseated in `scale_to()`.
// Allocates new scalar.

range_window_bounds(bool is_unbounded_, std::unique_ptr<scalar>&& range_scalar_)
: _is_unbounded{is_unbounded_}, _range_scalar{std::move(range_scalar_)}
{
assert_invariants();
}

void assert_invariants() const;
};

} // namespace cudf
Loading