Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loadbalancer-experimental: add LB observer method for when the host set changes #3003

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ public void onNoActiveHostsAvailable(int hostSetSize, NoActiveHostException exce
LOGGER.debug("{}- No active hosts available. Host set size: {}.", lbDescription, hostSetSize, exception);
}

@Override
public void onHostSetChanged(Collection<? extends Host> newHosts) {
if (LOGGER.isDebugEnabled()) {
int healthyCount = 0;
for (Host host : newHosts) {
if (host.isHealthy()) {
healthyCount++;
}
}
LOGGER.debug("{}- onHostSetChanged(host set size: {}, healthy: {}). New hosts: {}", lbDescription,
newHosts.size(), healthyCount, newHosts);
}
}

private final class HostObserverImpl implements HostObserver {

private final Object resolvedAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,9 @@ private void sequentialUpdateUsedHosts(List<PrioritizedHostImpl<ResolvedAddress,
for (PrioritizedHostImpl<?, ?> host : nextHosts) {
host.loadBalancingWeight(host.serviceDiscoveryWeight());
}
this.hostSelector = hostSelector.rebuildWithHosts(priorityStrategy.prioritize(usedHosts));
nextHosts = priorityStrategy.prioritize(nextHosts);
this.hostSelector = hostSelector.rebuildWithHosts(nextHosts);
loadBalancerObserver.onHostSetChanged(Collections.unmodifiableList(nextHosts));
}

@Override
Expand Down Expand Up @@ -649,7 +651,7 @@ List<PrioritizedHostImpl<ResolvedAddress, C>> hosts() {
}

static final class PrioritizedHostImpl<ResolvedAddress, C extends LoadBalancedConnection>
implements Host<ResolvedAddress, C>, PrioritizedHost {
implements Host<ResolvedAddress, C>, PrioritizedHost, LoadBalancerObserver.Host {
idelpivnitskiy marked this conversation as resolved.
Show resolved Hide resolved
private final Host<ResolvedAddress, C> delegate;
private int priority;
private double serviceDiscoveryWeight;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,17 @@ void onServiceDiscoveryEvent(Collection<? extends ServiceDiscovererEvent<?>> eve
void onNoActiveHostsAvailable(int hostSetSize, NoActiveHostException exception);

/**
* An observer for {@link Host} events.
* Callback for when the set of hosts used by the load balancer has changed. This set may not
* exactly reflect the state of the service discovery system due to filtering of zero-weight
* hosts and forms of sub-setting and thus may only represent the hosts that the selection
* algorithm may use.
* @param newHosts the new set of hosts used by the selection algorithm.
*/
default void onHostSetChanged(Collection<? extends Host> newHosts) {
}

/**
* An observer for {@link io.servicetalk.loadbalancer.Host} events.
*/
interface HostObserver {

Expand Down Expand Up @@ -84,14 +94,38 @@ interface HostObserver {
void onExpiredHostRemoved(int connectionCount);

/**
* Callback for when a {@link Host} transitions from healthy to unhealthy.
* Callback for when a {@link io.servicetalk.loadbalancer.Host} transitions from healthy to unhealthy.
* @param cause the most recent cause of the transition.
*/
void onHostMarkedUnhealthy(@Nullable Throwable cause);

/**
* Callback for when a {@link Host} transitions from unhealthy to healthy.
* Callback for when a {@link io.servicetalk.loadbalancer.Host} transitions from unhealthy to healthy.
*/
void onHostRevived();
}

/**
* A description of a host.
*/
interface Host {
/**
* The address of the host.
* @return the address of the host.
*/
Object address();

/**
* Determine the health status of this host.
* @return whether the host considers itself healthy enough to serve traffic. This is best effort and does not
* guarantee that the request will succeed.
*/
boolean isHealthy();

/**
* The weight of the host, relative to the weights of associated hosts as used for load balancing.
* @return the relative weight of the host.
*/
double loadBalancingWeight();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public void onServiceDiscoveryEvent(Collection<? extends ServiceDiscovererEvent<
// noop
}

@Override
public void onHostSetChanged(Collection<? extends Host> newHosts) {
// noop
}

private static final class NoopHostObserver implements LoadBalancerObserver.HostObserver {

private static final HostObserver INSTANCE = new NoopHostObserver();
Expand Down
Loading