Skip to content

Commit

Permalink
gce: concurrent zonal List()s + opportunistic basename fill
Browse files Browse the repository at this point in the history
FetchAllMigs (unfiltered InstanceGroupManagers.List) is costly as it's not
bounded to MIGs attached to the current cluster, but rather lists all MIGs
in the project/zone, and therefore equally affects all clusters in that
project/zone. Running the calls concurrently over the region's zones (so at
most, 4 concurrent API calls, about once per minute) contains that impact.

findMigsInRegion might be scoped to the current cluster (name pattern),
but also benefits from the same improvement, as it's also costly and
called at each refreshInterval (1mn).

Also: we're calling out GCE mig.Get() API again for each MIG (at ~300ms per
API call, in my tests), sequentially and with the global cache lock held
(when updateClusterState -> ...-> GetMigForInstance kicks in). Yet we
already get that bit of information (MIG's basename) from any other
mig.Get or mig.List call, like the one fetching target sizes. Leveraging
this helps significantly on large fleets (for instance this shaves 8mn
startup time on the large cluster I tested on).
  • Loading branch information
bpineau committed May 5, 2021
1 parent 200415e commit 6432c27
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 13 deletions.
15 changes: 12 additions & 3 deletions cluster-autoscaler/cloudprovider/gce/gce_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,11 +474,20 @@ func (m *gceManagerImpl) findMigsInRegion(region string, name *regexp.Regexp) ([
if err != nil {
return nil, err
}
for _, z := range zones {
zl, err := m.GceService.FetchMigsWithName(z, name)

zoneLinks := make([][]string, len(zones))
errors := make([]error, len(zones))
workqueue.ParallelizeUntil(context.Background(), len(zones), len(zones), func(piece int) {
zoneLinks[piece], errors[piece] = m.GceService.FetchMigsWithName(zones[piece], name)
})

for _, err := range errors {
if err != nil {
return nil, err
return nil, fmt.Errorf("%v", errors)
}
}

for _, zl := range zoneLinks {
for _, link := range zl {
links = append(links, link)
}
Expand Down
39 changes: 29 additions & 10 deletions cluster-autoscaler/cloudprovider/gce/mig_target_sizes_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@ limitations under the License.
package gce

import (
"context"
"fmt"
"sync"

gce "google.golang.org/api/compute/v1"
"k8s.io/client-go/util/workqueue"
klog "k8s.io/klog/v2"
)

Expand Down Expand Up @@ -54,7 +58,7 @@ func (c *cachingMigTargetSizesProvider) GetMigTargetSize(migRef GceRef) (int64,
return targetSize, nil
}

newTargetSizes, err := c.fillInMigTargetSizeCache()
newTargetSizes, err := c.fillInMigTargetSizeAndBaseNameCaches()

size, found := newTargetSizes[migRef]
if err != nil || !found {
Expand All @@ -71,20 +75,30 @@ func (c *cachingMigTargetSizesProvider) GetMigTargetSize(migRef GceRef) (int64,
return size, nil
}

func (c *cachingMigTargetSizesProvider) fillInMigTargetSizeCache() (map[GceRef]int64, error) {
zones := c.listAllZonesForMigs()
func (c *cachingMigTargetSizesProvider) fillInMigTargetSizeAndBaseNameCaches() (map[GceRef]int64, error) {
var zones []string
for zone := range c.listAllZonesForMigs() {
zones = append(zones, zone)
}

newMigTargetSizeCache := map[GceRef]int64{}
for zone := range zones {
zoneMigs, err := c.gceClient.FetchAllMigs(zone)
migs := make([][]*gce.InstanceGroupManager, len(zones))
errors := make([]error, len(zones))
workqueue.ParallelizeUntil(context.Background(), len(zones), len(zones), func(piece int) {
migs[piece], errors[piece] = c.gceClient.FetchAllMigs(zones[piece])
})

for idx, err := range errors {
if err != nil {
klog.Errorf("Error listing migs from zone %v; err=%v", zone, err)
return nil, err
klog.Errorf("Error listing migs from zone %v; err=%v", zones[idx], err)
return nil, fmt.Errorf("%v", errors)
}
}

newMigTargetSizeCache := map[GceRef]int64{}
newMigBasenameCache := map[GceRef]string{}
for idx, zone := range zones {
registeredMigRefs := c.getMigRefs()

for _, zoneMig := range zoneMigs {
for _, zoneMig := range migs[idx] {
zoneMigRef := GceRef{
c.projectId,
zone,
Expand All @@ -93,6 +107,7 @@ func (c *cachingMigTargetSizesProvider) fillInMigTargetSizeCache() (map[GceRef]i

if registeredMigRefs[zoneMigRef] {
newMigTargetSizeCache[zoneMigRef] = zoneMig.TargetSize
newMigBasenameCache[zoneMigRef] = zoneMig.BaseInstanceName
}
}
}
Expand All @@ -101,6 +116,10 @@ func (c *cachingMigTargetSizesProvider) fillInMigTargetSizeCache() (map[GceRef]i
c.cache.SetMigTargetSize(migRef, targetSize)
}

for migRef, baseName := range newMigBasenameCache {
c.cache.SetMigBasename(migRef, baseName)
}

return newMigTargetSizeCache, nil
}

Expand Down

0 comments on commit 6432c27

Please sign in to comment.