Skip to content

Commit

Permalink
ARROW-9398: [C++] Register SIMD sum variants to function instance.
Browse files Browse the repository at this point in the history
Enable simd_level feature of kernel and use it in DispatchExactImpl.
Add simd_level as a parameter of sum template to make sure every simd kernel has its own instantiation instance.
Also expand sum/mean test case to cover BitBlockCounter method.

Signed-off-by: Frank Du <frank.du@intel.com>

Closes apache#7700 from jianxind/sum_variants_to_function

Authored-by: Frank Du <frank.du@intel.com>
Signed-off-by: Wes McKinney <wesm@apache.org>
  • Loading branch information
frankdjx authored and wesm committed Jul 30, 2020
1 parent fa141ef commit 6efba62
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 98 deletions.
25 changes: 24 additions & 1 deletion cpp/src/arrow/compute/function.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "arrow/compute/exec.h"
#include "arrow/compute/exec_internal.h"
#include "arrow/datum.h"
#include "arrow/util/cpu_info.h"

namespace arrow {
namespace compute {
Expand Down Expand Up @@ -58,6 +59,7 @@ Result<const KernelType*> DispatchExactImpl(const Function& func,
const std::vector<KernelType>& kernels,
const std::vector<DescrType>& values) {
const int passed_num_args = static_cast<int>(values.size());
const KernelType* kernel_matches[SimdLevel::MAX] = {NULL};

// Validate arity
const Arity arity = func.arity();
Expand All @@ -70,9 +72,30 @@ Result<const KernelType*> DispatchExactImpl(const Function& func,
}
for (const auto& kernel : kernels) {
if (kernel.signature->MatchesInputs(values)) {
return &kernel;
kernel_matches[kernel.simd_level] = &kernel;
}
}

// Dispatch as the CPU feature
auto cpu_info = arrow::internal::CpuInfo::GetInstance();
#if defined(ARROW_HAVE_RUNTIME_AVX512)
if (cpu_info->IsSupported(arrow::internal::CpuInfo::AVX512)) {
if (kernel_matches[SimdLevel::AVX512]) {
return kernel_matches[SimdLevel::AVX512];
}
}
#endif
#if defined(ARROW_HAVE_RUNTIME_AVX2)
if (cpu_info->IsSupported(arrow::internal::CpuInfo::AVX2)) {
if (kernel_matches[SimdLevel::AVX2]) {
return kernel_matches[SimdLevel::AVX2];
}
}
#endif
if (kernel_matches[SimdLevel::NONE]) {
return kernel_matches[SimdLevel::NONE];
}

return Status::NotImplemented("Function ", func.name(),
" has no kernel matching input types ",
FormatArgTypes(values));
Expand Down
9 changes: 4 additions & 5 deletions cpp/src/arrow/compute/kernel.h
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ class ARROW_EXPORT KernelSignature {
/// type combination for different SIMD levels. Based on the active system's
/// CPU info or the user's preferences, we can elect to use one over the other.
struct SimdLevel {
enum type { NONE, SSE4_2, AVX, AVX2, AVX512, NEON };
enum type { NONE = 0, SSE4_2, AVX, AVX2, AVX512, NEON, MAX };
};

/// \brief The strategy to use for propagating or otherwise populating the
Expand Down Expand Up @@ -555,10 +555,9 @@ struct Kernel {
bool parallelizable = true;

/// \brief Indicates the level of SIMD instruction support in the host CPU is
/// required to use the function. Currently this is not used, but the
/// intention is for functions to be able to contain multiple kernels with
/// the same signature but different levels of SIMD, so that the most
/// optimized kernel supported on a host's processor can be chosen.
/// required to use the function. The intention is for functions to be able to
/// contain multiple kernels with the same signature but different levels of SIMD,
/// so that the most optimized kernel supported on a host's processor can be chosen.
SimdLevel::type simd_level = SimdLevel::NONE;
};

Expand Down
40 changes: 30 additions & 10 deletions cpp/src/arrow/compute/kernels/aggregate_basic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ struct RoundSizeDefault<uint32_t> {
template <typename ArrowType>
struct SumImplDefault
: public SumImpl<RoundSizeDefault<typename TypeTraits<ArrowType>::CType>::size,
ArrowType> {};
ArrowType, SimdLevel::NONE> {};

template <typename ArrowType>
struct MeanImplDefault
: public MeanImpl<RoundSizeDefault<typename TypeTraits<ArrowType>::CType>::size,
ArrowType> {};
ArrowType, SimdLevel::NONE> {};

std::unique_ptr<KernelState> SumInit(KernelContext* ctx, const KernelInitArgs& args) {
SumLikeInit<SumImplDefault> visitor(ctx, *args.inputs[0].type);
Expand Down Expand Up @@ -341,29 +341,35 @@ std::unique_ptr<KernelState> MinMaxInit(KernelContext* ctx, const KernelInitArgs
}

void AddAggKernel(std::shared_ptr<KernelSignature> sig, KernelInit init,
ScalarAggregateFunction* func) {
DCHECK_OK(func->AddKernel(ScalarAggregateKernel(std::move(sig), init, AggregateConsume,
AggregateMerge, AggregateFinalize)));
ScalarAggregateFunction* func,
SimdLevel::type simd_level = SimdLevel::NONE) {
ScalarAggregateKernel kernel(std::move(sig), init, AggregateConsume, AggregateMerge,
AggregateFinalize);
// Set the simd level
kernel.simd_level = simd_level;
DCHECK_OK(func->AddKernel(kernel));
}

void AddBasicAggKernels(KernelInit init,
const std::vector<std::shared_ptr<DataType>>& types,
std::shared_ptr<DataType> out_ty, ScalarAggregateFunction* func) {
std::shared_ptr<DataType> out_ty, ScalarAggregateFunction* func,
SimdLevel::type simd_level) {
for (const auto& ty : types) {
// array[InT] -> scalar[OutT]
auto sig = KernelSignature::Make({InputType::Array(ty)}, ValueDescr::Scalar(out_ty));
AddAggKernel(std::move(sig), init, func);
AddAggKernel(std::move(sig), init, func, simd_level);
}
}

void AddMinMaxKernels(KernelInit init,
const std::vector<std::shared_ptr<DataType>>& types,
ScalarAggregateFunction* func) {
ScalarAggregateFunction* func,
SimdLevel::type simd_level = SimdLevel::NONE) {
for (const auto& ty : types) {
// array[T] -> scalar[struct<min: T, max: T>]
auto out_ty = struct_({field("min", ty), field("max", ty)});
auto sig = KernelSignature::Make({InputType::Array(ty)}, ValueDescr::Scalar(out_ty));
AddAggKernel(std::move(sig), init, func);
AddAggKernel(std::move(sig), init, func, simd_level);
}
}

Expand All @@ -375,7 +381,7 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
auto func = std::make_shared<ScalarAggregateFunction>("count", Arity::Unary(),
&default_count_options);

/// Takes any array input, outputs int64 scalar
// Takes any array input, outputs int64 scalar
InputType any_array(ValueDescr::ARRAY);
aggregate::AddAggKernel(KernelSignature::Make({any_array}, ValueDescr::Scalar(int64())),
aggregate::CountInit, func.get());
Expand All @@ -389,12 +395,26 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
func.get());
aggregate::AddBasicAggKernels(aggregate::SumInit, FloatingPointTypes(), float64(),
func.get());
// Add the SIMD variants for sum
#if defined(ARROW_HAVE_RUNTIME_AVX2)
aggregate::AddSumAvx2AggKernels(func.get());
#endif
#if defined(ARROW_HAVE_RUNTIME_AVX512)
aggregate::AddSumAvx512AggKernels(func.get());
#endif
DCHECK_OK(registry->AddFunction(std::move(func)));

func = std::make_shared<ScalarAggregateFunction>("mean", Arity::Unary());
aggregate::AddBasicAggKernels(aggregate::MeanInit, {boolean()}, float64(), func.get());
aggregate::AddBasicAggKernels(aggregate::MeanInit, NumericTypes(), float64(),
func.get());
// Add the SIMD variants for mean
#if defined(ARROW_HAVE_RUNTIME_AVX2)
aggregate::AddMeanAvx2AggKernels(func.get());
#endif
#if defined(ARROW_HAVE_RUNTIME_AVX512)
aggregate::AddMeanAvx512AggKernels(func.get());
#endif
DCHECK_OK(registry->AddFunction(std::move(func)));

static auto default_minmax_options = MinMaxOptions::Defaults();
Expand Down
30 changes: 19 additions & 11 deletions cpp/src/arrow/compute/kernels/aggregate_basic_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,23 @@ struct ScalarAggregator : public KernelState {

void AddBasicAggKernels(KernelInit init,
const std::vector<std::shared_ptr<DataType>>& types,
std::shared_ptr<DataType> out_ty, ScalarAggregateFunction* func);
std::shared_ptr<DataType> out_ty, ScalarAggregateFunction* func,
SimdLevel::type simd_level = SimdLevel::NONE);

// SIMD variants for kernels
void AddSumAvx2AggKernels(ScalarAggregateFunction* func);
void AddMeanAvx2AggKernels(ScalarAggregateFunction* func);

void AddSumAvx512AggKernels(ScalarAggregateFunction* func);
void AddMeanAvx512AggKernels(ScalarAggregateFunction* func);

// ----------------------------------------------------------------------
// Sum implementation

template <int64_t kRoundSize, typename ArrowType>
template <int64_t kRoundSize, typename ArrowType, SimdLevel::type simd_level>
struct SumState {
using SumType = typename FindAccumulatorType<ArrowType>::Type;
using ThisType = SumState<kRoundSize, ArrowType>;
using ThisType = SumState<kRoundSize, ArrowType, simd_level>;
using T = typename TypeTraits<ArrowType>::CType;
using ArrayType = typename TypeTraits<ArrowType>::ArrayType;

Expand Down Expand Up @@ -203,10 +211,10 @@ struct SumState {
}
};

template <int64_t kRoundSize>
struct SumState<kRoundSize, BooleanType> {
template <int64_t kRoundSize, SimdLevel::type simd_level>
struct SumState<kRoundSize, BooleanType, simd_level> {
using SumType = typename FindAccumulatorType<BooleanType>::Type;
using ThisType = SumState<kRoundSize, BooleanType>;
using ThisType = SumState<kRoundSize, BooleanType, simd_level>;

ThisType& operator+=(const ThisType& rhs) {
this->count += rhs.count;
Expand All @@ -225,10 +233,10 @@ struct SumState<kRoundSize, BooleanType> {
typename SumType::c_type sum = 0;
};

template <uint64_t kRoundSize, typename ArrowType>
template <uint64_t kRoundSize, typename ArrowType, SimdLevel::type simd_level>
struct SumImpl : public ScalarAggregator {
using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
using ThisType = SumImpl<kRoundSize, ArrowType>;
using ThisType = SumImpl<kRoundSize, ArrowType, simd_level>;
using SumType = typename FindAccumulatorType<ArrowType>::Type;
using OutputType = typename TypeTraits<SumType>::ScalarType;

Expand All @@ -249,11 +257,11 @@ struct SumImpl : public ScalarAggregator {
}
}

SumState<kRoundSize, ArrowType> state;
SumState<kRoundSize, ArrowType, simd_level> state;
};

template <int64_t kRoundSize, typename ArrowType>
struct MeanImpl : public SumImpl<kRoundSize, ArrowType> {
template <int64_t kRoundSize, typename ArrowType, SimdLevel::type simd_level>
struct MeanImpl : public SumImpl<kRoundSize, ArrowType, simd_level> {
void Finalize(KernelContext*, Datum* out) override {
const bool is_valid = this->state.count > 0;
const double divisor = static_cast<double>(is_valid ? this->state.count : 1UL);
Expand Down
39 changes: 14 additions & 25 deletions cpp/src/arrow/compute/kernels/aggregate_sum_avx2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ struct RoundSizeAvx2<uint32_t> {
template <typename ArrowType>
struct SumImplAvx2
: public SumImpl<RoundSizeAvx2<typename TypeTraits<ArrowType>::CType>::size,
ArrowType> {};
ArrowType, SimdLevel::AVX2> {};

template <typename ArrowType>
struct MeanImplAvx2
: public MeanImpl<RoundSizeAvx2<typename TypeTraits<ArrowType>::CType>::size,
ArrowType> {};
ArrowType, SimdLevel::AVX2> {};

std::unique_ptr<KernelState> SumInitAvx2(KernelContext* ctx, const KernelInitArgs& args) {
SumLikeInit<SumImplAvx2> visitor(ctx, *args.inputs[0].type);
Expand All @@ -67,31 +67,20 @@ std::unique_ptr<KernelState> MeanInitAvx2(KernelContext* ctx,
return visitor.Create();
}

} // namespace aggregate

namespace internal {

void RegisterScalarAggregateSumAvx2(FunctionRegistry* registry) {
auto func = std::make_shared<ScalarAggregateFunction>("sum", Arity::Unary());
aggregate::AddBasicAggKernels(aggregate::SumInitAvx2, {boolean()}, int64(), func.get());
aggregate::AddBasicAggKernels(aggregate::SumInitAvx2, SignedIntTypes(), int64(),
func.get());
aggregate::AddBasicAggKernels(aggregate::SumInitAvx2, UnsignedIntTypes(), uint64(),
func.get());
aggregate::AddBasicAggKernels(aggregate::SumInitAvx2, FloatingPointTypes(), float64(),
func.get());
// Register the override AVX2 version
DCHECK_OK(registry->AddFunction(std::move(func), /*allow_overwrite=*/true));
void AddSumAvx2AggKernels(ScalarAggregateFunction* func) {
AddBasicAggKernels(SumInitAvx2, internal::SignedIntTypes(), int64(), func,
SimdLevel::AVX2);
AddBasicAggKernels(SumInitAvx2, internal::UnsignedIntTypes(), uint64(), func,
SimdLevel::AVX2);
AddBasicAggKernels(SumInitAvx2, internal::FloatingPointTypes(), float64(), func,
SimdLevel::AVX2);
}

func = std::make_shared<ScalarAggregateFunction>("mean", Arity::Unary());
aggregate::AddBasicAggKernels(aggregate::MeanInitAvx2, {boolean()}, float64(),
func.get());
aggregate::AddBasicAggKernels(aggregate::MeanInitAvx2, NumericTypes(), float64(),
func.get());
// Register the override AVX2 version
DCHECK_OK(registry->AddFunction(std::move(func), /*allow_overwrite=*/true));
void AddMeanAvx2AggKernels(ScalarAggregateFunction* func) {
AddBasicAggKernels(MeanInitAvx2, internal::NumericTypes(), float64(), func,
SimdLevel::AVX2);
}

} // namespace internal
} // namespace aggregate
} // namespace compute
} // namespace arrow
40 changes: 14 additions & 26 deletions cpp/src/arrow/compute/kernels/aggregate_sum_avx512.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ struct RoundSizeAvx512<uint32_t> {
template <typename ArrowType>
struct SumImplAvx512
: public SumImpl<RoundSizeAvx512<typename TypeTraits<ArrowType>::CType>::size,
ArrowType> {};
ArrowType, SimdLevel::AVX512> {};

template <typename ArrowType>
struct MeanImplAvx512
: public MeanImpl<RoundSizeAvx512<typename TypeTraits<ArrowType>::CType>::size,
ArrowType> {};
ArrowType, SimdLevel::AVX512> {};

std::unique_ptr<KernelState> SumInitAvx512(KernelContext* ctx,
const KernelInitArgs& args) {
Expand All @@ -68,32 +68,20 @@ std::unique_ptr<KernelState> MeanInitAvx512(KernelContext* ctx,
return visitor.Create();
}

} // namespace aggregate

namespace internal {

void RegisterScalarAggregateSumAvx512(FunctionRegistry* registry) {
auto func = std::make_shared<ScalarAggregateFunction>("sum", Arity::Unary());
aggregate::AddBasicAggKernels(aggregate::SumInitAvx512, {boolean()}, int64(),
func.get());
aggregate::AddBasicAggKernels(aggregate::SumInitAvx512, SignedIntTypes(), int64(),
func.get());
aggregate::AddBasicAggKernels(aggregate::SumInitAvx512, UnsignedIntTypes(), uint64(),
func.get());
aggregate::AddBasicAggKernels(aggregate::SumInitAvx512, FloatingPointTypes(), float64(),
func.get());
// Register the override AVX512 version
DCHECK_OK(registry->AddFunction(std::move(func), /*allow_overwrite=*/true));
void AddSumAvx512AggKernels(ScalarAggregateFunction* func) {
AddBasicAggKernels(SumInitAvx512, internal::SignedIntTypes(), int64(), func,
SimdLevel::AVX512);
AddBasicAggKernels(SumInitAvx512, internal::UnsignedIntTypes(), uint64(), func,
SimdLevel::AVX512);
AddBasicAggKernels(SumInitAvx512, internal::FloatingPointTypes(), float64(), func,
SimdLevel::AVX512);
}

func = std::make_shared<ScalarAggregateFunction>("mean", Arity::Unary());
aggregate::AddBasicAggKernels(aggregate::MeanInitAvx512, {boolean()}, float64(),
func.get());
aggregate::AddBasicAggKernels(aggregate::MeanInitAvx512, NumericTypes(), float64(),
func.get());
// Register the override AVX512 version
DCHECK_OK(registry->AddFunction(std::move(func), /*allow_overwrite=*/true));
void AddMeanAvx512AggKernels(ScalarAggregateFunction* func) {
aggregate::AddBasicAggKernels(MeanInitAvx512, internal::NumericTypes(), float64(), func,
SimdLevel::AVX512);
}

} // namespace internal
} // namespace aggregate
} // namespace compute
} // namespace arrow
8 changes: 5 additions & 3 deletions cpp/src/arrow/compute/kernels/aggregate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,9 @@ class TestRandomNumericSumKernel : public ::testing::Test {};
TYPED_TEST_SUITE(TestRandomNumericSumKernel, NumericArrowTypes);
TYPED_TEST(TestRandomNumericSumKernel, RandomArraySum) {
auto rand = random::RandomArrayGenerator(0x5487655);
for (size_t i = 3; i < 10; i += 2) {
for (auto null_probability : {0.0, 0.1, 0.5, 1.0}) {
// Test size up to 1<<13 (8192).
for (size_t i = 3; i < 14; i += 2) {
for (auto null_probability : {0.0, 0.001, 0.1, 0.5, 0.999, 1.0}) {
for (auto length_adjust : {-2, -1, 0, 1, 2}) {
int64_t length = (1UL << i) + length_adjust;
auto array = rand.Numeric<TypeParam>(length, 0, 100, null_probability);
Expand Down Expand Up @@ -389,8 +390,9 @@ class TestRandomNumericMeanKernel : public ::testing::Test {};
TYPED_TEST_SUITE(TestRandomNumericMeanKernel, NumericArrowTypes);
TYPED_TEST(TestRandomNumericMeanKernel, RandomArrayMean) {
auto rand = random::RandomArrayGenerator(0x8afc055);
// Test size up to 1<<13 (8192).
for (size_t i = 3; i < 14; i += 2) {
for (auto null_probability : {0.0, 0.1, 0.5, 1.0}) {
for (auto null_probability : {0.0, 0.001, 0.1, 0.5, 0.999, 1.0}) {
for (auto length_adjust : {-2, -1, 0, 1, 2}) {
int64_t length = (1UL << i) + length_adjust;
auto array = rand.Numeric<TypeParam>(length, 0, 100, null_probability);
Expand Down
Loading

0 comments on commit 6efba62

Please sign in to comment.