Skip to content

Commit

Permalink
Revamp subchannel connectivity state monitoring APIs.
Browse files Browse the repository at this point in the history
  • Loading branch information
markdroth committed May 10, 2019
1 parent 04da62c commit a4d4bb8
Show file tree
Hide file tree
Showing 10 changed files with 663 additions and 431 deletions.
51 changes: 25 additions & 26 deletions src/core/ext/filters/client_channel/client_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class ChannelData {
~ChannelData();

static bool ProcessResolverResultLocked(
void* arg, const Resolver::Result& result, const char** lb_policy_name,
void* arg, Resolver::Result* result, const char** lb_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config,
grpc_error** service_config_error);

Expand Down Expand Up @@ -271,7 +271,6 @@ class ChannelData {
OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy_;
grpc_connectivity_state_tracker state_tracker_;
ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_;
UniquePtr<char> health_check_service_name_;
RefCountedPtr<ServiceConfig> saved_service_config_;
bool received_first_resolver_result_ = false;

Expand Down Expand Up @@ -951,18 +950,10 @@ class ChannelData::ClientChannelControlHelper
}

Subchannel* CreateSubchannel(const grpc_channel_args& args) override {
grpc_arg args_to_add[2];
int num_args_to_add = 0;
if (chand_->health_check_service_name_ != nullptr) {
args_to_add[0] = grpc_channel_arg_string_create(
const_cast<char*>("grpc.temp.health_check"),
const_cast<char*>(chand_->health_check_service_name_.get()));
num_args_to_add++;
}
args_to_add[num_args_to_add++] = SubchannelPoolInterface::CreateChannelArg(
grpc_arg arg = SubchannelPoolInterface::CreateChannelArg(
chand_->subchannel_pool_.get());
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(&args, args_to_add, num_args_to_add);
grpc_channel_args_copy_and_add(&args, &arg, 1);
Subchannel* subchannel =
chand_->client_channel_factory_->CreateSubchannel(new_args);
grpc_channel_args_destroy(new_args);
Expand Down Expand Up @@ -1201,14 +1192,14 @@ void ChannelData::ProcessLbPolicy(
// Synchronous callback from ResolvingLoadBalancingPolicy to process a
// resolver result update.
bool ChannelData::ProcessResolverResultLocked(
void* arg, const Resolver::Result& result, const char** lb_policy_name,
void* arg, Resolver::Result* result, const char** lb_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config,
grpc_error** service_config_error) {
ChannelData* chand = static_cast<ChannelData*>(arg);
RefCountedPtr<ServiceConfig> service_config;
// If resolver did not return a service config or returned an invalid service
// config, we need a fallback service config.
if (result.service_config_error != GRPC_ERROR_NONE) {
if (result->service_config_error != GRPC_ERROR_NONE) {
// If the service config was invalid, then fallback to the saved service
// config. If there is no saved config either, use the default service
// config.
Expand All @@ -1229,7 +1220,7 @@ bool ChannelData::ProcessResolverResultLocked(
}
service_config = chand->default_service_config_;
}
} else if (result.service_config == nullptr) {
} else if (result->service_config == nullptr) {
if (chand->default_service_config_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
Expand All @@ -1240,15 +1231,15 @@ bool ChannelData::ProcessResolverResultLocked(
service_config = chand->default_service_config_;
}
} else {
service_config = result.service_config;
service_config = result->service_config;
}
*service_config_error = GRPC_ERROR_REF(result.service_config_error);
*service_config_error = GRPC_ERROR_REF(result->service_config_error);
if (service_config == nullptr &&
result.service_config_error != GRPC_ERROR_NONE) {
result->service_config_error != GRPC_ERROR_NONE) {
return false;
}
UniquePtr<char> service_config_json;
// Process service config.
UniquePtr<char> service_config_json;
const internal::ClientChannelGlobalParsedObject* parsed_service_config =
nullptr;
if (service_config != nullptr) {
Expand All @@ -1257,6 +1248,20 @@ bool ChannelData::ProcessResolverResultLocked(
service_config->GetParsedGlobalServiceConfigObject(
internal::ClientChannelServiceConfigParser::ParserIndex()));
}
// TODO(roth): Eliminate this hack as part of hiding health check
// service name from LB policy API. As part of this, change the API
// for this function to pass in result as a const reference.
if (parsed_service_config != nullptr &&
parsed_service_config->health_check_service_name() != nullptr) {
grpc_arg new_arg = grpc_channel_arg_string_create(
const_cast<char*>("grpc.temp.health_check"),
const_cast<char*>(parsed_service_config->health_check_service_name()));
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(result->args, &new_arg, 1);
grpc_channel_args_destroy(result->args);
result->args = new_args;
}
// Check if the config has changed.
const bool service_config_changed =
((service_config == nullptr) !=
(chand->saved_service_config_ == nullptr)) ||
Expand All @@ -1273,12 +1278,6 @@ bool ChannelData::ProcessResolverResultLocked(
chand, service_config_json.get());
}
chand->saved_service_config_ = std::move(service_config);
if (parsed_service_config != nullptr) {
chand->health_check_service_name_.reset(
gpr_strdup(parsed_service_config->health_check_service_name()));
} else {
chand->health_check_service_name_.reset();
}
}
// We want to set the service config at least once. This should not really be
// needed, but we are doing it as a defensive approach. This can be removed,
Expand All @@ -1296,7 +1295,7 @@ bool ChannelData::ProcessResolverResultLocked(
chand->saved_service_config_);
}
UniquePtr<char> processed_lb_policy_name;
chand->ProcessLbPolicy(result, parsed_service_config,
chand->ProcessLbPolicy(*result, parsed_service_config,
&processed_lb_policy_name, lb_policy_config);
// Swap out the data used by GetChannelInfo().
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ void SubchannelNode::PopulateConnectivityState(grpc_json* json) {
if (subchannel_ == nullptr) {
state = GRPC_CHANNEL_SHUTDOWN;
} else {
state = subchannel_->CheckConnectivity(true /* inhibit_health_checking */);
state = subchannel_->CheckConnectivityState(
nullptr /* health_check_service_name */,
nullptr /* connected_subchannel */);
}
json = grpc_json_create_child(nullptr, json, "state", nullptr,
GRPC_JSON_OBJECT, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ class PickFirst : public LoadBalancingPolicy {
PickFirstSubchannelData(
SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>*
subchannel_list,
const ServerAddress& address, Subchannel* subchannel,
grpc_combiner* combiner)
: SubchannelData(subchannel_list, address, subchannel, combiner) {}
const ServerAddress& address, Subchannel* subchannel)
: SubchannelData(subchannel_list, address, subchannel) {}

void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) override;
Expand Down Expand Up @@ -312,6 +311,7 @@ void PickFirst::UpdateLocked(UpdateArgs args) {
// here, since we've already checked the initial connectivity
// state of all subchannels above.
subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
subchannel_list_->subchannel(0)->subchannel()->AttemptToConnect();
}
} else {
// We do have a selected subchannel (which means it's READY), so keep
Expand All @@ -334,6 +334,9 @@ void PickFirst::UpdateLocked(UpdateArgs args) {
// state of all subchannels above.
latest_pending_subchannel_list_->subchannel(0)
->StartConnectivityWatchLocked();
latest_pending_subchannel_list_->subchannel(0)
->subchannel()
->AttemptToConnect();
}
}
}
Expand Down Expand Up @@ -366,7 +369,8 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
p->subchannel_list_.get());
}
p->selected_ = nullptr;
StopConnectivityWatchLocked();
CancelConnectivityWatchLocked(
"selected subchannel failed; switching to pending update");
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
// Set our state to that of the pending subchannel list.
if (p->subchannel_list_->in_transient_failure()) {
Expand All @@ -391,7 +395,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
p->idle_ = true;
p->channel_control_helper()->RequestReresolution();
p->selected_ = nullptr;
StopConnectivityWatchLocked();
CancelConnectivityWatchLocked("selected subchannel failed; going IDLE");
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_IDLE,
UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref())));
Expand All @@ -408,8 +412,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
connectivity_state,
UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref())));
}
// Renew notification.
RenewConnectivityWatchLocked();
}
}
return;
Expand All @@ -426,13 +428,11 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
subchannel_list()->set_in_transient_failure(false);
switch (connectivity_state) {
case GRPC_CHANNEL_READY: {
// Renew notification.
RenewConnectivityWatchLocked();
ProcessUnselectedReadyLocked();
break;
}
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
StopConnectivityWatchLocked();
CancelConnectivityWatchLocked("connection attempt failed");
PickFirstSubchannelData* sd = this;
size_t next_index =
(sd->Index() + 1) % subchannel_list()->num_subchannels();
Expand Down Expand Up @@ -468,8 +468,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
GRPC_CHANNEL_CONNECTING,
UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref())));
}
// Renew notification.
RenewConnectivityWatchLocked();
break;
}
case GRPC_CHANNEL_SHUTDOWN:
Expand Down Expand Up @@ -521,8 +519,11 @@ void PickFirst::PickFirstSubchannelData::
// If current state is READY, select the subchannel now, since we started
// watching from this state and will not get a notification of it
// transitioning into this state.
if (p->selected_ != this && current_state == GRPC_CHANNEL_READY) {
ProcessUnselectedReadyLocked();
// If the current state is not READY, attempt to connect.
if (current_state == GRPC_CHANNEL_READY) {
if (p->selected_ != this) ProcessUnselectedReadyLocked();
} else {
subchannel()->AttemptToConnect();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,8 @@ class RoundRobin : public LoadBalancingPolicy {
RoundRobinSubchannelData(
SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>*
subchannel_list,
const ServerAddress& address, Subchannel* subchannel,
grpc_combiner* combiner)
: SubchannelData(subchannel_list, address, subchannel, combiner) {}
const ServerAddress& address, Subchannel* subchannel)
: SubchannelData(subchannel_list, address, subchannel) {}

grpc_connectivity_state connectivity_state() const {
return last_connectivity_state_;
Expand Down Expand Up @@ -320,6 +319,7 @@ void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
for (size_t i = 0; i < num_subchannels(); i++) {
if (subchannel(i)->subchannel() != nullptr) {
subchannel(i)->StartConnectivityWatchLocked();
subchannel(i)->subchannel()->AttemptToConnect();
}
}
// Now set the LB policy's state based on the subchannels' states.
Expand Down Expand Up @@ -448,6 +448,7 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
// Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
// when the subchannel list was created, we'd wind up in a constant
// loop of re-resolution.
// Also attempt to reconnect.
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
Expand All @@ -456,9 +457,8 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
p, subchannel());
}
p->channel_control_helper()->RequestReresolution();
subchannel()->AttemptToConnect();
}
// Renew connectivity watch.
RenewConnectivityWatchLocked();
// Update state counters.
UpdateConnectivityStateLocked(connectivity_state);
// Update overall state and renew notification.
Expand Down
Loading

0 comments on commit a4d4bb8

Please sign in to comment.