Skip to content

Commit

Permalink
Revert "Restrict the number of threads in C++ sync server"
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolasnoble committed Aug 1, 2018
1 parent c3ce44e commit bea98c3
Show file tree
Hide file tree
Showing 16 changed files with 63 additions and 445 deletions.
1 change: 0 additions & 1 deletion grpc.def
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ EXPORTS
grpc_resource_quota_ref
grpc_resource_quota_unref
grpc_resource_quota_resize
grpc_resource_quota_set_max_threads
grpc_resource_quota_arg_vtable
grpc_channelz_get_top_channels
grpc_channelz_get_channel
Expand Down
4 changes: 0 additions & 4 deletions include/grpc/grpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -450,10 +450,6 @@ GRPCAPI void grpc_resource_quota_unref(grpc_resource_quota* resource_quota);
GRPCAPI void grpc_resource_quota_resize(grpc_resource_quota* resource_quota,
size_t new_size);

/** Update the size of the maximum number of threads allowed */
GRPCAPI void grpc_resource_quota_set_max_threads(
grpc_resource_quota* resource_quota, int new_max_threads);

/** Fetch a vtable for a grpc_channel_arg that points to a grpc_resource_quota
*/
GRPCAPI const grpc_arg_pointer_vtable* grpc_resource_quota_arg_vtable(void);
Expand Down
16 changes: 3 additions & 13 deletions include/grpcpp/resource_quota.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ struct grpc_resource_quota;

