Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-21.12' into catch-oom
Browse files Browse the repository at this point in the history
  • Loading branch information
rongou committed Oct 27, 2021
2 parents 79ecb49 + 306e42f commit c6c0eb7
Show file tree
Hide file tree
Showing 14 changed files with 213 additions and 70 deletions.
11 changes: 8 additions & 3 deletions cpp/include/cudf/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,20 +503,25 @@ std::unique_ptr<Base> make_merge_m2_aggregation();
*
* Compute covariance between two columns.
* The input columns are child columns of a non-nullable struct columns.
* @param min_periods Minimum number of non-null observations required to produce a result.
* @param ddof Delta Degrees of Freedom. The divisor used in calculations is N - ddof, where N is
* the number of non-null observations.
*/
template <typename Base = aggregation>
std::unique_ptr<Base> make_covariance_aggregation();
std::unique_ptr<Base> make_covariance_aggregation(size_type min_periods = 1, size_type ddof = 1);

/**
* @brief Factory to create a CORRELATION aggregation
*
* Compute correlation coefficient between two columns.
* The input columns are child columns of a non-nullable struct columns.
*
* @param[in] type: correlation_type
* @param type correlation_type
* @param min_periods Minimum number of non-null observations required to produce a result.
*/
template <typename Base = aggregation>
std::unique_ptr<Base> make_correlation_aggregation(correlation_type type);
std::unique_ptr<Base> make_correlation_aggregation(correlation_type type,
size_type min_periods = 1);

/**
* @brief Factory to create a TDIGEST aggregation
Expand Down
26 changes: 23 additions & 3 deletions cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,14 @@ class merge_m2_aggregation final : public groupby_aggregation {
*/
class covariance_aggregation final : public groupby_aggregation {
public:
explicit covariance_aggregation() : aggregation{COVARIANCE} {}
explicit covariance_aggregation(size_type min_periods, size_type ddof)
: aggregation{COVARIANCE}, _min_periods{min_periods}, _ddof(ddof)
{
}
size_type _min_periods;
size_type _ddof;

size_t do_hash() const override { return this->aggregation::do_hash() ^ hash_impl(); }

std::unique_ptr<aggregation> clone() const override
{
Expand All @@ -913,15 +920,25 @@ class covariance_aggregation final : public groupby_aggregation {
return collector.visit(col_type, *this);
}
void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }

protected:
size_t hash_impl() const
{
return std::hash<size_type>{}(_min_periods) ^ std::hash<size_type>{}(_ddof);
}
};

