Skip to content

Commit

Permalink
[src] Assert empty workqueue on ThreadPoolLight destruction (kaldi-as…
Browse files Browse the repository at this point in the history
…r#4568)

Other minor tweaks:
 * Convert runtime asserts to static asserts in template.
 * Remove ThreadPoolLight::nworkers_ as it's equal to the
   .size() of the worker vector, and was used only once.
 * Replace pointer indirection with std::move and make
   ThreadPoolLightWorker::thread_ a direct class member.
 * Update to coding guidelines and IWYU.
  • Loading branch information
kkm000 committed Jun 17, 2021
1 parent 9796afd commit 4973514
Showing 1 changed file with 43 additions and 38 deletions.
81 changes: 43 additions & 38 deletions src/cudadecoder/thread-pool-light.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
#define KALDI_CUDADECODER_THREAD_POOL_LIGHT_H_

#include <atomic>
#include <memory>
#include <thread>
#include <vector>

#include "util/stl-utils.h"

namespace kaldi {
namespace cuda_decoder {

const double kSleepForWorkerAvailable = 1e-3;
constexpr double kSleepForWorkAvailable = 1e-3;
constexpr double kSleepForWorkerAvailable = 1e-3;

struct ThreadPoolLightTask {
void (*func_ptr)(void *, uint64_t, void *);
Expand All @@ -39,20 +39,17 @@ struct ThreadPoolLightTask {
template <int QUEUE_SIZE>
// Single producer, multiple consumer
class ThreadPoolLightSPMCQueue {
static const unsigned int QUEUE_MASK = QUEUE_SIZE - 1;
static constexpr unsigned int QUEUE_MASK = QUEUE_SIZE - 1;
std::vector<ThreadPoolLightTask> tasks_;
std::atomic<int> back_;
std::atomic<int> front_;
int inc(int curr) { return ((curr + 1) & QUEUE_MASK); }
static int inc(int curr) { return ((curr + 1) & QUEUE_MASK); }

public:
ThreadPoolLightSPMCQueue() {
KALDI_ASSERT(QUEUE_SIZE > 1);
bool is_power_of_2 = ((QUEUE_SIZE & (QUEUE_SIZE - 1)) == 0);
KALDI_ASSERT(is_power_of_2); // validity of QUEUE_MASK
tasks_.resize(QUEUE_SIZE);
front_.store(0);
back_.store(0);
ThreadPoolLightSPMCQueue() : tasks_(QUEUE_SIZE), front_(0), back_(0) {
KALDI_COMPILE_TIME_ASSERT(QUEUE_SIZE > 1);
constexpr bool is_power_of_2 = ((QUEUE_SIZE & (QUEUE_SIZE - 1)) == 0);
KALDI_COMPILE_TIME_ASSERT(is_power_of_2); // validity of QUEUE_MASK
}

bool TryPush(const ThreadPoolLightTask &task) {
Expand All @@ -70,33 +67,35 @@ class ThreadPoolLightSPMCQueue {
bool TryPop(ThreadPoolLightTask *front_task) {
while (true) {
int front = front_.load(std::memory_order_relaxed);
if (front == back_.load(std::memory_order_acquire))
if (front == back_.load(std::memory_order_acquire)) {
return false; // queue is empty
}
*front_task = tasks_[front];
if (front_.compare_exchange_weak(front, inc(front),
std::memory_order_release))
std::memory_order_release)) {
return true;
}
}
}
};

class ThreadPoolLightWorker {
class ThreadPoolLightWorker final {
// Multi consumer queue, because worker can steal work
ThreadPoolLightSPMCQueue<512> queue_;
// If this thread has no more work to do, it will try to steal work from
// other
std::unique_ptr<std::thread> thread_;
bool run_thread_;
std::thread thread_;
volatile bool run_thread_;
ThreadPoolLightTask curr_task_;
std::weak_ptr<ThreadPoolLightWorker> other_;

void Work() {
while (run_thread_) {
bool got_task = queue_.TryPop(&curr_task_);
if (!got_task) {
if (auto other_sp = other_.lock()) {
got_task = other_sp->TrySteal(&curr_task_);
}
if (auto other_sp = other_.lock()) {
got_task = other_sp->TrySteal(&curr_task_);
}
}
if (got_task) {
// Not calling func_ptr as a member function,
Expand All @@ -106,71 +105,77 @@ class ThreadPoolLightWorker {
(curr_task_.func_ptr)(curr_task_.obj_ptr, curr_task_.arg1,
curr_task_.arg2);
} else {
Sleep(1e-3f); // TODO
Sleep(kSleepForWorkAvailable); // TODO
}
}
}

protected:
// Another worker can steal a task from this queue
// This is done so that a very long task computed by one thread does not
// hold the entire threadpool to complete a time-sensitive task
bool TrySteal(ThreadPoolLightTask *task) { return queue_.TryPop(task); }

public:
ThreadPoolLightWorker() : run_thread_(true), other_() {}
virtual ~ThreadPoolLightWorker() { Stop(); }
bool TryPush(const ThreadPoolLightTask &task) { return queue_.TryPush(task); }
~ThreadPoolLightWorker() {
KALDI_ASSERT(!queue_.TryPop(&curr_task_));
}
bool TryPush(const ThreadPoolLightTask &task) {
return queue_.TryPush(task);
}
void SetOtherWorkerToStealFrom(
const std::shared_ptr<ThreadPoolLightWorker>& other) {
other_ = other;
}
void Start() {
KALDI_ASSERT("Please call SetOtherWorkerToStealFrom() first" && !other_.expired());
thread_.reset(new std::thread(&ThreadPoolLightWorker::Work, this));
thread_ = std::move(std::thread(&ThreadPoolLightWorker::Work, this));
}
void Stop() {
run_thread_ = false;
thread_->join();
thread_.join();
other_.reset();
}
};

class ThreadPoolLight {
std::vector<std::shared_ptr<ThreadPoolLightWorker>> workers_;
int curr_iworker_; // next call on tryPush will post work on this
// worker
int nworkers_;

public:
ThreadPoolLight(int32 nworkers = std::thread::hardware_concurrency())
: curr_iworker_(0), nworkers_(nworkers) {
: workers_(nworkers), curr_iworker_(0) {
KALDI_ASSERT(nworkers > 1);
workers_.resize(nworkers);
for (int i = 0; i < workers_.size(); ++i)
for (size_t i = 0; i < workers_.size(); ++i) {
workers_[i] = std::make_shared<ThreadPoolLightWorker>();

for (int i = 0; i < workers_.size(); ++i) {
}
for (size_t i = 0; i < workers_.size(); ++i) {
int iother = (i + nworkers / 2) % nworkers;
workers_[i]->SetOtherWorkerToStealFrom(workers_[iother]);
workers_[i]->Start();
}
}

~ThreadPoolLight() {
for (auto& wkr : workers_) wkr->Stop();
}

bool TryPush(const ThreadPoolLightTask &task) {
if (!workers_[curr_iworker_]->TryPush(task)) return false;
++curr_iworker_;
if (curr_iworker_ == nworkers_) curr_iworker_ = 0;
if (curr_iworker_ == workers_.size()) curr_iworker_ = 0;
return true;
}

void Push(const ThreadPoolLightTask &task) {
// Could try another curr_iworker_
while (!TryPush(task))
while (!TryPush(task)) {
Sleep(kSleepForWorkerAvailable);
}
}
};

} // end namespace cuda_decoder
} // end namespace kaldi
} // namespace cuda_decoder
} // namespace kaldi

#endif // KALDI_CUDADECODER_THREAD_POOL_H_
#endif // KALDI_CUDADECODER_THREAD_POOL_LIGHT_H_

0 comments on commit 4973514

Please sign in to comment.