Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Auto scaler pendingTaskBased provisioning strategy to handle when there are no currently running worker node better #11440

Merged
merged 9 commits into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,7 @@ There are additional configs for autoscaling (if it is enabled):
|`druid.indexer.autoscale.pendingTaskTimeout`|How long a task can be in "pending" state before the Overlord tries to scale up.|PT30S|
|`druid.indexer.autoscale.workerVersion`|If set, will only create nodes of set version during autoscaling. Overrides dynamic configuration. |null|
|`druid.indexer.autoscale.workerPort`|The port that MiddleManagers will run on.|8080|
|`druid.indexer.autoscale.workerCapacityFallback`| Worker capacity for determining the number of workers needed for auto scaling when there is currently no worker running. If unset or set to value of 0 or less, auto scaler will scale to `minNumWorkers` in autoScaler config instead. Note: this config is only applicable to `pendingTaskBased` provisioning strategy|-1|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"fallback" in the name does not seem intuitive to me. How about workerCapacityHint? Also, it would be nice to document how to set this value. For example, the doc can say "this value should be typically equal to druid.worker.capacity when you have a homogeneous cluster". For heterogeneous clusters, does it make sense to set this to the average capacity?

Copy link
Contributor Author

@maytasm maytasm Jul 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the config name to workerCapacityHint. Updated the docs. I think for heterogeneous clusters, it does make sense to set this to the average capacity. I think the auto scaler should use the average capacity when there is running worker nodes too but that can be a separate change/discussion.


