Skip to content

Commit

Permalink
Add support for LEAD/LAG window functions for fixed-width types (#6277)
Browse files Browse the repository at this point in the history
* [lead/lag] [redux] Implement LEAD/LAG window functions

Initial implementation for LEAD/LAG window functions,
for fixed-width types.
  • Loading branch information
mythrocks authored Oct 7, 2020
1 parent 8950c1a commit f293588
Show file tree
Hide file tree
Showing 10 changed files with 1,090 additions and 20 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
- PR #6139 Add column conversion to big endian byte list.
- PR #6220 Add `list_topics()` to supply list of underlying Kafka connection topics
- PR #6254 Add `cudf::make_dictionary_from_scalar` factory function
- PR #6262 Add nth_element series aggregation with null handling
- PR #6277 Add support for LEAD/LAG window functions for fixed-width types
- PR #6318 Add support for reading Struct and map types from Parquet files
- PR #6315 Native code for string-map lookups, for cudf-java
- PR #6302 Add custom dataframe accessors
Expand Down
8 changes: 8 additions & 0 deletions cpp/include/cudf/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class aggregation {
NTH_ELEMENT, ///< get the nth element
ROW_NUMBER, ///< get row-number of element
COLLECT, ///< collect values into a list
LEAD, ///< window function, accesses row at specified offset following current row
LAG, ///< window function, accesses row at specified offset preceding current row
PTX, ///< PTX UDF based reduction
CUDA ///< CUDA UDf based reduction
};
Expand Down Expand Up @@ -197,6 +199,12 @@ std::unique_ptr<aggregation> make_row_number_aggregation();
/// Factory to create a COLLECT_NUMBER aggregation
std::unique_ptr<aggregation> make_collect_aggregation();

/// Factory to create a LAG aggregation
std::unique_ptr<aggregation> make_lag_aggregation(size_type offset);

/// Factory to create a LEAD aggregation
std::unique_ptr<aggregation> make_lead_aggregation(size_type offset);

/**
* @brief Factory to create an aggregation base on UDF for PTX or CUDA
*
Expand Down
33 changes: 33 additions & 0 deletions cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,23 @@ struct quantile_aggregation final : derived_aggregation<quantile_aggregation> {
}
};

struct lead_lag_aggregation final : derived_aggregation<lead_lag_aggregation> {
lead_lag_aggregation(Kind kind, size_type offset)
: derived_aggregation{offset < 0 ? (kind == LAG ? LEAD : LAG) : kind},
row_offset{std::abs(offset)}
{
}

size_type row_offset;

protected:
friend class derived_aggregation<lead_lag_aggregation>;

bool operator==(lead_lag_aggregation const& rhs) const { return row_offset == rhs.row_offset; }

size_t hash_impl() const { return std::hash<size_type>()(row_offset); }
};

/**
* @brief Derived class for specifying a standard deviation/variance aggregation
*/
Expand Down Expand Up @@ -361,6 +378,18 @@ struct target_type_impl<Source, aggregation::COLLECT> {
using type = cudf::list_view;
};

// Always use Source for LEAD
template <typename Source>
struct target_type_impl<Source, aggregation::LEAD> {
using type = Source;
};

// Always use Source for LAG
template <typename Source>
struct target_type_impl<Source, aggregation::LAG> {
using type = Source;
};

/**
* @brief Helper alias to get the accumulator type for performing aggregation
* `k` on elements of type `Source`
Expand Down Expand Up @@ -448,6 +477,10 @@ CUDA_HOST_DEVICE_CALLABLE decltype(auto) aggregation_dispatcher(aggregation::Kin
return f.template operator()<aggregation::ROW_NUMBER>(std::forward<Ts>(args)...);
case aggregation::COLLECT:
return f.template operator()<aggregation::COLLECT>(std::forward<Ts>(args)...);
case aggregation::LEAD:
return f.template operator()<aggregation::LEAD>(std::forward<Ts>(args)...);
case aggregation::LAG:
return f.template operator()<aggregation::LAG>(std::forward<Ts>(args)...);
default: {
#ifndef __CUDA_ARCH__
CUDF_FAIL("Unsupported aggregation.");
Expand Down
9 changes: 9 additions & 0 deletions cpp/include/cudf/detail/utilities/device_operators.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,15 @@ struct DeviceXor {
}
};

/**
* @brief Operator for calculating Lead/Lag window function.
*/
struct DeviceLeadLag {
const size_type row_offset;

explicit CUDA_HOST_DEVICE_CALLABLE DeviceLeadLag(size_type offset_) : row_offset(offset_) {}
};

} // namespace cudf

#endif
46 changes: 46 additions & 0 deletions cpp/include/cudf/rolling.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,28 @@ std::unique_ptr<column> rolling_window(
std::unique_ptr<aggregation> const& agg,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @copydoc std::unique_ptr<column> rolling_window(
* column_view const& input,
* size_type preceding_window,
* size_type following_window,
* size_type min_periods,
* std::unique_ptr<aggregation> const& agg,
* rmm::mr::device_memory_resource* mr)
*
* @param default_outputs A column of per-row default values to be returned instead
* of nulls. Used for LEAD()/LAG(), if the row offset crosses
* the boundaries of the column.
*/
std::unique_ptr<column> rolling_window(
column_view const& input,
column_view const& default_outputs,
size_type preceding_window,
size_type following_window,
size_type min_periods,
std::unique_ptr<aggregation> const& agg,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Applies a grouping-aware, fixed-size rolling window function to the values in a column.
*
Expand Down Expand Up @@ -140,6 +162,30 @@ std::unique_ptr<column> grouped_rolling_window(
std::unique_ptr<aggregation> const& aggr,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @copydoc std::unique_ptr<column> grouped_rolling_window(
* table_view const& group_keys,
* column_view const& input,
* size_type preceding_window,
* size_type following_window,
* size_type min_periods,
* std::unique_ptr<aggregation> const& aggr,
* rmm::mr::device_memory_resource* mr)
*
* @param default_outputs A column of per-row default values to be returned instead
* of nulls. Used for LEAD()/LAG(), if the row offset crosses
* the boundaries of the column or group.
*/
std::unique_ptr<column> grouped_rolling_window(
table_view const& group_keys,
column_view const& input,
column_view const& default_outputs,
size_type preceding_window,
size_type following_window,
size_type min_periods,
std::unique_ptr<aggregation> const& aggr,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Applies a grouping-aware, timestamp-based rolling window function to the values in a
*column.
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/aggregation/aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ std::unique_ptr<aggregation> make_collect_aggregation()
{
return std::make_unique<aggregation>(aggregation::COLLECT);
}
/// Factory to create a LAG aggregation
std::unique_ptr<aggregation> make_lag_aggregation(size_type offset)
{
return std::make_unique<cudf::detail::lead_lag_aggregation>(aggregation::LAG, offset);
}
/// Factory to create a LEAD aggregation
std::unique_ptr<aggregation> make_lead_aggregation(size_type offset)
{
return std::make_unique<cudf::detail::lead_lag_aggregation>(aggregation::LEAD, offset);
}
/// Factory to create a UDF aggregation
std::unique_ptr<aggregation> make_udf_aggregation(udf_type type,
std::string const& user_defined_aggregator,
Expand Down
Loading

0 comments on commit f293588

Please sign in to comment.