/**
* @brief Derived aggregation class for specifying CORRELATION aggregation
*/
class correlation_aggregation final : public groupby_aggregation {
public:
explicit correlation_aggregation(correlation_type type) : aggregation{CORRELATION}, _type{type} {}
explicit correlation_aggregation(correlation_type type, size_type min_periods)
: aggregation{CORRELATION}, _type{type}, _min_periods{min_periods}
{
}
correlation_type _type;
size_type _min_periods;

bool is_equal(aggregation const& _other) const override
{
Expand All @@ -944,7 +961,10 @@ class correlation_aggregation final : public groupby_aggregation {
void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }

protected:
size_t hash_impl() const { return std::hash<int>{}(static_cast<int>(_type)); }
size_t hash_impl() const
{
return std::hash<int>{}(static_cast<int>(_type)) ^ std::hash<size_type>{}(_min_periods);
}
};

/**
Expand Down
18 changes: 10 additions & 8 deletions cpp/src/aggregation/aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -713,23 +713,25 @@ template std::unique_ptr<groupby_aggregation> make_merge_m2_aggregation<groupby_

/// Factory to create a COVARIANCE aggregation
template <typename Base>
std::unique_ptr<Base> make_covariance_aggregation()
std::unique_ptr<Base> make_covariance_aggregation(size_type min_periods, size_type ddof)
{
return std::make_unique<detail::covariance_aggregation>();
return std::make_unique<detail::covariance_aggregation>(min_periods, ddof);
}
template std::unique_ptr<aggregation> make_covariance_aggregation<aggregation>();
template std::unique_ptr<groupby_aggregation> make_covariance_aggregation<groupby_aggregation>();
template std::unique_ptr<aggregation> make_covariance_aggregation<aggregation>(
size_type min_periods, size_type ddof);
template std::unique_ptr<groupby_aggregation> make_covariance_aggregation<groupby_aggregation>(
size_type min_periods, size_type ddof);

/// Factory to create a CORRELATION aggregation
template <typename Base>
std::unique_ptr<Base> make_correlation_aggregation(correlation_type type)
std::unique_ptr<Base> make_correlation_aggregation(correlation_type type, size_type min_periods)
{
return std::make_unique<detail::correlation_aggregation>(type);
return std::make_unique<detail::correlation_aggregation>(type, min_periods);
}
template std::unique_ptr<aggregation> make_correlation_aggregation<aggregation>(
correlation_type type);
correlation_type type, size_type min_periods);
template std::unique_ptr<groupby_aggregation> make_correlation_aggregation<groupby_aggregation>(
correlation_type type);
correlation_type type, size_type min_periods);

template <typename Base>
std::unique_ptr<Base> make_tdigest_aggregation(int max_centroids)
Expand Down
50 changes: 29 additions & 21 deletions cpp/src/groupby/sort/aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ void aggregate_result_functor::operator()<aggregation::COVARIANCE>(aggregation c
CUDF_EXPECTS(values.num_children() == 2,
"Input to `groupby covariance` must be a structs column having 2 children columns.");

auto const& cov_agg = dynamic_cast<cudf::detail::covariance_aggregation const&>(agg);
// Covariance only for valid values in both columns.
// in non-identical null mask cases, this prevents caching of the results - STD, MEAN, COUNT.
auto [_, values_child0, values_child1] =
Expand All @@ -596,6 +597,8 @@ void aggregate_result_functor::operator()<aggregation::COVARIANCE>(aggregation c
count,
mean0,
mean1,
cov_agg._min_periods,
cov_agg._ddof,
stream,
mr));
};
Expand Down Expand Up @@ -629,28 +632,33 @@ void aggregate_result_functor::operator()<aggregation::CORRELATION>(aggregation
aggregate_result_functor(values_child0, helper, cache, stream, mr).operator()<aggregation::STD>(*std_agg);
aggregate_result_functor(values_child1, helper, cache, stream, mr).operator()<aggregation::STD>(*std_agg);

auto const stddev0 = cache.get_result(values_child0, *std_agg);
auto const stddev1 = cache.get_result(values_child1, *std_agg);

auto mean_agg = make_mean_aggregation();
auto const mean0 = cache.get_result(values_child0, *mean_agg);
auto const mean1 = cache.get_result(values_child1, *mean_agg);
auto count_agg = make_count_aggregation();
auto const count = cache.get_result(values_child0, *count_agg);

// Compute covariance here to avoid repeated computation of mean & count
auto cov_agg = make_covariance_aggregation();
cache.add_result(values,
*cov_agg,
detail::group_covariance(get_grouped_values().child(0),
get_grouped_values().child(1),
helper.group_labels(stream),
helper.num_groups(stream),
count,
mean0,
mean1,
stream,
mr));
auto cov_agg = make_covariance_aggregation(corr_agg._min_periods);
if (not cache.has_result(values, *cov_agg)) {
auto mean_agg = make_mean_aggregation();
auto const mean0 = cache.get_result(values_child0, *mean_agg);
auto const mean1 = cache.get_result(values_child1, *mean_agg);
auto count_agg = make_count_aggregation();
auto const count = cache.get_result(values_child0, *count_agg);

auto const& cov_agg_obj = dynamic_cast<cudf::detail::covariance_aggregation const&>(*cov_agg);
cache.add_result(values,
*cov_agg,
detail::group_covariance(get_grouped_values().child(0),
get_grouped_values().child(1),
helper.group_labels(stream),
helper.num_groups(stream),
count,
mean0,
mean1,
cov_agg_obj._min_periods,
cov_agg_obj._ddof,
stream,
mr));
}

auto const stddev0 = cache.get_result(values_child0, *std_agg);
auto const stddev1 = cache.get_result(values_child1, *std_agg);
auto const covariance = cache.get_result(values, *cov_agg);
cache.add_result(
values, agg, detail::group_correlation(covariance, stddev0, stddev1, stream, mr));
Expand Down
15 changes: 11 additions & 4 deletions cpp/src/groupby/sort/group_correlation.cu
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ std::unique_ptr<column> group_covariance(column_view const& values_0,
column_view const& count,
column_view const& mean_0,
column_view const& mean_1,
size_type min_periods,
size_type ddof,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
Expand Down Expand Up @@ -140,8 +142,13 @@ std::unique_ptr<column> group_covariance(column_view const& values_0,

auto d_values_0 = column_device_view::create(values_0, stream);
auto d_values_1 = column_device_view::create(values_1, stream);
covariance_transform<result_type> covariance_transform_op{
*d_values_0, *d_values_1, mean0_ptr, mean1_ptr, count.data<size_type>(), group_labels.begin()};
covariance_transform<result_type> covariance_transform_op{*d_values_0,
*d_values_1,
mean0_ptr,
mean1_ptr,
count.data<size_type>(),
group_labels.begin(),
ddof};

auto result = make_numeric_column(
data_type(type_to_id<result_type>()), num_groups, mask_state::UNALLOCATED, stream, mr);
Expand All @@ -157,8 +164,8 @@ std::unique_ptr<column> group_covariance(column_view const& values_0,
thrust::make_discard_iterator(),
d_result);

auto is_null = [ddof = covariance_transform_op.ddof] __device__(size_type group_size) {
return not(group_size == 0 or group_size - ddof <= 0);
auto is_null = [ddof, min_periods] __device__(size_type group_size) {
return not(group_size == 0 or group_size - ddof <= 0 or group_size < min_periods);
};
auto [new_nullmask, null_count] =
cudf::detail::valid_if(count.begin<size_type>(), count.end<size_type>(), is_null, stream, mr);
Expand Down
6 changes: 2 additions & 4 deletions cpp/src/groupby/sort/group_rank_scan.cu
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,12 @@ std::unique_ptr<column> rank_generator(column_view const& order_by,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto const superimposed = structs::detail::superimpose_parent_nulls(order_by, stream, mr);
table_view const order_table{{std::get<0>(superimposed)}};
auto const flattened = cudf::structs::detail::flatten_nested_columns(
order_table, {}, {}, structs::detail::column_nullability::MATCH_INCOMING);
table_view{{order_by}}, {}, {}, structs::detail::column_nullability::MATCH_INCOMING);
auto const d_flat_order = table_device_view::create(flattened, stream);
row_equality_comparator<has_nulls> comparator(*d_flat_order, *d_flat_order, true);
auto ranks = make_fixed_width_column(data_type{type_to_id<size_type>()},
order_table.num_rows(),
flattened.flattened_columns().num_rows(),
mask_state::UNALLOCATED,
stream,
mr);
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/groupby/sort/group_reductions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,8 @@ std::unique_ptr<column> group_merge_m2(column_view const& values,
* @param count The count of valid rows of the grouped values of both columns
* @param mean_0 The mean of the first grouped values column
* @param mean_1 The mean of the second grouped values column
* @param min_periods The minimum number of non-null rows required to consider the covariance
* @param ddof The delta degrees of freedom used in the calculation of the variance
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
*/
Expand All @@ -461,6 +463,8 @@ std::unique_ptr<column> group_covariance(column_view const& values_0,
column_view const& count,
column_view const& mean_0,
column_view const& mean_1,
size_type min_periods,
size_type ddof,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

Expand Down
6 changes: 2 additions & 4 deletions cpp/src/reductions/scan/rank_scan.cu
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,12 @@ std::unique_ptr<column> rank_generator(column_view const& order_by,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto const superimposed = structs::detail::superimpose_parent_nulls(order_by, stream, mr);
table_view const order_table{{std::get<0>(superimposed)}};
auto const flattened = cudf::structs::detail::flatten_nested_columns(
order_table, {}, {}, structs::detail::column_nullability::MATCH_INCOMING);
table_view{{order_by}}, {}, {}, structs::detail::column_nullability::MATCH_INCOMING);
auto const d_flat_order = table_device_view::create(flattened, stream);
row_equality_comparator<has_nulls> comparator(*d_flat_order, *d_flat_order, true);
auto ranks = make_fixed_width_column(data_type{type_to_id<size_type>()},
order_table.num_rows(),
flattened.flattened_columns().num_rows(),
mask_state::UNALLOCATED,
stream,
mr);
Expand Down
13 changes: 8 additions & 5 deletions cpp/src/strings/regex/regcomp.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2019-2021, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -567,11 +567,11 @@ class regex_compiler {
case LBRA: /* must have been RBRA */
op1 = popand('(');
id_inst2 = m_prog.add_inst(RBRA);
m_prog.inst_at(id_inst2).u1.subid = ator.subid; // subidstack[subidstack.size()-1];
m_prog.inst_at(id_inst2).u1.subid = ator.subid;
m_prog.inst_at(op1.id_last).u2.next_id = id_inst2;
id_inst1 = m_prog.add_inst(LBRA);
m_prog.inst_at(id_inst1).u1.subid = ator.subid; // subidstack[subidstack.size() - 1];
m_prog.inst_at(id_inst1).u2.next_id = op1.id_first;
m_prog.inst_at(id_inst1).u1.subid = ator.subid;
m_prog.inst_at(id_inst1).u2.next_id = op1.id_first;
pushand(id_inst1, id_inst2);
return;
case OR:
Expand Down Expand Up @@ -826,7 +826,8 @@ reprog reprog::create_from(const char32_t* pattern)
{
reprog rtn;
regex_compiler compiler(pattern, ANY, rtn); // future feature: ANYNL
// rtn->print();
// for debugging, it can be helpful to call rtn.print() here to dump
// out the instructions that have been created from the given pattern
return rtn;
}

Expand Down Expand Up @@ -912,6 +913,7 @@ void reprog::optimize2()
_startinst_ids.push_back(-1); // terminator mark
}

#ifndef NDEBUG
void reprog::print()
{
printf("Instructions:\n");
Expand Down Expand Up @@ -992,6 +994,7 @@ void reprog::print()
}
if (_num_capturing_groups) printf("Number of capturing groups: %d\n", _num_capturing_groups);
}
#endif

} // namespace detail
} // namespace strings
Expand Down
10 changes: 6 additions & 4 deletions cpp/src/strings/regex/regex.inl
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,10 @@ __device__ inline int32_t reprog_device::regexec(
{
int32_t match = 0;
auto checkstart = jnk.starttype;
auto txtlen = dstr.length();
auto pos = begin;
auto eos = end;
char32_t c = 0;
auto last_character = false;
string_view::const_iterator itr = string_view::const_iterator(dstr, pos);

jnk.list1->reset();
Expand Down Expand Up @@ -235,7 +235,9 @@ __device__ inline int32_t reprog_device::regexec(
jnk.list1->activate(ids[i++], (group_id == 0 ? pos : -1), -1);
}

c = static_cast<char32_t>(pos >= txtlen ? 0 : *itr);
last_character = (pos >= dstr.length());

c = static_cast<char32_t>(last_character ? 0 : *itr);

// expand LBRA, RBRA, BOL, EOL, BOW, NBOW, and OR
bool expanded = false;
Expand Down Expand Up @@ -274,7 +276,7 @@ __device__ inline int32_t reprog_device::regexec(
}
break;
case EOL:
if ((c == 0) || (inst->u1.c == '$' && c == '\n')) {
if (last_character || (inst->u1.c == '$' && c == '\n')) {
id_activate = inst->u2.next_id;
expanded = true;
}
Expand Down Expand Up @@ -360,7 +362,7 @@ __device__ inline int32_t reprog_device::regexec(
++itr;
swaplist(jnk.list1, jnk.list2);
checkstart = jnk.list1->size > 0 ? 0 : 1;
} while (c && (jnk.list1->size > 0 || match == 0));
} while (!last_character && (jnk.list1->size > 0 || match == 0));

return match;
}
Expand Down
Loading

0 comments on commit c6c0eb7

Please sign in to comment.