##### Supervisors

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public GceAutoScaler(
@JsonProperty("envConfig") GceEnvironmentConfig envConfig
)
{
Preconditions.checkArgument(minNumWorkers > 0,
"minNumWorkers must be greater than 0");
Preconditions.checkArgument(minNumWorkers >= 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the overlord should fail to start if minNumWorkers = 0 and the new config is not set to a positive because auto scaler will not work anyway. It would be better to fail early.

Copy link
Contributor Author

@maytasm maytasm Jul 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's is only if you are using PendingTaskBasedWorkerProvisioningStrategy. You can have minNumWorkers = 0 and the new config not set if you use SimpleWorkerProvisioningStrategy. I think it makes more sense to have the check done in the PendingTaskBasedWorkerProvisioningStrategy, instead of having a check in every AutoScaler (EC2, GCE, etc) to check if the ProvisioningStategy == PendingTaskBasedWorkerProvisioningStrategy and minNumWorkers = 0 and the new config is not set. However, this means that the failure would only show when the auto scaler cycle runs. I think this is ok as the PendingTaskBasedWorkerProvisioningStrategy#getDefaultWorkerBehaviorConfig already have some check that make sure the config/AutoScaler set is valid.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sure, I didn't mean that the check code should be in each autoScaler because it's not possible as autoScaler doesn't know about provisioningStrategyConfig. The check code should be in PendingTaskBasedWorkerProvisioningStrategy instead. I just didn't find a good place to leave my comment earlier 😅

However, this means that the failure would only show when the auto scaler cycle runs. I think this is ok as the PendingTaskBasedWorkerProvisioningStrategy#getDefaultWorkerBehaviorConfig already have some check that make sure the config/AutoScaler set is valid.

Hmm, the check can throw an exception in the constructor of PendingTaskBasedWorkerProvisioningStrategy, can't it? Because the constructor accepts both WorkerBehaviorConfig and PendingTaskBasedWorkerProvisioningConfig as its parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PendingTaskBasedWorkerProvisioningStrategy gets created once at startup which at that time may not have an auto scaler configured or if auto scaler was configured the minWorkerCount can change later. This is because the auto scaler along with the minWorkerCount can be configured dynamically after the cluster is already up and running. Hence, if the check is in the constructor of PendingTaskBasedWorkerProvisioningStrategy , then we can miss the cases I mentioned above. I think it is better to have the check in PendingTaskBasedWorkerProvisioningStrategy#getDefaultWorkerBehaviorConfig which is every time we do terminate/provision, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah you are right. Thanks for the explanation.

"minNumWorkers must be greater than or equal to 0");
this.minNumWorkers = minNumWorkers;
Preconditions.checkArgument(maxNumWorkers > 0,
"maxNumWorkers must be greater than 0");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class PendingTaskBasedWorkerProvisioningConfig extends SimpleWorkerProvis
@JsonProperty
private int maxScalingStep = 10;

@JsonProperty
private int workerCapacityFallback = -1;

public int getMaxScalingStep()
{
Expand Down Expand Up @@ -76,4 +78,14 @@ public PendingTaskBasedWorkerProvisioningConfig setPendingTaskTimeout(Period pen
return this;
}

public int getWorkerCapacityFallback()
{
return workerCapacityFallback;
}

public PendingTaskBasedWorkerProvisioningConfig setWorkerCapacityFallback(int workerCapacityFallback)
{
this.workerCapacityFallback = workerCapacityFallback;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,16 @@ private int getScaleUpNodeCount(
log.info("Min/max workers: %d/%d", minWorkerCount, maxWorkerCount);
final int currValidWorkers = getCurrValidWorkers(workers);

// If there are no worker, spin up minWorkerCount, we cannot determine the exact capacity here to fulfill the need
// since we are not aware of the expectedWorkerCapacity.
int moreWorkersNeeded = currValidWorkers == 0 ? minWorkerCount : getWorkersNeededToAssignTasks(
// If there are no worker and workerCapacityFallback config is not set (-1) or invalid (<= 0), then spin up minWorkerCount
// as we cannot determine the exact capacity here to fulfill the need.
// However, if there are no worker but workerCapacityFallback config is set (>0), then we can
// determine the number of workers needed using workerCapacityFallback config as expected worker capacity
int moreWorkersNeeded = currValidWorkers == 0 && config.getWorkerCapacityFallback() <= 0 ? minWorkerCount : getWorkersNeededToAssignTasks(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this line is too long. Please break the line. One example would be

      int moreWorkersNeeded = currValidWorkers == 0 && config.getWorkerCapacityFallback() <= 0
                              ? minWorkerCount
                              : getWorkersNeededToAssignTasks(
                                  remoteTaskRunnerConfig,
                                  workerConfig,
                                  pendingTasks,
                                  workers,
                                  config.getWorkerCapacityFallback()
                              );

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

remoteTaskRunnerConfig,
workerConfig,
pendingTasks,
workers
workers,
config.getWorkerCapacityFallback()
);
log.debug("More workers needed: %d", moreWorkersNeeded);

Expand Down Expand Up @@ -280,7 +283,8 @@ private int getWorkersNeededToAssignTasks(
final WorkerTaskRunnerConfig workerTaskRunnerConfig,
final DefaultWorkerBehaviorConfig workerConfig,
final Collection<Task> pendingTasks,
final Collection<ImmutableWorkerInfo> workers
final Collection<ImmutableWorkerInfo> workers,
final int workerCapacityFallback
)
{
final Collection<ImmutableWorkerInfo> validWorkers = Collections2.filter(
Expand All @@ -295,7 +299,7 @@ private int getWorkersNeededToAssignTasks(
}
WorkerSelectStrategy workerSelectStrategy = workerConfig.getSelectStrategy();
int need = 0;
int capacity = getExpectedWorkerCapacity(workers);
int capacity = getExpectedWorkerCapacity(workers, workerCapacityFallback);
log.info("Expected worker capacity: %d", capacity);

// Simulate assigning tasks to dummy workers using configured workerSelectStrategy
Expand Down Expand Up @@ -441,12 +445,18 @@ private int getCurrValidWorkers(Collection<ImmutableWorkerInfo> workers)
return currValidWorkers;
}

private static int getExpectedWorkerCapacity(final Collection<ImmutableWorkerInfo> workers)
private static int getExpectedWorkerCapacity(final Collection<ImmutableWorkerInfo> workers, final int workerCapacityFallback)
{
int size = workers.size();
if (size == 0) {
// No existing workers assume capacity per worker as 1
return 1;
// No existing workers
if (workerCapacityFallback > 0) {
// Return workerCapacityFallback if it is set in config
return workerCapacityFallback;
} else {
// Assume capacity per worker as 1
return 1;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on this function I don't think you need to detect the case where workerCapacityFallback is unset. You can simply set the default value to 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having user set the config to 1 and the default value being 1 should not be the same due to https://github.com/apache/druid/pull/11440/files#diff-86276fbfe645ed104be05c850062fdb8e89fce43c656e41725c1e6d6ed671c25R253

If you have no worker and the new config NOT SET then you do the current behavior which is scale to minNumWorkers (this basically disregard capacity and pending tasks you have)
if you have no worker and the new config SET (i.e. set to 1) then you use that value to determine how many worker of said capacity needed to process the pending tasks you have...which may not be the same as the configured minNumWorkers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see. Sorry I missed that earlier

} else {
// Assume all workers have same capacity
return workers.iterator().next().getWorker().getCapacity();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.indexing.overlord.autoscaling;

import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.guava.DSuppliers;
import org.apache.druid.indexer.TaskLocation;
Expand Down Expand Up @@ -137,6 +138,101 @@ public void testSuccessfulInitialMinWorkersProvision()
}
}

@Test
public void testProvisionNoCurrentlyRunningWorkerWithCapacityFallbackSetAndNoPendingTaskShouldProvisionMinimumAsCurrentIsBelowMinimum()
{
PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig()
.setMaxScalingDuration(new Period(1000))
.setNumEventsToTrack(10)
.setPendingTaskTimeout(new Period(0))
.setWorkerVersion(MIN_VERSION)
.setMaxScalingStep(2)
.setWorkerCapacityFallback(30);
strategy = new PendingTaskBasedWorkerProvisioningStrategy(
config,
DSuppliers.of(workerConfig),
new ProvisioningSchedulerConfig(),
new Supplier<ScheduledExecutorService>()
{
@Override
public ScheduledExecutorService get()
{
return executorService;
}
}
);
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
.andReturn(new ArrayList<String>());
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
// No pending tasks
EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
new ArrayList<>()
);
EasyMock.expect(runner.getWorkers()).andReturn(
Collections.emptyList()
);
EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig());
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Collections.singletonList("aNode"))
).times(3);
EasyMock.replay(runner, autoScaler);
Provisioner provisioner = strategy.makeProvisioner(runner);
boolean provisionedSomething = provisioner.doProvision();
Assert.assertTrue(provisionedSomething);
Assert.assertTrue(provisioner.getStats().toList().size() == 3);
for (ScalingStats.ScalingEvent event : provisioner.getStats().toList()) {
Assert.assertTrue(
event.getEvent() == ScalingStats.EVENT.PROVISION
);
}
}

@Test
public void testProvisionNoCurrentlyRunningWorkerWithCapacityFallbackSetAndNoPendingTaskShouldNotProvisionAsMinimumIsZero()
{
PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig()
.setMaxScalingDuration(new Period(1000))
.setNumEventsToTrack(10)
.setPendingTaskTimeout(new Period(0))
.setWorkerVersion(MIN_VERSION)
.setMaxScalingStep(2)
.setWorkerCapacityFallback(30);
strategy = new PendingTaskBasedWorkerProvisioningStrategy(
config,
DSuppliers.of(workerConfig),
new ProvisioningSchedulerConfig(),
new Supplier<ScheduledExecutorService>()
{
@Override
public ScheduledExecutorService get()
{
return executorService;
}
}
);
// minWorkerCount is 0
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
.andReturn(new ArrayList<String>());
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
// No pending tasks
EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
new ArrayList<>()
);
EasyMock.expect(runner.getWorkers()).andReturn(
Collections.emptyList()
);
EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig());
EasyMock.replay(runner, autoScaler);
Provisioner provisioner = strategy.makeProvisioner(runner);
boolean provisionedSomething = provisioner.doProvision();
Assert.assertFalse(provisionedSomething);
Assert.assertEquals(0, provisioner.getStats().toList().size());
}

@Test
public void testSuccessfulMinWorkersProvision()
{
Expand Down Expand Up @@ -207,7 +303,7 @@ public void testSuccessfulMinWorkersProvisionWithOldVersionNodeRunning()
}

@Test
public void testSomethingProvisioning()
public void testProvisioning()
{
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(1);
Expand Down Expand Up @@ -257,6 +353,153 @@ public void testSomethingProvisioning()
EasyMock.verify(runner);
}

@Test
public void testProvisionWithPendingTaskAndWorkerCapacityFallbackSetButNonEmptyCurrentlyRunningWorkerShouldUseCapcityFromRunningWorker()
{
PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig()
.setMaxScalingDuration(new Period(1000))
.setNumEventsToTrack(10)
.setPendingTaskTimeout(new Period(0))
.setWorkerVersion(MIN_VERSION)
.setMaxScalingStep(2)
.setWorkerCapacityFallback(30);
strategy = new PendingTaskBasedWorkerProvisioningStrategy(
config,
DSuppliers.of(workerConfig),
new ProvisioningSchedulerConfig(),
new Supplier<ScheduledExecutorService>()
{
@Override
public ScheduledExecutorService get()
{
return executorService;
}
}
);
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(3).times(1);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
.andReturn(new ArrayList<String>()).times(2);
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Collections.singletonList("fake"))
).times(2);
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
// two pending tasks
EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
ImmutableList.of(
NoopTask.create(),
NoopTask.create()
)
).times(2);
// Capacity for current worker is 1
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.asList(
new TestZkWorker(testTask).toImmutable(),
new TestZkWorker(testTask, "http", "h1", "n1", INVALID_VERSION).toImmutable() // Invalid version node
)
).times(2);
EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()).times(1);
EasyMock.replay(runner);
EasyMock.replay(autoScaler);

