Skip to content

Commit

Permalink
Merge pull request oatpp#117 from oatpp/async_executor_suggest_thread…
Browse files Browse the repository at this point in the history
…s_count

Feature async::Executor. Suggest threads count.
  • Loading branch information
lganzzzo committed Sep 2, 2019
2 parents 549ba11 + 693f1a1 commit 387c777
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 30 deletions.
3 changes: 0 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ option(OATPP_DISABLE_POOL_ALLOCATIONS "This will make oatpp::base::memory::Memor

set(OATPP_THREAD_HARDWARE_CONCURRENCY "AUTO" CACHE STRING "Predefined value for function oatpp::concurrency::Thread::getHardwareConcurrency()")
set(OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT "10" CACHE STRING "Number of shards of ThreadDistributedMemoryPool")
set(OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT "2" CACHE STRING "oatpp::async::Executor default number of threads")

option(OATPP_COMPAT_BUILD_NO_THREAD_LOCAL "Disable 'thread_local' feature" OFF)

Expand All @@ -43,7 +42,6 @@ message("OATPP_DISABLE_ENV_OBJECT_COUNTERS=${OATPP_DISABLE_ENV_OBJECT_COUNTERS}"
message("OATPP_DISABLE_POOL_ALLOCATIONS=${OATPP_DISABLE_POOL_ALLOCATIONS}")
message("OATPP_THREAD_HARDWARE_CONCURRENCY=${OATPP_THREAD_HARDWARE_CONCURRENCY}")
message("OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT=${OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT}")
message("OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT=${OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT}")

message("OATPP_COMPAT_BUILD_NO_THREAD_LOCAL=${OATPP_COMPAT_BUILD_NO_THREAD_LOCAL}")

Expand All @@ -64,7 +62,6 @@ endif()

add_definitions (
-DOATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT=${OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT}
-DOATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT=${OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT}
)

if(OATPP_COMPAT_BUILD_NO_THREAD_LOCAL)
Expand Down
39 changes: 36 additions & 3 deletions src/oatpp/core/async/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,14 @@ void Executor::SubmissionProcessor::detach() {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Executor

const v_int32 Executor::THREAD_NUM_DEFAULT = OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT;

Executor::Executor(v_int32 processorWorkersCount, v_int32 ioWorkersCount, v_int32 timerWorkersCount, bool useIOEventWorker)
: m_balancer(0)
{

processorWorkersCount = chooseProcessorWorkersCount(processorWorkersCount);
ioWorkersCount = chooseIOWorkersCount(processorWorkersCount, ioWorkersCount);
timerWorkersCount = chooseTimerWorkersCount(timerWorkersCount);

for(v_int32 i = 0; i < processorWorkersCount; i ++) {
m_processorWorkers.push_back(std::make_shared<SubmissionProcessor>());
}
Expand Down Expand Up @@ -112,7 +114,38 @@ Executor::Executor(v_int32 processorWorkersCount, v_int32 ioWorkersCount, v_int3

}

Executor::~Executor() {
v_int32 Executor::chooseProcessorWorkersCount(v_int32 processorWorkersCount) {
if(processorWorkersCount >= 1) {
return processorWorkersCount;
}
if(processorWorkersCount == VALUE_SUGGESTED) {
return oatpp::concurrency::getHardwareConcurrency();
}
throw std::runtime_error("[oatpp::async::Executor::chooseProcessorWorkersCount()]: Error. Invalid processor workers count specified.");
}

v_int32 Executor::chooseIOWorkersCount(v_int32 processorWorkersCount, v_int32 ioWorkersCount) {
if(ioWorkersCount >= 1) {
return ioWorkersCount;
}
if(ioWorkersCount == VALUE_SUGGESTED) {
v_int32 count = processorWorkersCount >> 1;
if(count == 0) {
count = 1;
}
return count;
}
throw std::runtime_error("[oatpp::async::Executor::chooseIOWorkersCount()]: Error. Invalid I/O workers count specified.");
}

v_int32 Executor::chooseTimerWorkersCount(v_int32 timerWorkersCount) {
if(timerWorkersCount >= 1) {
return timerWorkersCount;
}
if(timerWorkersCount == VALUE_SUGGESTED) {
return 1;
}
throw std::runtime_error("[oatpp::async::Executor::chooseTimerWorkersCount()]: Error. Invalid timer workers count specified.");
}

void Executor::linkWorkers(const std::vector<std::shared_ptr<worker::Worker>>& workers) {
Expand Down
15 changes: 9 additions & 6 deletions src/oatpp/core/async/Executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,18 @@ class Executor {

public:
/**
* Default number of threads to run coroutines.
* Special value to indicate that Executor should choose it's own the value of specified parameter.
*/
static const v_int32 THREAD_NUM_DEFAULT;
static constexpr const v_int32 VALUE_SUGGESTED = -1000;
private:
std::atomic<v_word32> m_balancer;
private:
std::vector<std::shared_ptr<SubmissionProcessor>> m_processorWorkers;
std::vector<std::shared_ptr<worker::Worker>> m_allWorkers;
private:
static v_int32 chooseProcessorWorkersCount(v_int32 processorWorkersCount);
static v_int32 chooseIOWorkersCount(v_int32 processorWorkersCount, v_int32 ioWorkersCount);
static v_int32 chooseTimerWorkersCount(v_int32 timerWorkersCount);
void linkWorkers(const std::vector<std::shared_ptr<worker::Worker>>& workers);
public:

Expand All @@ -99,9 +102,9 @@ class Executor {
* @param ioWorkersCount - number of I/O processing workers.
* @param timerWorkersCount - number of timer processing workers.
*/
Executor(v_int32 processorWorkersCount = THREAD_NUM_DEFAULT,
v_int32 ioWorkersCount = 1,
v_int32 timerWorkersCount = 1,
Executor(v_int32 processorWorkersCount = VALUE_SUGGESTED,
v_int32 ioWorkersCount = VALUE_SUGGESTED,
v_int32 timerWorkersCount = VALUE_SUGGESTED,
#if defined(WIN32) || defined(_WIN32)
bool useIOEventWorker = false
#else
Expand All @@ -112,7 +115,7 @@ class Executor {
/**
* Non-virtual Destructor.
*/
~Executor();
~Executor() = default;

/**
* Join all worker-threads.
Expand Down
7 changes: 0 additions & 7 deletions src/oatpp/core/base/Config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,6 @@
#define OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT 10
#endif

/**
* oatpp::async::Executor default number of threads
*/
#ifndef OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT
#define OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT 2
#endif

/**
* Disable `thread_local` feature. <br>
* See https://github.com/oatpp/oatpp/issues/81
Expand Down
1 change: 0 additions & 1 deletion src/oatpp/core/base/Environment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ void Environment::printCompilationConfig() {
#endif

OATPP_LOGD("oatpp/Config", "OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT=%d", OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT);
OATPP_LOGD("oatpp/Config", "OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT=%d\n", OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT);

}

Expand Down
3 changes: 1 addition & 2 deletions src/oatpp/core/base/Environment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,7 @@ class Environment{
* - `OATPP_DISABLE_ENV_OBJECT_COUNTERS`<br>
* - `OATPP_DISABLE_POOL_ALLOCATIONS`<br>
* - `OATPP_THREAD_HARDWARE_CONCURRENCY`<br>
* - `OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT`<br>
* - `OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT`
* - `OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT`
*/
static void printCompilationConfig();

Expand Down
2 changes: 0 additions & 2 deletions src/oatpp/web/server/AsyncHttpConnectionHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@

namespace oatpp { namespace web { namespace server {

const v_int32 AsyncHttpConnectionHandler::THREAD_NUM_DEFAULT = OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT;

AsyncHttpConnectionHandler::AsyncHttpConnectionHandler(const std::shared_ptr<HttpRouter>& router,
v_int32 threadCount)
: m_executor(std::make_shared<oatpp::async::Executor>(threadCount))
Expand Down
6 changes: 2 additions & 4 deletions src/oatpp/web/server/AsyncHttpConnectionHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ namespace oatpp { namespace web { namespace server {
class AsyncHttpConnectionHandler : public base::Countable, public network::server::ConnectionHandler {
private:
typedef oatpp::web::protocol::http::incoming::BodyDecoder BodyDecoder;
public:
static const v_int32 THREAD_NUM_DEFAULT;
private:
std::shared_ptr<oatpp::async::Executor> m_executor;
private:
Expand All @@ -53,12 +51,12 @@ class AsyncHttpConnectionHandler : public base::Countable, public network::serve
HttpProcessor::RequestInterceptors m_requestInterceptors;
std::shared_ptr<const BodyDecoder> m_bodyDecoder; // TODO make bodyDecoder configurable here
public:
AsyncHttpConnectionHandler(const std::shared_ptr<HttpRouter>& router, v_int32 threadCount = THREAD_NUM_DEFAULT);
AsyncHttpConnectionHandler(const std::shared_ptr<HttpRouter>& router, v_int32 threadCount = oatpp::async::Executor::VALUE_SUGGESTED);
AsyncHttpConnectionHandler(const std::shared_ptr<HttpRouter>& router, const std::shared_ptr<oatpp::async::Executor>& executor);
public:

static std::shared_ptr<AsyncHttpConnectionHandler> createShared(const std::shared_ptr<HttpRouter>& router,
v_int32 threadCount = THREAD_NUM_DEFAULT);
v_int32 threadCount = oatpp::async::Executor::VALUE_SUGGESTED);

static std::shared_ptr<AsyncHttpConnectionHandler> createShared(const std::shared_ptr<HttpRouter>& router,
const std::shared_ptr<oatpp::async::Executor>& executor);
Expand Down
4 changes: 2 additions & 2 deletions test/oatpp/web/FullAsyncClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ class ClientCoroutine_getRootAsync : public oatpp::async::Coroutine<ClientCorout
Action handleError(const std::shared_ptr<const Error>& error) override {
if(error->is<oatpp::data::AsyncIOError>()) {
auto e = static_cast<const oatpp::data::AsyncIOError*>(error.get());
OATPP_LOGE("[FullAsyncClientTest::ClientCoroutine_echoBodyAsync::handleError()]", "AsyncIOError. %s, %d", e->what(), e->getCode());
OATPP_LOGE("[FullAsyncClientTest::ClientCoroutine_getRootAsync::handleError()]", "AsyncIOError. %s, %d", e->what(), e->getCode());
} else {
OATPP_LOGE("[FullAsyncClientTest::ClientCoroutine_echoBodyAsync::handleError()]", "Error. %s", error->what());
OATPP_LOGE("[FullAsyncClientTest::ClientCoroutine_getRootAsync::handleError()]", "Error. %s", error->what());
}
return propagateError();
}
Expand Down

0 comments on commit 387c777

Please sign in to comment.