Skip to content

Commit

Permalink
Unref unselected subchannels in Pick First.
Browse files Browse the repository at this point in the history
  • Loading branch information
bigfacebear committed Jun 20, 2019
1 parent 4d5f32e commit a75acbb
Showing 1 changed file with 59 additions and 58 deletions.
117 changes: 59 additions & 58 deletions src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ class PickFirst : public LoadBalancingPolicy {

void ShutdownLocked() override;

void AttemptToConnectUsingLatestUpdateArgsLocked();

// Lateset update args.
UpdateArgs latest_update_args_;
// All our subchannels.
OrphanablePtr<PickFirstSubchannelList> subchannel_list_;
// Latest pending subchannel list.
Expand Down Expand Up @@ -167,18 +171,7 @@ void PickFirst::ExitIdleLocked() {
if (shutdown_) return;
if (idle_) {
idle_ = false;
if (subchannel_list_ == nullptr ||
subchannel_list_->num_subchannels() == 0) {
grpc_error* error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No addresses to connect to"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
} else {
subchannel_list_->subchannel(0)
->CheckConnectivityStateAndStartWatchingLocked();
}
AttemptToConnectUsingLatestUpdateArgsLocked();
}
}

Expand All @@ -189,45 +182,33 @@ void PickFirst::ResetBackoffLocked() {
}
}

void PickFirst::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p received update with %" PRIuPTR " addresses", this,
args.addresses.size());
}
grpc_arg new_arg = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1);
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(args.args, &new_arg, 1);
void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
// Create a subchannel list from the latest_update_args_.
auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>(
this, &grpc_lb_pick_first_trace, args.addresses, combiner(), *new_args);
grpc_channel_args_destroy(new_args);
this, &grpc_lb_pick_first_trace, latest_update_args_.addresses,
combiner(), *latest_update_args_.args);
// Empty update or no valid subchannels.
if (subchannel_list->num_subchannels() == 0) {
// Empty update or no valid subchannels. Unsubscribe from all current
// subchannels.
// Unsubscribe from all current subchannels.
subchannel_list_ = std::move(subchannel_list); // Empty list.
selected_ = nullptr;
// If not idle, put the channel in TRANSIENT_FAILURE.
// (If we are idle, then this will happen in ExitIdleLocked() if we
// haven't gotten a non-empty update by the time the application tries
// to start a new call.)
if (!idle_) {
grpc_error* error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
}
grpc_error* error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
return;
}
// If one of the subchannels in the new list is already in state
// READY, then select it immediately. This can happen when the
// currently selected subchannel is also present in the update. It
// can also happen if one of the subchannels in the update is already
// in the global subchannel pool because it's in use by another channel.
// TODO(roth): If we're in IDLE state, we should probably defer this
// check and instead do it in ExitIdleLocked().
for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
PickFirstSubchannelData* sd = subchannel_list->subchannel(i);
grpc_connectivity_state state = sd->CheckConnectivityStateLocked();
Expand All @@ -239,10 +220,6 @@ void PickFirst::UpdateLocked(UpdateArgs args) {
// not have contained the currently selected subchannel), drop
// it, so that it doesn't override what we've done here.
latest_pending_subchannel_list_.reset();
// Make sure that subsequent calls to ExitIdleLocked() don't cause
// us to start watching a subchannel other than the one we've
// selected.
idle_ = false;
return;
}
}
Expand All @@ -252,13 +229,11 @@ void PickFirst::UpdateLocked(UpdateArgs args) {
subchannel_list_ = std::move(subchannel_list);
// If we're not in IDLE state, start trying to connect to the first
// subchannel in the new list.
if (!idle_) {
// Note: No need to use CheckConnectivityStateAndStartWatchingLocked()
// here, since we've already checked the initial connectivity
// state of all subchannels above.
subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
subchannel_list_->subchannel(0)->subchannel()->AttemptToConnect();
}
// Note: No need to use CheckConnectivityStateAndStartWatchingLocked()
// here, since we've already checked the initial connectivity
// state of all subchannels above.
subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
subchannel_list_->subchannel(0)->subchannel()->AttemptToConnect();
} else {
// We do have a selected subchannel (which means it's READY), so keep
// using it until one of the subchannels in the new list reports READY.
Expand All @@ -274,16 +249,35 @@ void PickFirst::UpdateLocked(UpdateArgs args) {
latest_pending_subchannel_list_ = std::move(subchannel_list);
// If we're not in IDLE state, start trying to connect to the first
// subchannel in the new list.
if (!idle_) {
// Note: No need to use CheckConnectivityStateAndStartWatchingLocked()
// here, since we've already checked the initial connectivity
// state of all subchannels above.
latest_pending_subchannel_list_->subchannel(0)
->StartConnectivityWatchLocked();
latest_pending_subchannel_list_->subchannel(0)
->subchannel()
->AttemptToConnect();
}
// Note: No need to use CheckConnectivityStateAndStartWatchingLocked()
// here, since we've already checked the initial connectivity
// state of all subchannels above.
latest_pending_subchannel_list_->subchannel(0)
->StartConnectivityWatchLocked();
latest_pending_subchannel_list_->subchannel(0)
->subchannel()
->AttemptToConnect();
}
}

void PickFirst::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p received update with %" PRIuPTR " addresses", this,
args.addresses.size());
}
// Update the latest_update_args_
grpc_arg new_arg = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1);
const grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(args.args, &new_arg, 1);
GPR_SWAP(const grpc_channel_args*, new_args, args.args);
grpc_channel_args_destroy(new_args);
latest_update_args_ = std::move(args);
// If we are not in idle, start connection attempt immediately.
// Otherwise, we defer the attempt into ExitIdleLocked().
if (!idle_) {
AttemptToConnectUsingLatestUpdateArgsLocked();
}
}

Expand Down Expand Up @@ -338,10 +332,12 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// also set the channel state to IDLE. The reason is that if the new
// state is TRANSIENT_FAILURE due to a GOAWAY reception we don't want
// to connect to the re-resolved backends until we leave IDLE state.
// TODO(qianchengz): We may want to request re-resolution in
// ExitIdleLocked().
p->idle_ = true;
p->channel_control_helper()->RequestReresolution();
p->selected_ = nullptr;
CancelConnectivityWatchLocked("selected subchannel failed; going IDLE");
p->subchannel_list_.reset();
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_IDLE, UniquePtr<SubchannelPicker>(New<QueuePicker>(
p->Ref(DEBUG_LOCATION, "QueuePicker"))));
Expand Down Expand Up @@ -454,6 +450,11 @@ void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
}
for (size_t i = 0; i < subchannel_list()->num_subchannels(); ++i) {
if (i != Index()) {
subchannel_list()->subchannel(i)->ShutdownLocked();
}
}
}

void PickFirst::PickFirstSubchannelData::
Expand Down

0 comments on commit a75acbb

Please sign in to comment.