Skip to content

Commit

Permalink
atomic client idle filter
Browse files Browse the repository at this point in the history
  • Loading branch information
bigfacebear committed Aug 16, 2019
1 parent e02ef35 commit 1391c93
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 38 deletions.
6 changes: 1 addition & 5 deletions include/grpc/impl/codegen/grpc_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,7 @@ typedef struct {
#define GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS "grpc.max_connection_age_grace_ms"
/** Timeout after the last RPC finishes on the client channel at which the
* channel goes back into IDLE state. Int valued, milliseconds. INT_MAX means
* unlimited. */
/** TODO(qianchengz): Currently the default value is INT_MAX, which means the
* client idle filter is disabled by default. After the client idle filter
* proves no perfomance issue, we will change the default value to a reasonable
* value. */
* unlimited. The default value is 30 minutes and the min value is 1 second. */
#define GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS "grpc.client_idle_timeout_ms"
/** Enable/disable support for per-message compression. Defaults to 1, unless
GRPC_ARG_MINIMAL_STACK is enabled, in which case it defaults to 0. */
Expand Down
234 changes: 205 additions & 29 deletions src/core/ext/filters/client_idle/client_idle_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/http2_errors.h"

// The idle filter is disabled in client channel by default.
// To enable the idle filte, set GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS to [0, INT_MAX)
// in channel args.
// TODO(qianchengz): Find a reasonable default value. Maybe check what deault
// value Java uses.
#define DEFAULT_IDLE_TIMEOUT_MS INT_MAX
// The idle filter is enabled in client channel by default.
// Set GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS to [1000, INT_MAX) in channel args to
// configure the idle timeout.
#define DEFAULT_IDLE_TIMEOUT_MS (30 /*minutes*/ * 60 * 1000)
// The user input idle timeout smaller than this would be capped to it.
#define MIN_IDLE_TIMEOUT_MS (1 /*second*/ * 1000)

namespace grpc_core {

Expand All @@ -47,10 +47,82 @@ TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter");

namespace {

/*
client_idle_filter maintains a state tracking if there are active calls in the
channel and its internal idle_timer_. The states are specified as following:
+--------------------------------------------+-------------+---------+
| ChannelState | idle_timer_ | channel |
+--------------------------------------------+-------------+---------+
| IDLE | unset | idle |
| CALLS_ACTIVE | unset | busy |
| TIMER_PENDING | set-valid | idle |
| TIMER_PENDING_CALLS_ACTIVE | set-invalid | busy |
| TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START | set-invalid | idle |
+--------------------------------------------+-------------+---------+
IDLE: The initial state of the client_idle_filter, indicating the channel is
in IDLE.
CALLS_ACTIVE: The channel has 1 or 1+ active calls and the timer is not set.
TIMER_PENDING: The state after the timer is set and no calls have arrived
after the timer is set. The channel must have 0 active call in this state. If
the timer is fired in this state, the channel will go into IDLE state.
TIMER_PENDING_CALLS_ACTIVE: The state after the timer is set and at least one
call has arrived after the timer is set. The channel must have 1 or 1+ active
calls in this state. If the timer is fired in this state, we won't reschedule
it.
TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START: The state after the timer is set
and at least one call has arrived after the timer is set, BUT the channel
currently has 0 active call. If the timer is fired in this state, we will
reschedule it according to the finish time of the latest call.
PROCESSING: The state set to block other threads when the setting thread is
doing some work to keep state consistency.
idle_timer_ will not be cancelled (unless the channel is shutting down).
If the timer callback is called when the idle_timer_ is valid (i.e. idle_state
is TIMER_PENDING), the channel will enter IDLE, otherwise the channel won't be
changed.
State transitions:
IDLE
| ^
--------------------------------- *
| *
v *
CALLS_ACTIVE =================> TIMER_PENDING
^ | ^
* ------------------------------ *
* | *
* v *
TIMER_PENDING_CALLS_ACTIVE ===> TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START
^ |
| |
---------------------------------
---> Triggered by IncreaseCallCount()
===> Triggered by DecreaseCallCount()
***> Triggered by IdleTimerCallback()
*/
enum ChannelState {
IDLE,
CALLS_ACTIVE,
TIMER_PENDING,
TIMER_PENDING_CALLS_ACTIVE,
TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START,
PROCESSING
};

grpc_millis GetClientIdleTimeout(const grpc_channel_args* args) {
return grpc_channel_arg_get_integer(
grpc_channel_args_find(args, GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS),
{DEFAULT_IDLE_TIMEOUT_MS, 0, INT_MAX});
return GPR_MAX(
grpc_channel_arg_get_integer(
grpc_channel_args_find(args, GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS),
{DEFAULT_IDLE_TIMEOUT_MS, 0, INT_MAX}),
MIN_IDLE_TIMEOUT_MS);
}

class ChannelData {
Expand Down Expand Up @@ -86,8 +158,9 @@ class ChannelData {
const grpc_millis client_idle_timeout_;

// Member data used to track the state of channel.
Mutex call_count_mu_;
size_t call_count_;
grpc_millis last_idle_time_;
Atomic<intptr_t> call_count_{0};
Atomic<ChannelState> state_{IDLE};

// Idle timer and its callback closure.
grpc_timer idle_timer_;
Expand Down Expand Up @@ -115,37 +188,105 @@ void ChannelData::StartTransportOp(grpc_channel_element* elem,
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
// Catch the disconnect_with_error transport op.
if (op->disconnect_with_error != nullptr) {
// Disconnect. Cancel the timer if we set it before.
// IncreaseCallCount() introduces a dummy call. It will cancel the timer and
// prevent the timer from being reset by other threads.
// IncreaseCallCount() introduces a dummy call and prevent the timer from
// being reset by other threads.
chand->IncreaseCallCount();
// If the timer has been set, cancel the timer.
// No synchronization issues here. grpc_timer_cancel() is valid as long as
// the timer has been init()ed before.
grpc_timer_cancel(&chand->idle_timer_);
}
// Pass the op to the next filter.
grpc_channel_next_op(elem, op);
}

void ChannelData::IncreaseCallCount() {
MutexLock lock(&call_count_mu_);
if (call_count_++ == 0) {
grpc_timer_cancel(&idle_timer_);
const intptr_t previous_value = call_count_.FetchAdd(1, MemoryOrder::RELAXED);
GRPC_IDLE_FILTER_LOG("call counter has increased to %" PRIuPTR,
previous_value + 1);
if (previous_value == 0) {
// This call is the one that makes the channel busy.
// Loop here to make sure the previous decrease operation has finished.
ChannelState state = state_.Load(MemoryOrder::RELAXED);
while (true) {
switch (state) {
// Timer has not been set. Switch to CALLS_ACTIVE.
case IDLE:
// In this case, no other threads will modify the state, so we can
// just store the value.
state_.Store(CALLS_ACTIVE, MemoryOrder::RELAXED);
return;
// Timer has been set. Switch to TIMER_PENDING_CALLS_ACTIVE.
case TIMER_PENDING:
case TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START:
// At this point, the state may have been switched to IDLE by the
// idle timer callback. Therefore, use CAS operation to change the
// state atomically.
// Use MemoryOrder::ACQUIRE on success to ensure last_idle_time_ has
// been properly set in DecreaseCallCount().
if (state_.CompareExchangeWeak(&state, TIMER_PENDING_CALLS_ACTIVE,
MemoryOrder::ACQUIRE,
MemoryOrder::RELAXED)) {
return;
}
break;
default:
// The state has not been switched to desired value yet, try again.
state = state_.Load(MemoryOrder::RELAXED);
break;
}
}
}
GRPC_IDLE_FILTER_LOG("call counter has increased to %" PRIuPTR, call_count_);
}

void ChannelData::DecreaseCallCount() {
MutexLock lock(&call_count_mu_);
if (call_count_-- == 1) {
StartIdleTimer();
const intptr_t previous_value = call_count_.FetchSub(1, MemoryOrder::RELAXED);
GRPC_IDLE_FILTER_LOG("call counter has decreased to %" PRIuPTR,
previous_value - 1);
if (previous_value == 1) {
// This call is the one that makes the channel idle.
// last_idle_time_ does not need to be Atomic<> because busy-loops in
// IncreaseCallCount(), DecreaseCallCount() and IdleTimerCallback() will
// prevent multiple threads from simultaneously accessing this variable.
last_idle_time_ = ExecCtx::Get()->Now();
ChannelState state = state_.Load(MemoryOrder::RELAXED);
while (true) {
switch (state) {
// Timer has not been set. Set the timer and switch to TIMER_PENDING
case CALLS_ACTIVE:
// Release store here to make other threads see the updated value of
// last_idle_time_.
StartIdleTimer();
state_.Store(TIMER_PENDING, MemoryOrder::RELEASE);
return;
// Timer has been set. Switch to
// TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START
case TIMER_PENDING_CALLS_ACTIVE:
// At this point, the state may have been switched to CALLS_ACTIVE by
// the idle timer callback. Therefore, use CAS operation to change the
// state atomically.
// Release store here to make the idle timer callback see the updated
// value of last_idle_time_ to properly reset the idle timer.
if (state_.CompareExchangeWeak(
&state, TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START,
MemoryOrder::RELEASE, MemoryOrder::RELAXED)) {
return;
}
break;
default:
// The state has not been switched to desired value yet, try again.
state = state_.Load(MemoryOrder::RELAXED);
break;
}
}
}
GRPC_IDLE_FILTER_LOG("call counter has decreased to %" PRIuPTR, call_count_);
}

ChannelData::ChannelData(grpc_channel_element* elem,
grpc_channel_element_args* args, grpc_error** error)
: elem_(elem),
channel_stack_(args->channel_stack),
client_idle_timeout_(GetClientIdleTimeout(args->channel_args)),
call_count_(0) {
client_idle_timeout_(GetClientIdleTimeout(args->channel_args)) {
// If the idle filter is explicitly disabled in channel args, this ctor should
// not get called.
GPR_ASSERT(client_idle_timeout_ != GRPC_MILLIS_INF_FUTURE);
Expand All @@ -165,10 +306,45 @@ ChannelData::ChannelData(grpc_channel_element* elem,
void ChannelData::IdleTimerCallback(void* arg, grpc_error* error) {
GRPC_IDLE_FILTER_LOG("timer alarms");
ChannelData* chand = static_cast<ChannelData*>(arg);
{
MutexLock lock(&chand->call_count_mu_);
if (error == GRPC_ERROR_NONE && chand->call_count_ == 0) {
chand->EnterIdle();
if (error != GRPC_ERROR_NONE) {
GRPC_IDLE_FILTER_LOG("timer canceled");
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "max idle timer callback");
return;
}
bool finished = false;
ChannelState state = chand->state_.Load(MemoryOrder::RELAXED);
while (!finished) {
switch (state) {
case TIMER_PENDING:
// Change the state to PROCESSING to block IncreaseCallCout() until the
// EnterIdle() operation finishes, preventing mistakenly entering IDLE
// when active RPC exists.
finished = chand->state_.CompareExchangeWeak(
&state, PROCESSING, MemoryOrder::RELAXED, MemoryOrder::RELAXED);
if (finished) {
chand->EnterIdle();
chand->state_.Store(IDLE, MemoryOrder::RELAXED);
}
break;
case TIMER_PENDING_CALLS_ACTIVE:
finished = chand->state_.CompareExchangeWeak(
&state, CALLS_ACTIVE, MemoryOrder::RELAXED, MemoryOrder::RELAXED);
break;
case TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START:
// Change the state to PROCESSING to block IncreaseCallCount() until the
// StartIdleTimer() operation finishes, preventing mistakenly restarting
// the timer after grpc_timer_cancel() when shutdown.
finished = chand->state_.CompareExchangeWeak(
&state, PROCESSING, MemoryOrder::ACQUIRE, MemoryOrder::RELAXED);
if (finished) {
chand->StartIdleTimer();
chand->state_.Store(TIMER_PENDING, MemoryOrder::RELAXED);
}
break;
default:
// The state has not been switched to desired value yet, try again.
state = chand->state_.Load(MemoryOrder::RELAXED);
break;
}
}
GRPC_IDLE_FILTER_LOG("timer finishes");
Expand All @@ -185,7 +361,7 @@ void ChannelData::StartIdleTimer() {
GRPC_IDLE_FILTER_LOG("timer has started");
// Hold a ref to the channel stack for the timer callback.
GRPC_CHANNEL_STACK_REF(channel_stack_, "max idle timer callback");
grpc_timer_init(&idle_timer_, ExecCtx::Get()->Now() + client_idle_timeout_,
grpc_timer_init(&idle_timer_, last_idle_time_ + client_idle_timeout_,
&idle_timer_callback_);
}

Expand Down
4 changes: 2 additions & 2 deletions test/core/channel/minimal_stack_is_minimal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ int main(int argc, char** argv) {
errors += CHECK_STACK("chttp2", nullptr, GRPC_SERVER_CHANNEL, "server",
"message_size", "deadline", "http-server",
"message_compress", "connected", NULL);
errors += CHECK_STACK(nullptr, nullptr, GRPC_CLIENT_CHANNEL, "client-channel",
NULL);
errors += CHECK_STACK(nullptr, nullptr, GRPC_CLIENT_CHANNEL,
"client_idle, client-channel", NULL);

GPR_ASSERT(errors == 0);
grpc_shutdown();
Expand Down
4 changes: 2 additions & 2 deletions test/cpp/end2end/client_lb_end2end_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1471,7 +1471,7 @@ TEST_F(ClientLbEnd2endTest, ChannelIdleness) {
StartServers(kNumServers);
// Set max idle time and build the channel.
ChannelArguments args;
args.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS, 100);
args.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS, 1000);
auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel("", response_generator, args);
auto stub = BuildStub(channel);
Expand All @@ -1483,7 +1483,7 @@ TEST_F(ClientLbEnd2endTest, ChannelIdleness) {
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
// After a period time not using the channel, the channel state should switch
// to IDLE.
gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(120));
gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1200));
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
// Sending a new RPC should awake the IDLE channel.
response_generator.SetNextResolution(GetServersPorts());
Expand Down

0 comments on commit 1391c93

Please sign in to comment.