Skip to content

Commit

Permalink
Support different types of MiddleManagers by Autoscaler apache#8695
Browse files Browse the repository at this point in the history
 - Put category information into Autoscaler
 - Changed structure of behavior config
 - Changed CategoriedProvisioningStrategy accordingly
 - Fixed bug in PendingTaskBasedWorkerProvisioningStrategy
 - Extended unit tests
 - Code cleanup and refactoring
  • Loading branch information
Vladimir Iordanov committed Dec 4, 2019
1 parent 66fbec5 commit 8d94ff2
Show file tree
Hide file tree
Showing 10 changed files with 324 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@
import org.apache.druid.indexing.overlord.autoscaling.AutoScaler;
import org.apache.druid.indexing.overlord.autoscaling.AutoScalingData;
import org.apache.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningConfig;
import org.apache.druid.indexing.overlord.setup.CategoriedWorkerBehaviorConfig;
import org.apache.druid.java.util.emitter.EmittingLogger;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/**
*/
Expand All @@ -68,7 +68,7 @@ public EC2AutoScaler(
@JsonProperty("envConfig") EC2EnvironmentConfig envConfig,
@JacksonInject AmazonEC2 amazonEC2Client,
@JacksonInject SimpleWorkerProvisioningConfig config,
@JsonProperty(value = "category", defaultValue = CategoriedWorkerBehaviorConfig.DEFAULT_AUTOSCALER_CATEGORY) String category
@JsonProperty("category") String category
)
{
this.minNumWorkers = minNumWorkers;
Expand All @@ -94,6 +94,7 @@ public int getMaxNumWorkers()
}

@Override
@JsonProperty
public String getCategory()
{
return category;
Expand Down Expand Up @@ -341,6 +342,7 @@ public String toString()
"envConfig=" + envConfig +
", maxNumWorkers=" + maxNumWorkers +
", minNumWorkers=" + minNumWorkers +
", category=" + category +
'}';
}

Expand All @@ -353,28 +355,16 @@ public boolean equals(Object o)
if (o == null || getClass() != o.getClass()) {
return false;
}

EC2AutoScaler that = (EC2AutoScaler) o;

if (maxNumWorkers != that.maxNumWorkers) {
return false;
}
if (minNumWorkers != that.minNumWorkers) {
return false;
}
if (envConfig != null ? !envConfig.equals(that.envConfig) : that.envConfig != null) {
return false;
}

return true;
return minNumWorkers == that.minNumWorkers &&
maxNumWorkers == that.maxNumWorkers &&
Objects.equals(category, that.category) &&
Objects.equals(envConfig, that.envConfig);
}

