Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: map_rect_concurrent with parallel STL backends #1085

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions stan/math/parallel/for_each.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#ifndef STAN_MATH_PARALLEL_FOR_EACH_HPP
#define STAN_MATH_PARALLEL_FOR_EACH_HPP

#ifdef STAN_PSTL_CPP17

// Standard C++17 parallel facilities
#include <execution>
#include <numeric>
#include <algorithm>

#define std_par std

#elif STAN_PSTL_INTEL

// Intel Parallel STL
#include <pstl/execution>
#include <pstl/numeric>
#include <pstl/algorithm>

#define std_par std

#else

#include <stan/math/parallel/get_num_threads.hpp>

// fall-back solution which is based on C++11 features only (async)
#include <algorithm>
#include <type_traits>
#include <iterator>

#include <vector>
#include <thread>
#include <future>

#define std_par stan::math::internal

namespace stan {
namespace math {
namespace internal {

namespace execution {

// sequential execution policy:
struct sequenced_policy {
typedef std::false_type parallel_mode;
typedef std::false_type unsequenced_mode;
};

// parallel execution policy:
struct parallel_policy {
typedef std::true_type parallel_mode;
typedef std::false_type unsequenced_mode;
};
struct parallel_unsequenced_policy {
typedef std::true_type parallel_mode;
typedef std::true_type unsequenced_mode;
};

template <class T>
struct is_execution_policy : std::false_type {};
template <>
struct is_execution_policy<sequenced_policy> : std::true_type {};
template <>
struct is_execution_policy<parallel_policy> : std::true_type {};
template <>
struct is_execution_policy<parallel_unsequenced_policy> : std::true_type {};

// global execution policy objects:
static constexpr sequenced_policy seq{};
static constexpr parallel_policy par{};
static constexpr parallel_unsequenced_policy par_unseq{};

} // namespace execution

template <class InputIt, class UnaryFunction>
UnaryFunction for_each_impl(InputIt first, InputIt last, UnaryFunction f,
std::true_type /* parallel mode */) {
const int num_jobs = std::distance(first, last);
const int num_threads = std::min(get_num_threads(), num_jobs);
const int small_job_size = num_jobs / num_threads;
const int num_big_jobs = num_jobs % num_threads;

std::vector<int> job_chunk_size(num_threads, small_job_size);

for (std::size_t i = 0; i < num_big_jobs; ++i)
++job_chunk_size[num_threads - i - 1];

std::vector<std::future<void> > job_futures;

InputIt cur_job_start = first;
for (std::size_t i = 0; i < num_threads; ++i) {
InputIt cur_job_end = cur_job_start;
std::advance(cur_job_end, job_chunk_size[i]);
job_futures.emplace_back(std::async(
i == 0 ? std::launch::deferred : std::launch::async, [=]() -> void {
std::for_each<InputIt, UnaryFunction>(cur_job_start, cur_job_end, f);
}));
cur_job_start = cur_job_end;
}

for (std::size_t i = 0; i < num_threads; ++i)
job_futures[i].wait();

return std::move(f);
}

template <class InputIt, class UnaryFunction>
UnaryFunction for_each_impl(InputIt first, InputIt last, UnaryFunction f,
std::false_type /* parallel mode */) {
return std::for_each(first, last, f);
}

/**
* C++11 based implementation of the C++17 for_each which supports
* sequenced and parallel execution policies, see
* https://en.cppreference.com/w/cpp/algorithm/for_each for details.
*
* While the C++17 does not allow to control the number of threads,
* this version creates at most STAN_NUM_THREADS concurrent threads
* (which is defined in the environment of the running program).
*
* Note: The implemented parallel version should only be used with
* random-access iterators.
*/
template <class ExecutionPolicy, class InputIt, class UnaryFunction>
UnaryFunction for_each(ExecutionPolicy& exec_policy, InputIt first,
InputIt last, UnaryFunction f) {
return for_each_impl(first, last, f,
typename ExecutionPolicy::parallel_mode());
}

} // namespace internal
} // namespace math
} // namespace stan

#endif

#endif
58 changes: 58 additions & 0 deletions stan/math/parallel/get_num_threads.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#ifndef STAN_MATH_PARALLEL_GET_NUM_THREADS_HPP
#define STAN_MATH_PARALLEL_GET_NUM_THREADS_HPP

