diff --git a/CHANGELOG.md b/CHANGELOG.md index 88667eaf55b..a685c93349c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ - PR #6139 Add column conversion to big endian byte list. - PR #6220 Add `list_topics()` to supply list of underlying Kafka connection topics - PR #6254 Add `cudf::make_dictionary_from_scalar` factory function +- PR #6262 Add nth_element series aggregation with null handling +- PR #6277 Add support for LEAD/LAG window functions for fixed-width types - PR #6318 Add support for reading Struct and map types from Parquet files - PR #6315 Native code for string-map lookups, for cudf-java - PR #6302 Add custom dataframe accessors diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp index 2f331b0116f..d9a9102f189 100644 --- a/cpp/include/cudf/aggregation.hpp +++ b/cpp/include/cudf/aggregation.hpp @@ -71,6 +71,8 @@ class aggregation { NTH_ELEMENT, ///< get the nth element ROW_NUMBER, ///< get row-number of element COLLECT, ///< collect values into a list + LEAD, ///< window function, accesses row at specified offset following current row + LAG, ///< window function, accesses row at specified offset preceding current row PTX, ///< PTX UDF based reduction CUDA ///< CUDA UDf based reduction }; @@ -197,6 +199,12 @@ std::unique_ptr make_row_number_aggregation(); /// Factory to create a COLLECT_NUMBER aggregation std::unique_ptr make_collect_aggregation(); +/// Factory to create a LAG aggregation +std::unique_ptr make_lag_aggregation(size_type offset); + +/// Factory to create a LEAD aggregation +std::unique_ptr make_lead_aggregation(size_type offset); + /** * @brief Factory to create an aggregation base on UDF for PTX or CUDA * diff --git a/cpp/include/cudf/detail/aggregation/aggregation.hpp b/cpp/include/cudf/detail/aggregation/aggregation.hpp index ad202b545b9..572d4addca8 100644 --- a/cpp/include/cudf/detail/aggregation/aggregation.hpp +++ b/cpp/include/cudf/detail/aggregation/aggregation.hpp @@ -92,6 +92,23 @@ struct quantile_aggregation final : derived_aggregation { } }; +struct lead_lag_aggregation final : derived_aggregation { + lead_lag_aggregation(Kind kind, size_type offset) + : derived_aggregation{offset < 0 ? (kind == LAG ? LEAD : LAG) : kind}, + row_offset{std::abs(offset)} + { + } + + size_type row_offset; + + protected: + friend class derived_aggregation; + + bool operator==(lead_lag_aggregation const& rhs) const { return row_offset == rhs.row_offset; } + + size_t hash_impl() const { return std::hash()(row_offset); } +}; + /** * @brief Derived class for specifying a standard deviation/variance aggregation */ @@ -361,6 +378,18 @@ struct target_type_impl { using type = cudf::list_view; }; +// Always use Source for LEAD +template +struct target_type_impl { + using type = Source; +}; + +// Always use Source for LAG +template +struct target_type_impl { + using type = Source; +}; + /** * @brief Helper alias to get the accumulator type for performing aggregation * `k` on elements of type `Source` @@ -448,6 +477,10 @@ CUDA_HOST_DEVICE_CALLABLE decltype(auto) aggregation_dispatcher(aggregation::Kin return f.template operator()(std::forward(args)...); case aggregation::COLLECT: return f.template operator()(std::forward(args)...); + case aggregation::LEAD: + return f.template operator()(std::forward(args)...); + case aggregation::LAG: + return f.template operator()(std::forward(args)...); default: { #ifndef __CUDA_ARCH__ CUDF_FAIL("Unsupported aggregation."); diff --git a/cpp/include/cudf/detail/utilities/device_operators.cuh b/cpp/include/cudf/detail/utilities/device_operators.cuh index c65b5b6437e..d7326f2993b 100644 --- a/cpp/include/cudf/detail/utilities/device_operators.cuh +++ b/cpp/include/cudf/detail/utilities/device_operators.cuh @@ -222,6 +222,15 @@ struct DeviceXor { } }; +/** + * @brief Operator for calculating Lead/Lag window function. + */ +struct DeviceLeadLag { + const size_type row_offset; + + explicit CUDA_HOST_DEVICE_CALLABLE DeviceLeadLag(size_type offset_) : row_offset(offset_) {} +}; + } // namespace cudf #endif diff --git a/cpp/include/cudf/rolling.hpp b/cpp/include/cudf/rolling.hpp index 6b0afc01bc2..d6f41e7d3a7 100644 --- a/cpp/include/cudf/rolling.hpp +++ b/cpp/include/cudf/rolling.hpp @@ -61,6 +61,28 @@ std::unique_ptr rolling_window( std::unique_ptr const& agg, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +/** + * @copydoc std::unique_ptr rolling_window( + * column_view const& input, + * size_type preceding_window, + * size_type following_window, + * size_type min_periods, + * std::unique_ptr const& agg, + * rmm::mr::device_memory_resource* mr) + * + * @param default_outputs A column of per-row default values to be returned instead + * of nulls. Used for LEAD()/LAG(), if the row offset crosses + * the boundaries of the column. + */ +std::unique_ptr rolling_window( + column_view const& input, + column_view const& default_outputs, + size_type preceding_window, + size_type following_window, + size_type min_periods, + std::unique_ptr const& agg, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + /** * @brief Applies a grouping-aware, fixed-size rolling window function to the values in a column. * @@ -140,6 +162,30 @@ std::unique_ptr grouped_rolling_window( std::unique_ptr const& aggr, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +/** + * @copydoc std::unique_ptr grouped_rolling_window( + * table_view const& group_keys, + * column_view const& input, + * size_type preceding_window, + * size_type following_window, + * size_type min_periods, + * std::unique_ptr const& aggr, + * rmm::mr::device_memory_resource* mr) + * + * @param default_outputs A column of per-row default values to be returned instead + * of nulls. Used for LEAD()/LAG(), if the row offset crosses + * the boundaries of the column or group. + */ +std::unique_ptr grouped_rolling_window( + table_view const& group_keys, + column_view const& input, + column_view const& default_outputs, + size_type preceding_window, + size_type following_window, + size_type min_periods, + std::unique_ptr const& aggr, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + /** * @brief Applies a grouping-aware, timestamp-based rolling window function to the values in a *column. diff --git a/cpp/src/aggregation/aggregation.cpp b/cpp/src/aggregation/aggregation.cpp index 4d16c7f24f3..a1937779819 100644 --- a/cpp/src/aggregation/aggregation.cpp +++ b/cpp/src/aggregation/aggregation.cpp @@ -121,6 +121,16 @@ std::unique_ptr make_collect_aggregation() { return std::make_unique(aggregation::COLLECT); } +/// Factory to create a LAG aggregation +std::unique_ptr make_lag_aggregation(size_type offset) +{ + return std::make_unique(aggregation::LAG, offset); +} +/// Factory to create a LEAD aggregation +std::unique_ptr make_lead_aggregation(size_type offset) +{ + return std::make_unique(aggregation::LEAD, offset); +} /// Factory to create a UDF aggregation std::unique_ptr make_udf_aggregation(udf_type type, std::string const& user_defined_aggregator, diff --git a/cpp/src/rolling/rolling.cu b/cpp/src/rolling/rolling.cu index b739ae57b13..362f572f94d 100644 --- a/cpp/src/rolling/rolling.cu +++ b/cpp/src/rolling/rolling.cu @@ -43,6 +43,10 @@ #include #include +#include +#include +#include +#include #include namespace cudf { @@ -60,6 +64,7 @@ template std::enable_if_t __device__ process_rolling_window(column_device_view input, + column_device_view ignored_default_outputs, mutable_column_device_view output, size_type start_index, size_type end_index, @@ -92,6 +97,7 @@ template std::enable_if_t __device__ process_rolling_window(column_device_view input, + column_device_view ignored_default_outputs, mutable_column_device_view output, size_type start_index, size_type end_index, @@ -104,6 +110,105 @@ process_rolling_window(column_device_view input, return output_is_valid; } +/** + * @brief LEAD(N): Returns the row from the input column, at the specified offset past the + * current row. + * If the offset crosses the grouping boundary or column boundary for + * a given row, a "default" value is returned. The "default" value is null, by default. + * + * E.g. Consider an input column with the following values and grouping: + * [10, 11, 12, 13, 20, 21, 22, 23] + * <------G1-----> <------G2------> + * + * LEAD(input_col, 1) yields: + * [11, 12, 13, null, 21, 22, 23, null] + * + * LEAD(input_col, 1, 99) (where 99 indicates the default) yields: + * [11, 12, 13, 99, 21, 22, 23, 99] + */ +template +std::enable_if_t<(op == aggregation::LEAD) && (cudf::is_fixed_width()), bool> __device__ +process_rolling_window(column_device_view input, + column_device_view default_outputs, + mutable_column_device_view output, + size_type start_index, + size_type end_index, + size_type current_index, + size_type min_periods, + agg_op device_agg_op) +{ + // Offsets have already been normalized. + auto row_offset = device_agg_op.row_offset; + + // Check if row is invalid. + if (row_offset > (end_index - current_index - 1)) { + // Invalid row marked. Use default value, if available. + if (default_outputs.size() == 0 || default_outputs.is_null(current_index)) { return false; } + + output.element(current_index) = default_outputs.element(current_index); + return true; + } + + // Not an invalid row. + auto index = current_index + row_offset; + auto is_null = input.is_null(index); + if (!is_null) { output.element(current_index) = input.element(index); } + return !is_null; +} + +/** + * @brief LAG(N): returns the row from the input column at the specified offset preceding + * the current row. + * If the offset crosses the grouping boundary or column boundary for + * a given row, a "default" value is returned. The "default" value is null, by default. + * + * E.g. Consider an input column with the following values and grouping: + * [10, 11, 12, 13, 20, 21, 22, 23] + * <------G1-----> <------G2------> + * + * LAG(input_col, 2) yields: + * [null, null, 10, 11, null, null, 20, 21] + * LAG(input_col, 2, 99) yields: + * [99, 99, 10, 11, 99, 99, 20, 21] + */ +template +std::enable_if_t<(op == aggregation::LAG) && (cudf::is_fixed_width()), bool> __device__ +process_rolling_window(column_device_view input, + column_device_view default_outputs, + mutable_column_device_view output, + size_type start_index, + size_type end_index, + size_type current_index, + size_type min_periods, + agg_op device_agg_op) +{ + // Offsets have already been normalized. + auto row_offset = device_agg_op.row_offset; + + // Check if row is invalid. + if (row_offset > (current_index - start_index)) { + // Invalid row marked. Use default value, if available. + if (default_outputs.size() == 0 || default_outputs.is_null(current_index)) { return false; } + + output.element(current_index) = default_outputs.element(current_index); + return true; + } + + // Not an invalid row. + auto index = current_index - row_offset; + auto is_null = input.is_null(index); + if (!is_null) { output.element(current_index) = input.element(index); } + return !is_null; +} + /** * @brief Only used for `string_view` type to get ARGMIN and ARGMAX, which * will be used to gather MIN and MAX. And returns true if the @@ -118,6 +223,7 @@ std::enable_if_t<(op == aggregation::ARGMIN or op == aggregation::ARGMAX) and std::is_same::value, bool> __device__ process_rolling_window(column_device_view input, + column_device_view ignored_default_outputs, mutable_column_device_view output, size_type start_index, size_type end_index, @@ -160,9 +266,11 @@ template std::enable_if_t::value and !(op == aggregation::COUNT_VALID || op == aggregation::COUNT_ALL || - op == aggregation::ROW_NUMBER), + op == aggregation::ROW_NUMBER || op == aggregation::LEAD || + op == aggregation::LAG), bool> __device__ process_rolling_window(column_device_view input, + column_device_view ignored_default_outputs, mutable_column_device_view output, size_type start_index, size_type end_index, @@ -222,6 +330,7 @@ template __launch_bounds__(block_size) __global__ void gpu_rolling(column_device_view input, + column_device_view default_outputs, mutable_column_device_view output, size_type* __restrict__ output_valid_count, PrecedingWindowIterator preceding_window_begin, @@ -251,7 +360,71 @@ __launch_bounds__(block_size) __global__ volatile bool output_is_valid = false; output_is_valid = process_rolling_window( - input, output, start_index, end_index, i, min_periods); + input, default_outputs, output, start_index, end_index, i, min_periods); + + // set the mask + cudf::bitmask_type result_mask{__ballot_sync(active_threads, output_is_valid)}; + + // only one thread writes the mask + if (0 == threadIdx.x % cudf::detail::warp_size) { + output.set_mask_word(cudf::word_index(i), result_mask); + warp_valid_count += __popc(result_mask); + } + + // process next element + i += stride; + active_threads = __ballot_sync(active_threads, i < input.size()); + } + + // sum the valid counts across the whole block + size_type block_valid_count = + cudf::detail::single_lane_block_sum_reduce(warp_valid_count); + + if (threadIdx.x == 0) { atomicAdd(output_valid_count, block_valid_count); } +} + +template +__launch_bounds__(block_size) __global__ + void gpu_rolling(column_device_view input, + column_device_view default_outputs, + mutable_column_device_view output, + size_type* __restrict__ output_valid_count, + PrecedingWindowIterator preceding_window_begin, + FollowingWindowIterator following_window_begin, + size_type min_periods, + agg_op device_agg_op) +{ + size_type i = blockIdx.x * block_size + threadIdx.x; + size_type stride = block_size * gridDim.x; + + size_type warp_valid_count{0}; + + auto active_threads = __ballot_sync(0xffffffff, i < input.size()); + while (i < input.size()) { + size_type preceding_window = preceding_window_begin[i]; + size_type following_window = following_window_begin[i]; + + // compute bounds + size_type start = min(input.size(), max(0, i - preceding_window + 1)); + size_type end = min(input.size(), max(0, i + following_window + 1)); + size_type start_index = min(start, end); + size_type end_index = max(start, end); + + // aggregate + // TODO: We should explore using shared memory to avoid redundant loads. + // This might require separating the kernel into a special version + // for dynamic and static sizes. + + volatile bool output_is_valid = false; + output_is_valid = process_rolling_window( + input, default_outputs, output, start_index, end_index, i, min_periods, device_agg_op); // set the mask cudf::bitmask_type result_mask{__ballot_sync(active_threads, output_is_valid)}; @@ -282,6 +455,7 @@ struct rolling_window_launcher { typename PrecedingWindowIterator, typename FollowingWindowIterator> size_type kernel_launcher(column_view const& input, + column_view const& default_outputs, mutable_column_view& output, PrecedingWindowIterator preceding_window_begin, FollowingWindowIterator following_window_begin, @@ -292,14 +466,16 @@ struct rolling_window_launcher { constexpr cudf::size_type block_size = 256; cudf::detail::grid_1d grid(input.size(), block_size); - auto input_device_view = column_device_view::create(input, stream); - auto output_device_view = mutable_column_device_view::create(output, stream); + auto input_device_view = column_device_view::create(input, stream); + auto output_device_view = mutable_column_device_view::create(output, stream); + auto default_outputs_device_view = column_device_view::create(default_outputs, stream); rmm::device_scalar device_valid_count{0, stream}; if (input.has_nulls()) { gpu_rolling, agg_op, op, block_size, true> <<>>(*input_device_view, + *default_outputs_device_view, *output_device_view, device_valid_count.data(), preceding_window_begin, @@ -308,6 +484,7 @@ struct rolling_window_launcher { } else { gpu_rolling, agg_op, op, block_size, false> <<>>(*input_device_view, + *default_outputs_device_view, *output_device_view, device_valid_count.data(), preceding_window_begin, @@ -323,6 +500,60 @@ struct rolling_window_launcher { return valid_count; } + template + size_type kernel_launcher(column_view const& input, + column_view const& default_outputs, + mutable_column_view& output, + PrecedingWindowIterator preceding_window_begin, + FollowingWindowIterator following_window_begin, + size_type min_periods, + std::unique_ptr const& agg, + agg_op const& device_agg_op, + cudaStream_t stream) + { + constexpr cudf::size_type block_size = 256; + cudf::detail::grid_1d grid(input.size(), block_size); + + auto input_device_view = column_device_view::create(input, stream); + auto output_device_view = mutable_column_device_view::create(output, stream); + auto default_outputs_device_view = column_device_view::create(default_outputs, stream); + + rmm::device_scalar device_valid_count{0, stream}; + + if (input.has_nulls()) { + gpu_rolling, agg_op, op, block_size, true> + <<>>(*input_device_view, + *default_outputs_device_view, + *output_device_view, + device_valid_count.data(), + preceding_window_begin, + following_window_begin, + min_periods, + device_agg_op); + } else { + gpu_rolling, agg_op, op, block_size, false> + <<>>(*input_device_view, + *default_outputs_device_view, + *output_device_view, + device_valid_count.data(), + preceding_window_begin, + following_window_begin, + min_periods, + device_agg_op); + } + + size_type valid_count = device_valid_count.value(stream); + + // check the stream for debugging + CHECK_CUDA(stream); + + return valid_count; + } + // This launch is only for fixed width columns with valid aggregation option // numeric: All // timestamp: MIN, MAX, COUNT_VALID, COUNT_ALL, ROW_NUMBER @@ -336,6 +567,7 @@ struct rolling_window_launcher { !cudf::detail::is_rolling_string_specialization(), std::unique_ptr> launch(column_view const& input, + column_view const& default_outputs, PrecedingWindowIterator preceding_window_begin, FollowingWindowIterator following_window_begin, size_type min_periods, @@ -352,6 +584,7 @@ struct rolling_window_launcher { auto valid_count = kernel_launcher( input, + default_outputs, output_view, preceding_window_begin, following_window_begin, @@ -374,6 +607,7 @@ struct rolling_window_launcher { std::enable_if_t(), std::unique_ptr> launch(column_view const& input, + column_view const& default_outputs, PrecedingWindowIterator preceding_window_begin, FollowingWindowIterator following_window_begin, size_type min_periods, @@ -399,6 +633,7 @@ struct rolling_window_launcher { aggregation::ARGMIN, PrecedingWindowIterator, FollowingWindowIterator>(input, + default_outputs, output_view, preceding_window_begin, following_window_begin, @@ -411,6 +646,7 @@ struct rolling_window_launcher { aggregation::ARGMAX, PrecedingWindowIterator, FollowingWindowIterator>(input, + default_outputs, output_view, preceding_window_begin, following_window_begin, @@ -442,6 +678,7 @@ struct rolling_window_launcher { !cudf::detail::is_rolling_string_specialization(), std::unique_ptr> launch(column_view const& input, + column_view const& default_outputs, PrecedingWindowIterator preceding_window_begin, FollowingWindowIterator following_window_begin, size_type min_periods, @@ -452,24 +689,109 @@ struct rolling_window_launcher { CUDF_FAIL("Aggregation operator and/or input type combination is invalid"); } + template + std::enable_if_t() and + (op == aggregation::LEAD || op == aggregation::LAG), + std::unique_ptr> + launch(column_view const& input, + column_view const& default_outputs, + PrecedingWindowIterator preceding_window_begin, + FollowingWindowIterator following_window_begin, + size_type min_periods, + std::unique_ptr const& agg, + agg_op const& device_agg_op, + rmm::mr::device_memory_resource* mr, + cudaStream_t stream) + { + if (input.is_empty()) return empty_like(input); + + CUDF_EXPECTS(default_outputs.type().id() == input.type().id(), + "Defaults column type must match input column."); // Because LEAD/LAG. + + // For LEAD(0)/LAG(0), no computation need be performed. + // Return copy of input. + if (0 == static_cast(agg.get())->row_offset) { + return std::make_unique(input, stream, mr); + } + + auto output = make_fixed_width_column( + target_type(input.type(), op), input.size(), mask_state::UNINITIALIZED, stream, mr); + + cudf::mutable_column_view output_view = output->mutable_view(); + auto valid_count = + kernel_launcher( + input, + default_outputs, + output_view, + preceding_window_begin, + following_window_begin, + min_periods, + agg, + device_agg_op, + stream); + + output->set_null_count(output->size() - valid_count); + + return output; + } + + // Deals with invalid column and/or aggregation options + template + std::enable_if_t(), + std::unique_ptr> + launch(column_view const& input, + column_view const& default_outputs, + PrecedingWindowIterator preceding_window_begin, + FollowingWindowIterator following_window_begin, + size_type min_periods, + std::unique_ptr const& agg, + agg_op device_agg_op, + rmm::mr::device_memory_resource* mr, + cudaStream_t stream) + { + CUDF_FAIL( + "Aggregation operator and/or input type combination is invalid: " + "LEAD/LAG supported only on fixed-width types"); + } + template - std::enable_if_t> operator()( - column_view const& input, - PrecedingWindowIterator preceding_window_begin, - FollowingWindowIterator following_window_begin, - size_type min_periods, - std::unique_ptr const& agg, - rmm::mr::device_memory_resource* mr, - cudaStream_t stream) + std::enable_if_t> + operator()(column_view const& input, + column_view const& default_outputs, + PrecedingWindowIterator preceding_window_begin, + FollowingWindowIterator following_window_begin, + size_type min_periods, + std::unique_ptr const& agg, + rmm::mr::device_memory_resource* mr, + cudaStream_t stream) { + CUDF_EXPECTS(default_outputs.is_empty(), + "Only LEAD/LAG window functions support default values."); + return launch::type, op, PrecedingWindowIterator, - FollowingWindowIterator>( - input, preceding_window_begin, following_window_begin, min_periods, agg, mr, stream); + FollowingWindowIterator>(input, + default_outputs, + preceding_window_begin, + following_window_begin, + min_periods, + agg, + mr, + stream); } // This variant is just to handle mean @@ -478,6 +800,7 @@ struct rolling_window_launcher { typename FollowingWindowIterator> std::enable_if_t<(op == aggregation::MEAN), std::unique_ptr> operator()( column_view const& input, + column_view const& default_outputs, PrecedingWindowIterator preceding_window_begin, FollowingWindowIterator following_window_begin, size_type min_periods, @@ -486,13 +809,50 @@ struct rolling_window_launcher { cudaStream_t stream) { return launch( - input, preceding_window_begin, following_window_begin, min_periods, agg, mr, stream); + input, + default_outputs, + preceding_window_begin, + following_window_begin, + min_periods, + agg, + mr, + stream); + } + + template + std::enable_if_t<(op == aggregation::LEAD || op == aggregation::LAG), std::unique_ptr> + operator()(column_view const& input, + column_view const& default_outputs, + PrecedingWindowIterator preceding_window_begin, + FollowingWindowIterator following_window_begin, + size_type min_periods, + std::unique_ptr const& agg, + rmm::mr::device_memory_resource* mr, + cudaStream_t stream) + { + return launch( + input, + default_outputs, + preceding_window_begin, + following_window_begin, + min_periods, + agg, + cudf::DeviceLeadLag{static_cast(agg.get())->row_offset}, + mr, + stream); } }; struct dispatch_rolling { template std::unique_ptr operator()(column_view const& input, + column_view const& default_outputs, PrecedingWindowIterator preceding_window_begin, FollowingWindowIterator following_window_begin, size_type min_periods, @@ -503,6 +863,7 @@ struct dispatch_rolling { return aggregation_dispatcher(agg->kind, rolling_window_launcher{}, input, + default_outputs, preceding_window_begin, following_window_begin, min_periods, @@ -617,6 +978,7 @@ std::unique_ptr rolling_window_udf(column_view const& input, */ template std::unique_ptr rolling_window(column_view const& input, + column_view const& default_outputs, PrecedingWindowIterator preceding_window_begin, FollowingWindowIterator following_window_begin, size_type min_periods, @@ -632,6 +994,7 @@ std::unique_ptr rolling_window(column_view const& input, return cudf::type_dispatcher(input.type(), dispatch_rolling{}, input, + default_outputs, preceding_window_begin, following_window_begin, min_periods, @@ -649,12 +1012,28 @@ std::unique_ptr rolling_window(column_view const& input, size_type min_periods, std::unique_ptr const& agg, rmm::mr::device_memory_resource* mr) +{ + return rolling_window( + input, empty_like(input)->view(), preceding_window, following_window, min_periods, agg, mr); +} + +// Applies a fixed-size rolling window function to the values in a column. +std::unique_ptr rolling_window(column_view const& input, + column_view const& default_outputs, + size_type preceding_window, + size_type following_window, + size_type min_periods, + std::unique_ptr const& agg, + rmm::mr::device_memory_resource* mr) { CUDF_FUNC_RANGE(); if (input.size() == 0) return empty_like(input); CUDF_EXPECTS((min_periods >= 0), "min_periods must be non-negative"); + CUDF_EXPECTS((default_outputs.is_empty() || default_outputs.size() == input.size()), + "Defaults column must be either empty or have as many rows as the input column."); + if (agg->kind == aggregation::CUDA || agg->kind == aggregation::PTX) { return cudf::detail::rolling_window_udf(input, preceding_window, @@ -669,8 +1048,14 @@ std::unique_ptr rolling_window(column_view const& input, auto preceding_window_begin = thrust::make_constant_iterator(preceding_window); auto following_window_begin = thrust::make_constant_iterator(following_window); - return cudf::detail::rolling_window( - input, preceding_window_begin, following_window_begin, min_periods, agg, mr, 0); + return cudf::detail::rolling_window(input, + default_outputs, + preceding_window_begin, + following_window_begin, + min_periods, + agg, + mr, + 0); } } @@ -706,6 +1091,7 @@ std::unique_ptr rolling_window(column_view const& input, 0); } else { return cudf::detail::rolling_window(input, + empty_like(input)->view(), preceding_window.begin(), following_window.begin(), min_periods, @@ -722,6 +1108,25 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, size_type min_periods, std::unique_ptr const& aggr, rmm::mr::device_memory_resource* mr) +{ + return grouped_rolling_window(group_keys, + input, + empty_like(input)->view(), + preceding_window, + following_window, + min_periods, + aggr, + mr); +} + +std::unique_ptr grouped_rolling_window(table_view const& group_keys, + column_view const& input, + column_view const& default_outputs, + size_type preceding_window, + size_type following_window, + size_type min_periods, + std::unique_ptr const& aggr, + rmm::mr::device_memory_resource* mr) { CUDF_FUNC_RANGE(); @@ -732,9 +1137,13 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, CUDF_EXPECTS((min_periods > 0), "min_periods must be positive"); + CUDF_EXPECTS((default_outputs.is_empty() || default_outputs.size() == input.size()), + "Defaults column must be either empty or have as many rows as the input column."); + if (group_keys.num_columns() == 0) { // No Groupby columns specified. Treat as one big group. - return rolling_window(input, preceding_window, following_window, min_periods, aggr, mr); + return rolling_window( + input, default_outputs, preceding_window, following_window, min_periods, aggr, mr); } using sort_groupby_helper = cudf::groupby::detail::sort::sort_groupby_helper; @@ -799,6 +1208,7 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, } else { return cudf::detail::rolling_window( input, + default_outputs, thrust::make_transform_iterator(thrust::make_counting_iterator(0), preceding_calculator), thrust::make_transform_iterator(thrust::make_counting_iterator(0), @@ -880,6 +1290,7 @@ std::unique_ptr time_range_window_ASC(column_view const& input, return cudf::detail::rolling_window( input, + empty_like(input)->view(), thrust::make_transform_iterator(thrust::make_counting_iterator(0), preceding_calculator), thrust::make_transform_iterator(thrust::make_counting_iterator(0), @@ -937,6 +1348,7 @@ std::unique_ptr time_range_window_ASC( return cudf::detail::rolling_window( input, + empty_like(input)->view(), thrust::make_transform_iterator(thrust::make_counting_iterator(0), preceding_calculator), thrust::make_transform_iterator(thrust::make_counting_iterator(0), @@ -991,6 +1403,7 @@ std::unique_ptr time_range_window_DESC(column_view const& input, return cudf::detail::rolling_window( input, + empty_like(input)->view(), thrust::make_transform_iterator(thrust::make_counting_iterator(0), preceding_calculator), thrust::make_transform_iterator(thrust::make_counting_iterator(0), @@ -1054,6 +1467,7 @@ std::unique_ptr time_range_window_DESC( } else { return cudf::detail::rolling_window( input, + empty_like(input)->view(), thrust::make_transform_iterator(thrust::make_counting_iterator(0), preceding_calculator), thrust::make_transform_iterator(thrust::make_counting_iterator(0), diff --git a/cpp/src/rolling/rolling_detail.hpp b/cpp/src/rolling/rolling_detail.hpp index 00c3babef94..a3d57a315f9 100644 --- a/cpp/src/rolling/rolling_detail.hpp +++ b/cpp/src/rolling/rolling_detail.hpp @@ -40,7 +40,8 @@ static constexpr bool is_rolling_supported() constexpr bool is_operation_supported = (op == aggregation::SUM) or (op == aggregation::MIN) or (op == aggregation::MAX) or (op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or - (op == aggregation::MEAN) or (op == aggregation::ROW_NUMBER); + (op == aggregation::MEAN) or (op == aggregation::ROW_NUMBER) or (op == aggregation::LEAD) or + (op == aggregation::LAG); constexpr bool is_valid_numeric_agg = (cudf::is_numeric() or cudf::is_duration() or @@ -52,7 +53,7 @@ static constexpr bool is_rolling_supported() } else if (cudf::is_timestamp()) { return (op == aggregation::MIN) or (op == aggregation::MAX) or (op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or - (op == aggregation::ROW_NUMBER); + (op == aggregation::ROW_NUMBER) or (op == aggregation::LEAD) or (op == aggregation::LAG); } else if (std::is_same()) { return (op == aggregation::MIN) or (op == aggregation::MAX) or diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index d4dc1765520..7213eacef88 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -460,6 +460,14 @@ set(GROUPED_ROLLING_TEST_SRC ConfigureTest(GROUPED_ROLLING_TEST "${GROUPED_ROLLING_TEST_SRC}") +################################################################################################### +# - lead/lag rolling tests --------------------------------------------------------------------------------- + +set(LEAD_LAG_TEST_SRC + "${CMAKE_CURRENT_SOURCE_DIR}/lead_lag/lead_lag_test.cpp") + +ConfigureTest(LEAD_LAG_TEST "${LEAD_LAG_TEST_SRC}") + ################################################################################################### # - filling test ---------------------------------------------------------------------------------- diff --git a/cpp/tests/lead_lag/lead_lag_test.cpp b/cpp/tests/lead_lag/lead_lag_test.cpp new file mode 100644 index 00000000000..a23a5357df4 --- /dev/null +++ b/cpp/tests/lead_lag/lead_lag_test.cpp @@ -0,0 +1,539 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using cudf::size_type; +using namespace cudf::test; + +struct LeadLagWindowTest : public cudf::test::BaseFixture { +}; + +template +struct TypedLeadLagWindowTest : public cudf::test::BaseFixture { +}; + +using TypesForTest = cudf::test::Concat; + +TYPED_TEST_CASE(TypedLeadLagWindowTest, TypesForTest); + +TYPED_TEST(TypedLeadLagWindowTest, LeadLagBasics) +{ + using T = int32_t; + + auto const input_col = + fixed_width_column_wrapper{0, 1, 2, 3, 4, 5, 0, 10, 20, 30, 40, 50}.release(); + auto const input_size = input_col->size(); + auto const grouping_key = fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1}; + auto const grouping_keys = cudf::table_view{std::vector{grouping_key}}; + + auto const preceding = 4; + auto const following = 3; + auto const min_periods = 1; + + auto lead_3_output_col = cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + preceding, + following, + min_periods, + cudf::make_lead_aggregation(3)); + + expect_columns_equivalent( + *lead_3_output_col, + fixed_width_column_wrapper{{3, 4, 5, -1, -1, -1, 30, 40, 50, -1, -1, -1}, + {1, 1, 1, 0, 0, 0, 1, 1, 1, 0, 0, 0}} + .release() + ->view()); + + auto lag_2_output_col = cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + preceding, + following, + min_periods, + cudf::make_lag_aggregation(2)); + + expect_columns_equivalent( + *lag_2_output_col, + fixed_width_column_wrapper{{-1, -1, 0, 1, 2, 3, -1, -1, 0, 10, 20, 30}, + {0, 0, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1}} + .release() + ->view()); +} + +TYPED_TEST(TypedLeadLagWindowTest, LeadLagWithNulls) +{ + using T = TypeParam; + + auto const input_col = fixed_width_column_wrapper{{0, 1, 2, 3, 4, 5, 0, 10, 20, 30, 40, 50}, + {1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1}} + .release(); + auto const input_size = input_col->size(); + auto const grouping_key = fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1}; + auto const grouping_keys = cudf::table_view{std::vector{grouping_key}}; + + auto const preceding = 4; + auto const following = 3; + auto const min_periods = 1; + + auto lead_3_output_col = cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + preceding, + following, + min_periods, + cudf::make_lead_aggregation(3)); + + expect_columns_equivalent( + *lead_3_output_col, + fixed_width_column_wrapper{{3, 4, 5, -1, -1, -1, 30, 40, 50, -1, -1, -1}, + {1, 1, 1, 0, 0, 0, 1, 1, 1, 0, 0, 0}} + .release() + ->view()); + + auto const lag_2_output_col = cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + preceding, + following, + min_periods, + cudf::make_lag_aggregation(2)); + + expect_columns_equivalent( + *lag_2_output_col, + fixed_width_column_wrapper{{-1, -1, 0, 1, -1, 3, -1, -1, 0, 10, -1, 30}, + {0, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 1}} + .release() + ->view()); +} + +TYPED_TEST(TypedLeadLagWindowTest, TestLeadLagWithDefaults) +{ + using T = TypeParam; + + auto const input_col = fixed_width_column_wrapper{{0, 1, 2, 3, 4, 5, 0, 10, 20, 30, 40, 50}, + {1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1}} + .release(); + auto const input_size = input_col->size(); + auto const grouping_key = fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1}; + auto const grouping_keys = cudf::table_view{std::vector{grouping_key}}; + + auto const preceding = 4; + auto const following = 3; + auto const min_periods = 1; + + auto const default_value = + cudf::make_fixed_width_scalar(detail::fixed_width_type_converter{}(99)); + auto const default_outputs = cudf::make_column_from_scalar(*default_value, input_col->size()); + + auto lead_3_output_col = cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + *default_outputs, + preceding, + following, + min_periods, + cudf::make_lead_aggregation(3)); + expect_columns_equivalent( + *lead_3_output_col, + fixed_width_column_wrapper{{3, 4, 5, 99, 99, 99, 30, 40, 50, 99, 99, 99}, + {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}} + .release() + ->view()); + + auto const lag_2_output_col = cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + *default_outputs, + preceding, + following, + min_periods, + cudf::make_lag_aggregation(2)); + + expect_columns_equivalent( + *lag_2_output_col, + fixed_width_column_wrapper{{99, 99, 0, 1, -1, 3, 99, 99, 0, 10, -1, 30}, + {1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 1}} + .release() + ->view()); +} + +TYPED_TEST(TypedLeadLagWindowTest, TestLeadLagWithDefaultsContainingNulls) +{ + using T = TypeParam; + + auto const input_col = fixed_width_column_wrapper{{0, 1, 2, 3, 4, 5, 0, 10, 20, 30, 40, 50}, + {1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1}} + .release(); + auto const input_size = input_col->size(); + auto const grouping_key = fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1}; + auto const grouping_keys = cudf::table_view{std::vector{grouping_key}}; + + auto const default_outputs = + fixed_width_column_wrapper{{-1, 99, -1, 99, 99, -1, 99, 99, -1, 99, 99, -1}, + {0, 1, 0, 1, 1, 0, 1, 1, 0, 1, 1, 0}} + .release(); + + auto const preceding = 4; + auto const following = 3; + auto const min_periods = 1; + + auto lead_3_output_col = cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + *default_outputs, + preceding, + following, + min_periods, + cudf::make_lead_aggregation(3)); + expect_columns_equivalent( + *lead_3_output_col, + fixed_width_column_wrapper{{3, 4, 5, 99, 99, -1, 30, 40, 50, 99, 99, -1}, + {1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0}} + .release() + ->view()); + + auto const lag_2_output_col = cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + *default_outputs, + preceding, + following, + min_periods, + cudf::make_lag_aggregation(2)); + + expect_columns_equivalent( + *lag_2_output_col, + fixed_width_column_wrapper{{-1, 99, 0, 1, -1, 3, 99, 99, 0, 10, -1, 30}, + {0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 1}} + .release() + ->view()); +} + +TYPED_TEST(TypedLeadLagWindowTest, TestLeadLagWithOutOfRangeOffsets) +{ + using T = TypeParam; + + auto const input_col = fixed_width_column_wrapper{{0, 1, 2, 3, 4, 5, 0, 10, 20, 30, 40, 50}, + {1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1}} + .release(); + auto const input_size = input_col->size(); + auto const grouping_key = fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1}; + auto const grouping_keys = cudf::table_view{std::vector{grouping_key}}; + + auto const default_value = + cudf::make_fixed_width_scalar(detail::fixed_width_type_converter{}(99)); + auto const default_outputs = cudf::make_column_from_scalar(*default_value, input_col->size()); + + auto const preceding = 4; + auto const following = 3; + auto const min_periods = 1; + + auto lead_30_output_col = cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + preceding, + following, + min_periods, + cudf::make_lead_aggregation(30)); + + expect_columns_equivalent( + *lead_30_output_col, + fixed_width_column_wrapper{{-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, + {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}} + .release() + ->view()); + + auto const lag_20_output_col = cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + *default_outputs, + preceding, + following, + min_periods, + cudf::make_lag_aggregation(20)); + + expect_columns_equivalent( + *lag_20_output_col, + fixed_width_column_wrapper{{99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99}, + {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}} + .release() + ->view()); +} + +TYPED_TEST(TypedLeadLagWindowTest, TestLeadLagWithZeroOffsets) +{ + using T = TypeParam; + + auto const input_col = fixed_width_column_wrapper{{0, 1, 2, 3, 4, 5, 0, 10, 20, 30, 40, 50}, + {1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1}} + .release(); + auto const input_size = input_col->size(); + auto const grouping_key = fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1}; + auto const grouping_keys = cudf::table_view{std::vector{grouping_key}}; + + auto const preceding = 4; + auto const following = 3; + auto const min_periods = 1; + + auto lead_0_output_col = cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + preceding, + following, + min_periods, + cudf::make_lead_aggregation(0)); + + expect_columns_equivalent(*lead_0_output_col, *input_col); + + auto const lag_0_output_col = cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + preceding, + following, + min_periods, + cudf::make_lag_aggregation(0)); + ; + + expect_columns_equivalent(*lag_0_output_col, *input_col); +} + +TYPED_TEST(TypedLeadLagWindowTest, TestLeadLagWithNegativeOffsets) +{ + using T = TypeParam; + + auto const input_col = fixed_width_column_wrapper{{0, 1, 2, 3, 4, 5, 0, 10, 20, 30, 40, 50}, + {1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1}} + .release(); + auto const input_size = input_col->size(); + auto const grouping_key = fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1}; + auto const grouping_keys = cudf::table_view{std::vector{grouping_key}}; + + auto const default_value = + cudf::make_fixed_width_scalar(detail::fixed_width_type_converter{}(99)); + auto const default_outputs = cudf::make_column_from_scalar(*default_value, input_col->size()); + + auto const preceding = 4; + auto const following = 3; + auto const min_periods = 1; + + auto lag_minus_3_output_col = cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + *default_outputs, + preceding, + following, + min_periods, + cudf::make_lag_aggregation(-3)); + ; + + expect_columns_equivalent( + *lag_minus_3_output_col, + fixed_width_column_wrapper{{3, 4, 5, 99, 99, 99, 30, 40, 50, 99, 99, 99}, + {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}} + .release() + ->view()); + + auto const lead_minus_2_output_col = + cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + *default_outputs, + preceding, + following, + min_periods, + cudf::make_lead_aggregation(-2)); + + expect_columns_equivalent( + *lead_minus_2_output_col, + fixed_width_column_wrapper{{99, 99, 0, 1, -1, 3, 99, 99, 0, 10, -1, 30}, + {1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 1}} + .release() + ->view()); +} + +TYPED_TEST(TypedLeadLagWindowTest, TestLeadLagWithNoGrouping) +{ + using T = TypeParam; + + auto const input_col = + fixed_width_column_wrapper{{0, 1, 2, 3, 4, 5}, {1, 1, 0, 1, 1, 1}}.release(); + auto const input_size = input_col->size(); + auto const grouping_keys = cudf::table_view{std::vector{}}; + + auto const default_value = + cudf::make_fixed_width_scalar(detail::fixed_width_type_converter{}(99)); + auto const default_outputs = cudf::make_column_from_scalar(*default_value, input_col->size()); + + auto const preceding = 4; + auto const following = 3; + auto const min_periods = 1; + + auto lead_3_output_col = cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + *default_outputs, + preceding, + following, + min_periods, + cudf::make_lead_aggregation(3)); + ; + + expect_columns_equivalent( + *lead_3_output_col, + fixed_width_column_wrapper{{3, 4, 5, 99, 99, 99}, {1, 1, 1, 1, 1, 1}}.release()->view()); + + auto const lag_2_output_col = cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + *default_outputs, + preceding, + following, + min_periods, + cudf::make_lag_aggregation(2)); + + expect_columns_equivalent( + *lag_2_output_col, + fixed_width_column_wrapper{{99, 99, 0, 1, -1, 3}, {1, 1, 1, 1, 0, 1}}.release()->view()); +} + +TYPED_TEST(TypedLeadLagWindowTest, TestLeadLagWithAllNullInput) +{ + using T = TypeParam; + + auto const input_col = fixed_width_column_wrapper{ + {0, 1, 2, 3, 4, 5, 0, 10, 20, 30, 40, 50}, make_counting_transform_iterator(0, [](auto i) { + return false; + })}.release(); + auto const input_size = input_col->size(); + auto const grouping_key = fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1}; + auto const grouping_keys = cudf::table_view{std::vector{grouping_key}}; + + auto const default_value = + cudf::make_fixed_width_scalar(detail::fixed_width_type_converter{}(99)); + auto const default_outputs = cudf::make_column_from_scalar(*default_value, input_col->size()); + + auto const preceding = 4; + auto const following = 3; + auto const min_periods = 1; + + auto lead_3_output_col = cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + *default_outputs, + preceding, + following, + min_periods, + cudf::make_lead_aggregation(3)); + expect_columns_equivalent( + *lead_3_output_col, + fixed_width_column_wrapper{{-1, -1, -1, 99, 99, 99, -1, -1, -1, 99, 99, 99}, + {0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1}} + .release() + ->view()); + + auto const lag_2_output_col = cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + *default_outputs, + preceding, + following, + min_periods, + cudf::make_lag_aggregation(2)); + + expect_columns_equivalent( + *lag_2_output_col, + fixed_width_column_wrapper{{99, 99, -1, -1, -1, -1, 99, 99, -1, -1, -1, -1}, + {1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0}} + .release() + ->view()); +} + +TYPED_TEST(TypedLeadLagWindowTest, DefaultValuesWithoutLeadLag) +{ + // Test that passing default values for window-functions + // other than lead/lag lead to cudf::logic_error. + + using T = TypeParam; + + auto const input_col = fixed_width_column_wrapper{ + {0, 1, 2, 3, 4, 5}, make_counting_transform_iterator(0, [](auto i) { + return true; + })}.release(); + auto const input_size = input_col->size(); + auto const grouping_key = fixed_width_column_wrapper{0, 0, 0, 0, 0, 0}; + auto const grouping_keys = cudf::table_view{std::vector{grouping_key}}; + + auto const default_value = + cudf::make_fixed_width_scalar(detail::fixed_width_type_converter{}(99)); + auto const default_outputs = cudf::make_column_from_scalar(*default_value, input_col->size()); + + auto const preceding = 4; + auto const following = 3; + auto const min_periods = 1; + + auto const assert_aggregation_fails = [&](auto&& aggr) { + EXPECT_THROW(cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + default_outputs->view(), + preceding, + following, + min_periods, + cudf::make_count_aggregation()), + cudf::logic_error); + }; + + auto aggs = {cudf::make_count_aggregation(), cudf::make_min_aggregation()}; + std::for_each( + aggs.begin(), aggs.end(), [&](auto& agg) { assert_aggregation_fails(std::move(agg)); }); +} + +TEST_F(LeadLagWindowTest, LeadLagWithoutFixedWidthInput) +{ + // Check that Lead/Lag aren't supported for non-fixed-width types. + + auto const input_col = strings_column_wrapper{ + {"0", "1", "2", "3", "4", "5"}, make_counting_transform_iterator(0, [](auto i) { + return false; + })}.release(); + auto const input_size = input_col->size(); + auto const grouping_key = fixed_width_column_wrapper{0, 0, 0, 0, 0, 0}; + auto const grouping_keys = cudf::table_view{std::vector{grouping_key}}; + + auto const default_value = cudf::make_string_scalar("99"); + auto const default_outputs = cudf::make_column_from_scalar(*default_value, input_col->size()); + + auto const preceding = 4; + auto const following = 3; + auto const min_periods = 1; + + EXPECT_THROW(cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + default_outputs->view(), + preceding, + following, + min_periods, + cudf::make_lead_aggregation(4)), + cudf::logic_error); +} + +CUDF_TEST_PROGRAM_MAIN()