Skip to content

Commit

Permalink
Add tracking for async deletions in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
domenicbozzuto committed Jul 10, 2024
1 parent 5cdfbf6 commit d5b6d30
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ func TestGetFilteredAutoscalingGroupsVmss(t *testing.T) {
curSize: 3,
sizeRefreshPeriod: manager.azureCache.refreshInterval,
instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod,
vmDeletionsInProgress: make(map[string]struct{}),
}}
assert.True(t, assert.ObjectsAreEqualValues(expectedAsgs, asgs), "expected %#v, but found: %#v", expectedAsgs, asgs)
}
Expand Down
45 changes: 39 additions & 6 deletions cluster-autoscaler/cloudprovider/azure/azure_scale_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type ScaleSet struct {
instanceMutex sync.Mutex
instanceCache []cloudprovider.Instance
lastInstanceRefresh time.Time

vmDeletionsInProgressMutex sync.Mutex
vmDeletionsInProgress map[string]struct{}
}

// NewScaleSet creates a new NewScaleSet.
Expand All @@ -78,6 +81,7 @@ func NewScaleSet(spec *dynamic.NodeGroupSpec, az *AzureManager, curSize int64) (
sizeRefreshPeriod: az.azureCache.refreshInterval,
enableDynamicInstanceList: az.config.EnableDynamicInstanceList,
instancesRefreshJitter: az.config.VmssVmsCacheJitter,
vmDeletionsInProgress: make(map[string]struct{}),
}

if az.config.VmssVmsCacheTTL != 0 {
Expand Down Expand Up @@ -177,18 +181,22 @@ func (scaleSet *ScaleSet) GetScaleSetSize() (int64, error) {
return scaleSet.getCurSize()
}

func (scaleSet *ScaleSet) waitForDeleteInstances(future *azure.Future, requiredIds *compute.VirtualMachineScaleSetVMInstanceRequiredIDs) {
// func (scaleSet *ScaleSet) waitForDeleteInstances(future *azure.Future, requiredIds *compute.VirtualMachineScaleSetVMInstanceRequiredIDs) {
func (scaleSet *ScaleSet) waitForDeleteInstances(future *azure.Future, instances []*azureRef) {
ctx, cancel := getContextWithCancel()
defer cancel()

klog.V(3).Infof("Calling virtualMachineScaleSetsClient.WaitForDeleteInstancesResult(%v) for %s", requiredIds.InstanceIds, scaleSet.Name)
klog.V(3).Infof("Calling virtualMachineScaleSetsClient.WaitForDeleteInstancesResult(%v) for %s", instances, scaleSet.Name)
httpResponse, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.WaitForDeleteInstancesResult(ctx, future, scaleSet.manager.config.ResourceGroup)
for _, instance := range instances {
scaleSet.trackDeletionAttemptCompleted(instance.Name)
}
isSuccess, err := isSuccessHTTPResponse(httpResponse, err)
if isSuccess {
klog.V(3).Infof("virtualMachineScaleSetsClient.WaitForDeleteInstancesResult(%v) for %s success", requiredIds.InstanceIds, scaleSet.Name)
klog.V(3).Infof("virtualMachineScaleSetsClient.WaitForDeleteInstancesResult(%v) for %s success", instances, scaleSet.Name)
return
}
klog.Errorf("virtualMachineScaleSetsClient.WaitForDeleteInstancesResult - DeleteInstances for instances %v for %s failed with error: %v", requiredIds.InstanceIds, scaleSet.Name, err)
klog.Errorf("virtualMachineScaleSetsClient.WaitForDeleteInstancesResult - DeleteInstances for instances %v for %s failed with error: %v", instances, scaleSet.Name, err)
}

// updateVMSSCapacity invokes virtualMachineScaleSetsClient to update the capacity for VMSS.
Expand Down Expand Up @@ -412,11 +420,16 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef, hasUnregistered
return fmt.Errorf("cannot delete instance (%s) which don't belong to the same Scale Set (%q)", instance.Name, commonAsg)
}

// Check both if the instance state was reported as deleting (from a previous deletion attempt) and if
// there's an in-flight deletion request for the instance
if cpi, found := scaleSet.getInstanceByProviderID(instance.Name); found && cpi.Status != nil && cpi.Status.State == cloudprovider.InstanceDeleting {
klog.V(3).Infof("Skipping deleting instance %s as its current state is deleting", instance.Name)
continue
}
instancesToDelete = append(instancesToDelete, instance)
if scaleSet.isDeletionAttemptInProgress(instance.Name) {
klog.V(3).Infof("Skipping deleting instance %s which already has an in-flight deletion request", instance.Name)
continue
}
}

// nothing to delete
Expand Down Expand Up @@ -467,9 +480,10 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef, hasUnregistered
// Proactively set the status of the instances to be deleted in cache
for _, instance := range instancesToDelete {
scaleSet.setInstanceStatusByProviderID(instance.Name, cloudprovider.InstanceStatus{State: cloudprovider.InstanceDeleting})
scaleSet.trackDeletionAttemptStarted(instance.Name)
}

go scaleSet.waitForDeleteInstances(future, requiredIds)
go scaleSet.waitForDeleteInstances(future, instancesToDelete)

return nil
}
Expand Down Expand Up @@ -755,3 +769,22 @@ func (scaleSet *ScaleSet) getOrchestrationMode() (compute.OrchestrationMode, err
}
return vmss.OrchestrationMode, nil
}

func (scaleSet *ScaleSet) trackDeletionAttemptStarted(instanceId string) {
scaleSet.vmDeletionsInProgressMutex.Lock()
defer scaleSet.vmDeletionsInProgressMutex.Unlock()
scaleSet.vmDeletionsInProgress[instanceId] = struct{}{}
}

func (scaleSet *ScaleSet) trackDeletionAttemptCompleted(instanceId string) {
scaleSet.vmDeletionsInProgressMutex.Lock()
defer scaleSet.vmDeletionsInProgressMutex.Unlock()
delete(scaleSet.vmDeletionsInProgress, instanceId)
}

func (scaleSet *ScaleSet) isDeletionAttemptInProgress(instanceId string) bool {
scaleSet.vmDeletionsInProgressMutex.Lock()
defer scaleSet.vmDeletionsInProgressMutex.Unlock()
_, exists := scaleSet.vmDeletionsInProgress[instanceId]
return exists
}

0 comments on commit d5b6d30

Please sign in to comment.