@Override
public int hashCode()
{
int result = minNumWorkers;
result = 31 * result + maxNumWorkers;
result = 31 * result + (envConfig != null ? envConfig.hashCode() : 0);
return result;
return Objects.hash(minNumWorkers, maxNumWorkers, category, envConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.indexing.overlord.setup.CategoriedWorkerBehaviorConfig;

import javax.annotation.Nullable;
import java.util.List;
Expand All @@ -36,7 +37,10 @@ public interface AutoScaler<T>

int getMaxNumWorkers();

String getCategory();
default String getCategory()
{
return CategoriedWorkerBehaviorConfig.DEFAULT_AUTOSCALER_CATEGORY;
}

/**
* This method is unused, but AutoScaler is an {@link ExtensionPoint}, so we cannot remove it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private static CategoriedWorkerBehaviorConfig getCategoriedWorkerBehaviorConfig(
}
if (!(workerBehaviorConfig instanceof CategoriedWorkerBehaviorConfig)) {
log.error(
"Only DefaultWorkerBehaviorConfig is supported as WorkerBehaviorConfig, [%s] given, cannot %s workers",
"Only CategoriedWorkerBehaviorConfig is supported as WorkerBehaviorConfig, [%s] given, cannot %s workers",
workerBehaviorConfig,
action
);
Expand Down Expand Up @@ -219,6 +219,11 @@ private boolean doTerminate(
AutoScaler autoScaler
)
{
if (autoScaler == null) {
log.error("No autoScaler available, cannot execute doTerminate for workers of category %s", category);
return false;
}

boolean didTerminate = false;
final Collection<String> workerNodeIds = getWorkerNodeIDs(runner.getLazyWorkers(), autoScaler);
log.debug(
Expand Down Expand Up @@ -472,9 +477,9 @@ private int getScaleUpNodeCount(
log.info("Min/max workers: %d/%d", minWorkerCount, maxWorkerCount);
final int currValidWorkers = getCurrValidWorkers(workers);

// If there are no worker, spin up minWorkerCount, we cannot determine the exact capacity here to fulfill the need
// If there are no worker, spin up minWorkerCount (or 1 if minWorkerCount is 0), we cannot determine the exact capacity here to fulfill the need
// since we are not aware of the expectedWorkerCapacity.
int moreWorkersNeeded = currValidWorkers == 0 ? minWorkerCount : getWorkersNeededToAssignTasks(
int moreWorkersNeeded = currValidWorkers == 0 ? Math.max(minWorkerCount, 1) : getWorkersNeededToAssignTasks(
remoteTaskRunnerConfig,
workerConfig,
pendingTasks,
Expand Down Expand Up @@ -655,18 +660,23 @@ private AutoScaler getCategoryAutoscaler(String category, Map<String, AutoScaler
);
return null;
}
return autoScaler == null ? autoscalersByCategory.get(CategoriedWorkerBehaviorConfig.DEFAULT_AUTOSCALER_CATEGORY) : autoScaler;
return autoScaler == null
? autoscalersByCategory.get(CategoriedWorkerBehaviorConfig.DEFAULT_AUTOSCALER_CATEGORY)
: autoScaler;
}

private Map<String, AutoScaler> mapAutoscalerByCategory(List<AutoScaler> autoScalers)
{
Map<String, AutoScaler> result = autoScalers.stream().collect(Collectors.groupingBy(
AutoScaler::getCategory,
autoScaler -> autoScaler.getCategory() == null
? CategoriedWorkerBehaviorConfig.DEFAULT_AUTOSCALER_CATEGORY
: autoScaler.getCategory(),
Collectors.collectingAndThen(Collectors.toList(), values -> values.get(0))
));

if (result.size() != autoScalers.size()) {
log.warn("Probably autoscalers with duplicated categories were defined. The first instance will be used.");
log.warn(
"Probably autoscalers with duplicated categories were defined. The first instance of each duplicate category will be used.");
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.apache.druid.indexing.overlord.autoscaling;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.overlord.setup.CategoriedWorkerBehaviorConfig;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.emitter.EmittingLogger;

Expand All @@ -34,8 +34,9 @@ public class NoopAutoScaler<Void> implements AutoScaler<Void>
private static final EmittingLogger log = new EmittingLogger(NoopAutoScaler.class);
private final String category;

@JsonCreator
public NoopAutoScaler(
@JsonProperty(value = "category", defaultValue = CategoriedWorkerBehaviorConfig.DEFAULT_AUTOSCALER_CATEGORY) String category
@JsonProperty("category") String category
)
{
this.category = category;
Expand All @@ -54,6 +55,7 @@ public int getMaxNumWorkers()
}

@Override
@JsonProperty
public String getCategory()
{
return category;
Expand All @@ -68,35 +70,43 @@ public Void getEnvConfig()
@Override
public AutoScalingData provision()
{
log.info("If I were a real strategy I'd create something now in category %s", category);
log.info("If I were a real strategy I'd create something now [category: %s]", category);
return null;
}

@Override
public AutoScalingData terminate(List<String> ips)
{
log.info("If I were a real strategy I'd terminate %s now in category %s", ips, category);
log.info("If I were a real strategy I'd terminate %s now [category: %s]", ips, category);
return null;
}

@Override
public AutoScalingData terminateWithIds(List<String> ids)
{
log.info("If I were a real strategy I'd terminate %s now", ids);
log.info("If I were a real strategy I'd terminate %s now [category: %s]", ids, category);
return null;
}

@Override
public List<String> ipToIdLookup(List<String> ips)
{
log.info("I'm not a real strategy so I'm returning what I got %s", ips);
log.info("I'm not a real strategy so I'm returning what I got %s [category: %s]", ips, category);
return ips;
}

@Override
public List<String> idToIpLookup(List<String> nodeIds)
{
log.info("I'm not a real strategy so I'm returning what I got %s", nodeIds);
log.info("I'm not a real strategy so I'm returning what I got %s [category: %s]", nodeIds, category);
return nodeIds;
}

@Override
public String toString()
{
return "NoopAutoScaler{" +
"category='" + category + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,9 @@ private int getScaleUpNodeCount(
log.info("Min/max workers: %d/%d", minWorkerCount, maxWorkerCount);
final int currValidWorkers = getCurrValidWorkers(workers);

// If there are no worker, spin up minWorkerCount, we cannot determine the exact capacity here to fulfill the need
// If there are no worker, spin up minWorkerCount (or 1 if minWorkerCount is 0), we cannot determine the exact capacity here to fulfill the need
// since we are not aware of the expectedWorkerCapacity.
int moreWorkersNeeded = currValidWorkers == 0 ? minWorkerCount : getWorkersNeededToAssignTasks(
int moreWorkersNeeded = currValidWorkers == 0 ? Math.max(minWorkerCount, 1) : getWorkersNeededToAssignTasks(
remoteTaskRunnerConfig,
workerConfig,
pendingTasks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public String apply(ImmutableWorkerInfo input)
updateTargetWorkerCount(workerConfig, pendingTasks, workers);

int want = targetWorkerCount - (currValidWorkers + currentlyProvisioning.size());
log.info("Want workers: %d", want);
while (want > 0) {
final AutoScalingData provisioned = workerConfig.getAutoScaler().provision();
final List<String> newNodes;
Expand Down
Loading

0 comments on commit 8d94ff2

Please sign in to comment.