Skip to content

Commit

Permalink
Merge pull request grpc#19696 from yunjiaw26/chunkedlist
Browse files Browse the repository at this point in the history
Improve MPMCQueue Performance
  • Loading branch information
yunjiaw26 committed Jul 30, 2019
2 parents f14f1dd + 7c10819 commit 0fd05f4
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 45 deletions.
110 changes: 84 additions & 26 deletions src/core/lib/iomgr/executor/mpmcqueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,15 @@ DebugOnlyTraceFlag grpc_thread_pool_trace(false, "thread_pool");

inline void* InfLenFIFOQueue::PopFront() {
// Caller should already check queue is not empty and has already held the
// mutex. This function will only do the job of removal.
// mutex. This function will assume that there is at least one element in the
// queue (i.e. queue_head_->content is valid).
void* result = queue_head_->content;
Node* head_to_remove = queue_head_;
queue_head_ = queue_head_->next;

count_.Store(count_.Load(MemoryOrder::RELAXED) - 1, MemoryOrder::RELAXED);

// Updates Stats when trace flag turned on.
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) {
gpr_timespec wait_time =
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), head_to_remove->insert_time);

// Updates Stats info
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), queue_head_->insert_time);
stats_.num_completed++;
stats_.total_queue_time = gpr_time_add(stats_.total_queue_time, wait_time);
stats_.max_queue_time = gpr_time_max(
Expand All @@ -58,44 +55,89 @@ inline void* InfLenFIFOQueue::PopFront() {
gpr_timespec_to_micros(stats_.busy_queue_time));
}

Delete(head_to_remove);
queue_head_ = queue_head_->next;
// Signal waiting thread
if (count_.Load(MemoryOrder::RELAXED) > 0 && num_waiters_ > 0) {
wait_nonempty_.Signal();
if (count_.Load(MemoryOrder::RELAXED) > 0) {
TopWaiter()->cv.Signal();
}

return result;
}

InfLenFIFOQueue::Node* InfLenFIFOQueue::AllocateNodes(int num) {
num_nodes_ = num_nodes_ + num;
Node* new_chunk = static_cast<Node*>(gpr_zalloc(sizeof(Node) * num));
new_chunk[0].next = &new_chunk[1];
new_chunk[num - 1].prev = &new_chunk[num - 2];
for (int i = 1; i < num - 1; ++i) {
new_chunk[i].prev = &new_chunk[i - 1];
new_chunk[i].next = &new_chunk[i + 1];
}
return new_chunk;
}

InfLenFIFOQueue::InfLenFIFOQueue() {
delete_list_size_ = kDeleteListInitSize;
delete_list_ =
static_cast<Node**>(gpr_zalloc(sizeof(Node*) * delete_list_size_));

Node* new_chunk = AllocateNodes(kQueueInitNumNodes);
delete_list_[delete_list_count_++] = new_chunk;
queue_head_ = queue_tail_ = new_chunk;
new_chunk[0].prev = &new_chunk[kQueueInitNumNodes - 1];
new_chunk[kQueueInitNumNodes - 1].next = &new_chunk[0];

waiters_.next = &waiters_;
waiters_.prev = &waiters_;
}

InfLenFIFOQueue::~InfLenFIFOQueue() {
GPR_ASSERT(count_.Load(MemoryOrder::RELAXED) == 0);
GPR_ASSERT(num_waiters_ == 0);
for (size_t i = 0; i < delete_list_count_; ++i) {
gpr_free(delete_list_[i]);
}
gpr_free(delete_list_);
}

void InfLenFIFOQueue::Put(void* elem) {
MutexLock l(&mu_);

Node* new_node = New<Node>(elem);
if (count_.Load(MemoryOrder::RELAXED) == 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) {
busy_time = gpr_now(GPR_CLOCK_MONOTONIC);
int curr_count = count_.Load(MemoryOrder::RELAXED);

if (queue_tail_ == queue_head_ && curr_count != 0) {
// List is full. Expands list to double size by inserting new chunk of nodes
Node* new_chunk = AllocateNodes(curr_count);
delete_list_[delete_list_count_++] = new_chunk;
// Expands delete list on full.
if (delete_list_count_ == delete_list_size_) {
delete_list_size_ = delete_list_size_ * 2;
delete_list_ = static_cast<Node**>(
gpr_realloc(delete_list_, sizeof(Node*) * delete_list_size_));
}
queue_head_ = queue_tail_ = new_node;
} else {
queue_tail_->next = new_node;
queue_tail_ = queue_tail_->next;
new_chunk[0].prev = queue_tail_->prev;
new_chunk[curr_count - 1].next = queue_head_;
queue_tail_->prev->next = new_chunk;
queue_head_->prev = &new_chunk[curr_count - 1];
queue_tail_ = new_chunk;
}
count_.Store(count_.Load(MemoryOrder::RELAXED) + 1, MemoryOrder::RELAXED);
queue_tail_->content = static_cast<void*>(elem);

// Updates Stats info
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) {
stats_.num_started++;
gpr_log(GPR_INFO, "[InfLenFIFOQueue Put] num_started: %" PRIu64,
stats_.num_started);
auto current_time = gpr_now(GPR_CLOCK_MONOTONIC);
if (curr_count == 0) {
busy_time = current_time;
}
queue_tail_->insert_time = current_time;
}

if (num_waiters_ > 0) {
wait_nonempty_.Signal();
}
count_.Store(curr_count + 1, MemoryOrder::RELAXED);
queue_tail_ = queue_tail_->next;

TopWaiter()->cv.Signal();
}