#include <stan/math/prim/scal/err/invalid_argument.hpp>
#include <boost/lexical_cast.hpp>
#include <thread>
#include <cstdlib>

namespace stan {
namespace math {
namespace internal {
/**
* Get number of threads to use. The function uses the environment
* variable STAN_NUM_THREADS and follows these conventions:
*
* - STAN_NUM_THREADS is not defined => num_threads=1
* - STAN_NUM_THREADS is positive => num_threads is set to the
* specified number
* - STAN_NUM_THREADS is set to -1 => num_threads is the number of
* available cores on the machine
* - STAN_NUM_THREADS < -1, STAN_NUM_THREADS = 0 or STAN_NUM_THREADS is
* not numeric => throws an exception
*
* @return number of threads to use
* @throws std::runtime_error if the value of STAN_NUM_THREADS env. variable
* is invalid
*/
inline int get_num_threads() {
int num_threads = 1;
const char* env_stan_num_threads = std::getenv("STAN_NUM_THREADS");
if (env_stan_num_threads != nullptr) {
try {
const int env_num_threads
= boost::lexical_cast<int>(env_stan_num_threads);
if (env_num_threads > 0)
num_threads = env_num_threads;
else if (env_num_threads == -1)
num_threads = std::thread::hardware_concurrency();
else
invalid_argument("get_num_threads(int)", "STAN_NUM_THREADS",
env_stan_num_threads,
"The STAN_NUM_THREADS environment variable is '",
"' but it must be positive or -1");
} catch (boost::bad_lexical_cast) {
invalid_argument("get_num_threads(int)", "STAN_NUM_THREADS",
env_stan_num_threads,
"The STAN_NUM_THREADS environment variable is '",
"' but it must be a positive number or -1");
}
}
return num_threads;
}

} // namespace internal
} // namespace math
} // namespace stan

#endif
132 changes: 27 additions & 105 deletions stan/math/prim/mat/functor/map_rect_concurrent.hpp
Original file line number Diff line number Diff line change
@@ -1,73 +1,19 @@
#ifndef STAN_MATH_PRIM_MAT_FUNCTOR_MAP_RECT_CONCURRENT_HPP
#define STAN_MATH_PRIM_MAT_FUNCTOR_MAP_RECT_CONCURRENT_HPP

#include <stan/math/parallel/for_each.hpp>
#include <stan/math/prim/mat/fun/typedefs.hpp>

#include <stan/math/prim/mat/functor/map_rect_reduce.hpp>
#include <stan/math/prim/mat/functor/map_rect_combine.hpp>
#include <stan/math/prim/scal/err/invalid_argument.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/iterator/counting_iterator.hpp>

#include <vector>
#include <thread>
#include <future>
#include <cstdlib>

