From 397bf0afb66efdf9025cc5425af422c3478f62fb Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Wed, 14 Jul 2021 10:08:50 -0700 Subject: [PATCH] Enable AST-based joining (#8214) This PR implements conditional joins using expressions that are decomposed into abstract syntax trees for evaluation. This PR builds on the AST evaluation framework established in #5494 and #7418, but significantly refactors the internals and generalizes them to enable 1) expressions on two tables and 2) operations on nullable columns. This PR uses the nested loop join code created in #5397 for inner joins, but also substantially generalizes that code to enable 1) all types of joins, 2) joins with arbitrary AST expressions rather than just equality, and 3) handling of null values (with user-specified `null_equality`). A significant chunk of the code is currently out of place, but since this changeset is rather large I've opted not to move things in ways that will make reviewing this PR significantly more challenging. I will make a follow-up to address those issues once this PR is merged. Authors: - Vyas Ramasubramani (https://github.com/vyasr) Approvers: - Jake Hemstad (https://github.com/jrhemstad) - Conor Hoekstra (https://github.com/codereport) URL: https://github.com/rapidsai/cudf/pull/8214 --- cpp/benchmarks/CMakeLists.txt | 2 +- cpp/benchmarks/ast/transform_benchmark.cpp | 76 +- .../join/conditional_join_benchmark.cu | 379 +++++++ cpp/include/cudf/ast/detail/linearizer.hpp | 19 +- cpp/include/cudf/ast/detail/transform.cuh | 959 +++++++++++++----- cpp/include/cudf/join.hpp | 203 ++++ cpp/src/ast/linearizer.cpp | 4 +- cpp/src/ast/transform.cu | 101 +- cpp/src/join/hash_join.cu | 11 +- cpp/src/join/hash_join.cuh | 11 + cpp/src/join/join.cu | 100 ++ cpp/src/join/join_kernels.cuh | 113 ++- cpp/src/join/nested_loop_join.cuh | 249 +++-- cpp/tests/CMakeLists.txt | 1 + cpp/tests/ast/transform_tests.cpp | 47 +- cpp/tests/join/conditional_join_tests.cu | 709 +++++++++++++ 16 files changed, 2488 insertions(+), 496 deletions(-) create mode 100644 cpp/benchmarks/join/conditional_join_benchmark.cu create mode 100644 cpp/tests/join/conditional_join_tests.cu diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index 6a2b71ae1d9..e5bee4771df 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -101,7 +101,7 @@ ConfigureBench(STREAM_COMPACTION_BENCH stream_compaction/drop_duplicates_benchma ################################################################################################### # - join benchmark -------------------------------------------------------------------------------- -ConfigureBench(JOIN_BENCH join/join_benchmark.cu) +ConfigureBench(JOIN_BENCH join/join_benchmark.cu join/conditional_join_benchmark.cu) ################################################################################################### # - iterator benchmark ---------------------------------------------------------------------------- diff --git a/cpp/benchmarks/ast/transform_benchmark.cpp b/cpp/benchmarks/ast/transform_benchmark.cpp index e1d52d7f0e6..6f131cf0d6a 100644 --- a/cpp/benchmarks/ast/transform_benchmark.cpp +++ b/cpp/benchmarks/ast/transform_benchmark.cpp @@ -30,9 +30,9 @@ #include #include -#include #include #include +#include #include enum class TreeType { @@ -40,11 +40,11 @@ enum class TreeType { // child column reference }; -template +template class AST : public cudf::benchmark { }; -template +template static void BM_ast_transform(benchmark::State& state) { const cudf::size_type table_size{(cudf::size_type)state.range(0)}; @@ -56,10 +56,24 @@ static void BM_ast_transform(benchmark::State& state) auto columns = std::vector(n_cols); auto data_iterator = thrust::make_counting_iterator(0); - std::generate_n(column_wrappers.begin(), n_cols, [=]() { - return cudf::test::fixed_width_column_wrapper(data_iterator, - data_iterator + table_size); - }); + + if constexpr (Nullable) { + auto validities = std::vector(table_size); + std::random_device rd; + std::mt19937 gen(rd()); + + std::generate( + validities.begin(), validities.end(), [&]() { return gen() > (0.5 * gen.max()); }); + std::generate_n(column_wrappers.begin(), n_cols, [=]() { + return cudf::test::fixed_width_column_wrapper( + data_iterator, data_iterator + table_size, validities.begin()); + }); + } else { + std::generate_n(column_wrappers.begin(), n_cols, [=]() { + return cudf::test::fixed_width_column_wrapper(data_iterator, + data_iterator + table_size); + }); + } std::transform( column_wrappers.begin(), column_wrappers.end(), columns.begin(), [](auto const& col) { return static_cast(col); @@ -113,22 +127,23 @@ static void BM_ast_transform(benchmark::State& state) (tree_levels + 1) * sizeof(key_type)); } -#define AST_TRANSFORM_BENCHMARK_DEFINE(name, key_type, tree_type, reuse_columns) \ - BENCHMARK_TEMPLATE_DEFINE_F(AST, name, key_type, tree_type, reuse_columns) \ - (::benchmark::State & st) { BM_ast_transform(st); } - -AST_TRANSFORM_BENCHMARK_DEFINE(ast_int32_imbalanced_unique, - int32_t, - TreeType::IMBALANCED_LEFT, - false); -AST_TRANSFORM_BENCHMARK_DEFINE(ast_int32_imbalanced_reuse, - int32_t, - TreeType::IMBALANCED_LEFT, - true); -AST_TRANSFORM_BENCHMARK_DEFINE(ast_double_imbalanced_unique, - double, - TreeType::IMBALANCED_LEFT, - false); +#define AST_TRANSFORM_BENCHMARK_DEFINE(name, key_type, tree_type, reuse_columns, nullable) \ + BENCHMARK_TEMPLATE_DEFINE_F(AST, name, key_type, tree_type, reuse_columns, nullable) \ + (::benchmark::State & st) { BM_ast_transform(st); } + +AST_TRANSFORM_BENCHMARK_DEFINE( + ast_int32_imbalanced_unique, int32_t, TreeType::IMBALANCED_LEFT, false, false); +AST_TRANSFORM_BENCHMARK_DEFINE( + ast_int32_imbalanced_reuse, int32_t, TreeType::IMBALANCED_LEFT, true, false); +AST_TRANSFORM_BENCHMARK_DEFINE( + ast_double_imbalanced_unique, double, TreeType::IMBALANCED_LEFT, false, false); + +AST_TRANSFORM_BENCHMARK_DEFINE( + ast_int32_imbalanced_unique_nulls, int32_t, TreeType::IMBALANCED_LEFT, false, true); +AST_TRANSFORM_BENCHMARK_DEFINE( + ast_int32_imbalanced_reuse_nulls, int32_t, TreeType::IMBALANCED_LEFT, true, true); +AST_TRANSFORM_BENCHMARK_DEFINE( + ast_double_imbalanced_unique_nulls, double, TreeType::IMBALANCED_LEFT, false, true); static void CustomRanges(benchmark::internal::Benchmark* b) { @@ -155,3 +170,18 @@ BENCHMARK_REGISTER_F(AST, ast_double_imbalanced_unique) ->Apply(CustomRanges) ->Unit(benchmark::kMillisecond) ->UseManualTime(); + +BENCHMARK_REGISTER_F(AST, ast_int32_imbalanced_unique_nulls) + ->Apply(CustomRanges) + ->Unit(benchmark::kMillisecond) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(AST, ast_int32_imbalanced_reuse_nulls) + ->Apply(CustomRanges) + ->Unit(benchmark::kMillisecond) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(AST, ast_double_imbalanced_unique_nulls) + ->Apply(CustomRanges) + ->Unit(benchmark::kMillisecond) + ->UseManualTime(); diff --git a/cpp/benchmarks/join/conditional_join_benchmark.cu b/cpp/benchmarks/join/conditional_join_benchmark.cu new file mode 100644 index 00000000000..4a655e29f74 --- /dev/null +++ b/cpp/benchmarks/join/conditional_join_benchmark.cu @@ -0,0 +1,379 @@ +/* + * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include "generate_input_tables.cuh" + +template +class ConditionalJoin : public cudf::benchmark { +}; + +template +static void BM_join(benchmark::State& state, Join JoinFunc) +{ + const cudf::size_type build_table_size{(cudf::size_type)state.range(0)}; + const cudf::size_type probe_table_size{(cudf::size_type)state.range(1)}; + const cudf::size_type rand_max_val{build_table_size * 2}; + const double selectivity = 0.3; + const bool is_build_table_key_unique = true; + + // Generate build and probe tables + cudf::test::UniformRandomGenerator rand_gen(0, build_table_size); + auto build_random_null_mask = [&rand_gen](int size) { + if (Nullable) { + // roughly 25% nulls + auto validity = thrust::make_transform_iterator( + thrust::make_counting_iterator(0), + [&rand_gen](auto i) { return (rand_gen.generate() & 3) == 0; }); + return cudf::test::detail::make_null_mask(validity, validity + size); + } else { + return cudf::create_null_mask(size, cudf::mask_state::UNINITIALIZED); + } + }; + + std::unique_ptr build_key_column = [&]() { + return Nullable ? cudf::make_numeric_column(cudf::data_type(cudf::type_to_id()), + build_table_size, + build_random_null_mask(build_table_size)) + : cudf::make_numeric_column(cudf::data_type(cudf::type_to_id()), + build_table_size); + }(); + std::unique_ptr probe_key_column = [&]() { + return Nullable ? cudf::make_numeric_column(cudf::data_type(cudf::type_to_id()), + probe_table_size, + build_random_null_mask(probe_table_size)) + : cudf::make_numeric_column(cudf::data_type(cudf::type_to_id()), + probe_table_size); + }(); + + generate_input_tables( + build_key_column->mutable_view().data(), + build_table_size, + probe_key_column->mutable_view().data(), + probe_table_size, + selectivity, + rand_max_val, + is_build_table_key_unique); + + auto payload_data_it = thrust::make_counting_iterator(0); + cudf::test::fixed_width_column_wrapper build_payload_column( + payload_data_it, payload_data_it + build_table_size); + + cudf::test::fixed_width_column_wrapper probe_payload_column( + payload_data_it, payload_data_it + probe_table_size); + + CHECK_CUDA(0); + + cudf::table_view build_table({build_key_column->view(), build_payload_column}); + cudf::table_view probe_table({probe_key_column->view(), probe_payload_column}); + + // Benchmark the inner join operation + + for (auto _ : state) { + cuda_event_timer raii(state, true, rmm::cuda_stream_default); + + // Common column references. + const auto col_ref_left_0 = cudf::ast::column_reference(0); + const auto col_ref_right_0 = cudf::ast::column_reference(0, cudf::ast::table_reference::RIGHT); + auto left_zero_eq_right_zero = + cudf::ast::expression(cudf::ast::ast_operator::EQUAL, col_ref_left_0, col_ref_right_0); + + auto result = + JoinFunc(probe_table, build_table, left_zero_eq_right_zero, cudf::null_equality::UNEQUAL); + } +} + +#define CONDITIONAL_INNER_JOIN_BENCHMARK_DEFINE(name, key_type, payload_type, nullable) \ + BENCHMARK_TEMPLATE_DEFINE_F(ConditionalJoin, name, key_type, payload_type) \ + (::benchmark::State & st) \ + { \ + auto join = [](cudf::table_view const& left, \ + cudf::table_view const& right, \ + cudf::ast::expression binary_pred, \ + cudf::null_equality compare_nulls) { \ + return cudf::conditional_inner_join(left, right, binary_pred, compare_nulls); \ + }; \ + BM_join(st, join); \ + } + +CONDITIONAL_INNER_JOIN_BENCHMARK_DEFINE(conditional_inner_join_32bit, int32_t, int32_t, false); +CONDITIONAL_INNER_JOIN_BENCHMARK_DEFINE(conditional_inner_join_64bit, int64_t, int64_t, false); +CONDITIONAL_INNER_JOIN_BENCHMARK_DEFINE(conditional_inner_join_32bit_nulls, int32_t, int32_t, true); +CONDITIONAL_INNER_JOIN_BENCHMARK_DEFINE(conditional_inner_join_64bit_nulls, int64_t, int64_t, true); + +#define CONDITIONAL_LEFT_JOIN_BENCHMARK_DEFINE(name, key_type, payload_type, nullable) \ + BENCHMARK_TEMPLATE_DEFINE_F(ConditionalJoin, name, key_type, payload_type) \ + (::benchmark::State & st) \ + { \ + auto join = [](cudf::table_view const& left, \ + cudf::table_view const& right, \ + cudf::ast::expression binary_pred, \ + cudf::null_equality compare_nulls) { \ + return cudf::conditional_left_join(left, right, binary_pred, compare_nulls); \ + }; \ + BM_join(st, join); \ + } + +CONDITIONAL_LEFT_JOIN_BENCHMARK_DEFINE(conditional_left_join_32bit, int32_t, int32_t, false); +CONDITIONAL_LEFT_JOIN_BENCHMARK_DEFINE(conditional_left_join_64bit, int64_t, int64_t, false); +CONDITIONAL_LEFT_JOIN_BENCHMARK_DEFINE(conditional_left_join_32bit_nulls, int32_t, int32_t, true); +CONDITIONAL_LEFT_JOIN_BENCHMARK_DEFINE(conditional_left_join_64bit_nulls, int64_t, int64_t, true); + +#define CONDITIONAL_FULL_JOIN_BENCHMARK_DEFINE(name, key_type, payload_type, nullable) \ + BENCHMARK_TEMPLATE_DEFINE_F(ConditionalJoin, name, key_type, payload_type) \ + (::benchmark::State & st) \ + { \ + auto join = [](cudf::table_view const& left, \ + cudf::table_view const& right, \ + cudf::ast::expression binary_pred, \ + cudf::null_equality compare_nulls) { \ + return cudf::conditional_inner_join(left, right, binary_pred, compare_nulls); \ + }; \ + BM_join(st, join); \ + } + +CONDITIONAL_FULL_JOIN_BENCHMARK_DEFINE(conditional_full_join_32bit, int32_t, int32_t, false); +CONDITIONAL_FULL_JOIN_BENCHMARK_DEFINE(conditional_full_join_64bit, int64_t, int64_t, false); +CONDITIONAL_FULL_JOIN_BENCHMARK_DEFINE(conditional_full_join_32bit_nulls, int32_t, int32_t, true); +CONDITIONAL_FULL_JOIN_BENCHMARK_DEFINE(conditional_full_join_64bit_nulls, int64_t, int64_t, true); + +#define CONDITIONAL_LEFT_ANTI_JOIN_BENCHMARK_DEFINE(name, key_type, payload_type, nullable) \ + BENCHMARK_TEMPLATE_DEFINE_F(ConditionalJoin, name, key_type, payload_type) \ + (::benchmark::State & st) \ + { \ + auto join = [](cudf::table_view const& left, \ + cudf::table_view const& right, \ + cudf::ast::expression binary_pred, \ + cudf::null_equality compare_nulls) { \ + return cudf::conditional_left_anti_join(left, right, binary_pred, compare_nulls); \ + }; \ + BM_join(st, join); \ + } + +CONDITIONAL_LEFT_ANTI_JOIN_BENCHMARK_DEFINE(conditional_left_anti_join_32bit, + int32_t, + int32_t, + false); +CONDITIONAL_LEFT_ANTI_JOIN_BENCHMARK_DEFINE(conditional_left_anti_join_64bit, + int64_t, + int64_t, + false); +CONDITIONAL_LEFT_ANTI_JOIN_BENCHMARK_DEFINE(conditional_left_anti_join_32bit_nulls, + int32_t, + int32_t, + true); +CONDITIONAL_LEFT_ANTI_JOIN_BENCHMARK_DEFINE(conditional_left_anti_join_64bit_nulls, + int64_t, + int64_t, + true); + +#define CONDITIONAL_LEFT_SEMI_JOIN_BENCHMARK_DEFINE(name, key_type, payload_type, nullable) \ + BENCHMARK_TEMPLATE_DEFINE_F(ConditionalJoin, name, key_type, payload_type) \ + (::benchmark::State & st) \ + { \ + auto join = [](cudf::table_view const& left, \ + cudf::table_view const& right, \ + cudf::ast::expression binary_pred, \ + cudf::null_equality compare_nulls) { \ + return cudf::conditional_left_semi_join(left, right, binary_pred, compare_nulls); \ + }; \ + BM_join(st, join); \ + } + +CONDITIONAL_LEFT_SEMI_JOIN_BENCHMARK_DEFINE(conditional_left_semi_join_32bit, + int32_t, + int32_t, + false); +CONDITIONAL_LEFT_SEMI_JOIN_BENCHMARK_DEFINE(conditional_left_semi_join_64bit, + int64_t, + int64_t, + false); +CONDITIONAL_LEFT_SEMI_JOIN_BENCHMARK_DEFINE(conditional_left_semi_join_32bit_nulls, + int32_t, + int32_t, + true); +CONDITIONAL_LEFT_SEMI_JOIN_BENCHMARK_DEFINE(conditional_left_semi_join_64bit_nulls, + int64_t, + int64_t, + true); + +// inner join ----------------------------------------------------------------------- +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_inner_join_32bit) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + // TODO: The below benchmark is slow, but can be useful to validate that the + // code works for large data sets. This benchmark was used to compare to the + // otherwise equivalent nullable benchmark below, which has memory errors for + // sufficiently large data sets. + //->Args({1'000'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_inner_join_64bit) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_inner_join_32bit_nulls) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_inner_join_64bit_nulls) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +// left join ----------------------------------------------------------------------- +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_join_32bit) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_join_64bit) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_join_32bit_nulls) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_join_64bit_nulls) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +// full join ----------------------------------------------------------------------- +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_full_join_32bit) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_full_join_64bit) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_full_join_32bit_nulls) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_full_join_64bit_nulls) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +// left anti-join ------------------------------------------------------------- +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_anti_join_32bit) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_anti_join_64bit) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_anti_join_32bit_nulls) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_anti_join_64bit_nulls) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +// left semi-join ------------------------------------------------------------- +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_semi_join_32bit) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_semi_join_64bit) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_semi_join_32bit_nulls) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); + +BENCHMARK_REGISTER_F(ConditionalJoin, conditional_left_semi_join_64bit_nulls) + ->Unit(benchmark::kMillisecond) + ->Args({100'000, 100'000}) + ->Args({100'000, 400'000}) + ->Args({100'000, 1'000'000}) + ->UseManualTime(); diff --git a/cpp/include/cudf/ast/detail/linearizer.hpp b/cpp/include/cudf/ast/detail/linearizer.hpp index 166a0408703..67474e08877 100644 --- a/cpp/include/cudf/ast/detail/linearizer.hpp +++ b/cpp/include/cudf/ast/detail/linearizer.hpp @@ -103,10 +103,24 @@ class linearizer { /** * @brief Construct a new linearizer object * + * @param expr The expression to create an evaluable linearizer for. + * @param left The left table used for evaluating the abstract syntax tree. + * @param right The right table used for evaluating the abstract syntax tree. + */ + linearizer(detail::node const& expr, cudf::table_view left, cudf::table_view right) + : _left(left), _right(right), _node_count(0), _intermediate_counter() + { + expr.accept(*this); + } + + /** + * @brief Construct a new linearizer object + * + * @param expr The expression to create an evaluable linearizer for. * @param table The table used for evaluating the abstract syntax tree. */ linearizer(detail::node const& expr, cudf::table_view table) - : _table(table), _node_count(0), _intermediate_counter() + : _left(table), _right(table), _node_count(0), _intermediate_counter() { expr.accept(*this); } @@ -217,7 +231,8 @@ class linearizer { cudf::size_type add_data_reference(detail::device_data_reference data_ref); // State information about the "linearized" GPU execution plan - cudf::table_view _table; + cudf::table_view const& _left; + cudf::table_view const& _right; cudf::size_type _node_count; intermediate_counter _intermediate_counter; std::vector _data_references; diff --git a/cpp/include/cudf/ast/detail/transform.cuh b/cpp/include/cudf/ast/detail/transform.cuh index f69927a3601..e56b4fb2281 100644 --- a/cpp/include/cudf/ast/detail/transform.cuh +++ b/cpp/include/cudf/ast/detail/transform.cuh @@ -31,6 +31,8 @@ #include +#include + #include #include @@ -40,132 +42,375 @@ namespace ast { namespace detail { -// Forward declaration -struct row_evaluator; +// Type trait for wrapping nullable types in a thrust::optional. Non-nullable +// types are returned as is. +template +struct possibly_null_value; -struct row_output { - public: - __device__ row_output(row_evaluator const& evaluator) : evaluator(evaluator) {} +template +struct possibly_null_value { + using type = thrust::optional; +}; + +template +struct possibly_null_value { + using type = T; +}; + +template +using possibly_null_value_t = typename possibly_null_value::type; + +// Type used for intermediate storage in expression evaluation. +template +using IntermediateDataType = possibly_null_value_t; +/** + * @brief A container for capturing the output of an evaluated expression. + * + * This class is designed to be passed by reference as the first argument to + * expression_evaluator::evaluate. The API is designed such that template + * specializations for specific output types will be able to customize setting + * behavior if necessary. The class leverages CRTP to define a suitable interface + * for the `expression_evaluator` at compile-time and enforce this API on its + * subclasses to get around the lack of device-side polymorphism. + * + * @tparam Subclass The subclass to dispatch methods to. + * @tparam T The underlying data type. + * @tparam has_nulls Whether or not the result data is nullable. + */ +template +struct expression_result { /** - * @brief Resolves an output data reference and assigns result value. - * - * Only output columns (COLUMN) and intermediates (INTERMEDIATE) are supported as output reference - * types. Intermediates must be of fixed width less than or equal to sizeof(std::int64_t). This - * requirement on intermediates is enforced by the linearizer. - * - * @tparam Element Type of result element. - * @param device_data_reference Data reference to resolve. - * @param row_index Row index of data column. - * @param result Value to assign to output. + * Helper function to get the subclass type to dispatch methods to. */ - template ())> - __device__ void resolve_output(detail::device_data_reference device_data_reference, - cudf::size_type row_index, - Element result) const; - // Definition below after row_evaluator is a complete type - - template ())> - __device__ void resolve_output(detail::device_data_reference device_data_reference, - cudf::size_type row_index, - Element result) const + Subclass& subclass() { return static_cast(*this); } + Subclass const& subclass() const { return static_cast(*this); } + + // TODO: The index is ignored by the value subclass, but is included in this + // signature because it is required by the implementation in the template + // specialization for column views. It would be nice to clean this up, see + // the related TODO below. Note that storing the index in the class on + // construction (which would result in a cleaner delineation of the API for + // the derived types) results in a significant performance penalty because + // the index is pushed down the memory hierarchy by the time it needs to be + // used, whereas passing it as a parameter keeps it in registers for fast + // access at the point where indexing occurs. + template + __device__ void set_value(cudf::size_type index, possibly_null_value_t result) { - cudf_assert(false && "Invalid type in resolve_output."); + subclass()->set_value(); } - private: - row_evaluator const& evaluator; + __device__ bool is_valid() const { subclass()->is_valid(); } + + __device__ T value() const { subclass()->value(); } }; -template -struct unary_row_output : public row_output { - __device__ unary_row_output(row_evaluator const& evaluator) : row_output(evaluator) {} +/** + * @brief A container for capturing the output of an evaluated expression in a scalar. + * + * This subclass of `expression_result` functions as an owning container of a + * (possibly nullable) scalar type that can be written to by the + * expression_evaluator. The data (and its validity) can then be accessed. + * + * @tparam T The underlying data type. + * @tparam has_nulls Whether or not the result data is nullable. + */ +template +struct value_expression_result + : public expression_result, T, has_nulls> { + __device__ value_expression_result() {} - template < - ast_operator op, - std::enable_if_t, Input>>* = nullptr> - __device__ void operator()(cudf::size_type row_index, - Input input, - detail::device_data_reference output) const + template + __device__ void set_value(cudf::size_type index, possibly_null_value_t result) { - using OperatorFunctor = detail::operator_functor; - using Out = cuda::std::invoke_result_t; - resolve_output(output, row_index, OperatorFunctor{}(input)); + if constexpr (std::is_same_v) { + _obj = result; + } else { + cudf_assert(false && "Output type does not match container type."); + } } - template < - ast_operator op, - std::enable_if_t, Input>>* = nullptr> - __device__ void operator()(cudf::size_type row_index, - Input input, - detail::device_data_reference output) const + /** + * @brief Returns true if the underlying data is valid and false otherwise. + */ + __device__ bool is_valid() const { - cudf_assert(false && "Invalid unary dispatch operator for the provided input."); + if constexpr (has_nulls) { return _obj.has_value(); } + return true; } + + /** + * @brief Returns the underlying data. + * + * @throws thrust::bad_optional_access if the underlying data is not valid. + */ + __device__ T value() const + { + // Using two separate constexprs silences compiler warnings, whereas an + // if/else does not. An unconditional return is not ignored by the compiler + // when has_nulls is true and therefore raises a compiler error. + if constexpr (has_nulls) { return _obj.value(); } + if constexpr (!has_nulls) { return _obj; } + } + + possibly_null_value_t + _obj; ///< The underlying data value, or a nullable version of it. }; -template -struct binary_row_output : public row_output { - __device__ binary_row_output(row_evaluator const& evaluator) : row_output(evaluator) {} - - template < - ast_operator op, - std::enable_if_t, LHS, RHS>>* = nullptr> - __device__ void operator()(cudf::size_type row_index, - LHS lhs, - RHS rhs, - detail::device_data_reference output) const +// TODO: The below implementation significantly differs from the default +// implementation above due to the non-owning nature of the container and the +// usage of the index. It would be ideal to unify these further if possible. + +/** + * @brief A container for capturing the output of an evaluated expression in a column. + * + * This subclass of `expression_result` functions as a non-owning container + * that transparently passes calls through to an underlying mutable view to a + * column. Not all methods are implemented + * + * @tparam has_nulls Whether or not the result data is nullable. + */ +template +struct mutable_column_expression_result + : public expression_result, + mutable_column_device_view, + has_nulls> { + __device__ mutable_column_expression_result(mutable_column_device_view& obj) : _obj(obj) {} + + template + __device__ void set_value(cudf::size_type index, possibly_null_value_t result) + { + if constexpr (has_nulls) { + if (result.has_value()) { + _obj.template element(index) = *result; + _obj.set_valid(index); + } else { + _obj.set_null(index); + } + } else { + _obj.template element(index) = result; + } + } + + /** + * @brief Not implemented for this specialization. + */ + __device__ bool is_valid() const { - using OperatorFunctor = detail::operator_functor; - using Out = cuda::std::invoke_result_t; - resolve_output(output, row_index, OperatorFunctor{}(lhs, rhs)); + // Not implemented since it would require modifying the API in the parent class to accept an + // index. + cudf_assert(false && "This method is not implemented."); } - template , LHS, RHS>>* = - nullptr> - __device__ void operator()(cudf::size_type row_index, - LHS lhs, - RHS rhs, - detail::device_data_reference output) const + /** + * @brief Not implemented for this specialization. + */ + __device__ mutable_column_device_view value() const { - cudf_assert(false && "Invalid binary dispatch operator for the provided input."); + // Not implemented since it would require modifying the API in the parent class to accept an + // index. + cudf_assert(false && "This method is not implemented."); } + + mutable_column_device_view& _obj; ///< The column to which the data is written. }; /** - * @brief An expression evaluator owned by a single thread operating on rows of a table. + * @brief A container of all device data required to evaluate an expression on tables. + * + * This struct should never be instantiated directly. It is created by the + * `ast_plan` on construction, and the resulting member is publicly accessible + * for passing to kernels for constructing an `expression_evaluator`. * - * This class is designed for n-ary transform evaluation. Currently this class assumes that there's - * only one relevant "row index" in its methods, which corresponds to a row in a single input table - * and the same row index in an output column. */ -struct row_evaluator { - friend struct row_output; - template - friend struct unary_row_output; - template - friend struct binary_row_output; +struct device_ast_plan { + device_span data_references; + device_span literals; + device_span operators; + device_span operator_source_indices; + cudf::size_type num_intermediates; + int shmem_per_thread; +}; + +/** + * @brief Preprocessor for an expression acting on tables to generate data suitable for AST + * expression evaluation on the GPU. + * + * On construction, an AST plan creates a single "packed" host buffer of all + * data arrays that will be necessary to evaluate an expression on a pair of + * tables. This data is copied to a single contiguous device buffer, and + * pointers are generated to the individual components. Because the plan tends + * to be small, this is the most efficient approach for low latency. All the + * data required on the GPU can be accessed via the convenient `dev_plan` + * member struct, which can be used to construct an `expression_evaluator` on + * the device. + * + * Note that the resulting device data cannot be used once this class goes out of scope. + */ +struct ast_plan { + /** + * @brief Construct an AST plan for an expression operating on two tables. + * + * @param expr The expression for which to construct a plan. + * @param left The left table on which the expression acts. + * @param right The right table on which the expression acts. + * @param has_nulls Boolean indicator of whether or not the data contains nulls. + * @param stream Stream view on which to allocate resources and queue execution. + * @param mr Device memory resource used to allocate the returned column's device. + */ + ast_plan(detail::node const& expr, + cudf::table_view left, + cudf::table_view right, + bool has_nulls, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) + : _linearizer(expr, left, right) + { + std::vector sizes; + std::vector data_pointers; + + extract_size_and_pointer(_linearizer.data_references(), sizes, data_pointers); + extract_size_and_pointer(_linearizer.literals(), sizes, data_pointers); + extract_size_and_pointer(_linearizer.operators(), sizes, data_pointers); + extract_size_and_pointer(_linearizer.operator_source_indices(), sizes, data_pointers); + + // Create device buffer + auto const buffer_size = std::accumulate(sizes.cbegin(), sizes.cend(), 0); + auto buffer_offsets = std::vector(sizes.size()); + thrust::exclusive_scan(sizes.cbegin(), sizes.cend(), buffer_offsets.begin(), 0); + + auto h_data_buffer = std::make_unique(buffer_size); + for (unsigned int i = 0; i < data_pointers.size(); ++i) { + std::memcpy(h_data_buffer.get() + buffer_offsets[i], data_pointers[i], sizes[i]); + } + + _device_data_buffer = rmm::device_buffer(h_data_buffer.get(), buffer_size, stream, mr); + stream.synchronize(); + + // Create device pointers to components of plan + auto device_data_buffer_ptr = static_cast(_device_data_buffer.data()); + dev_plan.data_references = device_span( + reinterpret_cast(device_data_buffer_ptr + + buffer_offsets[0]), + _linearizer.data_references().size()); + dev_plan.literals = device_span( + reinterpret_cast( + device_data_buffer_ptr + buffer_offsets[1]), + _linearizer.literals().size()); + dev_plan.operators = device_span( + reinterpret_cast(device_data_buffer_ptr + buffer_offsets[2]), + _linearizer.operators().size()); + dev_plan.operator_source_indices = device_span( + reinterpret_cast(device_data_buffer_ptr + buffer_offsets[3]), + _linearizer.operator_source_indices().size()); + dev_plan.num_intermediates = _linearizer.intermediate_count(); + dev_plan.shmem_per_thread = static_cast( + (has_nulls ? sizeof(IntermediateDataType) : sizeof(IntermediateDataType)) * + dev_plan.num_intermediates); + } + + /** + * @brief Construct an AST plan for an expression operating on one table. + * + * @param expr The expression for which to construct a plan. + * @param table The table on which the expression acts. + * @param has_nulls Boolean indicator of whether or not the data contains nulls. + * @param stream Stream view on which to allocate resources and queue execution. + * @param mr Device memory resource used to allocate the returned column's device. + */ + ast_plan(detail::node const& expr, + cudf::table_view table, + bool has_nulls, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) + : ast_plan(expr, table, table, has_nulls, stream, mr) + { + } + + cudf::data_type output_type() const { return _linearizer.root_data_type(); } + + device_ast_plan + dev_plan; ///< The collection of data required to evaluate the expression on the device. + + private: + /** + * @brief Helper function for adding components (operators, literals, etc) to AST plan + * + * @tparam T The underlying type of the input `std::vector` + * @param[in] v The `std::vector` containing components (operators, literals, etc). + * @param[in,out] sizes The `std::vector` containing the size of each data buffer. + * @param[in,out] data_pointers The `std::vector` containing pointers to each data buffer. + */ + template + void extract_size_and_pointer(std::vector const& v, + std::vector& sizes, + std::vector& data_pointers) + { + auto const data_size = sizeof(T) * v.size(); + sizes.push_back(data_size); + data_pointers.push_back(v.data()); + } + + rmm::device_buffer + _device_data_buffer; ///< The device-side data buffer containing the plan information, which is + ///< owned by this class and persists until it is destroyed. + linearizer const _linearizer; ///< The linearizer created from the provided expression that is + ///< used to construct device-side operators and references. +}; + +/** + * @brief The principal object for evaluating AST expressions on device. + * + * This class is designed for n-ary transform evaluation. It operates on two + * tables. + */ +template +struct expression_evaluator { public: /** - * @brief Construct a row evaluator. + * @brief Construct an expression evaluator acting on two tables. + * + * @param left View of the left table view used for evaluation. + * @param right View of the right table view used for evaluation. + * @param plan The collection of device references representing the expression to evaluate. + * @param thread_intermediate_storage Pointer to this thread's portion of shared memory for + * storing intermediates. + * @param compare_nulls Whether the equality operator returns true or false for two nulls. + + */ + __device__ expression_evaluator(table_device_view const& left, + table_device_view const& right, + device_ast_plan const& plan, + IntermediateDataType* thread_intermediate_storage, + null_equality compare_nulls = null_equality::EQUAL) + : left(left), + right(right), + plan(plan), + thread_intermediate_storage(thread_intermediate_storage), + compare_nulls(compare_nulls) + { + } + + /** + * @brief Construct an expression evaluator acting on one table. * - * @param table The table device view used for evaluation. - * @param literals Array of literal values used for evaluation. + * @param table View of the table view used for evaluation. + * @param plan The collection of device references representing the expression to evaluate. * @param thread_intermediate_storage Pointer to this thread's portion of shared memory for * storing intermediates. - * @param output_column The output column where results are stored. + * @param compare_nulls Whether the equality operator returns true or false for two nulls. */ - __device__ row_evaluator( - table_device_view const& table, - device_span literals, - std::int64_t* thread_intermediate_storage, - mutable_column_device_view* output_column) - : table(table), - literals(literals), + __device__ expression_evaluator(table_device_view const& table, + device_ast_plan const& plan, + IntermediateDataType* thread_intermediate_storage, + null_equality compare_nulls = null_equality::EQUAL) + : left(table), + right(table), + plan(plan), thread_intermediate_storage(thread_intermediate_storage), - output_column(output_column) + compare_nulls(compare_nulls) { } @@ -177,241 +422,437 @@ struct row_evaluator { * sizeof(std::int64_t). This requirement on intermediates is enforced by the linearizer. * * @tparam Element Type of element to return. + * @tparam has_nulls Whether or not the result data is nullable. * @param device_data_reference Data reference to resolve. * @param row_index Row index of data column. - * @return Element + * @return Element The type- and null-resolved data. */ template ())> - __device__ Element resolve_input(detail::device_data_reference device_data_reference, - cudf::size_type row_index) const + __device__ possibly_null_value_t resolve_input( + detail::device_data_reference device_data_reference, cudf::size_type row_index) const { auto const data_index = device_data_reference.data_index; auto const ref_type = device_data_reference.reference_type; + // TODO: Everywhere in the code assumes that the table reference is either + // left or right. Should we error-check somewhere to prevent + // table_reference::OUTPUT from being specified? + auto const& table = device_data_reference.table_source == table_reference::LEFT ? left : right; + using ReturnType = possibly_null_value_t; if (ref_type == detail::device_data_reference_type::COLUMN) { - return table.column(data_index).element(row_index); + // If we have nullable data, return an empty nullable type with no value if the data is null. + if constexpr (has_nulls) { + return table.column(data_index).is_valid(row_index) + ? ReturnType(table.column(data_index).element(row_index)) + : ReturnType(); + + } else { + return ReturnType(table.column(data_index).element(row_index)); + } } else if (ref_type == detail::device_data_reference_type::LITERAL) { - return literals[data_index].value(); + return ReturnType(plan.literals[data_index].value()); } else { // Assumes ref_type == detail::device_data_reference_type::INTERMEDIATE // Using memcpy instead of reinterpret_cast for safe type aliasing // Using a temporary variable ensures that the compiler knows the result is aligned - std::int64_t intermediate = thread_intermediate_storage[data_index]; - Element tmp; - memcpy(&tmp, &intermediate, sizeof(Element)); + IntermediateDataType intermediate = thread_intermediate_storage[data_index]; + ReturnType tmp; + memcpy(&tmp, &intermediate, sizeof(ReturnType)); return tmp; } + // Unreachable return used to silence compiler warnings. + return {}; } template ())> - __device__ Element resolve_input(detail::device_data_reference device_data_reference, - cudf::size_type row_index) const + __device__ possibly_null_value_t resolve_input( + detail::device_data_reference device_data_reference, cudf::size_type row_index) const { cudf_assert(false && "Unsupported type in resolve_input."); + // Unreachable return used to silence compiler warnings. return {}; } /** * @brief Callable to perform a unary operation. * - * @tparam OperatorFunctor Functor that performs desired operation when `operator()` is called. * @tparam Input Type of input value. - * @param row_index Row index of data column(s). + * @tparam OutputType The container type that data will be inserted into. + * + * @param output_object The container that data will be inserted into. + * @param input_row_index The row to pull the data from the input table. * @param input Input data reference. * @param output Output data reference. + * @param output_row_index The row in the output to insert the result. + * @param op The operator to act with. */ - template - __device__ void operator()(cudf::size_type row_index, - detail::device_data_reference input, - detail::device_data_reference output, - ast_operator op) const + template + __device__ void operator()(OutputType& output_object, + const cudf::size_type input_row_index, + const detail::device_data_reference input, + const detail::device_data_reference output, + const cudf::size_type output_row_index, + const ast_operator op) const { - auto const typed_input = resolve_input(input, row_index); - ast_operator_dispatcher(op, unary_row_output(*this), row_index, typed_input, output); + auto const typed_input = resolve_input(input, input_row_index); + ast_operator_dispatcher(op, + unary_expression_output_handler(*this), + output_object, + output_row_index, + typed_input, + output); } /** - * @brief Callable to perform a binary operation. + * @brief Callable to perform a unary operation. + * + * @tparam LHS Type of the left input value. + * @tparam RHS Type of the right input value. + * @tparam OutputType The container type that data will be inserted into. * - * @tparam OperatorFunctor Functor that performs desired operation when `operator()` is called. - * @tparam LHS Type of left input value. - * @tparam RHS Type of right input value. - * @param row_index Row index of data column(s). + * @param output_object The container that data will be inserted into. + * @param left_row_index The row to pull the data from the left table. + * @param right_row_index The row to pull the data from the right table. * @param lhs Left input data reference. * @param rhs Right input data reference. * @param output Output data reference. + * @param output_row_index The row in the output to insert the result. + * @param op The operator to act with. */ - template - __device__ void operator()(cudf::size_type row_index, - detail::device_data_reference lhs, - detail::device_data_reference rhs, - detail::device_data_reference output, - ast_operator op) const + template + __device__ void operator()(OutputType& output_object, + const cudf::size_type left_row_index, + const cudf::size_type right_row_index, + const detail::device_data_reference lhs, + const detail::device_data_reference rhs, + const detail::device_data_reference output, + const cudf::size_type output_row_index, + const ast_operator op) const { - auto const typed_lhs = resolve_input(lhs, row_index); - auto const typed_rhs = resolve_input(rhs, row_index); - ast_operator_dispatcher( - op, binary_row_output(*this), row_index, typed_lhs, typed_rhs, output); + auto const typed_lhs = resolve_input(lhs, left_row_index); + auto const typed_rhs = resolve_input(rhs, right_row_index); + ast_operator_dispatcher(op, + binary_expression_output_handler(*this), + output_object, + output_row_index, + typed_lhs, + typed_rhs, + output); } template >* = nullptr> - __device__ void operator()(cudf::size_type row_index, - detail::device_data_reference lhs, - detail::device_data_reference rhs, - detail::device_data_reference output) const + __device__ void operator()(OutputType& output_object, + cudf::size_type left_row_index, + cudf::size_type right_row_index, + const detail::device_data_reference lhs, + const detail::device_data_reference rhs, + const detail::device_data_reference output, + cudf::size_type output_row_index, + const ast_operator op) const { cudf_assert(false && "Invalid binary dispatch operator for the provided input."); } - private: - table_device_view const& table; - device_span literals; - std::int64_t* thread_intermediate_storage; - mutable_column_device_view* output_column; -}; - -template ()>*> -__device__ void row_output::resolve_output(detail::device_data_reference device_data_reference, - cudf::size_type row_index, - Element result) const -{ - auto const ref_type = device_data_reference.reference_type; - if (ref_type == detail::device_data_reference_type::COLUMN) { - evaluator.output_column->element(row_index) = result; - } else { // Assumes ref_type == detail::device_data_reference_type::INTERMEDIATE - // Using memcpy instead of reinterpret_cast for safe type aliasing. - // Using a temporary variable ensures that the compiler knows the result is aligned. - std::int64_t tmp; - memcpy(&tmp, &result, sizeof(Element)); - evaluator.thread_intermediate_storage[device_data_reference.data_index] = tmp; + /** + * @brief Evaluate an expression applied to a row. + * + * This function performs an n-ary transform for one row on one thread. + * + * @tparam OutputType The container type that data will be inserted into. + * + * @param output_object The container that data will be inserted into. + * @param row_index Row index of all input and output data column(s). + */ + template + __device__ void evaluate(OutputType& output_object, cudf::size_type const row_index) + { + evaluate(output_object, row_index, row_index, row_index); } -} -/** - * @brief Evaluate an expression applied to a row. - * - * This function performs an n-ary transform for one row on one thread. - * - * @param evaluator The row evaluator used for evaluation. - * @param data_references Array of data references. - * @param operators Array of operators to perform. - * @param operator_source_indices Array of source indices for the operators. - * @param num_operators Number of operators. - * @param row_index Row index of data column(s). - */ -__device__ void evaluate_row_expression( - detail::row_evaluator const& evaluator, - device_span data_references, - device_span operators, - device_span operator_source_indices, - cudf::size_type row_index) -{ - auto operator_source_index = static_cast(0); - for (cudf::size_type operator_index = 0; operator_index < operators.size(); operator_index++) { - // Execute operator - auto const op = operators[operator_index]; - auto const arity = ast_operator_arity(op); - if (arity == 1) { - // Unary operator - auto const input = data_references[operator_source_indices[operator_source_index]]; - auto const output = data_references[operator_source_indices[operator_source_index + 1]]; - operator_source_index += arity + 1; - type_dispatcher(input.data_type, evaluator, row_index, input, output, op); - } else if (arity == 2) { - // Binary operator - auto const lhs = data_references[operator_source_indices[operator_source_index]]; - auto const rhs = data_references[operator_source_indices[operator_source_index + 1]]; - auto const output = data_references[operator_source_indices[operator_source_index + 2]]; - operator_source_index += arity + 1; - type_dispatcher(lhs.data_type, - detail::single_dispatch_binary_operator{}, - evaluator, - row_index, - lhs, - rhs, - output, - op); - } else { - cudf_assert(false && "Invalid operator arity."); + /** + * @brief Evaluate an expression applied to a row. + * + * This function performs an n-ary transform for one row on one thread. + * + * @tparam OutputType The container type that data will be inserted into. + * + * @param output_object The container that data will be inserted into. + * @param left_row_index The row to pull the data from the left table. + * @param right_row_index The row to pull the data from the right table. + * @param output_row_index The row in the output to insert the result. + */ + template + __device__ void evaluate(OutputType& output_object, + cudf::size_type const left_row_index, + cudf::size_type const right_row_index, + cudf::size_type const output_row_index) + { + auto operator_source_index = static_cast(0); + for (cudf::size_type operator_index = 0; operator_index < plan.operators.size(); + operator_index++) { + // Execute operator + auto const op = plan.operators[operator_index]; + auto const arity = ast_operator_arity(op); + if (arity == 1) { + // Unary operator + auto const input = + plan.data_references[plan.operator_source_indices[operator_source_index]]; + auto const output = + plan.data_references[plan.operator_source_indices[operator_source_index + 1]]; + operator_source_index += arity + 1; + auto input_row_index = + input.table_source == table_reference::LEFT ? left_row_index : right_row_index; + type_dispatcher(input.data_type, + *this, + output_object, + input_row_index, + input, + output, + output_row_index, + op); + } else if (arity == 2) { + // Binary operator + auto const lhs = plan.data_references[plan.operator_source_indices[operator_source_index]]; + auto const rhs = + plan.data_references[plan.operator_source_indices[operator_source_index + 1]]; + auto const output = + plan.data_references[plan.operator_source_indices[operator_source_index + 2]]; + operator_source_index += arity + 1; + type_dispatcher(lhs.data_type, + detail::single_dispatch_binary_operator{}, + *this, + output_object, + left_row_index, + right_row_index, + lhs, + rhs, + output, + output_row_index, + op); + } else { + cudf_assert(false && "Invalid operator arity."); + } } } -} -/** - * @brief The AST plan creates a device buffer of data needed to execute an AST. - * - * On construction, an AST plan creates a single "packed" host buffer of all necessary data arrays, - * and copies that to the device with a single host-device memory copy. Because the plan tends to be - * small, this is the most efficient approach for low latency. - * - */ -struct ast_plan { - ast_plan(linearizer const& expr_linearizer, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - : _sizes{}, _data_pointers{} - { - add_to_plan(expr_linearizer.data_references()); - add_to_plan(expr_linearizer.literals()); - add_to_plan(expr_linearizer.operators()); - add_to_plan(expr_linearizer.operator_source_indices()); + private: + /** + * @brief Helper struct for type dispatch on the result of an expression. + * + * Evaluating an expression requires multiple levels of type dispatch to + * determine the input types, the operation type, and the output type. This + * helper class is a functor that handles the operator dispatch, invokes the + * operator, and dispatches output writing based on the resulting data type. + */ + struct expression_output_handler { + public: + __device__ expression_output_handler(expression_evaluator const& evaluator) + : evaluator(evaluator) + { + } - // Create device buffer - auto const buffer_size = std::accumulate(_sizes.cbegin(), _sizes.cend(), 0); - auto buffer_offsets = std::vector(_sizes.size()); - thrust::exclusive_scan(_sizes.cbegin(), _sizes.cend(), buffer_offsets.begin(), 0); + /** + * @brief Resolves an output data reference and assigns result value. + * + * Only output columns (COLUMN) and intermediates (INTERMEDIATE) are supported as output + * reference types. Intermediates must be of fixed width less than or equal to + * sizeof(std::int64_t). This requirement on intermediates is enforced by the linearizer. + * + * @tparam Element Type of result element. + * @tparam OutputType The container type that data will be inserted into. + * + * @param output_object The container that data will be inserted into. + * @param device_data_reference Data reference to resolve. + * @param row_index Row index of data column. + * @param result Value to assign to output. + */ + template ())> + __device__ void resolve_output(OutputType& output_object, + const detail::device_data_reference device_data_reference, + const cudf::size_type row_index, + const possibly_null_value_t result) const + { + auto const ref_type = device_data_reference.reference_type; + if (ref_type == detail::device_data_reference_type::COLUMN) { + output_object.template set_value(row_index, result); + } else { // Assumes ref_type == detail::device_data_reference_type::INTERMEDIATE + // Using memcpy instead of reinterpret_cast for safe type aliasing. + // Using a temporary variable ensures that the compiler knows the result is aligned. + IntermediateDataType tmp; + memcpy(&tmp, &result, sizeof(possibly_null_value_t)); + evaluator.thread_intermediate_storage[device_data_reference.data_index] = tmp; + } + } - auto h_data_buffer = std::make_unique(buffer_size); - for (unsigned int i = 0; i < _data_pointers.size(); ++i) { - std::memcpy(h_data_buffer.get() + buffer_offsets[i], _data_pointers[i], _sizes[i]); + template ())> + __device__ void resolve_output(OutputType& output_object, + const detail::device_data_reference device_data_reference, + const cudf::size_type row_index, + const possibly_null_value_t result) const + { + cudf_assert(false && "Invalid type in resolve_output."); } - _device_data_buffer = rmm::device_buffer(h_data_buffer.get(), buffer_size, stream, mr); + protected: + expression_evaluator const& evaluator; + }; - stream.synchronize(); + /** + * @brief Subclass of the expression output handler for unary operations. + * + * This functor's call operator is specialized to handle unary operations, + * which only require a single operand. + */ + template + struct unary_expression_output_handler : public expression_output_handler { + __device__ unary_expression_output_handler(expression_evaluator const& evaluator) + : expression_output_handler(evaluator) + { + } - // Create device pointers to components of plan - auto device_data_buffer_ptr = static_cast(_device_data_buffer.data()); - _device_data_references = device_span( - reinterpret_cast(device_data_buffer_ptr + - buffer_offsets[0]), - expr_linearizer.data_references().size()); - _device_literals = device_span( - reinterpret_cast( - device_data_buffer_ptr + buffer_offsets[1]), - expr_linearizer.literals().size()); - _device_operators = device_span( - reinterpret_cast(device_data_buffer_ptr + buffer_offsets[2]), - expr_linearizer.operators().size()); - _device_operator_source_indices = device_span( - reinterpret_cast(device_data_buffer_ptr + buffer_offsets[3]), - expr_linearizer.operator_source_indices().size()); - } + /** + * @brief Callable to perform a unary operation. + * + * @tparam op The operation to perform. + * @tparam OutputType The container type that data will be inserted into. + * + * @param output_object The container that data will be inserted into. + * @param outputrow_index The row in the output object to insert the data. + * @param input Input to the operation. + * @param output Output data reference. + */ + template < + ast_operator op, + typename OutputType, + std::enable_if_t, Input>>* = nullptr> + __device__ void operator()(OutputType& output_object, + const cudf::size_type output_row_index, + const possibly_null_value_t input, + const detail::device_data_reference output) const + { + using OperatorFunctor = detail::operator_functor; + using Out = cuda::std::invoke_result_t; + if constexpr (has_nulls) { + auto const result = input.has_value() + ? possibly_null_value_t(OperatorFunctor{}(*input)) + : possibly_null_value_t(); + this->template resolve_output(output_object, output, output_row_index, result); + } else { + this->template resolve_output( + output_object, output, output_row_index, OperatorFunctor{}(input)); + } + } + + template < + ast_operator op, + typename OutputType, + std::enable_if_t, Input>>* = nullptr> + __device__ void operator()(OutputType& output_object, + const cudf::size_type output_row_index, + const possibly_null_value_t input, + const detail::device_data_reference output) const + { + cudf_assert(false && "Invalid unary dispatch operator for the provided input."); + } + }; /** - * @brief Helper function for adding components (operators, literals, etc) to AST plan + * @brief Subclass of the expression output handler for binary operations. * - * @tparam T The underlying type of the input `std::vector` - * @param v The `std::vector` containing components (operators, literals, etc) + * This functor's call operator is specialized to handle binary operations, + * which require two operands. */ - template - void add_to_plan(std::vector const& v) - { - auto const data_size = sizeof(T) * v.size(); - _sizes.push_back(data_size); - _data_pointers.push_back(v.data()); - } + template + struct binary_expression_output_handler : public expression_output_handler { + __device__ binary_expression_output_handler(expression_evaluator const& evaluator) + : expression_output_handler(evaluator) + { + } - std::vector _sizes; - std::vector _data_pointers; + /** + * @brief Callable to perform a binary operation. + * + * @tparam op The operation to perform. + * @tparam OutputType The container type that data will be inserted into. + * + * @param output_object The container that data will be inserted into. + * @param output_row_index The row in the output to insert the result. + * @param lhs Left input to the operation. + * @param rhs Right input to the operation. + * @param output Output data reference. + */ + template , LHS, RHS>>* = nullptr> + __device__ void operator()(OutputType& output_object, + const cudf::size_type output_row_index, + const possibly_null_value_t lhs, + const possibly_null_value_t rhs, + const detail::device_data_reference output) const + { + using OperatorFunctor = detail::operator_functor; + using Out = cuda::std::invoke_result_t; + if constexpr (has_nulls) { + if constexpr (op == ast_operator::EQUAL) { + // Special handling of the equality operator based on what kind + // of null handling was requested. + possibly_null_value_t result; + if (!lhs.has_value() && !rhs.has_value()) { + // Case 1: Both null, so the output is based on compare_nulls. + result = possibly_null_value_t(this->evaluator.compare_nulls == + null_equality::EQUAL); + } else if (lhs.has_value() && rhs.has_value()) { + // Case 2: Neither is null, so the output is given by the operation. + result = possibly_null_value_t(OperatorFunctor{}(*lhs, *rhs)); + } else { + // Case 3: One value is null, while the other is not, so we simply propagate nulls. + result = possibly_null_value_t(); + } + this->template resolve_output(output_object, output, output_row_index, result); + } else { + // Default behavior for all other operators is to propagate nulls. + auto result = (lhs.has_value() && rhs.has_value()) + ? possibly_null_value_t(OperatorFunctor{}(*lhs, *rhs)) + : possibly_null_value_t(); + this->template resolve_output(output_object, output, output_row_index, result); + } + } else { + this->template resolve_output( + output_object, output, output_row_index, OperatorFunctor{}(lhs, rhs)); + } + } - rmm::device_buffer _device_data_buffer; - device_span _device_data_references; - device_span _device_literals; - device_span _device_operators; - device_span _device_operator_source_indices; + template , LHS, RHS>>* = nullptr> + __device__ void operator()(OutputType& output_object, + const cudf::size_type output_row_index, + const possibly_null_value_t lhs, + const possibly_null_value_t rhs, + const detail::device_data_reference output) const + { + cudf_assert(false && "Invalid binary dispatch operator for the provided input."); + } + }; + + table_device_view const& left; ///< The left table to operate on. + table_device_view const& right; ///< The right table to operate on. + device_ast_plan const& + plan; ///< The container of device data representing the expression to evaluate. + IntermediateDataType* + thread_intermediate_storage; ///< The shared memory store of intermediates produced during + ///< evaluation. + null_equality + compare_nulls; ///< Whether the equality operator returns true or false for two nulls. }; /** diff --git a/cpp/include/cudf/join.hpp b/cpp/include/cudf/join.hpp index 1f9ed71ce8c..725c0fc3699 100644 --- a/cpp/include/cudf/join.hpp +++ b/cpp/include/cudf/join.hpp @@ -16,6 +16,7 @@ #pragma once +#include #include #include @@ -23,6 +24,7 @@ #include #include +#include #include namespace cudf { @@ -647,5 +649,206 @@ class hash_join { const std::unique_ptr impl; }; +/** + * @brief Returns a pair of row index vectors corresponding to all pairs + * of rows between the specified tables where the predicate evaluates to true. + * + * The first returned vector contains the row indices from the left + * table that have a match in the right table (in unspecified order). + * The corresponding values in the second returned vector are + * the matched row indices from the right table. + * + * @code{.pseudo} + * Left: {{0, 1, 2}} + * Right: {{1, 2, 3}} + * Expression: Left.Column_0 == Right.Column_0 + * Result: {{1, 2}, {0, 1}} + * + * Left: {{0, 1, 2}, {3, 4, 5}} + * Right: {{1, 2, 3}, {4, 6, 7}} + * Expression: (Left.Column_0 == Right.Column_0) AND (Left.Column_1 == Right.Column_1) + * Result: {{1}, {0}} + * @endcode + * + * @throw cudf::logic_error if number of elements in `left_keys` or `right_keys` + * mismatch. + * + * @param left The left table + * @param right The right table + * @param binary_predicate The condition on which to join. + * @param compare_nulls Whether the equality operator returns true or false for two nulls. + * @param mr Device memory resource used to allocate the returned table and columns' device memory + * + * @return A pair of vectors [`left_indices`, `right_indices`] that can be used to construct + * the result of performing a conditional inner join between two tables `left` and `right` . + */ +std::pair>, + std::unique_ptr>> +conditional_inner_join( + table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls = null_equality::EQUAL, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + +/** + * @brief Returns a pair of row index vectors corresponding to all pairs + * of rows between the specified tables where the predicate evaluates to true, + * or null matches for rows in left that have no match in right. + * + * The first returned vector contains all the row indices from the left + * table (in unspecified order). The corresponding value in the + * second returned vector is either (1) the row index of the matched row + * from the right table, if there is a match or (2) an unspecified + * out-of-bounds value. + * + * @code{.pseudo} + * Left: {{0, 1, 2}} + * Right: {{1, 2, 3}} + * Expression: Left.Column_0 == Right.Column_0 + * Result: {{0, 1, 2}, {None, 0, 1}} + * + * Left: {{0, 1, 2}, {3, 4, 5}} + * Right: {{1, 2, 3}, {4, 6, 7}} + * Expression: (Left.Column_0 == Right.Column_0) AND (Left.Column_1 == Right.Column_1) + * Result: {{0, 1, 2}, {None, 0, None}} + * @endcode + * + * @throw cudf::logic_error if number of elements in `left_keys` or `right_keys` + * mismatch. + * + * @param left The left table + * @param right The right table + * @param binary_predicate The condition on which to join. + * @param compare_nulls Whether the equality operator returns true or false for two nulls. + * @param mr Device memory resource used to allocate the returned table and columns' device memory + * + * @return A pair of vectors [`left_indices`, `right_indices`] that can be used to construct + * the result of performing a conditional left join between two tables `left` and `right` . + */ +std::pair>, + std::unique_ptr>> +conditional_left_join(table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls = null_equality::EQUAL, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + +/** + * @brief Returns a pair of row index vectors corresponding to all pairs + * of rows between the specified tables where the predicate evaluates to true, + * or null matches for rows in either table that have no match in the other. + * + * Taken pairwise, the values from the returned vectors are one of: + * (1) row indices corresponding to matching rows from the left and + * right tables, (2) a row index and an unspecified out-of-bounds value, + * representing a row from one table without a match in the other. + * + * @code{.pseudo} + * Left: {{0, 1, 2}} + * Right: {{1, 2, 3}} + * Expression: Left.Column_0 == Right.Column_0 + * Result: {{0, 1, 2, None}, {None, 0, 1, 2}} + * + * Left: {{0, 1, 2}, {3, 4, 5}} + * Right: {{1, 2, 3}, {4, 6, 7}} + * Expression: (Left.Column_0 == Right.Column_0) AND (Left.Column_1 == Right.Column_1) + * Result: {{0, 1, 2, None, None}, {None, 0, None, 1, 2}} + * @endcode + * + * @throw cudf::logic_error if number of elements in `left_keys` or `right_keys` + * mismatch. + * + * @param left The left table + * @param right The right table + * @param binary_predicate The condition on which to join. + * @param compare_nulls Whether the equality operator returns true or false for two nulls. + * @param mr Device memory resource used to allocate the returned table and columns' device memory + * + * @return A pair of vectors [`left_indices`, `right_indices`] that can be used to construct + * the result of performing a conditional full join between two tables `left` and `right` . + */ +std::pair>, + std::unique_ptr>> +conditional_full_join(table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls = null_equality::EQUAL, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + +/** + * @brief Returns an index vector corresponding to all rows in the left table + * for which there exists some row in the right table where the predicate + * evaluates to true. + * + * @code{.pseudo} + * Left: {{0, 1, 2}} + * Right: {{1, 2, 3}} + * Expression: Left.Column_0 == Right.Column_0 + * Result: {1, 2} + * + * Left: {{0, 1, 2}, {3, 4, 5}} + * Right: {{1, 2, 3}, {4, 6, 7}} + * Expression: (Left.Column_0 == Right.Column_0) AND (Left.Column_1 == Right.Column_1) + * Result: {1} + * @endcode + * + * @throw cudf::logic_error if number of elements in `left_keys` or `right_keys` + * mismatch. + * + * @param left The left table + * @param right The right table + * @param binary_predicate The condition on which to join. + * @param compare_nulls Whether the equality operator returns true or false for two nulls. + * @param mr Device memory resource used to allocate the returned table and columns' device memory + * + * @return A vector `left_indices` that can be used to construct the result of + * performing a conditional left semi join between two tables `left` and + * `right` . + */ +std::unique_ptr> conditional_left_semi_join( + table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls = null_equality::EQUAL, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + +/** + * @brief Returns an index vector corresponding to all rows in the left table + * for which there does not exist any row in the right table where the + * predicate evaluates to true. + * + * @code{.pseudo} + * Left: {{0, 1, 2}} + * Right: {{1, 2, 3}} + * Expression: Left.Column_0 == Right.Column_0 + * Result: {0} + * + * Left: {{0, 1, 2}, {3, 4, 5}} + * Right: {{1, 2, 3}, {4, 6, 7}} + * Expression: (Left.Column_0 == Right.Column_0) AND (Left.Column_1 == Right.Column_1) + * Result: {0, 2} + * @endcode + * + * @throw cudf::logic_error if number of elements in `left_keys` or `right_keys` + * mismatch. + * + * @param left The left table + * @param right The right table + * @param binary_predicate The condition on which to join. + * @param compare_nulls Whether the equality operator returns true or false for two nulls. + * @param mr Device memory resource used to allocate the returned table and columns' device memory + * + * @return A vector `left_indices` that can be used to construct the result of + * performing a conditional left anti join between two tables `left` and + * `right` . + */ +std::unique_ptr> conditional_left_anti_join( + table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls = null_equality::EQUAL, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + /** @} */ // end of group } // namespace cudf diff --git a/cpp/src/ast/linearizer.cpp b/cpp/src/ast/linearizer.cpp index 66a32ead35e..3e442305552 100644 --- a/cpp/src/ast/linearizer.cpp +++ b/cpp/src/ast/linearizer.cpp @@ -111,7 +111,9 @@ cudf::size_type linearizer::visit(column_reference const& expr) // Increment the node index _node_count++; // Resolve node type - auto const data_type = expr.get_data_type(_table); + auto const data_type = expr.get_table_source() == table_reference::LEFT + ? expr.get_data_type(_left) + : expr.get_data_type(_right); // Push data reference auto const source = detail::device_data_reference(detail::device_data_reference_type::COLUMN, data_type, diff --git a/cpp/src/ast/transform.cu b/cpp/src/ast/transform.cu index 43d3bde97c2..7aa89635c54 100644 --- a/cpp/src/ast/transform.cu +++ b/cpp/src/ast/transform.cu @@ -49,37 +49,37 @@ namespace detail { * This evaluates an expression over a table to produce a new column. Also called an n-ary * transform. * - * @tparam block_size + * @tparam max_block_size The size of the thread block, used to set launch + * bounds and minimize register usage. + * @tparam has_nulls whether or not the output column may contain nulls. + * * @param table The table device view used for evaluation. - * @param literals Array of literal values used for evaluation. - * @param output_column The output column where results are stored. - * @param data_references Array of data references. - * @param operators Array of operators to perform. - * @param operator_source_indices Array of source indices for the operators. - * @param num_operators Number of operators. - * @param num_intermediates Number of intermediates, used to allocate a portion of shared memory to - * each thread. + * @param plan Container of device data required to evaluate the desired expression. + * @param output_column The destination for the results of evaluating the expression. */ -template -__launch_bounds__(max_block_size) __global__ void compute_column_kernel( - table_device_view const table, - device_span literals, - mutable_column_device_view output_column, - device_span data_references, - device_span operators, - device_span operator_source_indices, - cudf::size_type num_intermediates) +template +__launch_bounds__(max_block_size) __global__ + void compute_column_kernel(table_device_view const table, + device_ast_plan plan, + mutable_column_device_view output_column) { - extern __shared__ std::int64_t intermediate_storage[]; - auto thread_intermediate_storage = &intermediate_storage[threadIdx.x * num_intermediates]; + // The (required) extern storage of the shared memory array leads to + // conflicting declarations between different templates. The easiest + // workaround is to declare an arbitrary (here char) array type then cast it + // after the fact to the appropriate type. + extern __shared__ char raw_intermediate_storage[]; + IntermediateDataType* intermediate_storage = + reinterpret_cast*>(raw_intermediate_storage); + + auto thread_intermediate_storage = &intermediate_storage[threadIdx.x * plan.num_intermediates]; auto const start_idx = static_cast(threadIdx.x + blockIdx.x * blockDim.x); auto const stride = static_cast(blockDim.x * gridDim.x); - auto const evaluator = - cudf::ast::detail::row_evaluator(table, literals, thread_intermediate_storage, &output_column); + auto evaluator = + cudf::ast::detail::expression_evaluator(table, plan, thread_intermediate_storage); for (cudf::size_type row_index = start_idx; row_index < table.num_rows(); row_index += stride) { - evaluate_row_expression( - evaluator, data_references, operators, operator_source_indices, row_index); + auto output_dest = mutable_column_expression_result(output_column); + evaluator.evaluate(output_dest, row_index); } } @@ -88,22 +88,30 @@ std::unique_ptr compute_column(table_view const table, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - auto const expr_linearizer = linearizer(expr, table); // Linearize the AST - auto const plan = ast_plan{expr_linearizer, stream, mr}; // Create ast_plan + // Prepare output column. Whether or not the output column is nullable is + // determined by whether any of the columns in the input table are nullable. + // If none of the input columns actually contain nulls, we can still use the + // non-nullable version of the expression evaluation code path for + // performance, so we capture that information as well. + auto const nullable = + std::any_of(table.begin(), table.end(), [](column_view c) { return c.nullable(); }); + auto const has_nulls = nullable && std::any_of(table.begin(), table.end(), [](column_view c) { + return c.nullable() && c.has_nulls(); + }); - // Create table device view - auto table_device = table_device_view::create(table, stream); - auto const table_num_rows = table.num_rows(); + auto const plan = ast_plan{expr, table, has_nulls, stream, mr}; + + auto const output_column_mask_state = + nullable ? (has_nulls ? mask_state::UNINITIALIZED : mask_state::ALL_VALID) + : mask_state::UNALLOCATED; - // Prepare output column auto output_column = cudf::make_fixed_width_column( - expr_linearizer.root_data_type(), table_num_rows, mask_state::UNALLOCATED, stream, mr); + plan.output_type(), table.num_rows(), output_column_mask_state, stream, mr); auto mutable_output_device = cudf::mutable_column_device_view::create(output_column->mutable_view(), stream); // Configure kernel parameters - auto const num_intermediates = expr_linearizer.intermediate_count(); - auto const shmem_size_per_thread = static_cast(sizeof(std::int64_t) * num_intermediates); + auto const& dev_plan = plan.dev_plan; int device_id; CUDA_TRY(cudaGetDevice(&device_id)); int shmem_limit_per_block; @@ -111,22 +119,23 @@ std::unique_ptr compute_column(table_view const table, cudaDeviceGetAttribute(&shmem_limit_per_block, cudaDevAttrMaxSharedMemoryPerBlock, device_id)); auto constexpr MAX_BLOCK_SIZE = 128; auto const block_size = - shmem_size_per_thread != 0 - ? std::min(MAX_BLOCK_SIZE, shmem_limit_per_block / shmem_size_per_thread) + dev_plan.shmem_per_thread != 0 + ? std::min(MAX_BLOCK_SIZE, shmem_limit_per_block / dev_plan.shmem_per_thread) : MAX_BLOCK_SIZE; - auto const config = cudf::detail::grid_1d{table_num_rows, block_size}; - auto const shmem_size_per_block = shmem_size_per_thread * config.num_threads_per_block; + auto const config = cudf::detail::grid_1d{table.num_rows(), block_size}; + auto const shmem_per_block = dev_plan.shmem_per_thread * config.num_threads_per_block; // Execute the kernel - cudf::ast::detail::compute_column_kernel - <<>>( - *table_device, - plan._device_literals, - *mutable_output_device, - plan._device_data_references, - plan._device_operators, - plan._device_operator_source_indices, - num_intermediates); + auto table_device = table_device_view::create(table, stream); + if (has_nulls) { + cudf::ast::detail::compute_column_kernel + <<>>( + *table_device, dev_plan, *mutable_output_device); + } else { + cudf::ast::detail::compute_column_kernel + <<>>( + *table_device, dev_plan, *mutable_output_device); + } CHECK_CUDA(stream.value()); return output_column; } diff --git a/cpp/src/join/hash_join.cu b/cpp/src/join/hash_join.cu index 1133477669d..e6110edfaa8 100644 --- a/cpp/src/join/hash_join.cu +++ b/cpp/src/join/hash_join.cu @@ -90,12 +90,11 @@ struct valid_range { */ std::pair>, std::unique_ptr>> -get_left_join_indices_complement( - std::unique_ptr>& right_indices, - size_type left_table_row_count, - size_type right_table_row_count, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) +get_left_join_indices_complement(std::unique_ptr>& right_indices, + size_type left_table_row_count, + size_type right_table_row_count, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { // Get array of indices that do not appear in right_indices diff --git a/cpp/src/join/hash_join.cuh b/cpp/src/join/hash_join.cuh index f9ccbd68c74..1b4cbf4ba1d 100644 --- a/cpp/src/join/hash_join.cuh +++ b/cpp/src/join/hash_join.cuh @@ -153,6 +153,17 @@ std::pair, std::unique_ptr> get_empty_joined_table std::unique_ptr combine_table_pair(std::unique_ptr&& left, std::unique_ptr&& right); +VectorPair concatenate_vector_pairs(VectorPair& a, VectorPair& b, rmm::cuda_stream_view stream); + +std::pair>, + std::unique_ptr>> +get_left_join_indices_complement( + std::unique_ptr>& right_indices, + size_type left_table_row_count, + size_type right_table_row_count, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + } // namespace detail struct hash_join::hash_join_impl { diff --git a/cpp/src/join/join.cu b/cpp/src/join/join.cu index 6cb04cadcac..cf711524f0b 100644 --- a/cpp/src/join/join.cu +++ b/cpp/src/join/join.cu @@ -15,6 +15,7 @@ */ #include #include +#include #include #include @@ -219,6 +220,21 @@ std::unique_ptr
full_join(table_view const& left_input, return combine_table_pair(std::move(left_result), std::move(right_result)); } +std::pair>, + std::unique_ptr>> +conditional_join(table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls, + join_kind JoinKind, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + CUDF_FUNC_RANGE(); + return get_conditional_join_indices( + left, right, JoinKind, binary_predicate, compare_nulls, stream, mr); +} + } // namespace detail hash_join::~hash_join() = default; @@ -356,4 +372,88 @@ std::unique_ptr
full_join(table_view const& left, left, right, left_on, right_on, compare_nulls, rmm::cuda_stream_default, mr); } +std::pair>, + std::unique_ptr>> +conditional_inner_join(table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls, + rmm::mr::device_memory_resource* mr) +{ + return detail::conditional_join(left, + right, + binary_predicate, + compare_nulls, + detail::join_kind::INNER_JOIN, + rmm::cuda_stream_default, + mr); +} + +std::pair>, + std::unique_ptr>> +conditional_left_join(table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls, + rmm::mr::device_memory_resource* mr) +{ + return detail::conditional_join(left, + right, + binary_predicate, + compare_nulls, + detail::join_kind::LEFT_JOIN, + rmm::cuda_stream_default, + mr); +} + +std::pair>, + std::unique_ptr>> +conditional_full_join(table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls, + rmm::mr::device_memory_resource* mr) +{ + return detail::conditional_join(left, + right, + binary_predicate, + compare_nulls, + detail::join_kind::FULL_JOIN, + rmm::cuda_stream_default, + mr); +} + +std::unique_ptr> conditional_left_semi_join( + table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls, + rmm::mr::device_memory_resource* mr) +{ + return std::move(detail::conditional_join(left, + right, + binary_predicate, + compare_nulls, + detail::join_kind::LEFT_SEMI_JOIN, + rmm::cuda_stream_default, + mr) + .first); +} + +std::unique_ptr> conditional_left_anti_join( + table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls, + rmm::mr::device_memory_resource* mr) +{ + return std::move(detail::conditional_join(left, + right, + binary_predicate, + compare_nulls, + detail::join_kind::LEFT_ANTI_JOIN, + rmm::cuda_stream_default, + mr) + .first); +} } // namespace cudf diff --git a/cpp/src/join/join_kernels.cuh b/cpp/src/join/join_kernels.cuh index 4298706987c..6d0810ea800 100644 --- a/cpp/src/join/join_kernels.cuh +++ b/cpp/src/join/join_kernels.cuh @@ -18,12 +18,18 @@ #include #include +#include +#include +#include #include #include #include +#include #include "join_common_utils.hpp" +#include + namespace cudf { namespace detail { /** @@ -203,39 +209,63 @@ __global__ void compute_join_output_size(multimap_type multi_map, * @brief Computes the output size of joining the left table to the right table. * * This method uses a nested loop to iterate over the left and right tables and count the number of - * matches. + * matches according to a boolean expression. * * @tparam block_size The number of threads per block for this kernel + * @tparam has_nulls Whether or not the inputs may contain nulls. * * @param[in] left_table The left table * @param[in] right_table The right table * @param[in] JoinKind The type of join to be performed - * @param[in] check_row_equality The row equality comparator + * @param[in] compare_nulls Controls whether null join-key values should match or not. + * @param[in] plan Container of device data required to evaluate the desired expression. * @param[out] output_size The resulting output size */ -template -__global__ void compute_nested_loop_join_output_size(table_device_view left_table, +template +__global__ void compute_conditional_join_output_size(table_device_view left_table, table_device_view right_table, join_kind JoinKind, - row_equality check_row_equality, + null_equality compare_nulls, + ast::detail::device_ast_plan plan, cudf::size_type* output_size) { + // The (required) extern storage of the shared memory array leads to + // conflicting declarations between different templates. The easiest + // workaround is to declare an arbitrary (here char) array type then cast it + // after the fact to the appropriate type. + extern __shared__ char raw_intermediate_storage[]; + cudf::ast::detail::IntermediateDataType* intermediate_storage = + reinterpret_cast*>(raw_intermediate_storage); + auto thread_intermediate_storage = &intermediate_storage[threadIdx.x * plan.num_intermediates]; + cudf::size_type thread_counter(0); const cudf::size_type left_start_idx = threadIdx.x + blockIdx.x * blockDim.x; const cudf::size_type left_stride = blockDim.x * gridDim.x; const cudf::size_type left_num_rows = left_table.num_rows(); const cudf::size_type right_num_rows = right_table.num_rows(); + auto evaluator = cudf::ast::detail::expression_evaluator( + left_table, right_table, plan, thread_intermediate_storage, compare_nulls); + for (cudf::size_type left_row_index = left_start_idx; left_row_index < left_num_rows; left_row_index += left_stride) { bool found_match = false; for (cudf::size_type right_row_index = 0; right_row_index < right_num_rows; right_row_index++) { - if (check_row_equality(left_row_index, right_row_index)) { - ++thread_counter; + auto output_dest = cudf::ast::detail::value_expression_result(); + evaluator.evaluate(output_dest, left_row_index, right_row_index, 0); + if (output_dest.is_valid() && output_dest.value()) { + if ((JoinKind != join_kind::LEFT_ANTI_JOIN) && + !(JoinKind == join_kind::LEFT_SEMI_JOIN && found_match)) { + ++thread_counter; + } found_match = true; } } - if ((JoinKind == join_kind::LEFT_JOIN) && (!found_match)) { ++thread_counter; } + if ((JoinKind == join_kind::LEFT_JOIN || JoinKind == join_kind::LEFT_ANTI_JOIN || + JoinKind == join_kind::FULL_JOIN) && + (!found_match)) { + ++thread_counter; + } } using BlockReduce = cub::BlockReduce; @@ -428,32 +458,35 @@ __global__ void probe_hash_table(multimap_type multi_map, } /** - * @brief Performs a nested loop join to find all matching rows between the - * left and right tables and generate the output for the desired Join - * operation. + * @brief Performs a join conditioned on a predicate to find all matching rows + * between the left and right tables and generate the output for the desired + * Join operation. * * @tparam block_size The number of threads per block for this kernel * @tparam output_cache_size The side of the shared memory buffer to cache join * output results - + * @tparam has_nulls Whether or not the inputs may contain nulls. + * * @param[in] left_table The left table * @param[in] right_table The right table * @param[in] JoinKind The type of join to be performed - * @param[in] check_row_equality The row equality comparator + * @param compare_nulls Controls whether null join-key values should match or not. * @param[out] join_output_l The left result of the join operation * @param[out] join_output_r The right result of the join operation * @param[in,out] current_idx A global counter used by threads to coordinate * writes to the global output + * @param plan Container of device data required to evaluate the desired expression. * @param[in] max_size The maximum size of the output */ -template -__global__ void nested_loop_join(table_device_view left_table, +template +__global__ void conditional_join(table_device_view left_table, table_device_view right_table, join_kind JoinKind, - row_equality check_row_equality, + null_equality compare_nulls, cudf::size_type* join_output_l, cudf::size_type* join_output_r, cudf::size_type* current_idx, + cudf::ast::detail::device_ast_plan plan, const cudf::size_type max_size) { constexpr int num_warps = block_size / detail::warp_size; @@ -461,6 +494,15 @@ __global__ void nested_loop_join(table_device_view left_table, __shared__ cudf::size_type join_shared_l[num_warps][output_cache_size]; __shared__ cudf::size_type join_shared_r[num_warps][output_cache_size]; + // Normally the casting of a shared memory array is used to create multiple + // arrays of different types from the shared memory buffer, but here it is + // used to circumvent conflicts between arrays of different types between + // different template instantiations due to the extern specifier. + extern __shared__ char raw_intermediate_storage[]; + cudf::ast::detail::IntermediateDataType* intermediate_storage = + reinterpret_cast*>(raw_intermediate_storage); + auto thread_intermediate_storage = &intermediate_storage[threadIdx.x * plan.num_intermediates]; + const int warp_id = threadIdx.x / detail::warp_size; const int lane_id = threadIdx.x % detail::warp_size; const cudf::size_type left_num_rows = left_table.num_rows(); @@ -473,18 +515,34 @@ __global__ void nested_loop_join(table_device_view left_table, cudf::size_type left_row_index = threadIdx.x + blockIdx.x * blockDim.x; const unsigned int activemask = __ballot_sync(0xffffffff, left_row_index < left_num_rows); + + auto evaluator = cudf::ast::detail::expression_evaluator( + left_table, right_table, plan, thread_intermediate_storage, compare_nulls); + if (left_row_index < left_num_rows) { bool found_match = false; - for (size_type right_row_index(0); right_row_index < right_num_rows; right_row_index++) { - if (check_row_equality(left_row_index, right_row_index)) { + for (size_type right_row_index(0); right_row_index < right_num_rows; ++right_row_index) { + auto output_dest = cudf::ast::detail::value_expression_result(); + evaluator.evaluate(output_dest, left_row_index, right_row_index, 0); + + if (output_dest.is_valid() && output_dest.value()) { // If the rows are equal, then we have found a true match + // In the case of left anti joins we only add indices from left after + // the loop if we have found _no_ matches from the right. + // In the case of left semi joins we only add the first match (note + // that the current logic relies on the fact that we process all right + // table rows for a single left table row on a single thread so that no + // synchronization of found_match is required). + if ((JoinKind != join_kind::LEFT_ANTI_JOIN) && + !(JoinKind == join_kind::LEFT_SEMI_JOIN && found_match)) { + add_pair_to_cache(left_row_index, + right_row_index, + current_idx_shared, + warp_id, + join_shared_l[warp_id], + join_shared_r[warp_id]); + } found_match = true; - add_pair_to_cache(left_row_index, - right_row_index, - current_idx_shared, - warp_id, - join_shared_l[warp_id], - join_shared_r[warp_id]); } __syncwarp(activemask); @@ -506,8 +564,11 @@ __global__ void nested_loop_join(table_device_view left_table, } } - // If performing a LEFT join and no match was found, insert a Null into the output - if ((JoinKind == join_kind::LEFT_JOIN) && (!found_match)) { + // Left, left anti, and full joins all require saving left columns that + // aren't present in the right. + if ((JoinKind == join_kind::LEFT_JOIN || JoinKind == join_kind::LEFT_ANTI_JOIN || + JoinKind == join_kind::FULL_JOIN) && + (!found_match)) { add_pair_to_cache(left_row_index, static_cast(JoinNoneValue), current_idx_shared, diff --git a/cpp/src/join/nested_loop_join.cuh b/cpp/src/join/nested_loop_join.cuh index 5054305a41a..9848477a894 100644 --- a/cpp/src/join/nested_loop_join.cuh +++ b/cpp/src/join/nested_loop_join.cuh @@ -19,7 +19,8 @@ #include "join_common_utils.hpp" #include "join_kernels.cuh" -#include +#include +#include #include #include #include @@ -28,167 +29,153 @@ #include #include +#include -#include +#include + +#include namespace cudf { namespace detail { + /** - * @brief Gives an estimate of the size of the join output produced when - * joining two tables together. - * - * @throw cudf::logic_error if JoinKind is not INNER_JOIN or LEFT_JOIN + * @brief Computes the join operation between two tables and returns the + * output indices of left and right table as a combined table * - * @param left The left hand table - * @param right The right hand table + * @param left Table of left columns to join + * @param right Table of right columns to join + * tables have been flipped, meaning the output indices should also be flipped * @param JoinKind The type of join to be performed * @param compare_nulls Controls whether null join-key values should match or not. * @param stream CUDA stream used for device memory operations and kernel launches * - * @return An estimate of the size of the output of the join operation + * @return Join output indices vector pair */ -size_type estimate_nested_loop_join_output_size(table_device_view left, - table_device_view right, - join_kind JoinKind, - null_equality compare_nulls, - rmm::cuda_stream_view stream) +std::pair>, + std::unique_ptr>> +get_conditional_join_indices(table_view const& left, + table_view const& right, + join_kind JoinKind, + ast::expression binary_pred, + null_equality compare_nulls, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { - const size_type left_num_rows{left.num_rows()}; - const size_type right_num_rows{right.num_rows()}; - - if (right_num_rows == 0) { - // If the right table is empty, we know exactly how large the output - // will be for the different types of joins and can return immediately + // We can immediately filter out cases where the right table is empty. In + // some cases, we return all the rows of the left table with a corresponding + // null index for the right table; in others, we return an empty output. + if (right.num_rows() == 0) { switch (JoinKind) { - // Inner join with an empty table will have no output - case join_kind::INNER_JOIN: return 0; - - // Left join with an empty table will have an output of NULL rows - // equal to the number of rows in the left table - case join_kind::LEFT_JOIN: return left_num_rows; - - default: CUDF_FAIL("Unsupported join type"); + // Left, left anti, and full (which are effectively left because we are + // guaranteed that left has more rows than right) all return a all the + // row indices from left with a corresponding NULL from the right. + case join_kind::LEFT_JOIN: + case join_kind::LEFT_ANTI_JOIN: + case join_kind::FULL_JOIN: return get_trivial_left_join_indices(left, stream); + // Inner and left semi joins return empty output because no matches can exist. + case join_kind::INNER_JOIN: + case join_kind::LEFT_SEMI_JOIN: + return std::make_pair(std::make_unique>(0, stream, mr), + std::make_unique>(0, stream, mr)); } } - // Allocate storage for the counter used to get the size of the join output - size_type h_size_estimate{0}; - rmm::device_scalar size_estimate(0, stream); + // Prepare output column. Whether or not the output column is nullable is + // determined by whether any of the columns in the input table are nullable. + // If none of the input columns actually contain nulls, we can still use the + // non-nullable version of the expression evaluation code path for + // performance, so we capture that information as well. + auto const nullable = + std::any_of(left.begin(), left.end(), [](column_view c) { return c.nullable(); }) || + std::any_of(right.begin(), right.end(), [](column_view c) { return c.nullable(); }); + auto const has_nulls = + nullable && + (std::any_of( + left.begin(), left.end(), [](column_view c) { return c.nullable() && c.has_nulls(); }) || + std::any_of( + right.begin(), right.end(), [](column_view c) { return c.nullable() && c.has_nulls(); })); + + auto const plan = ast::detail::ast_plan{binary_pred, left, right, has_nulls, stream, mr}; + CUDF_EXPECTS(plan.output_type().id() == type_id::BOOL8, + "The expression must produce a boolean output."); - CHECK_CUDA(stream.value()); + auto left_table = table_device_view::create(left, stream); + auto right_table = table_device_view::create(right, stream); + // Allocate storage for the counter used to get the size of the join output + rmm::device_scalar size(0, stream, mr); + CHECK_CUDA(stream.value()); constexpr int block_size{DEFAULT_JOIN_BLOCK_SIZE}; - int numBlocks{-1}; + detail::grid_1d config(left_table->num_rows(), block_size); + auto const shmem_size_per_block = plan.dev_plan.shmem_per_thread * config.num_threads_per_block; - CUDA_TRY(cudaOccupancyMaxActiveBlocksPerMultiprocessor( - &numBlocks, compute_nested_loop_join_output_size, block_size, 0)); - - int dev_id{-1}; - CUDA_TRY(cudaGetDevice(&dev_id)); - - int num_sms{-1}; - CUDA_TRY(cudaDeviceGetAttribute(&num_sms, cudaDevAttrMultiProcessorCount, dev_id)); - - size_estimate.set_value_zero(stream); - - row_equality equality{left, right, compare_nulls == null_equality::EQUAL}; // Determine number of output rows without actually building the output to simply // find what the size of the output will be. - compute_nested_loop_join_output_size - <<>>( - left, right, JoinKind, equality, size_estimate.data()); + join_kind KernelJoinKind = JoinKind == join_kind::FULL_JOIN ? join_kind::LEFT_JOIN : JoinKind; + if (has_nulls) { + compute_conditional_join_output_size + <<>>( + *left_table, *right_table, KernelJoinKind, compare_nulls, plan.dev_plan, size.data()); + } else { + compute_conditional_join_output_size + <<>>( + *left_table, *right_table, KernelJoinKind, compare_nulls, plan.dev_plan, size.data()); + } CHECK_CUDA(stream.value()); - h_size_estimate = size_estimate.value(stream); + size_type const join_size = size.value(stream); - return h_size_estimate; -} - -/** - * @brief Computes the join operation between two tables and returns the - * output indices of left and right table as a combined table - * - * @param left Table of left columns to join - * @param right Table of right columns to join - * @param flip_join_indices Flag that indicates whether the left and right - * tables have been flipped, meaning the output indices should also be flipped - * @param JoinKind The type of join to be performed - * @param compare_nulls Controls whether null join-key values should match or not. - * @param stream CUDA stream used for device memory operations and kernel launches - * - * @return Join output indices vector pair - */ -std::pair, rmm::device_uvector> -get_base_nested_loop_join_indices(table_view const& left, - table_view const& right, - bool flip_join_indices, - join_kind JoinKind, - null_equality compare_nulls, - rmm::cuda_stream_view stream) -{ - // The `right` table is always used for the inner loop. We want to use the smaller table - // for the inner loop. Thus, if `left` is smaller than `right`, swap `left/right`. - if ((JoinKind == join_kind::INNER_JOIN) && (right.num_rows() > left.num_rows())) { - return get_base_nested_loop_join_indices(right, left, true, JoinKind, compare_nulls, stream); + // If the output size will be zero, we can return immediately. + if (join_size == 0) { + return std::make_pair(std::make_unique>(0, stream, mr), + std::make_unique>(0, stream, mr)); } - // Trivial left join case - exit early - if ((JoinKind == join_kind::LEFT_JOIN) && (right.num_rows() == 0)) { - return get_trivial_left_join_indices(left, stream); + + rmm::device_scalar write_index(0, stream); + + auto left_indices = std::make_unique>(join_size, stream, mr); + auto right_indices = std::make_unique>(join_size, stream, mr); + + const auto& join_output_l = left_indices->data(); + const auto& join_output_r = right_indices->data(); + if (has_nulls) { + conditional_join + <<>>( + *left_table, + *right_table, + KernelJoinKind, + compare_nulls, + join_output_l, + join_output_r, + write_index.data(), + plan.dev_plan, + join_size); + } else { + conditional_join + <<>>( + *left_table, + *right_table, + KernelJoinKind, + compare_nulls, + join_output_l, + join_output_r, + write_index.data(), + plan.dev_plan, + join_size); } - auto left_table = table_device_view::create(left, stream); - auto right_table = table_device_view::create(right, stream); + CHECK_CUDA(stream.value()); - size_type estimated_size = estimate_nested_loop_join_output_size( - *left_table, *right_table, JoinKind, compare_nulls, stream); + auto join_indices = std::make_pair(std::move(left_indices), std::move(right_indices)); - // If the estimated output size is zero, return immediately - if (estimated_size == 0) { - return std::make_pair(rmm::device_uvector{0, stream}, - rmm::device_uvector{0, stream}); + // For full joins, get the indices in the right table that were not joined to + // by any row in the left table. + if (JoinKind == join_kind::FULL_JOIN) { + auto complement_indices = detail::get_left_join_indices_complement( + join_indices.second, left.num_rows(), right.num_rows(), stream, mr); + join_indices = detail::concatenate_vector_pairs(join_indices, complement_indices, stream); } - - // Because we are approximating the number of joined elements, our approximation - // might be incorrect and we might have underestimated the number of joined elements. - // As such we will need to de-allocate memory and re-allocate memory to ensure - // that the final output is correct. - rmm::device_scalar write_index(0, stream); - size_type join_size{0}; - - rmm::device_uvector left_indices{0, stream}; - rmm::device_uvector right_indices{0, stream}; - auto current_estimated_size = estimated_size; - do { - left_indices.resize(estimated_size, stream); - right_indices.resize(estimated_size, stream); - - constexpr int block_size{DEFAULT_JOIN_BLOCK_SIZE}; - detail::grid_1d config(left_table->num_rows(), block_size); - write_index.set_value_zero(stream); - - row_equality equality{*left_table, *right_table, compare_nulls == null_equality::EQUAL}; - const auto& join_output_l = flip_join_indices ? right_indices.data() : left_indices.data(); - const auto& join_output_r = flip_join_indices ? left_indices.data() : right_indices.data(); - nested_loop_join - <<>>(*left_table, - *right_table, - JoinKind, - equality, - join_output_l, - join_output_r, - write_index.data(), - estimated_size); - - CHECK_CUDA(stream.value()); - - join_size = write_index.value(stream); - current_estimated_size = estimated_size; - estimated_size *= 2; - } while ((current_estimated_size < join_size)); - - left_indices.resize(join_size, stream); - right_indices.resize(join_size, stream); - return std::make_pair(std::move(left_indices), std::move(right_indices)); + return join_indices; } } // namespace detail diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 34fceb9015e..ddb5d88f2d0 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -88,6 +88,7 @@ ConfigureTest(GROUPBY_TEST # - join tests ------------------------------------------------------------------------------------ ConfigureTest(JOIN_TEST join/join_tests.cpp + join/conditional_join_tests.cu join/cross_join_tests.cpp join/semi_anti_join_tests.cpp) diff --git a/cpp/tests/ast/transform_tests.cpp b/cpp/tests/ast/transform_tests.cpp index 74937d4deea..48e19a2f587 100644 --- a/cpp/tests/ast/transform_tests.cpp +++ b/cpp/tests/ast/transform_tests.cpp @@ -31,10 +31,13 @@ #include #include -#include +#include +#include #include +#include #include +#include template using column_wrapper = cudf::test::fixed_width_column_wrapper; @@ -409,4 +412,46 @@ TEST_F(TransformTest, PyMod) cudf::test::expect_columns_equal(expected, result->view(), true); } +TEST_F(TransformTest, BasicAdditionNulls) +{ + auto c_0 = column_wrapper{{3, 20, 1, 50}, {0, 0, 1, 1}}; + auto c_1 = column_wrapper{{10, 7, 20, 0}, {0, 1, 0, 1}}; + auto table = cudf::table_view{{c_0, c_1}}; + + auto col_ref_0 = cudf::ast::column_reference(0); + auto col_ref_1 = cudf::ast::column_reference(1); + auto expression = cudf::ast::expression(cudf::ast::ast_operator::ADD, col_ref_0, col_ref_1); + + auto expected = column_wrapper{{0, 0, 0, 50}, {0, 0, 0, 1}}; + auto result = cudf::ast::compute_column(table, expression); + + cudf::test::expect_columns_equal(expected, result->view(), true); +} + +TEST_F(TransformTest, BasicAdditionLargeNulls) +{ + auto N = 2000; + auto a = thrust::make_counting_iterator(0); + + auto validities = std::vector(N); + std::fill(validities.begin(), validities.begin() + N / 2, 0); + std::fill(validities.begin() + (N / 2), validities.end(), 0); + + std::random_device rd; + std::mt19937 gen(rd()); + std::shuffle(validities.begin(), validities.end(), gen); + + auto col = column_wrapper(a, a + N, validities.begin()); + auto table = cudf::table_view{{col}}; + + auto col_ref = cudf::ast::column_reference(0); + auto expression = cudf::ast::expression(cudf::ast::ast_operator::ADD, col_ref, col_ref); + + auto b = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i * 2; }); + auto expected = column_wrapper(b, b + N, validities.begin()); + auto result = cudf::ast::compute_column(table, expression); + + cudf::test::expect_columns_equal(expected, result->view(), true); +} + CUDF_TEST_PROGRAM_MAIN() diff --git a/cpp/tests/join/conditional_join_tests.cu b/cpp/tests/join/conditional_join_tests.cu new file mode 100644 index 00000000000..57abdf17aa6 --- /dev/null +++ b/cpp/tests/join/conditional_join_tests.cu @@ -0,0 +1,709 @@ +/* + * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +// Defining expressions for AST evaluation is currently a bit tedious, so we +// define some standard nodes here that can be easily reused elsewhere. +namespace { +constexpr cudf::size_type JoinNoneValue = + std::numeric_limits::min(); // TODO: how to test if this isn't public? + +// Common column references. +const auto col_ref_left_0 = cudf::ast::column_reference(0, cudf::ast::table_reference::LEFT); +const auto col_ref_right_0 = cudf::ast::column_reference(0, cudf::ast::table_reference::RIGHT); +const auto col_ref_left_1 = cudf::ast::column_reference(1, cudf::ast::table_reference::LEFT); +const auto col_ref_right_1 = cudf::ast::column_reference(1, cudf::ast::table_reference::RIGHT); + +// Common expressions. +auto left_zero_eq_right_zero = + cudf::ast::expression(cudf::ast::ast_operator::EQUAL, col_ref_left_0, col_ref_right_0); +} // namespace + +/** + * The principal fixture for all conditional joins. + */ +template +struct ConditionalJoinTest : public cudf::test::BaseFixture { + /** + * Convenience utility for parsing initializer lists of input data into + * suitable inputs for tables. + */ + std::tuple>, + std::vector>, + std::vector, + std::vector, + cudf::table_view, + cudf::table_view> + parse_input(std::vector> left_data, std::vector> right_data) + { + // Note that we need to maintain the column wrappers otherwise the + // resulting column views will be referencing potentially invalid memory. + std::vector> left_wrappers; + std::vector> right_wrappers; + + std::vector left_columns; + std::vector right_columns; + + for (auto v : left_data) { + left_wrappers.push_back(cudf::test::fixed_width_column_wrapper(v.begin(), v.end())); + left_columns.push_back(left_wrappers.back()); + } + + for (auto v : right_data) { + right_wrappers.push_back(cudf::test::fixed_width_column_wrapper(v.begin(), v.end())); + right_columns.push_back(right_wrappers.back()); + } + + return std::make_tuple(std::move(left_wrappers), + std::move(right_wrappers), + std::move(left_columns), + std::move(right_columns), + cudf::table_view(left_columns), + cudf::table_view(right_columns)); + } + + std::tuple>, + std::vector>, + std::vector, + std::vector, + cudf::table_view, + cudf::table_view> + parse_input(std::vector, std::vector>> left_data, + std::vector, std::vector>> right_data) + { + // Note that we need to maintain the column wrappers otherwise the + // resulting column views will be referencing potentially invalid memory. + std::vector> left_wrappers; + std::vector> right_wrappers; + + std::vector left_columns; + std::vector right_columns; + + for (auto v : left_data) { + left_wrappers.push_back(cudf::test::fixed_width_column_wrapper( + v.first.begin(), v.first.end(), v.second.begin())); + left_columns.push_back(left_wrappers.back()); + } + + for (auto v : right_data) { + right_wrappers.push_back(cudf::test::fixed_width_column_wrapper( + v.first.begin(), v.first.end(), v.second.begin())); + right_columns.push_back(right_wrappers.back()); + } + + return std::make_tuple(std::move(left_wrappers), + std::move(right_wrappers), + std::move(left_columns), + std::move(right_columns), + cudf::table_view(left_columns), + cudf::table_view(right_columns)); + } +}; + +/** + * Fixture for join types that return both left and right indices (inner, left, + * and full joins). + */ +template +struct ConditionalJoinPairReturnTest : public ConditionalJoinTest { + /* + * Perform a join of tables constructed from two input data sets according to + * the provided predicate and verify that the outputs match the expected + * outputs (up to order). + */ + void test(std::vector> left_data, + std::vector> right_data, + cudf::ast::expression predicate, + std::vector> expected_outputs) + { + // Note that we need to maintain the column wrappers otherwise the + // resulting column views will be referencing potentially invalid memory. + auto [left_wrappers, right_wrappers, left_columns, right_columns, left, right] = + this->parse_input(left_data, right_data); + auto result = this->join(left, right, predicate); + + std::vector> result_pairs; + for (size_t i = 0; i < result.first->size(); ++i) { + // Note: Not trying to be terribly efficient here since these tests are + // small, otherwise a batch copy to host before constructing the tuples + // would be important. + result_pairs.push_back({result.first->element(i, rmm::cuda_stream_default), + result.second->element(i, rmm::cuda_stream_default)}); + } + std::sort(result_pairs.begin(), result_pairs.end()); + std::sort(expected_outputs.begin(), expected_outputs.end()); + + EXPECT_TRUE(std::equal(result_pairs.begin(), result_pairs.end(), expected_outputs.begin())); + } + + void test_nulls(std::vector, std::vector>> left_data, + std::vector, std::vector>> right_data, + cudf::ast::expression predicate, + std::vector> expected_outputs) + { + // Note that we need to maintain the column wrappers otherwise the + // resulting column views will be referencing potentially invalid memory. + auto [left_wrappers, right_wrappers, left_columns, right_columns, left, right] = + this->parse_input(left_data, right_data); + auto result = this->join(left, right, predicate); + + std::vector> result_pairs; + for (size_t i = 0; i < result.first->size(); ++i) { + // Note: Not trying to be terribly efficient here since these tests are + // small, otherwise a batch copy to host before constructing the tuples + // would be important. + result_pairs.push_back({result.first->element(i, rmm::cuda_stream_default), + result.second->element(i, rmm::cuda_stream_default)}); + } + std::sort(result_pairs.begin(), result_pairs.end()); + std::sort(expected_outputs.begin(), expected_outputs.end()); + + EXPECT_TRUE(std::equal(result_pairs.begin(), result_pairs.end(), expected_outputs.begin())); + } + + /* + * Perform a join of tables constructed from two input data sets according to + * an equality predicate on all corresponding columns and verify that the outputs match the + * expected outputs (up to order). + */ + void compare_to_hash_join(std::vector> left_data, + std::vector> right_data) + { + // Note that we need to maintain the column wrappers otherwise the + // resulting column views will be referencing potentially invalid memory. + auto [left_wrappers, right_wrappers, left_columns, right_columns, left, right] = + this->parse_input(left_data, right_data); + // TODO: Generalize this to support multiple columns by automatically + // constructing the appropriate expression. + auto result = this->join(left, right, left_zero_eq_right_zero); + auto reference = this->reference_join(left, right); + + thrust::device_vector> result_pairs( + result.first->size()); + thrust::device_vector> reference_pairs( + reference.first->size()); + + thrust::transform(thrust::device, + result.first->begin(), + result.first->end(), + result.second->begin(), + result_pairs.begin(), + [] __device__(cudf::size_type first, cudf::size_type second) { + return thrust::make_pair(first, second); + }); + thrust::transform(thrust::device, + reference.first->begin(), + reference.first->end(), + reference.second->begin(), + reference_pairs.begin(), + [] __device__(cudf::size_type first, cudf::size_type second) { + return thrust::make_pair(first, second); + }); + + thrust::sort(thrust::device, result_pairs.begin(), result_pairs.end()); + thrust::sort(thrust::device, reference_pairs.begin(), reference_pairs.end()); + + EXPECT_TRUE(thrust::equal( + thrust::device, result_pairs.begin(), result_pairs.end(), reference_pairs.begin())); + } + + /** + * This method must be implemented by subclasses for specific types of joins. + * It should be a simply forwarding of arguments to the appropriate cudf + * conditional join API. + */ + virtual std::pair>, + std::unique_ptr>> + join(cudf::table_view left, cudf::table_view right, cudf::ast::expression predicate) = 0; + + /** + * This method must be implemented by subclasses for specific types of joins. + * It should be a simply forwarding of arguments to the appropriate cudf + * hash join API for comparison with conditional joins. + */ + virtual std::pair>, + std::unique_ptr>> + reference_join(cudf::table_view left, cudf::table_view right) = 0; +}; + +/** + * Tests of inner joins. + */ +template +struct ConditionalInnerJoinTest : public ConditionalJoinPairReturnTest { + std::pair>, + std::unique_ptr>> + join(cudf::table_view left, cudf::table_view right, cudf::ast::expression predicate) override + { + return cudf::conditional_inner_join(left, right, predicate); + } + + std::pair>, + std::unique_ptr>> + reference_join(cudf::table_view left, cudf::table_view right) override + { + return cudf::inner_join(left, right); + } +}; + +TYPED_TEST_CASE(ConditionalInnerJoinTest, cudf::test::IntegralTypesNotBool); + +TYPED_TEST(ConditionalInnerJoinTest, TestOneColumnOneRowAllEqual) +{ + this->test({{0}}, {{0}}, left_zero_eq_right_zero, {{0, 0}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestOneColumnTwoRowAllEqual) +{ + this->test({{0, 1}}, {{0, 0}}, left_zero_eq_right_zero, {{0, 0}, {0, 1}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestTwoColumnOneRowAllEqual) +{ + this->test({{0}, {0}}, {{0}, {0}}, left_zero_eq_right_zero, {{0, 0}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestTwoColumnThreeRowAllEqual) +{ + this->test({{0, 1, 2}, {10, 20, 30}}, + {{0, 1, 2}, {30, 40, 50}}, + left_zero_eq_right_zero, + {{0, 0}, {1, 1}, {2, 2}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestTwoColumnThreeRowSomeEqual) +{ + this->test({{0, 1, 2}, {10, 20, 30}}, + {{0, 1, 3}, {30, 40, 50}}, + left_zero_eq_right_zero, + {{0, 0}, {1, 1}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestNotComparison) +{ + auto col_ref_0 = cudf::ast::column_reference(0); + auto expression = cudf::ast::expression(cudf::ast::ast_operator::NOT, col_ref_0); + + this->test({{0, 1, 2}}, {{3, 4, 5}}, expression, {{0, 0}, {0, 1}, {0, 2}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestGreaterComparison) +{ + auto col_ref_0 = cudf::ast::column_reference(0); + auto col_ref_1 = cudf::ast::column_reference(0, cudf::ast::table_reference::RIGHT); + auto expression = cudf::ast::expression(cudf::ast::ast_operator::GREATER, col_ref_0, col_ref_1); + + this->test({{0, 1, 2}}, {{1, 0, 0}}, expression, {{1, 1}, {1, 2}, {2, 0}, {2, 1}, {2, 2}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestGreaterTwoColumnComparison) +{ + auto col_ref_0 = cudf::ast::column_reference(0); + auto col_ref_1 = cudf::ast::column_reference(1, cudf::ast::table_reference::RIGHT); + auto expression = cudf::ast::expression(cudf::ast::ast_operator::GREATER, col_ref_0, col_ref_1); + + this->test({{0, 1, 2}, {0, 0, 0}}, + {{0, 0, 0}, {1, 0, 0}}, + expression, + {{1, 1}, {1, 2}, {2, 0}, {2, 1}, {2, 2}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestGreaterDifferentNumberColumnComparison) +{ + auto col_ref_0 = cudf::ast::column_reference(0); + auto col_ref_1 = cudf::ast::column_reference(1, cudf::ast::table_reference::RIGHT); + auto expression = cudf::ast::expression(cudf::ast::ast_operator::GREATER, col_ref_0, col_ref_1); + + this->test( + {{0, 1, 2}}, {{0, 0, 0}, {1, 0, 0}}, expression, {{1, 1}, {1, 2}, {2, 0}, {2, 1}, {2, 2}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestGreaterDifferentNumberColumnDifferentSizeComparison) +{ + auto col_ref_0 = cudf::ast::column_reference(0); + auto col_ref_1 = cudf::ast::column_reference(1, cudf::ast::table_reference::RIGHT); + auto expression = cudf::ast::expression(cudf::ast::ast_operator::GREATER, col_ref_0, col_ref_1); + + this->test({{0, 1}}, {{0, 0, 0}, {1, 0, 0}}, expression, {{1, 1}, {1, 2}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestComplexConditionMultipleColumns) +{ + // LEFT is implicit, but specifying explicitly to validate that it works. + auto col_ref_0 = cudf::ast::column_reference(0, cudf::ast::table_reference::LEFT); + auto scalar_1 = cudf::numeric_scalar(1); + auto literal_1 = cudf::ast::literal(scalar_1); + auto left_0_equal_1 = cudf::ast::expression(cudf::ast::ast_operator::EQUAL, col_ref_0, literal_1); + + auto col_ref_1 = cudf::ast::column_reference(1, cudf::ast::table_reference::RIGHT); + auto comparison_filter = + cudf::ast::expression(cudf::ast::ast_operator::LESS, col_ref_1, col_ref_0); + + auto expression = + cudf::ast::expression(cudf::ast::ast_operator::LOGICAL_AND, left_0_equal_1, comparison_filter); + + this->test({{0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2}, {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}}, + {{0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2}, + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, + {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}, + expression, + {{4, 0}, {5, 0}, {6, 0}, {7, 0}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestCompareRandomToHash) +{ + // Generate columns of 10 repeats of the integer range [0, 10), then merge + // a shuffled version and compare to hash join. + unsigned int N = 10000; + unsigned int num_repeats = 10; + unsigned int num_unique = N / num_repeats; + + std::vector left(N); + std::vector right(N); + + for (unsigned int i = 0; i < num_repeats; ++i) { + std::iota( + std::next(left.begin(), num_unique * i), std::next(left.begin(), num_unique * (i + 1)), 0); + std::iota( + std::next(right.begin(), num_unique * i), std::next(right.begin(), num_unique * (i + 1)), 0); + } + + std::random_device rd; + std::mt19937 gen(rd()); + std::shuffle(left.begin(), left.end(), gen); + std::shuffle(right.begin(), right.end(), gen); + + this->compare_to_hash_join({left}, {right}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestOneColumnTwoNullsRowAllEqual) +{ + this->test_nulls( + {{{0, 1}, {1, 0}}}, {{{0, 0}, {1, 1}}}, left_zero_eq_right_zero, {{0, 0}, {0, 1}}); +}; + +TYPED_TEST(ConditionalInnerJoinTest, TestOneColumnTwoNullsNoOutputRowAllEqual) +{ + this->test_nulls({{{0, 1}, {0, 1}}}, {{{0, 0}, {1, 1}}}, left_zero_eq_right_zero, {{}, {}}); +}; + +/** + * Tests of left joins. + */ +template +struct ConditionalLeftJoinTest : public ConditionalJoinPairReturnTest { + std::pair>, + std::unique_ptr>> + join(cudf::table_view left, cudf::table_view right, cudf::ast::expression predicate) override + { + return cudf::conditional_left_join(left, right, predicate); + } + + std::pair>, + std::unique_ptr>> + reference_join(cudf::table_view left, cudf::table_view right) override + { + return cudf::left_join(left, right); + } +}; + +TYPED_TEST_CASE(ConditionalLeftJoinTest, cudf::test::IntegralTypesNotBool); + +TYPED_TEST(ConditionalLeftJoinTest, TestTwoColumnThreeRowSomeEqual) +{ + this->test({{0, 1, 2}, {10, 20, 30}}, + {{0, 1, 3}, {30, 40, 50}}, + left_zero_eq_right_zero, + {{0, 0}, {1, 1}, {2, JoinNoneValue}}); +}; + +TYPED_TEST(ConditionalLeftJoinTest, TestCompareRandomToHash) +{ + // Generate columns of 10 repeats of the integer range [0, 10), then merge + // a shuffled version and compare to hash join. + unsigned int N = 10000; + unsigned int num_repeats = 10; + unsigned int num_unique = N / num_repeats; + + std::vector left(N); + std::vector right(N); + + for (unsigned int i = 0; i < num_repeats; ++i) { + std::iota( + std::next(left.begin(), num_unique * i), std::next(left.begin(), num_unique * (i + 1)), 0); + std::iota( + std::next(right.begin(), num_unique * i), std::next(right.begin(), num_unique * (i + 1)), 0); + } + + std::random_device rd; + std::mt19937 gen(rd()); + std::shuffle(left.begin(), left.end(), gen); + std::shuffle(right.begin(), right.end(), gen); + + this->compare_to_hash_join({left}, {right}); +}; + +/** + * Tests of full joins. + */ +template +struct ConditionalFullJoinTest : public ConditionalJoinPairReturnTest { + std::pair>, + std::unique_ptr>> + join(cudf::table_view left, cudf::table_view right, cudf::ast::expression predicate) override + { + return cudf::conditional_full_join(left, right, predicate); + } + + std::pair>, + std::unique_ptr>> + reference_join(cudf::table_view left, cudf::table_view right) override + { + return cudf::full_join(left, right); + } +}; + +TYPED_TEST_CASE(ConditionalFullJoinTest, cudf::test::IntegralTypesNotBool); + +TYPED_TEST(ConditionalFullJoinTest, TestTwoColumnThreeRowSomeEqual) +{ + this->test({{0, 1, 2}, {10, 20, 30}}, + {{0, 1, 3}, {30, 40, 50}}, + left_zero_eq_right_zero, + {{0, 0}, {1, 1}, {2, JoinNoneValue}, {JoinNoneValue, 2}}); +}; + +TYPED_TEST(ConditionalFullJoinTest, TestCompareRandomToHash) +{ + // Generate columns of 10 repeats of the integer range [0, 10), then merge + // a shuffled version and compare to hash join. + unsigned int N = 10000; + unsigned int num_repeats = 10; + unsigned int num_unique = N / num_repeats; + + std::vector left(N); + std::vector right(N); + + for (unsigned int i = 0; i < num_repeats; ++i) { + std::iota( + std::next(left.begin(), num_unique * i), std::next(left.begin(), num_unique * (i + 1)), 0); + std::iota( + std::next(right.begin(), num_unique * i), std::next(right.begin(), num_unique * (i + 1)), 0); + } + + std::random_device rd; + std::mt19937 gen(rd()); + std::shuffle(left.begin(), left.end(), gen); + std::shuffle(right.begin(), right.end(), gen); + + this->compare_to_hash_join({left}, {right}); +}; + +/** + * Fixture for join types that return both only left indices (left semi and + * left anti). + */ +template +struct ConditionalJoinSingleReturnTest : public ConditionalJoinTest { + /* + * Perform a join of tables constructed from two input data sets according to + * the provided predicate and verify that the outputs match the expected + * outputs (up to order). + */ + void test(std::vector> left_data, + std::vector> right_data, + cudf::ast::expression predicate, + std::vector expected_outputs) + { + auto [left_wrappers, right_wrappers, left_columns, right_columns, left, right] = + this->parse_input(left_data, right_data); + auto result = this->join(left, right, predicate); + + std::vector resulting_indices; + for (size_t i = 0; i < result->size(); ++i) { + // Note: Not trying to be terribly efficient here since these tests are + // small, otherwise a batch copy to host before constructing the tuples + // would be important. + resulting_indices.push_back(result->element(i, rmm::cuda_stream_default)); + } + std::sort(resulting_indices.begin(), resulting_indices.end()); + std::sort(expected_outputs.begin(), expected_outputs.end()); + EXPECT_TRUE( + std::equal(resulting_indices.begin(), resulting_indices.end(), expected_outputs.begin())); + } + + /* + * Perform a join of tables constructed from two input data sets according to + * an equality predicate on all corresponding columns and verify that the outputs match the + * expected outputs (up to order). + */ + void compare_to_hash_join(std::vector> left_data, + std::vector> right_data) + { + // Note that we need to maintain the column wrappers otherwise the + // resulting column views will be referencing potentially invalid memory. + auto [left_wrappers, right_wrappers, left_columns, right_columns, left, right] = + this->parse_input(left_data, right_data); + // TODO: Generalize this to support multiple columns by automatically + // constructing the appropriate expression. + auto result = this->join(left, right, left_zero_eq_right_zero); + auto reference = this->reference_join(left, right); + + thrust::sort(thrust::device, result->begin(), result->end()); + thrust::sort(thrust::device, reference->begin(), reference->end()); + + EXPECT_TRUE(thrust::equal(thrust::device, result->begin(), result->end(), reference->begin())); + } + + /** + * This method must be implemented by subclasses for specific types of joins. + * It should be a simply forwarding of arguments to the appropriate cudf + * conditional join API. + */ + virtual std::unique_ptr> join( + cudf::table_view left, cudf::table_view right, cudf::ast::expression predicate) = 0; + + /** + * This method must be implemented by subclasses for specific types of joins. + * It should be a simply forwarding of arguments to the appropriate cudf + * hash join API for comparison with conditional joins. + */ + virtual std::unique_ptr> reference_join( + cudf::table_view left, cudf::table_view right) = 0; +}; + +/** + * Tests of left semi joins. + */ +template +struct ConditionalLeftSemiJoinTest : public ConditionalJoinSingleReturnTest { + std::unique_ptr> join( + cudf::table_view left, cudf::table_view right, cudf::ast::expression predicate) override + { + return cudf::conditional_left_semi_join(left, right, predicate); + } + + std::unique_ptr> reference_join( + cudf::table_view left, cudf::table_view right) override + { + return cudf::left_semi_join(left, right); + } +}; + +TYPED_TEST_CASE(ConditionalLeftSemiJoinTest, cudf::test::IntegralTypesNotBool); + +TYPED_TEST(ConditionalLeftSemiJoinTest, TestTwoColumnThreeRowSomeEqual) +{ + this->test({{0, 1, 2}, {10, 20, 30}}, {{0, 1, 3}, {30, 40, 50}}, left_zero_eq_right_zero, {0, 1}); +}; + +TYPED_TEST(ConditionalLeftSemiJoinTest, TestCompareRandomToHash) +{ + // Generate columns of 10 repeats of the integer range [0, 10), then merge + // a shuffled version and compare to hash join. + unsigned int N = 10000; + unsigned int num_repeats = 10; + unsigned int num_unique = N / num_repeats; + + std::vector left(N); + std::vector right(N); + + for (unsigned int i = 0; i < num_repeats; ++i) { + std::iota( + std::next(left.begin(), num_unique * i), std::next(left.begin(), num_unique * (i + 1)), 0); + std::iota( + std::next(right.begin(), num_unique * i), std::next(right.begin(), num_unique * (i + 1)), 0); + } + + std::random_device rd; + std::mt19937 gen(rd()); + std::shuffle(left.begin(), left.end(), gen); + std::shuffle(right.begin(), right.end(), gen); + + this->compare_to_hash_join({left}, {right}); +}; + +/** + * Tests of left anti joins. + */ +template +struct ConditionalLeftAntiJoinTest : public ConditionalJoinSingleReturnTest { + std::unique_ptr> join( + cudf::table_view left, cudf::table_view right, cudf::ast::expression predicate) override + { + return cudf::conditional_left_anti_join(left, right, predicate); + } + + std::unique_ptr> reference_join( + cudf::table_view left, cudf::table_view right) override + { + return cudf::left_anti_join(left, right); + } +}; + +TYPED_TEST_CASE(ConditionalLeftAntiJoinTest, cudf::test::IntegralTypesNotBool); + +TYPED_TEST(ConditionalLeftAntiJoinTest, TestTwoColumnThreeRowSomeEqual) +{ + this->test({{0, 1, 2}, {10, 20, 30}}, {{0, 1, 3}, {30, 40, 50}}, left_zero_eq_right_zero, {2}); +}; + +TYPED_TEST(ConditionalLeftAntiJoinTest, TestCompareRandomToHash) +{ + // Generate columns of 10 repeats of the integer range [0, 10), then merge + // a shuffled version and compare to hash join. + unsigned int N = 10000; + unsigned int num_repeats = 10; + unsigned int num_unique = N / num_repeats; + + std::vector left(N); + std::vector right(N); + + for (unsigned int i = 0; i < num_repeats; ++i) { + std::iota( + std::next(left.begin(), num_unique * i), std::next(left.begin(), num_unique * (i + 1)), 0); + std::iota( + std::next(right.begin(), num_unique * i), std::next(right.begin(), num_unique * (i + 1)), 0); + } + + std::random_device rd; + std::mt19937 gen(rd()); + std::shuffle(left.begin(), left.end(), gen); + std::shuffle(right.begin(), right.end(), gen); + + this->compare_to_hash_join({left}, {right}); +};