diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index 6a2b71ae1d9..e5bee4771df 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -101,7 +101,7 @@ ConfigureBench(STREAM_COMPACTION_BENCH stream_compaction/drop_duplicates_benchma ################################################################################################### # - join benchmark -------------------------------------------------------------------------------- -ConfigureBench(JOIN_BENCH join/join_benchmark.cu) +ConfigureBench(JOIN_BENCH join/join_benchmark.cu join/conditional_join_benchmark.cu) ################################################################################################### # - iterator benchmark ---------------------------------------------------------------------------- diff --git a/cpp/benchmarks/ast/transform_benchmark.cpp b/cpp/benchmarks/ast/transform_benchmark.cpp index e1d52d7f0e6..6f131cf0d6a 100644 --- a/cpp/benchmarks/ast/transform_benchmark.cpp +++ b/cpp/benchmarks/ast/transform_benchmark.cpp @@ -30,9 +30,9 @@ #include #include -#include #include #include +#include #include enum class TreeType { @@ -40,11 +40,11 @@ enum class TreeType { // child column reference }; -template +template class AST : public cudf::benchmark { }; -template +template static void BM_ast_transform(benchmark::State& state) { const cudf::size_type table_size{(cudf::size_type)state.range(0)}; @@ -56,10 +56,24 @@ static void BM_ast_transform(benchmark::State& state) auto columns = std::vector(n_cols); auto data_iterator = thrust::make_counting_iterator(0); - std::generate_n(column_wrappers.begin(), n_cols, [=]() { - return cudf::test::fixed_width_column_wrapper(data_iterator, - data_iterator + table_size); - }); + + if constexpr (Nullable) { + auto validities = std::vector(table_size); + std::random_device rd; + std::mt19937 gen(rd()); + + std::generate( + validities.begin(), validities.end(), [&]() { return gen() > (0.5 * gen.max()); }); + std::generate_n(column_wrappers.begin(), n_cols, [=]() { + return cudf::test::fixed_width_column_wrapper( + data_iterator, data_iterator + table_size, validities.begin()); + }); + } else { + std::generate_n(column_wrappers.begin(), n_cols, [=]() { + return cudf::test::fixed_width_column_wrapper(data_iterator, + data_iterator + table_size); + }); + } std::transform( column_wrappers.begin(), column_wrappers.end(), columns.begin(), [](auto const& col) { return static_cast(col); @@ -113,22 +127,23 @@ static void BM_ast_transform(benchmark::State& state) (tree_levels + 1) * sizeof(key_type)); } -#define AST_TRANSFORM_BENCHMARK_DEFINE(name, key_type, tree_type, reuse_columns) \ - BENCHMARK_TEMPLATE_DEFINE_F(AST, name, key_type, tree_type, reuse_columns) \ - (::benchmark::State & st) { BM_ast_transform(st); } - -AST_TRANSFORM_BENCHMARK_DEFINE(ast_int32_imbalanced_unique, - int32_t, - TreeType::IMBALANCED_LEFT, - false); -AST_TRANSFORM_BENCHMARK_DEFINE(ast_int32_imbalanced_reuse, - int32_t, - TreeType::IMBALANCED_LEFT, - true); -AST_TRANSFORM_BENCHMARK_DEFINE(ast_double_imbalanced_unique, - double, - TreeType::IMBALANCED_LEFT, - false); +#define AST_TRANSFORM_BENCHMARK_DEFINE(name, key_type, tree_type, reuse_columns, nullable) \ + BENCHMARK_TEMPLATE_DEFINE_F(AST, name, key_type, tree_type, reuse_columns, nullable) \ + (::benchmark::State & st) { BM_ast_transform(st); } + +AST_TRANSFORM_BENCHMARK_DEFINE( + ast_int32_imbalanced_unique, int32_t, TreeType::IMBALANCED_LEFT, false, false); +AST_TRANSFORM_BENCHMARK_DEFINE( + ast_int32_imbalanced_reuse, int32_t, TreeType::IMBALANCED_LEFT, true, false); +AST_TRANSFORM_BENCHMARK_DEFINE( + ast_double_imbalanced_unique, double, TreeType::IMBALANCED_LEFT, false, false); + +AST_TRANSFORM_BENCHMARK_DEFINE( + ast_int32_imbalanced_unique_nulls, int32_t, TreeType::IMBALANCED_LEFT, false, true); +AST_TRANSFORM_BENCHMARK_DEFINE( + ast_int32_imbalanced_reuse_nulls, int32_t, TreeType::IMBALANCED_LEFT, true, true); +AST_TRANSFORM_BENCHMARK_DEFINE( + ast_double_imbalanced_unique_nulls, double, TreeType::IMBALANCED_LEFT, false, true); static void CustomRanges(benchmark::internal::Benchmark* b) { @@ -155,3 +170,18 @@ BENCHMARK_REGISTER_F(AST, ast_double_imbalanced_unique) ->Apply(CustomRanges) ->Unit(benchmark::kMillisecond) ->UseManualTime(); + +BENCHMARK_REGISTER_F(AST, ast_int32_imbalanced_unique_nulls) + ->Apply(CustomRanges) + ->Unit(benchmark::kMillisecond) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(AST, ast_int32_imbalanced_reuse_nulls) + ->Apply(CustomRanges) + ->Unit(benchmark::kMillisecond) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(AST, ast_double_imbalanced_unique_nulls) + ->Apply(CustomRanges) + ->Unit(benchmark::kMillisecond) + ->UseManualTime(); diff --git a/cpp/benchmarks/join/conditional_join_benchmark.cu b/cpp/benchmarks/join/conditional_join_benchmark.cu new file mode 100644 index 00000000000..4a655e29f74 --- /dev/null +++ b/cpp/benchmarks/join/conditional_join_benchmark.cu @@ -0,0 +1,379 @@ +/* + * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include "generate_input_tables.cuh" + +template +class ConditionalJoin : public cudf::benchmark { +}; + +template +static void BM_join(benchmark::State& state, Join JoinFunc) +{ + const cudf::size_type build_table_size{(cudf::size_type)state.range(0)}; + const cudf::size_type probe_table_size{(cudf::size_type)state.range(1)}; + const cudf::size_type rand_max_val{build_table_size * 2}; + const double selectivity = 0.3; + const bool is_build_table_key_unique = true; + + // Generate build and probe tables + cudf::test::UniformRandomGenerator rand_gen(0, build_table_size); + auto build_random_null_mask = [&rand_gen](int size) { + if (Nullable) { + // roughly 25% nulls + auto validity = thrust::make_transform_iterator( + thrust::make_counting_iterator(0), + [&rand_gen](auto i) { return (rand_gen.generate() & 3) == 0; }); + return cudf::test::detail::make_null_mask(validity, validity + size); + } else { + return cudf::create_null_mask(size, cudf::mask_state::UNINITIALIZED); + } + }; + + std::unique_ptr build_key_column = [&]() { + return Nullable ? cudf::make_numeric_column(cudf::data_type(cudf::type_to_id()), + build_table_size, + build_random_null_mask(build_table_size)) + : cudf::make_numeric_column(cudf::data_type(cudf::type_to_id()), + build_table_size); + }(); + std::unique_ptr probe_key_column = [&]() { + return Nullable ? cudf::make_numeric_column(cudf::data_type(cudf::type_to_id()), + probe_table_size, + build_random_null_mask(probe_table_size)) + : cudf::make_numeric_column(cudf::data_type(cudf::type_to_id()), + probe_table_size); + }(); + + generate_input_tables( + build_key_column->mutable_view().data(), + build_table_size, + probe_key_column->mutable_view().data(), + probe_table_size, + selectivity, + rand_max_val, + is_build_table_key_unique); + + auto payload_data_it = thrust::make_counting_iterator(0); + cudf::test::fixed_width_column_wrapper build_payload_column( + payload_data_it, payload_data_it + build_table_size); + + cudf::test::fixed_width_column_wrapper probe_payload_column( + payload_data_it, payload_data_it + probe_table_size); + + CHECK_CUDA(0); + + cudf::table_view build_table({build_key_column->view(), build_payload_column}); + cudf::table_view probe_table({probe_key_column->view(), probe_payload_column}); + + // Benchmark the inner join operation + + for (auto _ : state) { + cuda_event_timer raii(state, true, rmm::cuda_stream_default); + + // Common column references. + const auto col_ref_left_0 = cudf::ast::column_reference(0); + const auto col_ref_right_0 = cudf::ast::column_reference(0, cudf::ast::table_reference::RIGHT); + auto left_zero_eq_right_zero = + cudf::ast::expression(cudf::ast::ast_operator::EQUAL, col_ref_left_0, col_ref_right_0); + + auto result = + JoinFunc(probe_table, build_table, left_zero_eq_right_zero, cudf::null_equality::UNEQUAL); + } +} + +#define CONDITIONAL_INNER_JOIN_BENCHMARK_DEFINE(name, key_type, payload_type, nullable) \ + BENCHMARK_TEMPLATE_DEFINE_F(ConditionalJoin, name, key_type, payload_type) \ + (::benchmark::State & st) \ + { \ + auto join = [](cudf::table_view const& left, \ + cudf::table_view const& right, \ + cudf::ast::expression binary_pred, \ + cudf::null_equality compare_nulls) { \ + return cudf::conditional_inner_join(left, right, binary_pred, compare_nulls); \ + }; \ + BM_join(st, join); \ + } + +CONDITIONAL_INNER_JOIN_BENCHMARK_DEFINE(conditional_inner_join_32bit, int32_t, int32_t, false); +CONDITIONAL_INNER_JOIN_BENCHMARK_DEFINE(conditional_inner_join_64bit, int64_t, int64_t, false); +CONDITIONAL_INNER_JOIN_BENCHMARK_DEFINE(conditional_inner_join_32bit_nulls, int32_t, int32_t, true); +CONDITIONAL_INNER_JOIN_BENCHMARK_DEFINE(conditional_inner_join_64bit_nulls, int64_t, int64_t, true); + +#define CONDITIONAL_LEFT_JOIN_BENCHMARK_DEFINE(name, key_type, payload_type, nullable) \ + BENCHMARK_TEMPLATE_DEFINE_F(ConditionalJoin, name, key_type, payload_type) \ + (::benchmark::State & st) \ + { \ + auto join = [](cudf::table_view const& left, \ + cudf::table_view const& right, \ + cudf::ast::expression binary_pred, \ + cudf::null_equality compare_nulls) { \ + return cudf::conditional_left_join(left, right, binary_pred, compare_nulls); \ + }; \ + BM_join(st, join); \ + } + +CONDITIONAL_LEFT_JOIN_BENCHMARK_DEFINE(conditional_left_join_32bit, int32_t, int32_t, false); +CONDITIONAL_LEFT_JOIN_BENCHMARK_DEFINE(conditional_left_join_64bit, int64_t, int64_t, false); +CONDITIONAL_LEFT_JOIN_BENCHMARK_DEFINE(conditional_left_join_32bit_nulls, int32_t, int32_t, true); +CONDITIONAL_LEFT_JOIN_BENCHMARK_DEFINE(conditional_left_join_64bit_nulls, int64_t, int64_t, true); + +#define CONDITIONAL_FULL_JOIN_BENCHMARK_DEFINE(name, key_type, payload_type, nullable) \ + BENCHMARK_TEMPLATE_DEFINE_F(ConditionalJoin, name, key_type, payload_type) \ + (::benchmark::State & st) \ + { \ + auto join = [](cudf::table_view const& left, \ + cudf::table_view const& right, \ + cudf::ast::expression binary_pred, \ + cudf::null_equality compare_nulls) { \ + return cudf::conditional_inner_join(left, right, binary_pred, compare_nulls); \ + }; \ + BM_join(st, join); \ + } + +CONDITIONAL_FULL_JOIN_BENCHMARK_DEFINE(conditional_full_join_32bit, int32_t, int32_t, false); +CONDITIONAL_FULL_JOIN_BENCHMARK_DEFINE(conditional_full_join_64bit, int64_t, int64_t, false); +CONDITIONAL_FULL_JOIN_BENCHMARK_DEFINE(conditional_full_join_32bit_nulls, int32_t, int32_t, true); +CONDITIONAL_FULL_JOIN_BENCHMARK_DEFINE(conditional_full_join_64bit_nulls, int64_t, int64_t, true); + +#define CONDITIONAL_LEFT_ANTI_JOIN_BENCHMARK_DEFINE(name, key_type, payload_type, nullable) \ + BENCHMARK_TEMPLATE_DEFINE_F(ConditionalJoin, name, key_type, payload_type) \ + (::benchmark::State & st) \ + { \ + auto join = [](cudf::table_view const& left, \ + cudf::table_view const& right, \ + cudf::ast::expression binary_pred, \ + cudf::null_equality compare_nulls) { \ + return cudf::conditional_left_anti_join(left, right, binary_pred, compare_nulls); \ + }; \ + BM_join(st, join); \ + } + +CONDITIONAL_LEFT_ANTI_JOIN_BENCHMARK_DEFINE(conditional_left_anti_join_32bit, + int32_t, + int32_t, + false); +CONDITIONAL_LEFT_ANTI_JOIN_BENCHMARK_DEFINE(conditional_left_anti_join_64bit, + int64_t, + int64_t, + false); +CONDITIONAL_LEFT_ANTI_JOIN_BENCHMARK_DEFINE(conditional_left_anti_join_32bit_nulls, + int32_t, + int32_t, + true); +CONDITIONAL_LEFT_ANTI_JOIN_BENCHMARK_DEFINE(conditional_left_anti_join_64bit_nulls, + int64_t, + int64_t, + true); + +#define CONDITIONAL_LEFT_SEMI_JOIN_BENCHMARK_DEFINE(name, key_type, payload_type, nullable) \ + BENCHMARK_TEMPLATE_DEFINE_F(ConditionalJoin, name, key_type, payload_type) \ + (::benchmark::State & st) \ + { \ + auto join = [](cudf::table_view const& left, \ + cudf::table_view const& right, \ + cudf::ast::expression binary_pred, \ + cudf::null_equality compare_nulls) { \ + return cudf::conditional_left_semi_join(left, right, binary_pred, compare_nulls); \ + }; \ + BM_join(st, join); \ + } + +CONDITIONAL_LEFT_SEMI_JOIN_BENCHMARK_DEFINE(conditional_left_semi_join_32bit, + int32_t, + int32_t, + false); +CONDITIONAL_LEFT_SEMI_JOIN_BENCHMARK_DEFINE(conditional_left_semi_join_64bit, + int64_t, + int64_t, + false); +CONDITIONAL_LEFT_SEMI_JOIN_BENCHMARK_DEFINE(conditional_left_semi_join_32bit_nulls, + int32_t, + int32_t, + true); +CONDITIONAL_LEFT_SEMI_JOIN_BENCHMARK_DEFINE(conditional_left_semi_join_64bit_nulls, + int64_t, + int64_t, + true); + +// inner join ----------------------------------------------------------------------- +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_inner_join_32bit) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + // TODO: The below benchmark is slow, but can be useful to validate that the + // code works for large data sets. This benchmark was used to compare to the + // otherwise equivalent nullable benchmark below, which has memory errors for + // sufficiently large data sets. + //->Args({1'000'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_inner_join_64bit) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_inner_join_32bit_nulls) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_inner_join_64bit_nulls) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +// left join ----------------------------------------------------------------------- +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_join_32bit) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_join_64bit) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_join_32bit_nulls) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_join_64bit_nulls) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +// full join ----------------------------------------------------------------------- +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_full_join_32bit) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_full_join_64bit) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_full_join_32bit_nulls) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_full_join_64bit_nulls) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +// left anti-join ------------------------------------------------------------- +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_anti_join_32bit) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_anti_join_64bit) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_anti_join_32bit_nulls) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_anti_join_64bit_nulls) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +// left semi-join ------------------------------------------------------------- +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_semi_join_32bit) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_semi_join_64bit) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_semi_join_32bit_nulls) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_semi_join_64bit_nulls) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); diff --git a/cpp/include/cudf/ast/detail/linearizer.hpp b/cpp/include/cudf/ast/detail/linearizer.hpp index 166a0408703..67474e08877 100644 --- a/cpp/include/cudf/ast/detail/linearizer.hpp +++ b/cpp/include/cudf/ast/detail/linearizer.hpp @@ -103,10 +103,24 @@ class linearizer { /** * @brief Construct a new linearizer object * + * @param expr The expression to create an evaluable linearizer for. + * @param left The left table used for evaluating the abstract syntax tree. + * @param right The right table used for evaluating the abstract syntax tree. + */ + linearizer(detail::node const& expr, cudf::table_view left, cudf::table_view right) + : _left(left), _right(right), _node_count(0), _intermediate_counter() + { + expr.accept(*this); + } + + /** + * @brief Construct a new linearizer object + * + * @param expr The expression to create an evaluable linearizer for. * @param table The table used for evaluating the abstract syntax tree. */ linearizer(detail::node const& expr, cudf::table_view table) - : _table(table), _node_count(0), _intermediate_counter() + : _left(table), _right(table), _node_count(0), _intermediate_counter() { expr.accept(*this); } @@ -217,7 +231,8 @@ class linearizer { cudf::size_type add_data_reference(detail::device_data_reference data_ref); // State information about the "linearized" GPU execution plan - cudf::table_view _table; + cudf::table_view const& _left; + cudf::table_view const& _right; cudf::size_type _node_count; intermediate_counter _intermediate_counter; std::vector _data_references; diff --git a/cpp/include/cudf/ast/detail/transform.cuh b/cpp/include/cudf/ast/detail/transform.cuh index f69927a3601..e56b4fb2281 100644 --- a/cpp/include/cudf/ast/detail/transform.cuh +++ b/cpp/include/cudf/ast/detail/transform.cuh @@ -31,6 +31,8 @@ #include +#include + #include #include @@ -40,132 +42,375 @@ namespace ast { namespace detail { -// Forward declaration -struct row_evaluator; +// Type trait for wrapping nullable types in a thrust::optional. Non-nullable +// types are returned as is. +template +struct possibly_null_value; -struct row_output { - public: - __device__ row_output(row_evaluator const& evaluator) : evaluator(evaluator) {} +template +struct possibly_null_value { + using type = thrust::optional; +}; + +template +struct possibly_null_value { + using type = T; +}; + +template +using possibly_null_value_t = typename possibly_null_value::type; + +// Type used for intermediate storage in expression evaluation. +template +using IntermediateDataType = possibly_null_value_t; +/** + * @brief A container for capturing the output of an evaluated expression. + * + * This class is designed to be passed by reference as the first argument to + * expression_evaluator::evaluate. The API is designed such that template + * specializations for specific output types will be able to customize setting + * behavior if necessary. The class leverages CRTP to define a suitable interface + * for the `expression_evaluator` at compile-time and enforce this API on its + * subclasses to get around the lack of device-side polymorphism. + * + * @tparam Subclass The subclass to dispatch methods to. + * @tparam T The underlying data type. + * @tparam has_nulls Whether or not the result data is nullable. + */ +template +struct expression_result { /** - * @brief Resolves an output data reference and assigns result value. - * - * Only output columns (COLUMN) and intermediates (INTERMEDIATE) are supported as output reference - * types. Intermediates must be of fixed width less than or equal to sizeof(std::int64_t). This - * requirement on intermediates is enforced by the linearizer. - * - * @tparam Element Type of result element. - * @param device_data_reference Data reference to resolve. - * @param row_index Row index of data column. - * @param result Value to assign to output. + * Helper function to get the subclass type to dispatch methods to. */ - template ())> - __device__ void resolve_output(detail::device_data_reference device_data_reference, - cudf::size_type row_index, - Element result) const; - // Definition below after row_evaluator is a complete type - - template ())> - __device__ void resolve_output(detail::device_data_reference device_data_reference, - cudf::size_type row_index, - Element result) const + Subclass& subclass() { return static_cast(*this); } + Subclass const& subclass() const { return static_cast(*this); } + + // TODO: The index is ignored by the value subclass, but is included in this + // signature because it is required by the implementation in the template + // specialization for column views. It would be nice to clean this up, see + // the related TODO below. Note that storing the index in the class on + // construction (which would result in a cleaner delineation of the API for + // the derived types) results in a significant performance penalty because + // the index is pushed down the memory hierarchy by the time it needs to be + // used, whereas passing it as a parameter keeps it in registers for fast + // access at the point where indexing occurs. + template + __device__ void set_value(cudf::size_type index, possibly_null_value_t result) { - cudf_assert(false && "Invalid type in resolve_output."); + subclass()->set_value(); } - private: - row_evaluator const& evaluator; + __device__ bool is_valid() const { subclass()->is_valid(); } + + __device__ T value() const { subclass()->value(); } }; -template -struct unary_row_output : public row_output { - __device__ unary_row_output(row_evaluator const& evaluator) : row_output(evaluator) {} +/** + * @brief A container for capturing the output of an evaluated expression in a scalar. + * + * This subclass of `expression_result` functions as an owning container of a + * (possibly nullable) scalar type that can be written to by the + * expression_evaluator. The data (and its validity) can then be accessed. + * + * @tparam T The underlying data type. + * @tparam has_nulls Whether or not the result data is nullable. + */ +template +struct value_expression_result + : public expression_result, T, has_nulls> { + __device__ value_expression_result() {} - template < - ast_operator op, - std::enable_if_t, Input>>* = nullptr> - __device__ void operator()(cudf::size_type row_index, - Input input, - detail::device_data_reference output) const + template + __device__ void set_value(cudf::size_type index, possibly_null_value_t result) { - using OperatorFunctor = detail::operator_functor; - using Out = cuda::std::invoke_result_t; - resolve_output(output, row_index, OperatorFunctor{}(input)); + if constexpr (std::is_same_v) { + _obj = result; + } else { + cudf_assert(false && "Output type does not match container type."); + } } - template < - ast_operator op, - std::enable_if_t, Input>>* = nullptr> - __device__ void operator()(cudf::size_type row_index, - Input input, - detail::device_data_reference output) const + /** + * @brief Returns true if the underlying data is valid and false otherwise. + */ + __device__ bool is_valid() const { - cudf_assert(false && "Invalid unary dispatch operator for the provided input."); + if constexpr (has_nulls) { return _obj.has_value(); } + return true; } + + /** + * @brief Returns the underlying data. + * + * @throws thrust::bad_optional_access if the underlying data is not valid. + */ + __device__ T value() const + { + // Using two separate constexprs silences compiler warnings, whereas an + // if/else does not. An unconditional return is not ignored by the compiler + // when has_nulls is true and therefore raises a compiler error. + if constexpr (has_nulls) { return _obj.value(); } + if constexpr (!has_nulls) { return _obj; } + } + + possibly_null_value_t + _obj; ///< The underlying data value, or a nullable version of it. }; -template -struct binary_row_output : public row_output { - __device__ binary_row_output(row_evaluator const& evaluator) : row_output(evaluator) {} - - template < - ast_operator op, - std::enable_if_t, LHS, RHS>>* = nullptr> - __device__ void operator()(cudf::size_type row_index, - LHS lhs, - RHS rhs, - detail::device_data_reference output) const +// TODO: The below implementation significantly differs from the default +// implementation above due to the non-owning nature of the container and the +// usage of the index. It would be ideal to unify these further if possible. + +/** + * @brief A container for capturing the output of an evaluated expression in a column. + * + * This subclass of `expression_result` functions as a non-owning container + * that transparently passes calls through to an underlying mutable view to a + * column. Not all methods are implemented + * + * @tparam has_nulls Whether or not the result data is nullable. + */ +template +struct mutable_column_expression_result + : public expression_result, + mutable_column_device_view, + has_nulls> { + __device__ mutable_column_expression_result(mutable_column_device_view& obj) : _obj(obj) {} + + template + __device__ void set_value(cudf::size_type index, possibly_null_value_t result) + { + if constexpr (has_nulls) { + if (result.has_value()) { + _obj.template element(index) = *result; + _obj.set_valid(index); + } else { + _obj.set_null(index); + } + } else { + _obj.template element(index) = result; + } + } + + /** + * @brief Not implemented for this specialization. + */ + __device__ bool is_valid() const { - using OperatorFunctor = detail::operator_functor; - using Out = cuda::std::invoke_result_t; - resolve_output(output, row_index, OperatorFunctor{}(lhs, rhs)); + // Not implemented since it would require modifying the API in the parent class to accept an + // index. + cudf_assert(false && "This method is not implemented."); } - template , LHS, RHS>>* = - nullptr> - __device__ void operator()(cudf::size_type row_index, - LHS lhs, - RHS rhs, - detail::device_data_reference output) const + /** + * @brief Not implemented for this specialization. + */ + __device__ mutable_column_device_view value() const { - cudf_assert(false && "Invalid binary dispatch operator for the provided input."); + // Not implemented since it would require modifying the API in the parent class to accept an + // index. + cudf_assert(false && "This method is not implemented."); } + + mutable_column_device_view& _obj; ///< The column to which the data is written. }; /** - * @brief An expression evaluator owned by a single thread operating on rows of a table. + * @brief A container of all device data required to evaluate an expression on tables. + * + * This struct should never be instantiated directly. It is created by the + * `ast_plan` on construction, and the resulting member is publicly accessible + * for passing to kernels for constructing an `expression_evaluator`. * - * This class is designed for n-ary transform evaluation. Currently this class assumes that there's - * only one relevant "row index" in its methods, which corresponds to a row in a single input table - * and the same row index in an output column. */ -struct row_evaluator { - friend struct row_output; - template - friend struct unary_row_output; - template - friend struct binary_row_output; +struct device_ast_plan { + device_span data_references; + device_span literals; + device_span operators; + device_span operator_source_indices; + cudf::size_type num_intermediates; + int shmem_per_thread; +}; + +/** + * @brief Preprocessor for an expression acting on tables to generate data suitable for AST + * expression evaluation on the GPU. + * + * On construction, an AST plan creates a single "packed" host buffer of all + * data arrays that will be necessary to evaluate an expression on a pair of + * tables. This data is copied to a single contiguous device buffer, and + * pointers are generated to the individual components. Because the plan tends + * to be small, this is the most efficient approach for low latency. All the + * data required on the GPU can be accessed via the convenient `dev_plan` + * member struct, which can be used to construct an `expression_evaluator` on + * the device. + * + * Note that the resulting device data cannot be used once this class goes out of scope. + */ +struct ast_plan { + /** + * @brief Construct an AST plan for an expression operating on two tables. + * + * @param expr The expression for which to construct a plan. + * @param left The left table on which the expression acts. + * @param right The right table on which the expression acts. + * @param has_nulls Boolean indicator of whether or not the data contains nulls. + * @param stream Stream view on which to allocate resources and queue execution. + * @param mr Device memory resource used to allocate the returned column's device. + */ + ast_plan(detail::node const& expr, + cudf::table_view left, + cudf::table_view right, + bool has_nulls, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) + : _linearizer(expr, left, right) + { + std::vector sizes; + std::vector data_pointers; + + extract_size_and_pointer(_linearizer.data_references(), sizes, data_pointers); + extract_size_and_pointer(_linearizer.literals(), sizes, data_pointers); + extract_size_and_pointer(_linearizer.operators(), sizes, data_pointers); + extract_size_and_pointer(_linearizer.operator_source_indices(), sizes, data_pointers); + + // Create device buffer + auto const buffer_size = std::accumulate(sizes.cbegin(), sizes.cend(), 0); + auto buffer_offsets = std::vector(sizes.size()); + thrust::exclusive_scan(sizes.cbegin(), sizes.cend(), buffer_offsets.begin(), 0); + + auto h_data_buffer = std::make_unique(buffer_size); + for (unsigned int i = 0; i < data_pointers.size(); ++i) { + std::memcpy(h_data_buffer.get() + buffer_offsets[i], data_pointers[i], sizes[i]); + } + + _device_data_buffer = rmm::device_buffer(h_data_buffer.get(), buffer_size, stream, mr); + stream.synchronize(); + + // Create device pointers to components of plan + auto device_data_buffer_ptr = static_cast(_device_data_buffer.data()); + dev_plan.data_references = device_span( + reinterpret_cast(device_data_buffer_ptr + + buffer_offsets[0]), + _linearizer.data_references().size()); + dev_plan.literals = device_span( + reinterpret_cast( + device_data_buffer_ptr + buffer_offsets[1]), + _linearizer.literals().size()); + dev_plan.operators = device_span( + reinterpret_cast(device_data_buffer_ptr + buffer_offsets[2]), + _linearizer.operators().size()); + dev_plan.operator_source_indices = device_span( + reinterpret_cast(device_data_buffer_ptr + buffer_offsets[3]), + _linearizer.operator_source_indices().size()); + dev_plan.num_intermediates = _linearizer.intermediate_count(); + dev_plan.shmem_per_thread = static_cast( + (has_nulls ? sizeof(IntermediateDataType) : sizeof(IntermediateDataType)) * + dev_plan.num_intermediates); + } + + /** + * @brief Construct an AST plan for an expression operating on one table. + * + * @param expr The expression for which to construct a plan. + * @param table The table on which the expression acts. + * @param has_nulls Boolean indicator of whether or not the data contains nulls. + * @param stream Stream view on which to allocate resources and queue execution. + * @param mr Device memory resource used to allocate the returned column's device. + */ + ast_plan(detail::node const& expr, + cudf::table_view table, + bool has_nulls, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) + : ast_plan(expr, table, table, has_nulls, stream, mr) + { + } + + cudf::data_type output_type() const { return _linearizer.root_data_type(); } + + device_ast_plan + dev_plan; ///< The collection of data required to evaluate the expression on the device. + + private: + /** + * @brief Helper function for adding components (operators, literals, etc) to AST plan + * + * @tparam T The underlying type of the input `std::vector` + * @param[in] v The `std::vector` containing components (operators, literals, etc). + * @param[in,out] sizes The `std::vector` containing the size of each data buffer. + * @param[in,out] data_pointers The `std::vector` containing pointers to each data buffer. + */ + template + void extract_size_and_pointer(std::vector const& v, + std::vector& sizes, + std::vector& data_pointers) + { + auto const data_size = sizeof(T) * v.size(); + sizes.push_back(data_size); + data_pointers.push_back(v.data()); + } + + rmm::device_buffer + _device_data_buffer; ///< The device-side data buffer containing the plan information, which is + ///< owned by this class and persists until it is destroyed. + linearizer const _linearizer; ///< The linearizer created from the provided expression that is + ///< used to construct device-side operators and references. +}; + +/** + * @brief The principal object for evaluating AST expressions on device. + * + * This class is designed for n-ary transform evaluation. It operates on two + * tables. + */ +template +struct expression_evaluator { public: /** - * @brief Construct a row evaluator. + * @brief Construct an expression evaluator acting on two tables. + * + * @param left View of the left table view used for evaluation. + * @param right View of the right table view used for evaluation. + * @param plan The collection of device references representing the expression to evaluate. + * @param thread_intermediate_storage Pointer to this thread's portion of shared memory for + * storing intermediates. + * @param compare_nulls Whether the equality operator returns true or false for two nulls. + + */ + __device__ expression_evaluator(table_device_view const& left, + table_device_view const& right, + device_ast_plan const& plan, + IntermediateDataType* thread_intermediate_storage, + null_equality compare_nulls = null_equality::EQUAL) + : left(left), + right(right), + plan(plan), + thread_intermediate_storage(thread_intermediate_storage), + compare_nulls(compare_nulls) + { + } + + /** + * @brief Construct an expression evaluator acting on one table. * - * @param table The table device view used for evaluation. - * @param literals Array of literal values used for evaluation. + * @param table View of the table view used for evaluation. + * @param plan The collection of device references representing the expression to evaluate. * @param thread_intermediate_storage Pointer to this thread's portion of shared memory for * storing intermediates. - * @param output_column The output column where results are stored. + * @param compare_nulls Whether the equality operator returns true or false for two nulls. */ - __device__ row_evaluator( - table_device_view const& table, - device_span literals, - std::int64_t* thread_intermediate_storage, - mutable_column_device_view* output_column) - : table(table), - literals(literals), + __device__ expression_evaluator(table_device_view const& table, + device_ast_plan const& plan, + IntermediateDataType* thread_intermediate_storage, + null_equality compare_nulls = null_equality::EQUAL) + : left(table), + right(table), + plan(plan), thread_intermediate_storage(thread_intermediate_storage), - output_column(output_column) + compare_nulls(compare_nulls) { } @@ -177,241 +422,437 @@ struct row_evaluator { * sizeof(std::int64_t). This requirement on intermediates is enforced by the linearizer. * * @tparam Element Type of element to return. + * @tparam has_nulls Whether or not the result data is nullable. * @param device_data_reference Data reference to resolve. * @param row_index Row index of data column. - * @return Element + * @return Element The type- and null-resolved data. */ template ())> - __device__ Element resolve_input(detail::device_data_reference device_data_reference, - cudf::size_type row_index) const + __device__ possibly_null_value_t resolve_input( + detail::device_data_reference device_data_reference, cudf::size_type row_index) const { auto const data_index = device_data_reference.data_index; auto const ref_type = device_data_reference.reference_type; + // TODO: Everywhere in the code assumes that the table reference is either + // left or right. Should we error-check somewhere to prevent + // table_reference::OUTPUT from being specified? + auto const& table = device_data_reference.table_source == table_reference::LEFT ? left : right; + using ReturnType = possibly_null_value_t; if (ref_type == detail::device_data_reference_type::COLUMN) { - return table.column(data_index).element(row_index); + // If we have nullable data, return an empty nullable type with no value if the data is null. + if constexpr (has_nulls) { + return table.column(data_index).is_valid(row_index) + ? ReturnType(table.column(data_index).element(row_index)) + : ReturnType(); + + } else { + return ReturnType(table.column(data_index).element(row_index)); + } } else if (ref_type == detail::device_data_reference_type::LITERAL) { - return literals[data_index].value(); + return ReturnType(plan.literals[data_index].value()); } else { // Assumes ref_type == detail::device_data_reference_type::INTERMEDIATE // Using memcpy instead of reinterpret_cast for safe type aliasing // Using a temporary variable ensures that the compiler knows the result is aligned - std::int64_t intermediate = thread_intermediate_storage[data_index]; - Element tmp; - memcpy(&tmp, &intermediate, sizeof(Element)); + IntermediateDataType intermediate = thread_intermediate_storage[data_index]; + ReturnType tmp; + memcpy(&tmp, &intermediate, sizeof(ReturnType)); return tmp; } + // Unreachable return used to silence compiler warnings. + return {}; } template ())> - __device__ Element resolve_input(detail::device_data_reference device_data_reference, - cudf::size_type row_index) const + __device__ possibly_null_value_t resolve_input( + detail::device_data_reference device_data_reference, cudf::size_type row_index) const { cudf_assert(false && "Unsupported type in resolve_input."); + // Unreachable return used to silence compiler warnings. return {}; } /** * @brief Callable to perform a unary operation. * - * @tparam OperatorFunctor Functor that performs desired operation when `operator()` is called. * @tparam Input Type of input value. - * @param row_index Row index of data column(s). + * @tparam OutputType The container type that data will be inserted into. + * + * @param output_object The container that data will be inserted into. + * @param input_row_index The row to pull the data from the input table. * @param input Input data reference. * @param output Output data reference. + * @param output_row_index The row in the output to insert the result. + * @param op The operator to act with. */ - template - __device__ void operator()(cudf::size_type row_index, - detail::device_data_reference input, - detail::device_data_reference output, - ast_operator op) const + template + __device__ void operator()(OutputType& output_object, + const cudf::size_type input_row_index, + const detail::device_data_reference input, + const detail::device_data_reference output, + const cudf::size_type output_row_index, + const ast_operator op) const { - auto const typed_input = resolve_input(input, row_index); - ast_operator_dispatcher(op, unary_row_output(*this), row_index, typed_input, output); + auto const typed_input = resolve_input(input, input_row_index); + ast_operator_dispatcher(op, + unary_expression_output_handler(*this), + output_object, + output_row_index, + typed_input, + output); } /** - * @brief Callable to perform a binary operation. + * @brief Callable to perform a unary operation. + * + * @tparam LHS Type of the left input value. + * @tparam RHS Type of the right input value. + * @tparam OutputType The container type that data will be inserted into. * - * @tparam OperatorFunctor Functor that performs desired operation when `operator()` is called. - * @tparam LHS Type of left input value. - * @tparam RHS Type of right input value. - * @param row_index Row index of data column(s). + * @param output_object The container that data will be inserted into. + * @param left_row_index The row to pull the data from the left table. + * @param right_row_index The row to pull the data from the right table. * @param lhs Left input data reference. * @param rhs Right input data reference. * @param output Output data reference. + * @param output_row_index The row in the output to insert the result. + * @param op The operator to act with. */ - template - __device__ void operator()(cudf::size_type row_index, - detail::device_data_reference lhs, - detail::device_data_reference rhs, - detail::device_data_reference output, - ast_operator op) const + template + __device__ void operator()(OutputType& output_object, + const cudf::size_type left_row_index, + const cudf::size_type right_row_index, + const detail::device_data_reference lhs, + const detail::device_data_reference rhs, + const detail::device_data_reference output, + const cudf::size_type output_row_index, + const ast_operator op) const { - auto const typed_lhs = resolve_input(lhs, row_index); - auto const typed_rhs = resolve_input(rhs, row_index); - ast_operator_dispatcher( - op, binary_row_output(*this), row_index, typed_lhs, typed_rhs, output); + auto const typed_lhs = resolve_input(lhs, left_row_index); + auto const typed_rhs = resolve_input(rhs, right_row_index); + ast_operator_dispatcher(op, + binary_expression_output_handler(*this), + output_object, + output_row_index, + typed_lhs, + typed_rhs, + output); } template >* = nullptr> - __device__ void operator()(cudf::size_type row_index, - detail::device_data_reference lhs, - detail::device_data_reference rhs, - detail::device_data_reference output) const + __device__ void operator()(OutputType& output_object, + cudf::size_type left_row_index, + cudf::size_type right_row_index, + const detail::device_data_reference lhs, + const detail::device_data_reference rhs, + const detail::device_data_reference output, + cudf::size_type output_row_index, + const ast_operator op) const { cudf_assert(false && "Invalid binary dispatch operator for the provided input."); } - private: - table_device_view const& table; - device_span literals; - std::int64_t* thread_intermediate_storage; - mutable_column_device_view* output_column; -}; - -template ()>*> -__device__ void row_output::resolve_output(detail::device_data_reference device_data_reference, - cudf::size_type row_index, - Element result) const -{ - auto const ref_type = device_data_reference.reference_type; - if (ref_type == detail::device_data_reference_type::COLUMN) { - evaluator.output_column->element(row_index) = result; - } else { // Assumes ref_type == detail::device_data_reference_type::INTERMEDIATE - // Using memcpy instead of reinterpret_cast for safe type aliasing. - // Using a temporary variable ensures that the compiler knows the result is aligned. - std::int64_t tmp; - memcpy(&tmp, &result, sizeof(Element)); - evaluator.thread_intermediate_storage[device_data_reference.data_index] = tmp; + /** + * @brief Evaluate an expression applied to a row. + * + * This function performs an n-ary transform for one row on one thread. + * + * @tparam OutputType The container type that data will be inserted into. + * + * @param output_object The container that data will be inserted into. + * @param row_index Row index of all input and output data column(s). + */ + template + __device__ void evaluate(OutputType& output_object, cudf::size_type const row_index) + { + evaluate(output_object, row_index, row_index, row_index); } -} -/** - * @brief Evaluate an expression applied to a row. - * - * This function performs an n-ary transform for one row on one thread. - * - * @param evaluator The row evaluator used for evaluation. - * @param data_references Array of data references. - * @param operators Array of operators to perform. - * @param operator_source_indices Array of source indices for the operators. - * @param num_operators Number of operators. - * @param row_index Row index of data column(s). - */ -__device__ void evaluate_row_expression( - detail::row_evaluator const& evaluator, - device_span data_references, - device_span operators, - device_span operator_source_indices, - cudf::size_type row_index) -{ - auto operator_source_index = static_cast(0); - for (cudf::size_type operator_index = 0; operator_index < operators.size(); operator_index++) { - // Execute operator - auto const op = operators[operator_index]; - auto const arity = ast_operator_arity(op); - if (arity == 1) { - // Unary operator - auto const input = data_references[operator_source_indices[operator_source_index]]; - auto const output = data_references[operator_source_indices[operator_source_index + 1]]; - operator_source_index += arity + 1; - type_dispatcher(input.data_type, evaluator, row_index, input, output, op); - } else if (arity == 2) { - // Binary operator - auto const lhs = data_references[operator_source_indices[operator_source_index]]; - auto const rhs = data_references[operator_source_indices[operator_source_index + 1]]; - auto const output = data_references[operator_source_indices[operator_source_index + 2]]; - operator_source_index += arity + 1; - type_dispatcher(lhs.data_type, - detail::single_dispatch_binary_operator{}, - evaluator, - row_index, - lhs, - rhs, - output, - op); - } else { - cudf_assert(false && "Invalid operator arity."); + /** + * @brief Evaluate an expression applied to a row. + * + * This function performs an n-ary transform for one row on one thread. + * + * @tparam OutputType The container type that data will be inserted into. + * + * @param output_object The container that data will be inserted into. + * @param left_row_index The row to pull the data from the left table. + * @param right_row_index The row to pull the data from the right table. + * @param output_row_index The row in the output to insert the result. + */ + template + __device__ void evaluate(OutputType& output_object, + cudf::size_type const left_row_index, + cudf::size_type const right_row_index, + cudf::size_type const output_row_index) + { + auto operator_source_index = static_cast(0); + for (cudf::size_type operator_index = 0; operator_index < plan.operators.size(); + operator_index++) { + // Execute operator + auto const op = plan.operators[operator_index]; + auto const arity = ast_operator_arity(op); + if (arity == 1) { + // Unary operator + auto const input = + plan.data_references[plan.operator_source_indices[operator_source_index]]; + auto const output = + plan.data_references[plan.operator_source_indices[operator_source_index + 1]]; + operator_source_index += arity + 1; + auto input_row_index = + input.table_source == table_reference::LEFT ? left_row_index : right_row_index; + type_dispatcher(input.data_type, + *this, + output_object, + input_row_index, + input, + output, + output_row_index, + op); + } else if (arity == 2) { + // Binary operator + auto const lhs = plan.data_references[plan.operator_source_indices[operator_source_index]]; + auto const rhs = + plan.data_references[plan.operator_source_indices[operator_source_index + 1]]; + auto const output = + plan.data_references[plan.operator_source_indices[operator_source_index + 2]]; + operator_source_index += arity + 1; + type_dispatcher(lhs.data_type, + detail::single_dispatch_binary_operator{}, + *this, + output_object, + left_row_index, + right_row_index, + lhs, + rhs, + output, + output_row_index, + op); + } else { + cudf_assert(false && "Invalid operator arity."); + } } } -} -/** - * @brief The AST plan creates a device buffer of data needed to execute an AST. - * - * On construction, an AST plan creates a single "packed" host buffer of all necessary data arrays, - * and copies that to the device with a single host-device memory copy. Because the plan tends to be - * small, this is the most efficient approach for low latency. - * - */ -struct ast_plan { - ast_plan(linearizer const& expr_linearizer, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - : _sizes{}, _data_pointers{} - { - add_to_plan(expr_linearizer.data_references()); - add_to_plan(expr_linearizer.literals()); - add_to_plan(expr_linearizer.operators()); - add_to_plan(expr_linearizer.operator_source_indices()); + private: + /** + * @brief Helper struct for type dispatch on the result of an expression. + * + * Evaluating an expression requires multiple levels of type dispatch to + * determine the input types, the operation type, and the output type. This + * helper class is a functor that handles the operator dispatch, invokes the + * operator, and dispatches output writing based on the resulting data type. + */ + struct expression_output_handler { + public: + __device__ expression_output_handler(expression_evaluator const& evaluator) + : evaluator(evaluator) + { + } - // Create device buffer - auto const buffer_size = std::accumulate(_sizes.cbegin(), _sizes.cend(), 0); - auto buffer_offsets = std::vector(_sizes.size()); - thrust::exclusive_scan(_sizes.cbegin(), _sizes.cend(), buffer_offsets.begin(), 0); + /** + * @brief Resolves an output data reference and assigns result value. + * + * Only output columns (COLUMN) and intermediates (INTERMEDIATE) are supported as output + * reference types. Intermediates must be of fixed width less than or equal to + * sizeof(std::int64_t). This requirement on intermediates is enforced by the linearizer. + * + * @tparam Element Type of result element. + * @tparam OutputType The container type that data will be inserted into. + * + * @param output_object The container that data will be inserted into. + * @param device_data_reference Data reference to resolve. + * @param row_index Row index of data column. + * @param result Value to assign to output. + */ + template ())> + __device__ void resolve_output(OutputType& output_object, + const detail::device_data_reference device_data_reference, + const cudf::size_type row_index, + const possibly_null_value_t result) const + { + auto const ref_type = device_data_reference.reference_type; + if (ref_type == detail::device_data_reference_type::COLUMN) { + output_object.template set_value(row_index, result); + } else { // Assumes ref_type == detail::device_data_reference_type::INTERMEDIATE + // Using memcpy instead of reinterpret_cast for safe type aliasing. + // Using a temporary variable ensures that the compiler knows the result is aligned. + IntermediateDataType tmp; + memcpy(&tmp, &result, sizeof(possibly_null_value_t)); + evaluator.thread_intermediate_storage[device_data_reference.data_index] = tmp; + } + } - auto h_data_buffer = std::make_unique(buffer_size); - for (unsigned int i = 0; i < _data_pointers.size(); ++i) { - std::memcpy(h_data_buffer.get() + buffer_offsets[i], _data_pointers[i], _sizes[i]); + template ())> + __device__ void resolve_output(OutputType& output_object, + const detail::device_data_reference device_data_reference, + const cudf::size_type row_index, + const possibly_null_value_t result) const + { + cudf_assert(false && "Invalid type in resolve_output."); } - _device_data_buffer = rmm::device_buffer(h_data_buffer.get(), buffer_size, stream, mr); + protected: + expression_evaluator const& evaluator; + }; - stream.synchronize(); + /** + * @brief Subclass of the expression output handler for unary operations. + * + * This functor's call operator is specialized to handle unary operations, + * which only require a single operand. + */ + template + struct unary_expression_output_handler : public expression_output_handler { + __device__ unary_expression_output_handler(expression_evaluator const& evaluator) + : expression_output_handler(evaluator) + { + } - // Create device pointers to components of plan - auto device_data_buffer_ptr = static_cast(_device_data_buffer.data()); - _device_data_references = device_span( - reinterpret_cast(device_data_buffer_ptr + - buffer_offsets[0]), - expr_linearizer.data_references().size()); - _device_literals = device_span( - reinterpret_cast( - device_data_buffer_ptr + buffer_offsets[1]), - expr_linearizer.literals().size()); - _device_operators = device_span( - reinterpret_cast(device_data_buffer_ptr + buffer_offsets[2]), - expr_linearizer.operators().size()); - _device_operator_source_indices = device_span( - reinterpret_cast(device_data_buffer_ptr + buffer_offsets[3]), - expr_linearizer.operator_source_indices().size()); - } + /** + * @brief Callable to perform a unary operation. + * + * @tparam op The operation to perform. + * @tparam OutputType The container type that data will be inserted into. + * + * @param output_object The container that data will be inserted into. + * @param outputrow_index The row in the output object to insert the data. + * @param input Input to the operation. + * @param output Output data reference. + */ + template < + ast_operator op, + typename OutputType, + std::enable_if_t, Input>>* = nullptr> + __device__ void operator()(OutputType& output_object, + const cudf::size_type output_row_index, + const possibly_null_value_t input, + const detail::device_data_reference output) const + { + using OperatorFunctor = detail::operator_functor; + using Out = cuda::std::invoke_result_t; + if constexpr (has_nulls) { + auto const result = input.has_value() + ? possibly_null_value_t(OperatorFunctor{}(*input)) + : possibly_null_value_t(); + this->template resolve_output(output_object, output, output_row_index, result); + } else { + this->template resolve_output( + output_object, output, output_row_index, OperatorFunctor{}(input)); + } + } + + template < + ast_operator op, + typename OutputType, + std::enable_if_t, Input>>* = nullptr> + __device__ void operator()(OutputType& output_object, + const cudf::size_type output_row_index, + const possibly_null_value_t input, + const detail::device_data_reference output) const + { + cudf_assert(false && "Invalid unary dispatch operator for the provided input."); + } + }; /** - * @brief Helper function for adding components (operators, literals, etc) to AST plan + * @brief Subclass of the expression output handler for binary operations. * - * @tparam T The underlying type of the input `std::vector` - * @param v The `std::vector` containing components (operators, literals, etc) + * This functor's call operator is specialized to handle binary operations, + * which require two operands. */ - template - void add_to_plan(std::vector const& v) - { - auto const data_size = sizeof(T) * v.size(); - _sizes.push_back(data_size); - _data_pointers.push_back(v.data()); - } + template + struct binary_expression_output_handler : public expression_output_handler { + __device__ binary_expression_output_handler(expression_evaluator const& evaluator) + : expression_output_handler(evaluator) + { + } - std::vector _sizes; - std::vector _data_pointers; + /** + * @brief Callable to perform a binary operation. + * + * @tparam op The operation to perform. + * @tparam OutputType The container type that data will be inserted into. + * + * @param output_object The container that data will be inserted into. + * @param output_row_index The row in the output to insert the result. + * @param lhs Left input to the operation. + * @param rhs Right input to the operation. + * @param output Output data reference. + */ + template , LHS, RHS>>* = nullptr> + __device__ void operator()(OutputType& output_object, + const cudf::size_type output_row_index, + const possibly_null_value_t lhs, + const possibly_null_value_t rhs, + const detail::device_data_reference output) const + { + using OperatorFunctor = detail::operator_functor; + using Out = cuda::std::invoke_result_t; + if constexpr (has_nulls) { + if constexpr (op == ast_operator::EQUAL) { + // Special handling of the equality operator based on what kind + // of null handling was requested. + possibly_null_value_t result; + if (!lhs.has_value() && !rhs.has_value()) { + // Case 1: Both null, so the output is based on compare_nulls. + result = possibly_null_value_t(this->evaluator.compare_nulls == + null_equality::EQUAL); + } else if (lhs.has_value() && rhs.has_value()) { + // Case 2: Neither is null, so the output is given by the operation. + result = possibly_null_value_t(OperatorFunctor{}(*lhs, *rhs)); + } else { + // Case 3: One value is null, while the other is not, so we simply propagate nulls. + result = possibly_null_value_t(); + } + this->template resolve_output(output_object, output, output_row_index, result); + } else { + // Default behavior for all other operators is to propagate nulls. + auto result = (lhs.has_value() && rhs.has_value()) + ? possibly_null_value_t(OperatorFunctor{}(*lhs, *rhs)) + : possibly_null_value_t(); + this->template resolve_output(output_object, output, output_row_index, result); + } + } else { + this->template resolve_output( + output_object, output, output_row_index, OperatorFunctor{}(lhs, rhs)); + } + } - rmm::device_buffer _device_data_buffer; - device_span _device_data_references; - device_span _device_literals; - device_span _device_operators; - device_span _device_operator_source_indices; + template , LHS, RHS>>* = nullptr> + __device__ void operator()(OutputType& output_object, + const cudf::size_type output_row_index, + const possibly_null_value_t lhs, + const possibly_null_value_t rhs, + const detail::device_data_reference output) const + { + cudf_assert(false && "Invalid binary dispatch operator for the provided input."); + } + }; + + table_device_view const& left; ///< The left table to operate on. + table_device_view const& right; ///< The right table to operate on. + device_ast_plan const& + plan; ///< The container of device data representing the expression to evaluate. + IntermediateDataType* + thread_intermediate_storage; ///< The shared memory store of intermediates produced during + ///< evaluation. + null_equality + compare_nulls; ///< Whether the equality operator returns true or false for two nulls. }; /** diff --git a/cpp/include/cudf/join.hpp b/cpp/include/cudf/join.hpp index 1f9ed71ce8c..725c0fc3699 100644 --- a/cpp/include/cudf/join.hpp +++ b/cpp/include/cudf/join.hpp @@ -16,6 +16,7 @@ #pragma once +#include #include #include @@ -23,6 +24,7 @@ #include #include +#include #include namespace cudf { @@ -647,5 +649,206 @@ class hash_join { const std::unique_ptr impl; }; +/** + * @brief Returns a pair of row index vectors corresponding to all pairs + * of rows between the specified tables where the predicate evaluates to true. + * + * The first returned vector contains the row indices from the left + * table that have a match in the right table (in unspecified order). + * The corresponding values in the second returned vector are + * the matched row indices from the right table. + * + * @code{.pseudo} + * Left: {{0, 1, 2}} + * Right: {{1, 2, 3}} + * Expression: Left.Column_0 == Right.Column_0 + * Result: {{1, 2}, {0, 1}} + * + * Left: {{0, 1, 2}, {3, 4, 5}} + * Right: {{1, 2, 3}, {4, 6, 7}} + * Expression: (Left.Column_0 == Right.Column_0) AND (Left.Column_1 == Right.Column_1) + * Result: {{1}, {0}} + * @endcode + * + * @throw cudf::logic_error if number of elements in `left_keys` or `right_keys` + * mismatch. + * + * @param left The left table + * @param right The right table + * @param binary_predicate The condition on which to join. + * @param compare_nulls Whether the equality operator returns true or false for two nulls. + * @param mr Device memory resource used to allocate the returned table and columns' device memory + * + * @return A pair of vectors [`left_indices`, `right_indices`] that can be used to construct + * the result of performing a conditional inner join between two tables `left` and `right` . + */ +std::pair>, + std::unique_ptr>> +conditional_inner_join( + table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls = null_equality::EQUAL, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + +/** + * @brief Returns a pair of row index vectors corresponding to all pairs + * of rows between the specified tables where the predicate evaluates to true, + * or null matches for rows in left that have no match in right. + * + * The first returned vector contains all the row indices from the left + * table (in unspecified order). The corresponding value in the + * second returned vector is either (1) the row index of the matched row + * from the right table, if there is a match or (2) an unspecified + * out-of-bounds value. + * + * @code{.pseudo} + * Left: {{0, 1, 2}} + * Right: {{1, 2, 3}} + * Expression: Left.Column_0 == Right.Column_0 + * Result: {{0, 1, 2}, {None, 0, 1}} + * + * Left: {{0, 1, 2}, {3, 4, 5}} + * Right: {{1, 2, 3}, {4, 6, 7}} + * Expression: (Left.Column_0 == Right.Column_0) AND (Left.Column_1 == Right.Column_1) + * Result: {{0, 1, 2}, {None, 0, None}} + * @endcode + * + * @throw cudf::logic_error if number of elements in `left_keys` or `right_keys` + * mismatch. + * + * @param left The left table + * @param right The right table + * @param binary_predicate The condition on which to join. + * @param compare_nulls Whether the equality operator returns true or false for two nulls. + * @param mr Device memory resource used to allocate the returned table and columns' device memory + * + * @return A pair of vectors [`left_indices`, `right_indices`] that can be used to construct + * the result of performing a conditional left join between two tables `left` and `right` . + */ +std::pair>, + std::unique_ptr>> +conditional_left_join(table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls = null_equality::EQUAL, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + +/** + * @brief Returns a pair of row index vectors corresponding to all pairs + * of rows between the specified tables where the predicate evaluates to true, + * or null matches for rows in either table that have no match in the other. + * + * Taken pairwise, the values from the returned vectors are one of: + * (1) row indices corresponding to matching rows from the left and + * right tables, (2) a row index and an unspecified out-of-bounds value, + * representing a row from one table without a match in the other. + * + * @code{.pseudo} + * Left: {{0, 1, 2}} + * Right: {{1, 2, 3}} + * Expression: Left.Column_0 == Right.Column_0 + * Result: {{0, 1, 2, None}, {None, 0, 1, 2}} + * + * Left: {{0, 1, 2}, {3, 4, 5}} + * Right: {{1, 2, 3}, {4, 6, 7}} + * Expression: (Left.Column_0 == Right.Column_0) AND (Left.Column_1 == Right.Column_1) + * Result: {{0, 1, 2, None, None}, {None, 0, None, 1, 2}} + * @endcode + * + * @throw cudf::logic_error if number of elements in `left_keys` or `right_keys` + * mismatch. + * + * @param left The left table + * @param right The right table + * @param binary_predicate The condition on which to join. + * @param compare_nulls Whether the equality operator returns true or false for two nulls. + * @param mr Device memory resource used to allocate the returned table and columns' device memory + * + * @return A pair of vectors [`left_indices`, `right_indices`] that can be used to construct + * the result of performing a conditional full join between two tables `left` and `right` . + */ +std::pair>, + std::unique_ptr>> +conditional_full_join(table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls = null_equality::EQUAL, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + +/** + * @brief Returns an index vector corresponding to all rows in the left table + * for which there exists some row in the right table where the predicate + * evaluates to true. + * + * @code{.pseudo} + * Left: {{0, 1, 2}} + * Right: {{1, 2, 3}} + * Expression: Left.Column_0 == Right.Column_0 + * Result: {1, 2} + * + * Left: {{0, 1, 2}, {3, 4, 5}} + * Right: {{1, 2, 3}, {4, 6, 7}} + * Expression: (Left.Column_0 == Right.Column_0) AND (Left.Column_1 == Right.Column_1) + * Result: {1} + * @endcode + * + * @throw cudf::logic_error if number of elements in `left_keys` or `right_keys` + * mismatch. + * + * @param left The left table + * @param right The right table + * @param binary_predicate The condition on which to join. + * @param compare_nulls Whether the equality operator returns true or false for two nulls. + * @param mr Device memory resource used to allocate the returned table and columns' device memory + * + * @return A vector `left_indices` that can be used to construct the result of + * performing a conditional left semi join between two tables `left` and + * `right` . + */ +std::unique_ptr> conditional_left_semi_join( + table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls = null_equality::EQUAL, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + +/** + * @brief Returns an index vector corresponding to all rows in the left table + * for which there does not exist any row in the right table where the + * predicate evaluates to true. + * + * @code{.pseudo} + * Left: {{0, 1, 2}} + * Right: {{1, 2, 3}} + * Expression: Left.Column_0 == Right.Column_0 + * Result: {0} + * + * Left: {{0, 1, 2}, {3, 4, 5}} + * Right: {{1, 2, 3}, {4, 6, 7}} + * Expression: (Left.Column_0 == Right.Column_0) AND (Left.Column_1 == Right.Column_1) + * Result: {0, 2} + * @endcode + * + * @throw cudf::logic_error if number of elements in `left_keys` or `right_keys` + * mismatch. + * + * @param left The left table + * @param right The right table + * @param binary_predicate The condition on which to join. + * @param compare_nulls Whether the equality operator returns true or false for two nulls. + * @param mr Device memory resource used to allocate the returned table and columns' device memory + * + * @return A vector `left_indices` that can be used to construct the result of + * performing a conditional left anti join between two tables `left` and + * `right` . + */ +std::unique_ptr> conditional_left_anti_join( + table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls = null_equality::EQUAL, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + /** @} */ // end of group } // namespace cudf diff --git a/cpp/src/ast/linearizer.cpp b/cpp/src/ast/linearizer.cpp index 66a32ead35e..3e442305552 100644 --- a/cpp/src/ast/linearizer.cpp +++ b/cpp/src/ast/linearizer.cpp @@ -111,7 +111,9 @@ cudf::size_type linearizer::visit(column_reference const& expr) // Increment the node index _node_count++; // Resolve node type - auto const data_type = expr.get_data_type(_table); + auto const data_type = expr.get_table_source() == table_reference::LEFT + ? expr.get_data_type(_left) + : expr.get_data_type(_right); // Push data reference auto const source = detail::device_data_reference(detail::device_data_reference_type::COLUMN, data_type, diff --git a/cpp/src/ast/transform.cu b/cpp/src/ast/transform.cu index 43d3bde97c2..7aa89635c54 100644 --- a/cpp/src/ast/transform.cu +++ b/cpp/src/ast/transform.cu @@ -49,37 +49,37 @@ namespace detail { * This evaluates an expression over a table to produce a new column. Also called an n-ary * transform. * - * @tparam block_size + * @tparam max_block_size The size of the thread block, used to set launch + * bounds and minimize register usage. + * @tparam has_nulls whether or not the output column may contain nulls. + * * @param table The table device view used for evaluation. - * @param literals Array of literal values used for evaluation. - * @param output_column The output column where results are stored. - * @param data_references Array of data references. - * @param operators Array of operators to perform. - * @param operator_source_indices Array of source indices for the operators. - * @param num_operators Number of operators. - * @param num_intermediates Number of intermediates, used to allocate a portion of shared memory to - * each thread. + * @param plan Container of device data required to evaluate the desired expression. + * @param output_column The destination for the results of evaluating the expression. */ -template -__launch_bounds__(max_block_size) __global__ void compute_column_kernel( - table_device_view const table, - device_span literals, - mutable_column_device_view output_column, - device_span data_references, - device_span operators, - device_span operator_source_indices, - cudf::size_type num_intermediates) +template +__launch_bounds__(max_block_size) __global__ + void compute_column_kernel(table_device_view const table, + device_ast_plan plan, + mutable_column_device_view output_column) { - extern __shared__ std::int64_t intermediate_storage[]; - auto thread_intermediate_storage = &intermediate_storage[threadIdx.x * num_intermediates]; + // The (required) extern storage of the shared memory array leads to + // conflicting declarations between different templates. The easiest + // workaround is to declare an arbitrary (here char) array type then cast it + // after the fact to the appropriate type. + extern __shared__ char raw_intermediate_storage[]; + IntermediateDataType* intermediate_storage = + reinterpret_cast*>(raw_intermediate_storage); + + auto thread_intermediate_storage = &intermediate_storage[threadIdx.x * plan.num_intermediates]; auto const start_idx = static_cast(threadIdx.x + blockIdx.x * blockDim.x); auto const stride = static_cast(blockDim.x * gridDim.x); - auto const evaluator = - cudf::ast::detail::row_evaluator(table, literals, thread_intermediate_storage, &output_column); + auto evaluator = + cudf::ast::detail::expression_evaluator(table, plan, thread_intermediate_storage); for (cudf::size_type row_index = start_idx; row_index < table.num_rows(); row_index += stride) { - evaluate_row_expression( - evaluator, data_references, operators, operator_source_indices, row_index); + auto output_dest = mutable_column_expression_result(output_column); + evaluator.evaluate(output_dest, row_index); } } @@ -88,22 +88,30 @@ std::unique_ptr compute_column(table_view const table, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - auto const expr_linearizer = linearizer(expr, table); // Linearize the AST - auto const plan = ast_plan{expr_linearizer, stream, mr}; // Create ast_plan + // Prepare output column. Whether or not the output column is nullable is + // determined by whether any of the columns in the input table are nullable. + // If none of the input columns actually contain nulls, we can still use the + // non-nullable version of the expression evaluation code path for + // performance, so we capture that information as well. + auto const nullable = + std::any_of(table.begin(), table.end(), [](column_view c) { return c.nullable(); }); + auto const has_nulls = nullable && std::any_of(table.begin(), table.end(), [](column_view c) { + return c.nullable() && c.has_nulls(); + }); - // Create table device view - auto table_device = table_device_view::create(table, stream); - auto const table_num_rows = table.num_rows(); + auto const plan = ast_plan{expr, table, has_nulls, stream, mr}; + + auto const output_column_mask_state = + nullable ? (has_nulls ? mask_state::UNINITIALIZED : mask_state::ALL_VALID) + : mask_state::UNALLOCATED; - // Prepare output column auto output_column = cudf::make_fixed_width_column( - expr_linearizer.root_data_type(), table_num_rows, mask_state::UNALLOCATED, stream, mr); + plan.output_type(), table.num_rows(), output_column_mask_state, stream, mr); auto mutable_output_device = cudf::mutable_column_device_view::create(output_column->mutable_view(), stream); // Configure kernel parameters - auto const num_intermediates = expr_linearizer.intermediate_count(); - auto const shmem_size_per_thread = static_cast(sizeof(std::int64_t) * num_intermediates); + auto const& dev_plan = plan.dev_plan; int device_id; CUDA_TRY(cudaGetDevice(&device_id)); int shmem_limit_per_block; @@ -111,22 +119,23 @@ std::unique_ptr compute_column(table_view const table, cudaDeviceGetAttribute(&shmem_limit_per_block, cudaDevAttrMaxSharedMemoryPerBlock, device_id)); auto constexpr MAX_BLOCK_SIZE = 128; auto const block_size = - shmem_size_per_thread != 0 - ? std::min(MAX_BLOCK_SIZE, shmem_limit_per_block / shmem_size_per_thread) + dev_plan.shmem_per_thread != 0 + ? std::min(MAX_BLOCK_SIZE, shmem_limit_per_block / dev_plan.shmem_per_thread) : MAX_BLOCK_SIZE; - auto const config = cudf::detail::grid_1d{table_num_rows, block_size}; - auto const shmem_size_per_block = shmem_size_per_thread * config.num_threads_per_block; + auto const config = cudf::detail::grid_1d{table.num_rows(), block_size}; + auto const shmem_per_block = dev_plan.shmem_per_thread * config.num_threads_per_block; // Execute the kernel - cudf::ast::detail::compute_column_kernel - <<>>( - *table_device, - plan._device_literals, - *mutable_output_device, - plan._device_data_references, - plan._device_operators, - plan._device_operator_source_indices, - num_intermediates); + auto table_device = table_device_view::create(table, stream); + if (has_nulls) { + cudf::ast::detail::compute_column_kernel + <<>>( + *table_device, dev_plan, *mutable_output_device); + } else { + cudf::ast::detail::compute_column_kernel + <<>>( + *table_device, dev_plan, *mutable_output_device); + } CHECK_CUDA(stream.value()); return output_column; } diff --git a/cpp/src/join/hash_join.cu b/cpp/src/join/hash_join.cu index 1133477669d..e6110edfaa8 100644 --- a/cpp/src/join/hash_join.cu +++ b/cpp/src/join/hash_join.cu @@ -90,12 +90,11 @@ struct valid_range { */ std::pair>, std::unique_ptr>> -get_left_join_indices_complement( - std::unique_ptr>& right_indices, - size_type left_table_row_count, - size_type right_table_row_count, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) +get_left_join_indices_complement(std::unique_ptr>& right_indices, + size_type left_table_row_count, + size_type right_table_row_count, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { // Get array of indices that do not appear in right_indices diff --git a/cpp/src/join/hash_join.cuh b/cpp/src/join/hash_join.cuh index f9ccbd68c74..1b4cbf4ba1d 100644 --- a/cpp/src/join/hash_join.cuh +++ b/cpp/src/join/hash_join.cuh @@ -153,6 +153,17 @@ std::pair, std::unique_ptr> get_empty_joined_table std::unique_ptr combine_table_pair(std::unique_ptr&& left, std::unique_ptr&& right); +VectorPair concatenate_vector_pairs(VectorPair& a, VectorPair& b, rmm::cuda_stream_view stream); + +std::pair>, + std::unique_ptr>> +get_left_join_indices_complement( + std::unique_ptr>& right_indices, + size_type left_table_row_count, + size_type right_table_row_count, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + } // namespace detail struct hash_join::hash_join_impl { diff --git a/cpp/src/join/join.cu b/cpp/src/join/join.cu index 6cb04cadcac..cf711524f0b 100644 --- a/cpp/src/join/join.cu +++ b/cpp/src/join/join.cu @@ -15,6 +15,7 @@ */ #include #include +#include #include #include @@ -219,6 +220,21 @@ std::unique_ptr
full_join(table_view const& left_input, return combine_table_pair(std::move(left_result), std::move(right_result)); } +std::pair>, + std::unique_ptr>> +conditional_join(table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls, + join_kind JoinKind, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + CUDF_FUNC_RANGE(); + return get_conditional_join_indices( + left, right, JoinKind, binary_predicate, compare_nulls, stream, mr); +} + } // namespace detail hash_join::~hash_join() = default; @@ -356,4 +372,88 @@ std::unique_ptr
full_join(table_view const& left, left, right, left_on, right_on, compare_nulls, rmm::cuda_stream_default, mr); } +std::pair>, + std::unique_ptr>> +conditional_inner_join(table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls, + rmm::mr::device_memory_resource* mr) +{ + return detail::conditional_join(left, + right, + binary_predicate, + compare_nulls, + detail::join_kind::INNER_JOIN, + rmm::cuda_stream_default, + mr); +} + +std::pair>, + std::unique_ptr>> +conditional_left_join(table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls, + rmm::mr::device_memory_resource* mr) +{ + return detail::conditional_join(left, + right, + binary_predicate, + compare_nulls, + detail::join_kind::LEFT_JOIN, + rmm::cuda_stream_default, + mr); +} + +std::pair>, + std::unique_ptr>> +conditional_full_join(table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls, + rmm::mr::device_memory_resource* mr) +{ + return detail::conditional_join(left, + right, + binary_predicate, + compare_nulls, + detail::join_kind::FULL_JOIN, + rmm::cuda_stream_default, + mr); +} + +std::unique_ptr> conditional_left_semi_join( + table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls, + rmm::mr::device_memory_resource* mr) +{ + return std::move(detail::conditional_join(left, + right, + binary_predicate, + compare_nulls, + detail::join_kind::LEFT_SEMI_JOIN, + rmm::cuda_stream_default, + mr) + .first); +} + +std::unique_ptr> conditional_left_anti_join( + table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls, + rmm::mr::device_memory_resource* mr) +{ + return std::move(detail::conditional_join(left, + right, + binary_predicate, + compare_nulls, + detail::join_kind::LEFT_ANTI_JOIN, + rmm::cuda_stream_default, + mr) + .first); +} } // namespace cudf diff --git a/cpp/src/join/join_kernels.cuh b/cpp/src/join/join_kernels.cuh index 4298706987c..6d0810ea800 100644 --- a/cpp/src/join/join_kernels.cuh +++ b/cpp/src/join/join_kernels.cuh @@ -18,12 +18,18 @@ #include #include +#include +#include +#include #include #include #include +#include #include "join_common_utils.hpp" +#include + namespace cudf { namespace detail { /** @@ -203,39 +209,63 @@ __global__ void compute_join_output_size(multimap_type multi_map, * @brief Computes the output size of joining the left table to the right table. * * This method uses a nested loop to iterate over the left and right tables and count the number of - * matches. + * matches according to a boolean expression. * * @tparam block_size The number of threads per block for this kernel + * @tparam has_nulls Whether or not the inputs may contain nulls. * * @param[in] left_table The left table * @param[in] right_table The right table * @param[in] JoinKind The type of join to be performed - * @param[in] check_row_equality The row equality comparator + * @param[in] compare_nulls Controls whether null join-key values should match or not. + * @param[in] plan Container of device data required to evaluate the desired expression. * @param[out] output_size The resulting output size */ -template -__global__ void compute_nested_loop_join_output_size(table_device_view left_table, +template +__global__ void compute_conditional_join_output_size(table_device_view left_table, table_device_view right_table, join_kind JoinKind, - row_equality check_row_equality, + null_equality compare_nulls, + ast::detail::device_ast_plan plan, cudf::size_type* output_size) { + // The (required) extern storage of the shared memory array leads to + // conflicting declarations between different templates. The easiest + // workaround is to declare an arbitrary (here char) array type then cast it + // after the fact to the appropriate type. + extern __shared__ char raw_intermediate_storage[]; + cudf::ast::detail::IntermediateDataType* intermediate_storage = + reinterpret_cast*>(raw_intermediate_storage); + auto thread_intermediate_storage = &intermediate_storage[threadIdx.x * plan.num_intermediates]; + cudf::size_type thread_counter(0); const cudf::size_type left_start_idx = threadIdx.x + blockIdx.x * blockDim.x; const cudf::size_type left_stride = blockDim.x * gridDim.x; const cudf::size_type left_num_rows = left_table.num_rows(); const cudf::size_type right_num_rows = right_table.num_rows(); + auto evaluator = cudf::ast::detail::expression_evaluator( + left_table, right_table, plan, thread_intermediate_storage, compare_nulls); + for (cudf::size_type left_row_index = left_start_idx; left_row_index < left_num_rows; left_row_index += left_stride) { bool found_match = false; for (cudf::size_type right_row_index = 0; right_row_index < right_num_rows; right_row_index++) { - if (check_row_equality(left_row_index, right_row_index)) { - ++thread_counter; + auto output_dest = cudf::ast::detail::value_expression_result(); + evaluator.evaluate(output_dest, left_row_index, right_row_index, 0); + if (output_dest.is_valid() && output_dest.value()) { + if ((JoinKind != join_kind::LEFT_ANTI_JOIN) && + !(JoinKind == join_kind::LEFT_SEMI_JOIN && found_match)) { + ++thread_counter; + } found_match = true; } } - if ((JoinKind == join_kind::LEFT_JOIN) && (!found_match)) { ++thread_counter; } + if ((JoinKind == join_kind::LEFT_JOIN || JoinKind == join_kind::LEFT_ANTI_JOIN || + JoinKind == join_kind::FULL_JOIN) && + (!found_match)) { + ++thread_counter; + } } using BlockReduce = cub::BlockReduce; @@ -428,32 +458,35 @@ __global__ void probe_hash_table(multimap_type multi_map, } /** - * @brief Performs a nested loop join to find all matching rows between the - * left and right tables and generate the output for the desired Join - * operation. + * @brief Performs a join conditioned on a predicate to find all matching rows + * between the left and right tables and generate the output for the desired + * Join operation. * * @tparam block_size The number of threads per block for this kernel * @tparam output_cache_size The side of the shared memory buffer to cache join * output results - + * @tparam has_nulls Whether or not the inputs may contain nulls. + * * @param[in] left_table The left table * @param[in] right_table The right table * @param[in] JoinKind The type of join to be performed - * @param[in] check_row_equality The row equality comparator + * @param compare_nulls Controls whether null join-key values should match or not. * @param[out] join_output_l The left result of the join operation * @param[out] join_output_r The right result of the join operation * @param[in,out] current_idx A global counter used by threads to coordinate * writes to the global output + * @param plan Container of device data required to evaluate the desired expression. * @param[in] max_size The maximum size of the output */ -template -__global__ void nested_loop_join(table_device_view left_table, +template +__global__ void conditional_join(table_device_view left_table, table_device_view right_table, join_kind JoinKind, - row_equality check_row_equality, + null_equality compare_nulls, cudf::size_type* join_output_l, cudf::size_type* join_output_r, cudf::size_type* current_idx, + cudf::ast::detail::device_ast_plan plan, const cudf::size_type max_size) { constexpr int num_warps = block_size / detail::warp_size; @@ -461,6 +494,15 @@ __global__ void nested_loop_join(table_device_view left_table, __shared__ cudf::size_type join_shared_l[num_warps][output_cache_size]; __shared__ cudf::size_type join_shared_r[num_warps][output_cache_size]; + // Normally the casting of a shared memory array is used to create multiple + // arrays of different types from the shared memory buffer, but here it is + // used to circumvent conflicts between arrays of different types between + // different template instantiations due to the extern specifier. + extern __shared__ char raw_intermediate_storage[]; + cudf::ast::detail::IntermediateDataType* intermediate_storage = + reinterpret_cast*>(raw_intermediate_storage); + auto thread_intermediate_storage = &intermediate_storage[threadIdx.x * plan.num_intermediates]; + const int warp_id = threadIdx.x / detail::warp_size; const int lane_id = threadIdx.x % detail::warp_size; const cudf::size_type left_num_rows = left_table.num_rows(); @@ -473,18 +515,34 @@ __global__ void nested_loop_join(table_device_view left_table, cudf::size_type left_row_index = threadIdx.x + blockIdx.x * blockDim.x; const unsigned int activemask = __ballot_sync(0xffffffff, left_row_index < left_num_rows); + + auto evaluator = cudf::ast::detail::expression_evaluator( + left_table, right_table, plan, thread_intermediate_storage, compare_nulls); + if (left_row_index < left_num_rows) { bool found_match = false; - for (size_type right_row_index(0); right_row_index < right_num_rows; right_row_index++) { - if (check_row_equality(left_row_index, right_row_index)) { + for (size_type right_row_index(0); right_row_index < right_num_rows; ++right_row_index) { + auto output_dest = cudf::ast::detail::value_expression_result(); + evaluator.evaluate(output_dest, left_row_index, right_row_index, 0); + + if (output_dest.is_valid() && output_dest.value()) { // If the rows are equal, then we have found a true match + // In the case of left anti joins we only add indices from left after + // the loop if we have found _no_ matches from the right. + // In the case of left semi joins we only add the first match (note + // that the current logic relies on the fact that we process all right + // table rows for a single left table row on a single thread so that no + // synchronization of found_match is required). + if ((JoinKind != join_kind::LEFT_ANTI_JOIN) && + !(JoinKind == join_kind::LEFT_SEMI_JOIN && found_match)) { + add_pair_to_cache(left_row_index, + right_row_index, + current_idx_shared, + warp_id, + join_shared_l[warp_id], + join_shared_r[warp_id]); + } found_match = true; - add_pair_to_cache(left_row_index, - right_row_index, - current_idx_shared, - warp_id, - join_shared_l[warp_id], - join_shared_r[warp_id]); } __syncwarp(activemask); @@ -506,8 +564,11 @@ __global__ void nested_loop_join(table_device_view left_table, } } - // If performing a LEFT join and no match was found, insert a Null into the output - if ((JoinKind == join_kind::LEFT_JOIN) && (!found_match)) { + // Left, left anti, and full joins all require saving left columns that + // aren't present in the right. + if ((JoinKind == join_kind::LEFT_JOIN || JoinKind == join_kind::LEFT_ANTI_JOIN || + JoinKind == join_kind::FULL_JOIN) && + (!found_match)) { add_pair_to_cache(left_row_index, static_cast(JoinNoneValue), current_idx_shared, diff --git a/cpp/src/join/nested_loop_join.cuh b/cpp/src/join/nested_loop_join.cuh index 5054305a41a..9848477a894 100644 --- a/cpp/src/join/nested_loop_join.cuh +++ b/cpp/src/join/nested_loop_join.cuh @@ -19,7 +19,8 @@ #include "join_common_utils.hpp" #include "join_kernels.cuh" -#include +#include +#include #include #include #include @@ -28,167 +29,153 @@ #include #include +#include -#include +#include + +#include namespace cudf { namespace detail { + /** - * @brief Gives an estimate of the size of the join output produced when - * joining two tables together. - * - * @throw cudf::logic_error if JoinKind is not INNER_JOIN or LEFT_JOIN + * @brief Computes the join operation between two tables and returns the + * output indices of left and right table as a combined table * - * @param left The left hand table - * @param right The right hand table + * @param left Table of left columns to join + * @param right Table of right columns to join + * tables have been flipped, meaning the output indices should also be flipped * @param JoinKind The type of join to be performed * @param compare_nulls Controls whether null join-key values should match or not. * @param stream CUDA stream used for device memory operations and kernel launches * - * @return An estimate of the size of the output of the join operation + * @return Join output indices vector pair */ -size_type estimate_nested_loop_join_output_size(table_device_view left, - table_device_view right, - join_kind JoinKind, - null_equality compare_nulls, - rmm::cuda_stream_view stream) +std::pair>, + std::unique_ptr>> +get_conditional_join_indices(table_view const& left, + table_view const& right, + join_kind JoinKind, + ast::expression binary_pred, + null_equality compare_nulls, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { - const size_type left_num_rows{left.num_rows()}; - const size_type right_num_rows{right.num_rows()}; - - if (right_num_rows == 0) { - // If the right table is empty, we know exactly how large the output - // will be for the different types of joins and can return immediately + // We can immediately filter out cases where the right table is empty. In + // some cases, we return all the rows of the left table with a corresponding + // null index for the right table; in others, we return an empty output. + if (right.num_rows() == 0) { switch (JoinKind) { - // Inner join with an empty table will have no output - case join_kind::INNER_JOIN: return 0; - - // Left join with an empty table will have an output of NULL rows - // equal to the number of rows in the left table - case join_kind::LEFT_JOIN: return left_num_rows; - - default: CUDF_FAIL("Unsupported join type"); + // Left, left anti, and full (which are effectively left because we are + // guaranteed that left has more rows than right) all return a all the + // row indices from left with a corresponding NULL from the right. + case join_kind::LEFT_JOIN: + case join_kind::LEFT_ANTI_JOIN: + case join_kind::FULL_JOIN: return get_trivial_left_join_indices(left, stream); + // Inner and left semi joins return empty output because no matches can exist. + case join_kind::INNER_JOIN: + case join_kind::LEFT_SEMI_JOIN: + return std::make_pair(std::make_unique>(0, stream, mr), + std::make_unique>(0, stream, mr)); } } - // Allocate storage for the counter used to get the size of the join output - size_type h_size_estimate{0}; - rmm::device_scalar size_estimate(0, stream); + // Prepare output column. Whether or not the output column is nullable is + // determined by whether any of the columns in the input table are nullable. + // If none of the input columns actually contain nulls, we can still use the + // non-nullable version of the expression evaluation code path for + // performance, so we capture that information as well. + auto const nullable = + std::any_of(left.begin(), left.end(), [](column_view c) { return c.nullable(); }) || + std::any_of(right.begin(), right.end(), [](column_view c) { return c.nullable(); }); + auto const has_nulls = + nullable && + (std::any_of( + left.begin(), left.end(), [](column_view c) { return c.nullable() && c.has_nulls(); }) || + std::any_of( + right.begin(), right.end(), [](column_view c) { return c.nullable() && c.has_nulls(); })); + + auto const plan = ast::detail::ast_plan{binary_pred, left, right, has_nulls, stream, mr}; + CUDF_EXPECTS(plan.output_type().id() == type_id::BOOL8, + "The expression must produce a boolean output."); - CHECK_CUDA(stream.value()); + auto left_table = table_device_view::create(left, stream); + auto right_table = table_device_view::create(right, stream); + // Allocate storage for the counter used to get the size of the join output + rmm::device_scalar size(0, stream, mr); + CHECK_CUDA(stream.value()); constexpr int block_size{DEFAULT_JOIN_BLOCK_SIZE}; - int numBlocks{-1}; + detail::grid_1d config(left_table->num_rows(), block_size); + auto const shmem_size_per_block = plan.dev_plan.shmem_per_thread * config.num_threads_per_block; - CUDA_TRY(cudaOccupancyMaxActiveBlocksPerMultiprocessor( - &numBlocks, compute_nested_loop_join_output_size, block_size, 0)); - - int dev_id{-1}; - CUDA_TRY(cudaGetDevice(&dev_id)); - - int num_sms{-1}; - CUDA_TRY(cudaDeviceGetAttribute(&num_sms, cudaDevAttrMultiProcessorCount, dev_id)); - - size_estimate.set_value_zero(stream); - - row_equality equality{left, right, compare_nulls == null_equality::EQUAL}; // Determine number of output rows without actually building the output to simply // find what the size of the output will be. - compute_nested_loop_join_output_size - <<>>( - left, right, JoinKind, equality, size_estimate.data()); + join_kind KernelJoinKind = JoinKind == join_kind::FULL_JOIN ? join_kind::LEFT_JOIN : JoinKind; + if (has_nulls) { + compute_conditional_join_output_size + <<>>( + *left_table, *right_table, KernelJoinKind, compare_nulls, plan.dev_plan, size.data()); + } else { + compute_conditional_join_output_size + <<>>( + *left_table, *right_table, KernelJoinKind, compare_nulls, plan.dev_plan, size.data()); + } CHECK_CUDA(stream.value()); - h_size_estimate = size_estimate.value(stream); + size_type const join_size = size.value(stream); - return h_size_estimate; -} - -/** - * @brief Computes the join operation between two tables and returns the - * output indices of left and right table as a combined table - * - * @param left Table of left columns to join - * @param right Table of right columns to join - * @param flip_join_indices Flag that indicates whether the left and right - * tables have been flipped, meaning the output indices should also be flipped - * @param JoinKind The type of join to be performed - * @param compare_nulls Controls whether null join-key values should match or not. - * @param stream CUDA stream used for device memory operations and kernel launches - * - * @return Join output indices vector pair - */ -std::pair, rmm::device_uvector> -get_base_nested_loop_join_indices(table_view const& left, - table_view const& right, - bool flip_join_indices, - join_kind JoinKind, - null_equality compare_nulls, - rmm::cuda_stream_view stream) -{ - // The `right` table is always used for the inner loop. We want to use the smaller table - // for the inner loop. Thus, if `left` is smaller than `right`, swap `left/right`. - if ((JoinKind == join_kind::INNER_JOIN) && (right.num_rows() > left.num_rows())) { - return get_base_nested_loop_join_indices(right, left, true, JoinKind, compare_nulls, stream); + // If the output size will be zero, we can return immediately. + if (join_size == 0) { + return std::make_pair(std::make_unique>(0, stream, mr), + std::make_unique>(0, stream, mr)); } - // Trivial left join case - exit early - if ((JoinKind == join_kind::LEFT_JOIN) && (right.num_rows() == 0)) { - return get_trivial_left_join_indices(left, stream); + + rmm::device_scalar write_index(0, stream); + + auto left_indices = std::make_unique>(join_size, stream, mr); + auto right_indices = std::make_unique>(join_size, stream, mr); + + const auto& join_output_l = left_indices->data(); + const auto& join_output_r = right_indices->data(); + if (has_nulls) { + conditional_join + <<>>( + *left_table, + *right_table, + KernelJoinKind, + compare_nulls, + join_output_l, + join_output_r, + write_index.data(), + plan.dev_plan, + join_size); + } else { + conditional_join + <<>>( + *left_table, + *right_table, + KernelJoinKind, + compare_nulls, + join_output_l, + join_output_r, + write_index.data(), + plan.dev_plan, + join_size); } - auto left_table = table_device_view::create(left, stream); - auto right_table = table_device_view::create(right, stream); + CHECK_CUDA(stream.value()); - size_type estimated_size = estimate_nested_loop_join_output_size( - *left_table, *right_table, JoinKind, compare_nulls, stream); + auto join_indices = std::make_pair(std::move(left_indices), std::move(right_indices)); - // If the estimated output size is zero, return immediately - if (estimated_size == 0) { - return std::make_pair(rmm::device_uvector{0, stream}, - rmm::device_uvector{0, stream}); + // For full joins, get the indices in the right table that were not joined to + // by any row in the left table. + if (JoinKind == join_kind::FULL_JOIN) { + auto complement_indices = detail::get_left_join_indices_complement( + join_indices.second, left.num_rows(), right.num_rows(), stream, mr); + join_indices = detail::concatenate_vector_pairs(join_indices, complement_indices, stream); } - - // Because we are approximating the number of joined elements, our approximation - // might be incorrect and we might have underestimated the number of joined elements. - // As such we will need to de-allocate memory and re-allocate memory to ensure - // that the final output is correct. - rmm::device_scalar write_index(0, stream); - size_type join_size{0}; - - rmm::device_uvector left_indices{0, stream}; - rmm::device_uvector right_indices{0, stream}; - auto current_estimated_size = estimated_size; - do { - left_indices.resize(estimated_size, stream); - right_indices.resize(estimated_size, stream); - - constexpr int block_size{DEFAULT_JOIN_BLOCK_SIZE}; - detail::grid_1d config(left_table->num_rows(), block_size); - write_index.set_value_zero(stream); - - row_equality equality{*left_table, *right_table, compare_nulls == null_equality::EQUAL}; - const auto& join_output_l = flip_join_indices ? right_indices.data() : left_indices.data(); - const auto& join_output_r = flip_join_indices ? left_indices.data() : right_indices.data(); - nested_loop_join - <<>>(*left_table, - *right_table, - JoinKind, - equality, - join_output_l, - join_output_r, - write_index.data(), - estimated_size); - - CHECK_CUDA(stream.value()); - - join_size = write_index.value(stream); - current_estimated_size = estimated_size; - estimated_size *= 2; - } while ((current_estimated_size < join_size)); - - left_indices.resize(join_size, stream); - right_indices.resize(join_size, stream); - return std::make_pair(std::move(left_indices), std::move(right_indices)); + return join_indices; } } // namespace detail diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 34fceb9015e..ddb5d88f2d0 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -88,6 +88,7 @@ ConfigureTest(GROUPBY_TEST # - join tests ------------------------------------------------------------------------------------ ConfigureTest(JOIN_TEST join/join_tests.cpp + join/conditional_join_tests.cu join/cross_join_tests.cpp join/semi_anti_join_tests.cpp) diff --git a/cpp/tests/ast/transform_tests.cpp b/cpp/tests/ast/transform_tests.cpp index 74937d4deea..48e19a2f587 100644 --- a/cpp/tests/ast/transform_tests.cpp +++ b/cpp/tests/ast/transform_tests.cpp @@ -31,10 +31,13 @@ #include #include -#include +#include +#include #include +#include #include +#include template using column_wrapper = cudf::test::fixed_width_column_wrapper; @@ -409,4 +412,46 @@ TEST_F(TransformTest, PyMod) cudf::test::expect_columns_equal(expected, result->view(), true); } +TEST_F(TransformTest, BasicAdditionNulls) +{ + auto c_0 = column_wrapper{{3, 20, 1, 50}, {0, 0, 1, 1}}; + auto c_1 = column_wrapper{{10, 7, 20, 0}, {0, 1, 0, 1}}; + auto table = cudf::table_view{{c_0, c_1}}; + + auto col_ref_0 = cudf::ast::column_reference(0); + auto col_ref_1 = cudf::ast::column_reference(1); + auto expression = cudf::ast::expression(cudf::ast::ast_operator::ADD, col_ref_0, col_ref_1); + + auto expected = column_wrapper{{0, 0, 0, 50}, {0, 0, 0, 1}}; + auto result = cudf::ast::compute_column(table, expression); + + cudf::test::expect_columns_equal(expected, result->view(), true); +} + +TEST_F(TransformTest, BasicAdditionLargeNulls) +{ + auto N = 2000; + auto a = thrust::make_counting_iterator(0); + + auto validities = std::vector(N); + std::fill(validities.begin(), validities.begin() + N / 2, 0); + std::fill(validities.begin() + (N / 2), validities.end(), 0); + + std::random_device rd; + std::mt19937 gen(rd()); + std::shuffle(validities.begin(), validities.end(), gen); + + auto col = column_wrapper(a, a + N, validities.begin()); + auto table = cudf::table_view{{col}}; + + auto col_ref = cudf::ast::column_reference(0); + auto expression = cudf::ast::expression(cudf::ast::ast_operator::ADD, col_ref, col_ref); + + auto b = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i * 2; }); + auto expected = column_wrapper(b, b + N, validities.begin()); + auto result = cudf::ast::compute_column(table, expression); + + cudf::test::expect_columns_equal(expected, result->view(), true); +} + CUDF_TEST_PROGRAM_MAIN() diff --git a/cpp/tests/join/conditional_join_tests.cu b/cpp/tests/join/conditional_join_tests.cu new file mode 100644 index 00000000000..57abdf17aa6 --- /dev/null +++ b/cpp/tests/join/conditional_join_tests.cu @@ -0,0 +1,709 @@ +/* + * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +// Defining expressions for AST evaluation is currently a bit tedious, so we +// define some standard nodes here that can be easily reused elsewhere. +namespace { +constexpr cudf::size_type JoinNoneValue = + std::numeric_limits::min(); // TODO: how to test if this isn't public? + +// Common column references. +const auto col_ref_left_0 = cudf::ast::column_reference(0, cudf::ast::table_reference::LEFT); +const auto col_ref_right_0 = cudf::ast::column_reference(0, cudf::ast::table_reference::RIGHT); +const auto col_ref_left_1 = cudf::ast::column_reference(1, cudf::ast::table_reference::LEFT); +const auto col_ref_right_1 = cudf::ast::column_reference(1, cudf::ast::table_reference::RIGHT); + +// Common expressions. +auto left_zero_eq_right_zero = + cudf::ast::expression(cudf::ast::ast_operator::EQUAL, col_ref_left_0, col_ref_right_0); +} // namespace + +/** + * The principal fixture for all conditional joins. + */ +template +struct ConditionalJoinTest : public cudf::test::BaseFixture { + /** + * Convenience utility for parsing initializer lists of input data into + * suitable inputs for tables. + */ + std::tuple>, + std::vector>, + std::vector, + std::vector, + cudf::table_view, + cudf::table_view> + parse_input(std::vector> left_data, std::vector> right_data) + { + // Note that we need to maintain the column wrappers otherwise the + // resulting column views will be referencing potentially invalid memory. + std::vector> left_wrappers; + std::vector> right_wrappers; + + std::vector left_columns; + std::vector right_columns; + + for (auto v : left_data) { + left_wrappers.push_back(cudf::test::fixed_width_column_wrapper(v.begin(), v.end())); + left_columns.push_back(left_wrappers.back()); + } + + for (auto v : right_data) { + right_wrappers.push_back(cudf::test::fixed_width_column_wrapper(v.begin(), v.end())); + right_columns.push_back(right_wrappers.back()); + } + + return std::make_tuple(std::move(left_wrappers), + std::move(right_wrappers), + std::move(left_columns), + std::move(right_columns), + cudf::table_view(left_columns), + cudf::table_view(right_columns)); + } + + std::tuple>, + std::vector>, + std::vector, + std::vector, + cudf::table_view, + cudf::table_view> + parse_input(std::vector, std::vector>> left_data, + std::vector, std::vector>> right_data) + { + // Note that we need to maintain the column wrappers otherwise the + // resulting column views will be referencing potentially invalid memory. + std::vector> left_wrappers; + std::vector> right_wrappers; + + std::vector left_columns; + std::vector right_columns; + + for (auto v : left_data) { + left_wrappers.push_back(cudf::test::fixed_width_column_wrapper( + v.first.begin(), v.first.end(), v.second.begin())); + left_columns.push_back(left_wrappers.back()); + } + + for (auto v : right_data) { + right_wrappers.push_back(cudf::test::fixed_width_column_wrapper( + v.first.begin(), v.first.end(), v.second.begin())); + right_columns.push_back(right_wrappers.back()); + } + + return std::make_tuple(std::move(left_wrappers), + std::move(right_wrappers), + std::move(left_columns), + std::move(right_columns), + cudf::table_view(left_columns), + cudf::table_view(right_columns)); + } +}; + +/** + * Fixture for join types that return both left and right indices (inner, left, + * and full joins). + */ +template +struct ConditionalJoinPairReturnTest : public ConditionalJoinTest { + /* + * Perform a join of tables constructed from two input data sets according to + * the provided predicate and verify that the outputs match the expected + * outputs (up to order). + */ + void test(std::vector> left_data, + std::vector> right_data, + cudf::ast::expression predicate, + std::vector> expected_outputs) + { + // Note that we need to maintain the column wrappers otherwise the + // resulting column views will be referencing potentially invalid memory. + auto [left_wrappers, right_wrappers, left_columns, right_columns, left, right] = + this->parse_input(left_data, right_data); + auto result = this->join(left, right, predicate); + + std::vector> result_pairs; + for (size_t i = 0; i < result.first->size(); ++i) { + // Note: Not trying to be terribly efficient here since these tests are + // small, otherwise a batch copy to host before constructing the tuples + // would be important. + result_pairs.push_back({result.first->element(i, rmm::cuda_stream_default), + result.second->element(i, rmm::cuda_stream_default)}); + } + std::sort(result_pairs.begin(), result_pairs.end()); + std::sort(expected_outputs.begin(), expected_outputs.end()); + + EXPECT_TRUE(std::equal(result_pairs.begin(), result_pairs.end(), expected_outputs.begin())); + } + + void test_nulls(std::vector, std::vector>> left_data, + std::vector, std::vector>> right_data, + cudf::ast::expression predicate, + std::vector> expected_outputs) + { + // Note that we need to maintain the column wrappers otherwise the + // resulting column views will be referencing potentially invalid memory. + auto [left_wrappers, right_wrappers, left_columns, right_columns, left, right] = + this->parse_input(left_data, right_data); + auto result = this->join(left, right, predicate); + + std::vector> result_pairs; + for (size_t i = 0; i < result.first->size(); ++i) { + // Note: Not trying to be terribly efficient here since these tests are + // small, otherwise a batch copy to host before constructing the tuples + // would be important. + result_pairs.push_back({result.first->element(i, rmm::cuda_stream_default), + result.second->element(i, rmm::cuda_stream_default)}); + } + std::sort(result_pairs.begin(), result_pairs.end()); + std::sort(expected_outputs.begin(), expected_outputs.end()); + + EXPECT_TRUE(std::equal(result_pairs.begin(), result_pairs.end(), expected_outputs.begin())); + } + + /* + * Perform a join of tables constructed from two input data sets according to + * an equality predicate on all corresponding columns and verify that the outputs match the + * expected outputs (up to order). + */ + void compare_to_hash_join(std::vector> left_data, + std::vector> right_data) + { + // Note that we need to maintain the column wrappers otherwise the + // resulting column views will be referencing potentially invalid memory. + auto [left_wrappers, right_wrappers, left_columns, right_columns, left, right] = + this->parse_input(left_data, right_data); + // TODO: Generalize this to support multiple columns by automatically + // constructing the appropriate expression. + auto result = this->join(left, right, left_zero_eq_right_zero); + auto reference = this->reference_join(left, right); + + thrust::device_vector> result_pairs( + result.first->size()); + thrust::device_vector> reference_pairs( + reference.first->size()); + + thrust::transform(thrust::device, + result.first->begin(), + result.first->end(), + result.second->begin(), + result_pairs.begin(), + [] __device__(cudf::size_type first, cudf::size_type second) { + return thrust::make_pair(first, second); + }); + thrust::transform(thrust::device, + reference.first->begin(), + reference.first->end(), + reference.second->begin(), + reference_pairs.begin(), + [] __device__(cudf::size_type first, cudf::size_type second) { + return thrust::make_pair(first, second); + }); + + thrust::sort(thrust::device, result_pairs.begin(), result_pairs.end()); + thrust::sort(thrust::device, reference_pairs.begin(), reference_pairs.end()); + + EXPECT_TRUE(thrust::equal( + thrust::device, result_pairs.begin(), result_pairs.end(), reference_pairs.begin())); + } + + /** + * This method must be implemented by subclasses for specific types of joins. + * It should be a simply forwarding of arguments to the appropriate cudf + * conditional join API. + */ + virtual std::pair>, + std::unique_ptr>> + join(cudf::table_view left, cudf::table_view right, cudf::ast::expression predicate) = 0; + + /** + * This method must be implemented by subclasses for specific types of joins. + * It should be a simply forwarding of arguments to the appropriate cudf + * hash join API for comparison with conditional joins. + */ + virtual std::pair>, + std::unique_ptr>> + reference_join(cudf::table_view left, cudf::table_view right) = 0; +}; + +/** + * Tests of inner joins. + */ +template +struct ConditionalInnerJoinTest : public ConditionalJoinPairReturnTest { + std::pair>, + std::unique_ptr>> + join(cudf::table_view left, cudf::table_view right, cudf::ast::expression predicate) override + { + return cudf::conditional_inner_join(left, right, predicate); + } + + std::pair>, + std::unique_ptr>> + reference_join(cudf::table_view left, cudf::table_view right) override + { + return cudf::inner_join(left, right); + } +}; + +TYPED_TEST_CASE(ConditionalInnerJoinTest, cudf::test::IntegralTypesNotBool); + +TYPED_TEST(ConditionalInnerJoinTest, TestOneColumnOneRowAllEqual) +{ + this->test({{0}}, {{0}}, left_zero_eq_right_zero, {{0, 0}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestOneColumnTwoRowAllEqual) +{ + this->test({{0, 1}}, {{0, 0}}, left_zero_eq_right_zero, {{0, 0}, {0, 1}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestTwoColumnOneRowAllEqual) +{ + this->test({{0}, {0}}, {{0}, {0}}, left_zero_eq_right_zero, {{0, 0}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestTwoColumnThreeRowAllEqual) +{ + this->test({{0, 1, 2}, {10, 20, 30}}, + {{0, 1, 2}, {30, 40, 50}}, + left_zero_eq_right_zero, + {{0, 0}, {1, 1}, {2, 2}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestTwoColumnThreeRowSomeEqual) +{ + this->test({{0, 1, 2}, {10, 20, 30}}, + {{0, 1, 3}, {30, 40, 50}}, + left_zero_eq_right_zero, + {{0, 0}, {1, 1}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestNotComparison) +{ + auto col_ref_0 = cudf::ast::column_reference(0); + auto expression = cudf::ast::expression(cudf::ast::ast_operator::NOT, col_ref_0); + + this->test({{0, 1, 2}}, {{3, 4, 5}}, expression, {{0, 0}, {0, 1}, {0, 2}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestGreaterComparison) +{ + auto col_ref_0 = cudf::ast::column_reference(0); + auto col_ref_1 = cudf::ast::column_reference(0, cudf::ast::table_reference::RIGHT); + auto expression = cudf::ast::expression(cudf::ast::ast_operator::GREATER, col_ref_0, col_ref_1); + + this->test({{0, 1, 2}}, {{1, 0, 0}}, expression, {{1, 1}, {1, 2}, {2, 0}, {2, 1}, {2, 2}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestGreaterTwoColumnComparison) +{ + auto col_ref_0 = cudf::ast::column_reference(0); + auto col_ref_1 = cudf::ast::column_reference(1, cudf::ast::table_reference::RIGHT); + auto expression = cudf::ast::expression(cudf::ast::ast_operator::GREATER, col_ref_0, col_ref_1); + + this->test({{0, 1, 2}, {0, 0, 0}}, + {{0, 0, 0}, {1, 0, 0}}, + expression, + {{1, 1}, {1, 2}, {2, 0}, {2, 1}, {2, 2}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestGreaterDifferentNumberColumnComparison) +{ + auto col_ref_0 = cudf::ast::column_reference(0); + auto col_ref_1 = cudf::ast::column_reference(1, cudf::ast::table_reference::RIGHT); + auto expression = cudf::ast::expression(cudf::ast::ast_operator::GREATER, col_ref_0, col_ref_1); + + this->test( + {{0, 1, 2}}, {{0, 0, 0}, {1, 0, 0}}, expression, {{1, 1}, {1, 2}, {2, 0}, {2, 1}, {2, 2}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestGreaterDifferentNumberColumnDifferentSizeComparison) +{ + auto col_ref_0 = cudf::ast::column_reference(0); + auto col_ref_1 = cudf::ast::column_reference(1, cudf::ast::table_reference::RIGHT); + auto expression = cudf::ast::expression(cudf::ast::ast_operator::GREATER, col_ref_0, col_ref_1); + + this->test({{0, 1}}, {{0, 0, 0}, {1, 0, 0}}, expression, {{1, 1}, {1, 2}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestComplexConditionMultipleColumns) +{ + // LEFT is implicit, but specifying explicitly to validate that it works. + auto col_ref_0 = cudf::ast::column_reference(0, cudf::ast::table_reference::LEFT); + auto scalar_1 = cudf::numeric_scalar(1); + auto literal_1 = cudf::ast::literal(scalar_1); + auto left_0_equal_1 = cudf::ast::expression(cudf::ast::ast_operator::EQUAL, col_ref_0, literal_1); + + auto col_ref_1 = cudf::ast::column_reference(1, cudf::ast::table_reference::RIGHT); + auto comparison_filter = + cudf::ast::expression(cudf::ast::ast_operator::LESS, col_ref_1, col_ref_0); + + auto expression = + cudf::ast::expression(cudf::ast::ast_operator::LOGICAL_AND, left_0_equal_1, comparison_filter); + + this->test({{0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2}, {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}}, + {{0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2}, + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, + {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}, + expression, + {{4, 0}, {5, 0}, {6, 0}, {7, 0}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestCompareRandomToHash) +{ + // Generate columns of 10 repeats of the integer range [0, 10), then merge + // a shuffled version and compare to hash join. + unsigned int N = 10000; + unsigned int num_repeats = 10; + unsigned int num_unique = N / num_repeats; + + std::vector left(N); + std::vector right(N); + + for (unsigned int i = 0; i < num_repeats; ++i) { + std::iota( + std::next(left.begin(), num_unique * i), std::next(left.begin(), num_unique * (i + 1)), 0); + std::iota( + std::next(right.begin(), num_unique * i), std::next(right.begin(), num_unique * (i + 1)), 0); + } + + std::random_device rd; + std::mt19937 gen(rd()); + std::shuffle(left.begin(), left.end(), gen); + std::shuffle(right.begin(), right.end(), gen); + + this->compare_to_hash_join({left}, {right}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestOneColumnTwoNullsRowAllEqual) +{ + this->test_nulls( + {{{0, 1}, {1, 0}}}, {{{0, 0}, {1, 1}}}, left_zero_eq_right_zero, {{0, 0}, {0, 1}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestOneColumnTwoNullsNoOutputRowAllEqual) +{ + this->test_nulls({{{0, 1}, {0, 1}}}, {{{0, 0}, {1, 1}}}, left_zero_eq_right_zero, {{}, {}}); +}; + +/** + * Tests of left joins. + */ +template +struct ConditionalLeftJoinTest : public ConditionalJoinPairReturnTest { + std::pair>, + std::unique_ptr>> + join(cudf::table_view left, cudf::table_view right, cudf::ast::expression predicate) override + { + return cudf::conditional_left_join(left, right, predicate); + } + + std::pair>, + std::unique_ptr>> + reference_join(cudf::table_view left, cudf::table_view right) override + { + return cudf::left_join(left, right); + } +}; + +TYPED_TEST_CASE(ConditionalLeftJoinTest, cudf::test::IntegralTypesNotBool); + +TYPED_TEST(ConditionalLeftJoinTest, TestTwoColumnThreeRowSomeEqual) +{ + this->test({{0, 1, 2}, {10, 20, 30}}, + {{0, 1, 3}, {30, 40, 50}}, + left_zero_eq_right_zero, + {{0, 0}, {1, 1}, {2, JoinNoneValue}}); +}; + +TYPED_TEST(ConditionalLeftJoinTest, TestCompareRandomToHash) +{ + // Generate columns of 10 repeats of the integer range [0, 10), then merge + // a shuffled version and compare to hash join. + unsigned int N = 10000; + unsigned int num_repeats = 10; + unsigned int num_unique = N / num_repeats; + + std::vector left(N); + std::vector right(N); + + for (unsigned int i = 0; i < num_repeats; ++i) { + std::iota( + std::next(left.begin(), num_unique * i), std::next(left.begin(), num_unique * (i + 1)), 0); + std::iota( + std::next(right.begin(), num_unique * i), std::next(right.begin(), num_unique * (i + 1)), 0); + } + + std::random_device rd; + std::mt19937 gen(rd()); + std::shuffle(left.begin(), left.end(), gen); + std::shuffle(right.begin(), right.end(), gen); + + this->compare_to_hash_join({left}, {right}); +}; + +/** + * Tests of full joins. + */ +template +struct ConditionalFullJoinTest : public ConditionalJoinPairReturnTest { + std::pair>, + std::unique_ptr>> + join(cudf::table_view left, cudf::table_view right, cudf::ast::expression predicate) override + { + return cudf::conditional_full_join(left, right, predicate); + } + + std::pair>, + std::unique_ptr>> + reference_join(cudf::table_view left, cudf::table_view right) override + { + return cudf::full_join(left, right); + } +}; + +TYPED_TEST_CASE(ConditionalFullJoinTest, cudf::test::IntegralTypesNotBool); + +TYPED_TEST(ConditionalFullJoinTest, TestTwoColumnThreeRowSomeEqual) +{ + this->test({{0, 1, 2}, {10, 20, 30}}, + {{0, 1, 3}, {30, 40, 50}}, + left_zero_eq_right_zero, + {{0, 0}, {1, 1}, {2, JoinNoneValue}, {JoinNoneValue, 2}}); +}; + +TYPED_TEST(ConditionalFullJoinTest, TestCompareRandomToHash) +{ + // Generate columns of 10 repeats of the integer range [0, 10), then merge + // a shuffled version and compare to hash join. + unsigned int N = 10000; + unsigned int num_repeats = 10; + unsigned int num_unique = N / num_repeats; + + std::vector left(N); + std::vector right(N); + + for (unsigned int i = 0; i < num_repeats; ++i) { + std::iota( + std::next(left.begin(), num_unique * i), std::next(left.begin(), num_unique * (i + 1)), 0); + std::iota( + std::next(right.begin(), num_unique * i), std::next(right.begin(), num_unique * (i + 1)), 0); + } + + std::random_device rd; + std::mt19937 gen(rd()); + std::shuffle(left.begin(), left.end(), gen); + std::shuffle(right.begin(), right.end(), gen); + + this->compare_to_hash_join({left}, {right}); +}; + +/** + * Fixture for join types that return both only left indices (left semi and + * left anti). + */ +template +struct ConditionalJoinSingleReturnTest : public ConditionalJoinTest { + /* + * Perform a join of tables constructed from two input data sets according to + * the provided predicate and verify that the outputs match the expected + * outputs (up to order). + */ + void test(std::vector> left_data, + std::vector> right_data, + cudf::ast::expression predicate, + std::vector expected_outputs) + { + auto [left_wrappers, right_wrappers, left_columns, right_columns, left, right] = + this->parse_input(left_data, right_data); + auto result = this->join(left, right, predicate); + + std::vector resulting_indices; + for (size_t i = 0; i < result->size(); ++i) { + // Note: Not trying to be terribly efficient here since these tests are + // small, otherwise a batch copy to host before constructing the tuples + // would be important. + resulting_indices.push_back(result->element(i, rmm::cuda_stream_default)); + } + std::sort(resulting_indices.begin(), resulting_indices.end()); + std::sort(expected_outputs.begin(), expected_outputs.end()); + EXPECT_TRUE( + std::equal(resulting_indices.begin(), resulting_indices.end(), expected_outputs.begin())); + } + + /* + * Perform a join of tables constructed from two input data sets according to + * an equality predicate on all corresponding columns and verify that the outputs match the + * expected outputs (up to order). + */ + void compare_to_hash_join(std::vector> left_data, + std::vector> right_data) + { + // Note that we need to maintain the column wrappers otherwise the + // resulting column views will be referencing potentially invalid memory. + auto [left_wrappers, right_wrappers, left_columns, right_columns, left, right] = + this->parse_input(left_data, right_data); + // TODO: Generalize this to support multiple columns by automatically + // constructing the appropriate expression. + auto result = this->join(left, right, left_zero_eq_right_zero); + auto reference = this->reference_join(left, right); + + thrust::sort(thrust::device, result->begin(), result->end()); + thrust::sort(thrust::device, reference->begin(), reference->end()); + + EXPECT_TRUE(thrust::equal(thrust::device, result->begin(), result->end(), reference->begin())); + } + + /** + * This method must be implemented by subclasses for specific types of joins. + * It should be a simply forwarding of arguments to the appropriate cudf + * conditional join API. + */ + virtual std::unique_ptr> join( + cudf::table_view left, cudf::table_view right, cudf::ast::expression predicate) = 0; + + /** + * This method must be implemented by subclasses for specific types of joins. + * It should be a simply forwarding of arguments to the appropriate cudf + * hash join API for comparison with conditional joins. + */ + virtual std::unique_ptr> reference_join( + cudf::table_view left, cudf::table_view right) = 0; +}; + +/** + * Tests of left semi joins. + */ +template +struct ConditionalLeftSemiJoinTest : public ConditionalJoinSingleReturnTest { + std::unique_ptr> join( + cudf::table_view left, cudf::table_view right, cudf::ast::expression predicate) override + { + return cudf::conditional_left_semi_join(left, right, predicate); + } + + std::unique_ptr> reference_join( + cudf::table_view left, cudf::table_view right) override + { + return cudf::left_semi_join(left, right); + } +}; + +TYPED_TEST_CASE(ConditionalLeftSemiJoinTest, cudf::test::IntegralTypesNotBool); + +TYPED_TEST(ConditionalLeftSemiJoinTest, TestTwoColumnThreeRowSomeEqual) +{ + this->test({{0, 1, 2}, {10, 20, 30}}, {{0, 1, 3}, {30, 40, 50}}, left_zero_eq_right_zero, {0, 1}); +}; + +TYPED_TEST(ConditionalLeftSemiJoinTest, TestCompareRandomToHash) +{ + // Generate columns of 10 repeats of the integer range [0, 10), then merge + // a shuffled version and compare to hash join. + unsigned int N = 10000; + unsigned int num_repeats = 10; + unsigned int num_unique = N / num_repeats; + + std::vector left(N); + std::vector right(N); + + for (unsigned int i = 0; i < num_repeats; ++i) { + std::iota( + std::next(left.begin(), num_unique * i), std::next(left.begin(), num_unique * (i + 1)), 0); + std::iota( + std::next(right.begin(), num_unique * i), std::next(right.begin(), num_unique * (i + 1)), 0); + } + + std::random_device rd; + std::mt19937 gen(rd()); + std::shuffle(left.begin(), left.end(), gen); + std::shuffle(right.begin(), right.end(), gen); + + this->compare_to_hash_join({left}, {right}); +}; + +/** + * Tests of left anti joins. + */ +template +struct ConditionalLeftAntiJoinTest : public ConditionalJoinSingleReturnTest { + std::unique_ptr> join( + cudf::table_view left, cudf::table_view right, cudf::ast::expression predicate) override + { + return cudf::conditional_left_anti_join(left, right, predicate); + } + + std::unique_ptr> reference_join( + cudf::table_view left, cudf::table_view right) override + { + return cudf::left_anti_join(left, right); + } +}; + +TYPED_TEST_CASE(ConditionalLeftAntiJoinTest, cudf::test::IntegralTypesNotBool); + +TYPED_TEST(ConditionalLeftAntiJoinTest, TestTwoColumnThreeRowSomeEqual) +{ + this->test({{0, 1, 2}, {10, 20, 30}}, {{0, 1, 3}, {30, 40, 50}}, left_zero_eq_right_zero, {2}); +}; + +TYPED_TEST(ConditionalLeftAntiJoinTest, TestCompareRandomToHash) +{ + // Generate columns of 10 repeats of the integer range [0, 10), then merge + // a shuffled version and compare to hash join. + unsigned int N = 10000; + unsigned int num_repeats = 10; + unsigned int num_unique = N / num_repeats; + + std::vector left(N); + std::vector right(N); + + for (unsigned int i = 0; i < num_repeats; ++i) { + std::iota( + std::next(left.begin(), num_unique * i), std::next(left.begin(), num_unique * (i + 1)), 0); + std::iota( + std::next(right.begin(), num_unique * i), std::next(right.begin(), num_unique * (i + 1)), 0); + } + + std::random_device rd; + std::mt19937 gen(rd()); + std::shuffle(left.begin(), left.end(), gen); + std::shuffle(right.begin(), right.end(), gen); + + this->compare_to_hash_join({left}, {right}); +};