namespace grpc {

/// ResourceQuota represents a bound on memory and thread usage by the gRPC
/// library. A ResourceQuota can be attached to a server (via \a ServerBuilder),
/// ResourceQuota represents a bound on memory usage by the gRPC library.
/// A ResourceQuota can be attached to a server (via \a ServerBuilder),
/// or a client channel (via \a ChannelArguments).
/// gRPC will attempt to keep memory and threads used by all attached entities
/// gRPC will attempt to keep memory used by all attached entities
/// below the ResourceQuota bound.
class ResourceQuota final : private GrpcLibraryCodegen {
public:
Expand All @@ -44,16 +44,6 @@ class ResourceQuota final : private GrpcLibraryCodegen {
/// No time bound is given for this to occur however.
ResourceQuota& Resize(size_t new_size);

/// Set the max number of threads that can be allocated from this
/// ResourceQuota object.
///
/// If the new_max_threads value is smaller than the current value, no new
/// threads are allocated until the number of active threads fall below
/// new_max_threads. There is no time bound on when this may happen i.e none
/// of the current threads are forcefully destroyed and all threads run their
/// normal course.
ResourceQuota& SetMaxThreads(int new_max_threads);

grpc_resource_quota* c_resource_quota() const { return impl_; }

private:
Expand Down
3 changes: 1 addition & 2 deletions include/grpcpp/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
Server(int max_message_size, ChannelArguments* args,
std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
sync_server_cqs,
grpc_resource_quota* server_rq, int min_pollers, int max_pollers,
int sync_cq_timeout_msec);
int min_pollers, int max_pollers, int sync_cq_timeout_msec);

/// Start the server.
///
Expand Down
78 changes: 0 additions & 78 deletions src/core/lib/iomgr/resource_quota.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@ struct grpc_resource_user {
list, false otherwise */
bool added_to_free_pool;

/* The number of threads currently allocated to this resource user */
gpr_atm num_threads_allocated;

/* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
*/
grpc_closure* reclaimers[2];
Expand Down Expand Up @@ -138,33 +135,12 @@ struct grpc_resource_quota {

gpr_atm last_size;

/* Mutex to protect max_threads and num_threads_allocated */
/* Note: We could have used gpr_atm for max_threads and num_threads_allocated
* and avoid having this mutex; but in that case, each invocation of the
* function grpc_resource_user_allocate_threads() would have had to do at
* least two atomic loads (for max_threads and num_threads_allocated) followed
* by a CAS (on num_threads_allocated).
* Moreover, we expect grpc_resource_user_allocate_threads() to be often
* called concurrently thereby increasing the chances of failing the CAS
* operation. This additional complexity is not worth the tiny perf gain we
* may (or may not) have by using atomics */
gpr_mu thread_count_mu;

/* Max number of threads allowed */
int max_threads;

/* Number of threads currently allocated via this resource_quota object */
int num_threads_allocated;

/* Has rq_step been scheduled to occur? */
bool step_scheduled;

/* Are we currently reclaiming memory */
bool reclaiming;

/* Closure around rq_step */
grpc_closure rq_step_closure;

/* Closure around rq_reclamation_done */
grpc_closure rq_reclamation_done_closure;

Expand Down Expand Up @@ -548,11 +524,6 @@ static void ru_shutdown(void* ru, grpc_error* error) {
static void ru_destroy(void* ru, grpc_error* error) {
grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->refs) == 0);
// Free all the remaining thread quota
grpc_resource_user_free_threads(resource_user,
static_cast<int>(gpr_atm_no_barrier_load(
&resource_user->num_threads_allocated)));

for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
rulist_remove(resource_user, static_cast<grpc_rulist>(i));
}
Expand Down Expand Up @@ -623,9 +594,6 @@ grpc_resource_quota* grpc_resource_quota_create(const char* name) {
resource_quota->free_pool = INT64_MAX;
resource_quota->size = INT64_MAX;
gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
gpr_mu_init(&resource_quota->thread_count_mu);
resource_quota->max_threads = INT_MAX;
resource_quota->num_threads_allocated = 0;
resource_quota->step_scheduled = false;
resource_quota->reclaiming = false;
gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0);
Expand All @@ -648,8 +616,6 @@ grpc_resource_quota* grpc_resource_quota_create(const char* name) {

void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota) {
if (gpr_unref(&resource_quota->refs)) {
// No outstanding thread quota
GPR_ASSERT(resource_quota->num_threads_allocated == 0);
GRPC_COMBINER_UNREF(resource_quota->combiner, "resource_quota");
gpr_free(resource_quota->name);
gpr_free(resource_quota);
Expand Down Expand Up @@ -680,15 +646,6 @@ double grpc_resource_quota_get_memory_pressure(
(static_cast<double>(MEMORY_USAGE_ESTIMATION_MAX));
}

/* Public API */
void grpc_resource_quota_set_max_threads(grpc_resource_quota* resource_quota,
int new_max_threads) {
GPR_ASSERT(new_max_threads >= 0);
gpr_mu_lock(&resource_quota->thread_count_mu);
resource_quota->max_threads = new_max_threads;
gpr_mu_unlock(&resource_quota->thread_count_mu);
}

/* Public API */
void grpc_resource_quota_resize(grpc_resource_quota* resource_quota,
size_t size) {
Expand Down Expand Up @@ -774,7 +731,6 @@ grpc_resource_user* grpc_resource_user_create(
grpc_closure_list_init(&resource_user->on_allocated);
resource_user->allocating = false;
resource_user->added_to_free_pool = false;
gpr_atm_no_barrier_store(&resource_user->num_threads_allocated, 0);
resource_user->reclaimers[0] = nullptr;
resource_user->reclaimers[1] = nullptr;
resource_user->new_reclaimers[0] = nullptr;
Expand Down Expand Up @@ -829,40 +785,6 @@ void grpc_resource_user_shutdown(grpc_resource_user* resource_user) {
}
}

bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
int thread_count) {
GPR_ASSERT(thread_count >= 0);
bool is_success = false;
gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
grpc_resource_quota* rq = resource_user->resource_quota;
if (rq->num_threads_allocated + thread_count <= rq->max_threads) {
rq->num_threads_allocated += thread_count;
gpr_atm_no_barrier_fetch_add(&resource_user->num_threads_allocated,
thread_count);
is_success = true;
}
gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
return is_success;
}

void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
int thread_count) {
GPR_ASSERT(thread_count >= 0);
gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
grpc_resource_quota* rq = resource_user->resource_quota;
rq->num_threads_allocated -= thread_count;
int old_count = static_cast<int>(gpr_atm_no_barrier_fetch_add(
&resource_user->num_threads_allocated, -thread_count));
if (old_count < thread_count || rq->num_threads_allocated < 0) {
gpr_log(GPR_ERROR,
"Releasing more threads (%d) than currently allocated (rq threads: "
"%d, ru threads: %d)",
thread_count, rq->num_threads_allocated + thread_count, old_count);
abort();
}
gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
}

void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size,
grpc_closure* optional_on_done) {
gpr_mu_lock(&resource_user->mu);
Expand Down
16 changes: 0 additions & 16 deletions src/core/lib/iomgr/resource_quota.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,6 @@ void grpc_resource_user_ref(grpc_resource_user* resource_user);
void grpc_resource_user_unref(grpc_resource_user* resource_user);
void grpc_resource_user_shutdown(grpc_resource_user* resource_user);

/* Attempts to get quota (from the resource_user) to create 'thd_count' number
* of threads. Returns true if successful (i.e the caller is now free to create
* 'thd_count' number of threads) or false if quota is not available */
bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
int thd_count);
/* Releases 'thd_count' worth of quota back to the resource user. The quota
* should have been previously obtained successfully by calling
* grpc_resource_user_allocate_threads().
*
* Note: There need not be an exact one-to-one correspondence between
* grpc_resource_user_allocate_threads() and grpc_resource_user_free_threads()
* calls. The only requirement is that the number of threads allocated should
* all be eventually released */
void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
int thd_count);

/* Allocate from the resource user (and its quota).
If optional_on_done is NULL, then allocate immediately. This may push the
quota over-limit, at which point reclamation will kick in.
Expand Down
4 changes: 0 additions & 4 deletions src/cpp/common/resource_quota_cc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,4 @@ ResourceQuota& ResourceQuota::Resize(size_t new_size) {
return *this;
}

ResourceQuota& ResourceQuota::SetMaxThreads(int new_max_threads) {
grpc_resource_quota_set_max_threads(impl_, new_max_threads);
return *this;
}
} // namespace grpc
2 changes: 1 addition & 1 deletion src/cpp/server/server_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
}

std::unique_ptr<Server> server(new Server(
max_receive_message_size_, &args, sync_server_cqs, resource_quota_,
max_receive_message_size_, &args, sync_server_cqs,
sync_server_settings_.min_pollers, sync_server_settings_.max_pollers,
sync_server_settings_.cq_timeout_msec));

Expand Down
31 changes: 6 additions & 25 deletions src/cpp/server/server_cc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,6 @@
namespace grpc {
namespace {

// The default value for maximum number of threads that can be created in the
// sync server. This value of 500 is empirically chosen. To increase the max
// number of threads in a sync server, pass a custom ResourceQuota object (with
// the desired number of max-threads set) to the server builder
#define DEFAULT_MAX_SYNC_SERVER_THREADS 500

class DefaultGlobalCallbacks final : public Server::GlobalCallbacks {
public:
~DefaultGlobalCallbacks() override {}
Expand Down Expand Up @@ -272,9 +266,9 @@ class Server::SyncRequestThreadManager : public ThreadManager {
public:
SyncRequestThreadManager(Server* server, CompletionQueue* server_cq,
std::shared_ptr<GlobalCallbacks> global_callbacks,
grpc_resource_quota* rq, int min_pollers,
int max_pollers, int cq_timeout_msec)
: ThreadManager("SyncServer", rq, min_pollers, max_pollers),
int min_pollers, int max_pollers,
int cq_timeout_msec)
: ThreadManager(min_pollers, max_pollers),
server_(server),
server_cq_(server_cq),
cq_timeout_msec_(cq_timeout_msec),
Expand Down Expand Up @@ -382,8 +376,7 @@ Server::Server(
int max_receive_message_size, ChannelArguments* args,
std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
sync_server_cqs,
grpc_resource_quota* server_rq, int min_pollers, int max_pollers,
int sync_cq_timeout_msec)
int min_pollers, int max_pollers, int sync_cq_timeout_msec)
: max_receive_message_size_(max_receive_message_size),
sync_server_cqs_(std::move(sync_server_cqs)),
started_(false),
Expand All @@ -399,22 +392,10 @@ Server::Server(
global_callbacks_->UpdateArguments(args);

if (sync_server_cqs_ != nullptr) {
bool default_rq_created = false;
if (server_rq == nullptr) {
server_rq = grpc_resource_quota_create("SyncServer-default-rq");
grpc_resource_quota_set_max_threads(server_rq,
DEFAULT_MAX_SYNC_SERVER_THREADS);
default_rq_created = true;
}

for (const auto& it : *sync_server_cqs_) {
sync_req_mgrs_.emplace_back(new SyncRequestThreadManager(
this, it.get(), global_callbacks_, server_rq, min_pollers,
max_pollers, sync_cq_timeout_msec));
}

if (default_rq_created) {
grpc_resource_quota_unref(server_rq);
this, it.get(), global_callbacks_, min_pollers, max_pollers,
sync_cq_timeout_msec));
}
}

Expand Down
Loading

0 comments on commit bea98c3

Please sign in to comment.