void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) {
Expand All @@ -108,11 +150,12 @@ void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) {
start_time = gpr_now(GPR_CLOCK_MONOTONIC);
}

num_waiters_++;
Waiter self;
PushWaiter(&self);
do {
wait_nonempty_.Wait(&mu_);
self.cv.Wait(&mu_);
} while (count_.Load(MemoryOrder::RELAXED) == 0);
num_waiters_--;
RemoveWaiter(&self);
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) &&
wait_time != nullptr) {
*wait_time = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_time);
Expand All @@ -122,4 +165,19 @@ void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) {
return PopFront();
}

void InfLenFIFOQueue::PushWaiter(Waiter* waiter) {
waiter->next = waiters_.next;
waiter->prev = &waiters_;
waiter->next->prev = waiter;
waiter->prev->next = waiter;
}

void InfLenFIFOQueue::RemoveWaiter(Waiter* waiter) {
GPR_DEBUG_ASSERT(waiter != &waiters_);
waiter->next->prev = waiter->prev;
waiter->prev->next = waiter->next;
}

InfLenFIFOQueue::Waiter* InfLenFIFOQueue::TopWaiter() { return waiters_.next; }

} // namespace grpc_core
85 changes: 66 additions & 19 deletions src/core/lib/iomgr/executor/mpmcqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class MPMCQueueInterface {
class InfLenFIFOQueue : public MPMCQueueInterface {
public:
// Creates a new MPMC Queue. The queue created will have infinite length.
InfLenFIFOQueue() {}
InfLenFIFOQueue();

// Releases all resources held by the queue. The queue must be empty, and no
// one waits on conditional variables.
Expand All @@ -66,33 +66,42 @@ class InfLenFIFOQueue : public MPMCQueueInterface {

// Removes the oldest element from the queue and returns it.
// This routine will cause the thread to block if queue is currently empty.
// Argument wait_time should be passed in when turning on the trace flag
// grpc_thread_pool_trace (for collecting stats info purpose.)
// Argument wait_time should be passed in when trace flag turning on (for
// collecting stats info purpose.)
void* Get(gpr_timespec* wait_time = nullptr);

// Returns number of elements in queue currently.
// There might be concurrently add/remove on queue, so count might change
// quickly.
int count() const { return count_.Load(MemoryOrder::RELAXED); }

private:
// For Internal Use Only.
// Removes the oldest element from the queue and returns it. This routine
// will NOT check whether queue is empty, and it will NOT acquire mutex.
// Caller should do the check and acquire mutex before callling.
void* PopFront();

struct Node {
Node* next; // Linking
Node* next; // Linking
Node* prev;
void* content; // Points to actual element
gpr_timespec insert_time; // Time for stats

Node(void* c) : content(c) {
next = nullptr;
insert_time = gpr_now(GPR_CLOCK_MONOTONIC);
Node() {
next = prev = nullptr;
content = nullptr;
}
};

// For test purpose only. Returns number of nodes allocated in queue.
// Any allocated node will be alive until the destruction of the queue.
int num_nodes() const { return num_nodes_; }

// For test purpose only. Returns the initial number of nodes in queue.
int init_num_nodes() const { return kQueueInitNumNodes; }

private:
// For Internal Use Only.
// Removes the oldest element from the queue and returns it. This routine
// will NOT check whether queue is empty, and it will NOT acquire mutex.
// Caller MUST check that queue is not empty and must acquire mutex before
// callling.
void* PopFront();

// Stats of queue. This will only be collect when debug trace mode is on.
// All printed stats info will have time measurement in microsecond.
struct Stats {
Expand All @@ -115,15 +124,53 @@ class InfLenFIFOQueue : public MPMCQueueInterface {
}
};

Mutex mu_; // Protecting lock
CondVar wait_nonempty_; // Wait on empty queue on get
int num_waiters_ = 0; // Number of waiters
// Node for waiting thread queue. Stands for one waiting thread, should have
// exact one thread waiting on its CondVar.
// Using a doubly linked list for waiting thread queue to wake up waiting
// threads in LIFO order to reduce cache misses.
struct Waiter {
CondVar cv;
Waiter* next;
Waiter* prev;
};

// Pushs waiter to the front of queue, require caller held mutex
void PushWaiter(Waiter* waiter);

// Removes waiter from queue, require caller held mutex
void RemoveWaiter(Waiter* waiter);

// Returns pointer to the waiter that should be waken up next, should be the
// last added waiter.
Waiter* TopWaiter();

Mutex mu_; // Protecting lock
Waiter waiters_; // Head of waiting thread queue

// Initial size for delete list
static const int kDeleteListInitSize = 1024;
// Initial number of nodes allocated
static const int kQueueInitNumNodes = 1024;

Node** delete_list_ = nullptr; // Keeps track of all allocated array entries
// for deleting on destruction
size_t delete_list_count_ = 0; // Number of entries in list
size_t delete_list_size_ = 0; // Size of the list. List will be expanded to
// double size on full

Node* queue_head_ = nullptr; // Head of the queue, remove position
Node* queue_tail_ = nullptr; // End of queue, insert position
Atomic<int> count_{0}; // Number of elements in queue
Stats stats_; // Stats info
gpr_timespec busy_time; // Start time of busy queue
int num_nodes_ = 0; // Number of nodes allocated

Stats stats_; // Stats info
gpr_timespec busy_time; // Start time of busy queue

// Internal Helper.
// Allocates an array of nodes of size "num", links all nodes together except
// the first node's prev and last node's next. They should be set by caller
// manually afterward.
Node* AllocateNodes(int num);
};

} // namespace grpc_core
Expand Down
52 changes: 52 additions & 0 deletions test/core/iomgr/mpmcqueue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,57 @@ static void test_FIFO(void) {
}
}

// Test if queue's behavior of expanding is correct. (Only does expansion when
// it gets full, and each time expands to doubled size).
static void test_space_efficiency(void) {
gpr_log(GPR_INFO, "test_space_efficiency");
grpc_core::InfLenFIFOQueue queue;
for (int i = 0; i < queue.init_num_nodes(); ++i) {
queue.Put(static_cast<void*>(grpc_core::New<WorkItem>(i)));
}
// Queue should not have been expanded at this time.
GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes());
for (int i = 0; i < queue.init_num_nodes(); ++i) {
WorkItem* item = static_cast<WorkItem*>(queue.Get());
queue.Put(item);
}
GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes());
for (int i = 0; i < queue.init_num_nodes(); ++i) {
WorkItem* item = static_cast<WorkItem*>(queue.Get());
grpc_core::Delete(item);
}
// Queue never shrinks even it is empty.
GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes());
GPR_ASSERT(queue.count() == 0);
// queue empty now
for (int i = 0; i < queue.init_num_nodes() * 2; ++i) {
queue.Put(static_cast<void*>(grpc_core::New<WorkItem>(i)));
}
GPR_ASSERT(queue.count() == queue.init_num_nodes() * 2);
// Queue should have been expanded once.
GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 2);
for (int i = 0; i < queue.init_num_nodes(); ++i) {
WorkItem* item = static_cast<WorkItem*>(queue.Get());
grpc_core::Delete(item);
}
GPR_ASSERT(queue.count() == queue.init_num_nodes());
// Queue will never shrink, should keep same number of node as before.
GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 2);
for (int i = 0; i < queue.init_num_nodes() + 1; ++i) {
queue.Put(static_cast<void*>(grpc_core::New<WorkItem>(i)));
}
GPR_ASSERT(queue.count() == queue.init_num_nodes() * 2 + 1);
// Queue should have been expanded twice.
GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 4);
for (int i = 0; i < queue.init_num_nodes() * 2 + 1; ++i) {
WorkItem* item = static_cast<WorkItem*>(queue.Get());
grpc_core::Delete(item);
}
GPR_ASSERT(queue.count() == 0);
GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 4);
gpr_log(GPR_DEBUG, "Done.");
}

static void test_many_thread(void) {
gpr_log(GPR_INFO, "test_many_thread");
const int num_producer_threads = 10;
Expand Down Expand Up @@ -172,6 +223,7 @@ int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv);
grpc_init();
test_FIFO();
test_space_efficiency();
test_many_thread();
grpc_shutdown();
return 0;
Expand Down

0 comments on commit 0fd05f4

Please sign in to comment.