namespace stan {
namespace math {
namespace internal {

/**
* Get number of threads to use for num_jobs jobs. The function uses
* the environment variable STAN_NUM_THREADS and follows these
* conventions:
*
* - STAN_NUM_THREADS is not defined => num_threads=1
* - STAN_NUM_THREADS is positive => num_threads is set to the
* specified number
* - STAN_NUM_THREADS is set to -1 => num_threads is the number of
* available cores on the machine
* - STAN_NUM_THREADS < -1, STAN_NUM_THREADS = 0 or STAN_NUM_THREADS is
* not numeric => throws an exception
*
* Should num_threads exceed the number of jobs, then num_threads will
* be set equal to the number of jobs.
*
* @param num_jobs number of jobs
* @return number of threads to use
* @throws std::runtime_error if the value of STAN_NUM_THREADS env. variable
* is invalid
*/
inline int get_num_threads(int num_jobs) {
int num_threads = 1;
#ifdef STAN_THREADS
const char* env_stan_num_threads = std::getenv("STAN_NUM_THREADS");
if (env_stan_num_threads != nullptr) {
try {
const int env_num_threads
= boost::lexical_cast<int>(env_stan_num_threads);
if (env_num_threads > 0)
num_threads = env_num_threads;
else if (env_num_threads == -1)
num_threads = std::thread::hardware_concurrency();
else
invalid_argument("get_num_threads(int)", "STAN_NUM_THREADS",
env_stan_num_threads,
"The STAN_NUM_THREADS environment variable is '",
"' but it must be positive or -1");
} catch (boost::bad_lexical_cast) {
invalid_argument("get_num_threads(int)", "STAN_NUM_THREADS",
env_stan_num_threads,
"The STAN_NUM_THREADS environment variable is '",
"' but it must be a positive number or -1");
}
}
if (num_threads > num_jobs)
num_threads = num_jobs;
#endif
return num_threads;
}

template <int call_id, typename F, typename T_shared_param,
typename T_job_param>
Eigen::Matrix<typename stan::return_type<T_shared_param, T_job_param>::type,
Expand All @@ -80,66 +26,42 @@ map_rect_concurrent(
const std::vector<std::vector<int>>& x_i, std::ostream* msgs = nullptr) {
typedef map_rect_reduce<F, T_shared_param, T_job_param> ReduceF;
typedef map_rect_combine<F, T_shared_param, T_job_param> CombineF;
typedef boost::counting_iterator<int> count_iter;

#ifdef STAN_THREADS
constexpr std_par::execution::parallel_unsequenced_policy exec_policy
= std_par::execution::par_unseq;
#else
constexpr std_par::execution::sequenced_policy exec_policy
= std_par::execution::seq;
#endif
using std_par::for_each;

const int num_jobs = job_params.size();
const vector_d shared_params_dbl = value_of(shared_params);
std::vector<std::future<std::vector<matrix_d>>> futures;

auto execute_chunk = [&](int start, int size) -> std::vector<matrix_d> {
const int end = start + size;
std::vector<matrix_d> chunk_f_out;
chunk_f_out.reserve(size);
for (int i = start; i != end; i++)
chunk_f_out.push_back(ReduceF()(
shared_params_dbl, value_of(job_params[i]), x_r[i], x_i[i], msgs));
return chunk_f_out;
};
std::vector<int> world_f_out(num_jobs);
std::vector<matrix_d> world_job_output(num_jobs);

int num_threads = get_num_threads(num_jobs);
int num_jobs_per_thread = num_jobs / num_threads;
futures.emplace_back(
std::async(std::launch::deferred, execute_chunk, 0, num_jobs_per_thread));

#ifdef STAN_THREADS
if (num_threads > 1) {
const int num_big_threads
= (num_jobs - num_jobs_per_thread) % (num_threads - 1);
const int first_big_thread = num_threads - num_big_threads;
for (int i = 1, job_start = num_jobs_per_thread, job_size = 0;
i < num_threads; ++i, job_start += job_size) {
job_size = i >= first_big_thread ? num_jobs_per_thread + 1
: num_jobs_per_thread;
futures.emplace_back(
std::async(std::launch::async, execute_chunk, job_start, job_size));
}
}
#endif
for_each(exec_policy, count_iter(0), count_iter(num_jobs), [&](int i) {
world_job_output[i] = ReduceF()(shared_params_dbl, value_of(job_params[i]),
x_r[i], x_i[i], msgs);
world_f_out[i] = world_job_output[i].cols();
});

// collect results
std::vector<int> world_f_out;
world_f_out.reserve(num_jobs);
matrix_d world_output(0, 0);
const int num_world_output
= std::accumulate(world_f_out.begin(), world_f_out.end(), 0);
matrix_d world_output(world_job_output[0].rows(), num_world_output);

int offset = 0;
for (std::size_t i = 0; i < futures.size(); ++i) {
const std::vector<matrix_d>& chunk_result = futures[i].get();
if (i == 0)
world_output.resize(chunk_result[0].rows(),
num_jobs * chunk_result[0].cols());

for (const auto& job_result : chunk_result) {
const int num_job_outputs = job_result.cols();
world_f_out.push_back(num_job_outputs);

if (world_output.cols() < offset + num_job_outputs)
world_output.conservativeResize(Eigen::NoChange,
2 * (offset + num_job_outputs));
for (const auto& job_result : world_job_output) {
const int num_job_outputs = job_result.cols();

world_output.block(0, offset, world_output.rows(), num_job_outputs)
= job_result;
world_output.block(0, offset, world_output.rows(), num_job_outputs)
= job_result;

offset += num_job_outputs;
}
offset += num_job_outputs;
}
CombineF combine(shared_params, job_params);
return combine(world_output, world_f_out);
Expand Down
Loading