Provisioner provisioner = strategy.makeProvisioner(runner);
boolean provisionedSomething = provisioner.doProvision();

// Expect to use capacity from current worker (which is 1)
// and since there are two pending tasks, we will need two more workers
Assert.assertTrue(provisionedSomething);
Assert.assertEquals(2, provisioner.getStats().toList().size());
DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp();
Assert.assertEquals(ScalingStats.EVENT.PROVISION, provisioner.getStats().toList().get(0).getEvent());
Assert.assertEquals(ScalingStats.EVENT.PROVISION, provisioner.getStats().toList().get(1).getEvent());

provisionedSomething = provisioner.doProvision();

Assert.assertFalse(provisionedSomething);
Assert.assertTrue(
provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
);
DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp();
Assert.assertTrue(
createdTime.equals(anotherCreatedTime)
);

EasyMock.verify(autoScaler);
EasyMock.verify(runner);
}

@Test
public void testProvisionWithPendingTaskAndWorkerCapacityFallbackSetButEmptyCurrentlyRunningWorkerShouldUseCapcityFromFallbackConfig()
{
PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig()
.setMaxScalingDuration(new Period(1000))
.setNumEventsToTrack(10)
.setPendingTaskTimeout(new Period(0))
.setWorkerVersion(MIN_VERSION)
.setMaxScalingStep(2)
.setWorkerCapacityFallback(30);
strategy = new PendingTaskBasedWorkerProvisioningStrategy(
config,
DSuppliers.of(workerConfig),
new ProvisioningSchedulerConfig(),
new Supplier<ScheduledExecutorService>()
{
@Override
public ScheduledExecutorService get()
{
return executorService;
}
}
);
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(3).times(1);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
.andReturn(new ArrayList<String>()).times(2);
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Collections.singletonList("fake"))
).times(1);
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
// two pending tasks
EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
ImmutableList.of(
NoopTask.create(),
NoopTask.create()
)
).times(2);
// No currently running worker node
EasyMock.expect(runner.getWorkers()).andReturn(
Collections.emptyList()
).times(2);

EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()).times(1);
EasyMock.replay(runner);
EasyMock.replay(autoScaler);

Provisioner provisioner = strategy.makeProvisioner(runner);
boolean provisionedSomething = provisioner.doProvision();

// Expect to use capacity from workerCapacityFallback config (which is 30)
// and since there are two pending tasks, we will need one more worker
Assert.assertTrue(provisionedSomething);
Assert.assertEquals(1, provisioner.getStats().toList().size());
DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp();
Assert.assertEquals(ScalingStats.EVENT.PROVISION, provisioner.getStats().toList().get(0).getEvent());

provisionedSomething = provisioner.doProvision();

Assert.assertFalse(provisionedSomething);
Assert.assertTrue(
provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
);
DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp();
Assert.assertTrue(
createdTime.equals(anotherCreatedTime)
);

EasyMock.verify(autoScaler);
EasyMock.verify(runner);
}

@Test
public void testProvisionAlert() throws Exception
{
Expand Down