Skip to content

Commit

Permalink
Merge pull request kubernetes#2117 from tosi3k/fix-deployments
Browse files Browse the repository at this point in the history
Fix WaitForControlledPodsRunning for deletion of deployments
  • Loading branch information
k8s-ci-robot committed Aug 16, 2022
2 parents bdcf75f + 639af9e commit 59ee553
Show file tree
Hide file tree
Showing 2 changed files with 296 additions and 56 deletions.
214 changes: 184 additions & 30 deletions clusterloader2/pkg/measurement/util/controlled_pods_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package util
import (
"context"
"fmt"
"sync"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -39,9 +39,186 @@ const (
type ControlledPodsIndexer struct {
podsIndexer cache.Indexer
podsSynced cache.InformerSynced
rsSynced cache.InformerSynced

rsIndexer cache.Indexer
rsSynced cache.InformerSynced
// deploymentUIDToRsUIDs is a map between a Deployment object's UID and the set of ReplicaSet objects' UIDs
// that are owned by that particular Deployment object.
deploymentUIDToRsUIDs map[types.UID]UIDSet

// rsUIDToState is a map between a ReplicaSet object's UID and its state.
rsUIDToState map[types.UID]*ReplicaSetState

// UIDLock is a lock for accessing rsUIDToState and deploymentUIDToRsUIDs fields in ControlledPodsIndexer.
UIDLock sync.Mutex
}

// ReplicaSetState stores information relevant to a specific ReplicaSet object,
// i.e. how many pods it owns exist, whether the RS object itself exists
// and its latest known owner's UID.
type ReplicaSetState struct {
NumPods int
Exists bool
OwnerUID types.UID
}

// UIDSet is a collection of ReplicaSet objects UIDs.
type UIDSet map[types.UID]bool

// NewControlledPodsIndexer creates a new ControlledPodsIndexer instance.
func NewControlledPodsIndexer(podsInformer coreinformers.PodInformer, rsInformer appsinformers.ReplicaSetInformer) (*ControlledPodsIndexer, error) {
if err := podsInformer.Informer().AddIndexers(cache.Indexers{controllerUIDIndex: controllerUIDIndexFunc}); err != nil {
return nil, fmt.Errorf("failed to register indexer: %w", err)
}

cpi := &ControlledPodsIndexer{
podsIndexer: podsInformer.Informer().GetIndexer(),
podsSynced: podsInformer.Informer().HasSynced,
rsSynced: rsInformer.Informer().HasSynced,
rsUIDToState: make(map[types.UID]*ReplicaSetState),
deploymentUIDToRsUIDs: make(map[types.UID]UIDSet),
}

podsInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cpi.UIDLock.Lock()
defer cpi.UIDLock.Unlock()

ownerUID, ownerKind := getControllerInfo(obj)
if ownerUID == "" || ownerKind != "ReplicaSet" {
return
}

cpi.updatePodRefCountLocked(ownerUID, 1)
},
UpdateFunc: func(oldObj, newObj interface{}) {
cpi.UIDLock.Lock()
defer cpi.UIDLock.Unlock()

oldOwnerUID, oldOwnerKind := getControllerInfo(oldObj)
newOwnerUID, newOwnerKind := getControllerInfo(newObj)
if oldOwnerUID == newOwnerUID {
return
}

if oldOwnerUID != "" && oldOwnerKind == "ReplicaSet" {
cpi.updatePodRefCountLocked(oldOwnerUID, -1)
cpi.clearRSDataIfPossibleLocked(oldOwnerUID)
}
if newOwnerUID != "" && newOwnerKind == "ReplicaSet" {
cpi.updatePodRefCountLocked(newOwnerUID, 1)
}
},
DeleteFunc: func(obj interface{}) {
cpi.UIDLock.Lock()
defer cpi.UIDLock.Unlock()

ownerUID, ownerKind := getControllerInfo(obj)
if ownerUID == "" || ownerKind != "ReplicaSet" {
return
}

cpi.updatePodRefCountLocked(ownerUID, -1)
cpi.clearRSDataIfPossibleLocked(ownerUID)
},
},
)
rsInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cpi.UIDLock.Lock()
defer cpi.UIDLock.Unlock()

cpi.handleIncomingRSAddEventLocked(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
cpi.UIDLock.Lock()
defer cpi.UIDLock.Unlock()

oldDeploymentUID, _ := getControllerInfo(oldObj)
newDeploymentUID, _ := getControllerInfo(newObj)
if oldDeploymentUID == newDeploymentUID {
return
}

if oldDeploymentUID != "" {
cpi.removeDeploymentDataLocked(oldDeploymentUID, getObjUID(oldObj))
}
cpi.handleIncomingRSAddEventLocked(newObj)
},
DeleteFunc: func(obj interface{}) {
cpi.UIDLock.Lock()
defer cpi.UIDLock.Unlock()
rsUID := getObjUID(obj)
cpi.rsUIDToState[rsUID].Exists = false
cpi.clearRSDataIfPossibleLocked(rsUID)
},
},
)

return cpi, nil
}

func getControllerInfo(obj interface{}) (types.UID, string) {
metaAccessor, err := meta.Accessor(obj)
if err != nil {
return "", ""
}
controller := metav1.GetControllerOf(metaAccessor)
if controller == nil {
return "", ""
}
return controller.UID, controller.Kind
}

func getObjUID(obj interface{}) types.UID {
metaAccessor, err := meta.Accessor(obj)
if err != nil {
return ""
}
return metaAccessor.GetUID()
}

func (p *ControlledPodsIndexer) clearRSDataIfPossibleLocked(rsUID types.UID) {
state := p.rsUIDToState[rsUID]
if state != nil && !state.Exists && state.NumPods == 0 {
delete(p.rsUIDToState, rsUID)
ownerUID := state.OwnerUID
if _, ok := p.deploymentUIDToRsUIDs[ownerUID]; ok {
p.removeDeploymentDataLocked(ownerUID, rsUID)
}
}
}

func (p *ControlledPodsIndexer) removeDeploymentDataLocked(ownerUID, rsUID types.UID) {
delete(p.deploymentUIDToRsUIDs[ownerUID], rsUID)
if len(p.deploymentUIDToRsUIDs[ownerUID]) == 0 {
delete(p.deploymentUIDToRsUIDs, ownerUID)
}
}

func (p *ControlledPodsIndexer) updatePodRefCountLocked(rsUID types.UID, diff int) {
if _, ok := p.rsUIDToState[rsUID]; !ok {
p.rsUIDToState[rsUID] = &ReplicaSetState{}
}
p.rsUIDToState[rsUID].NumPods += diff
}

func (p *ControlledPodsIndexer) handleIncomingRSAddEventLocked(obj interface{}) {
ownerUID, _ := getControllerInfo(obj)
rsUID := getObjUID(obj)
if _, ok := p.rsUIDToState[rsUID]; !ok {
p.rsUIDToState[rsUID] = &ReplicaSetState{}
}
p.rsUIDToState[rsUID].Exists = true
p.rsUIDToState[rsUID].OwnerUID = ownerUID
if ownerUID == "" {
return
}
if _, ok := p.deploymentUIDToRsUIDs[ownerUID]; !ok {
p.deploymentUIDToRsUIDs[ownerUID] = UIDSet{}
}
p.deploymentUIDToRsUIDs[ownerUID][rsUID] = true
}

func controllerUIDIndexFunc(obj interface{}) ([]string, error) {
Expand All @@ -56,23 +233,6 @@ func controllerUIDIndexFunc(obj interface{}) ([]string, error) {
return []string{string(controllerRef.UID)}, nil
}

// NewControlledPodsIndexer creates a new ControlledPodsIndexer instance.
func NewControlledPodsIndexer(podsInformer coreinformers.PodInformer, rsInformer appsinformers.ReplicaSetInformer) (*ControlledPodsIndexer, error) {
if err := podsInformer.Informer().AddIndexers(cache.Indexers{controllerUIDIndex: controllerUIDIndexFunc}); err != nil {
return nil, fmt.Errorf("failed to register indexer: %w", err)
}
if err := rsInformer.Informer().AddIndexers(cache.Indexers{controllerUIDIndex: controllerUIDIndexFunc}); err != nil {
return nil, fmt.Errorf("failed to register indexer: %w", err)
}

return &ControlledPodsIndexer{
podsIndexer: podsInformer.Informer().GetIndexer(),
podsSynced: podsInformer.Informer().HasSynced,
rsIndexer: rsInformer.Informer().GetIndexer(),
rsSynced: rsInformer.Informer().HasSynced,
}, nil
}

// WaitForCacheSync waits for all required informers to be initialized.
func (p *ControlledPodsIndexer) WaitForCacheSync(ctx context.Context) bool {
return cache.WaitForNamedCacheSync("PodsIndexer", ctx.Done(), p.podsSynced, p.rsSynced)
Expand All @@ -92,17 +252,11 @@ func (p *ControlledPodsIndexer) PodsControlledBy(obj interface{}) ([]*corev1.Pod
var podOwners []types.UID
switch typeAccessor.GetKind() {
case "Deployment":
replicaSets, err := p.rsIndexer.ByIndex(controllerUIDIndex, string(metaAccessor.GetUID()))
if err != nil {
return nil, fmt.Errorf("failed to get replicasets controlled by %v: %w", metaAccessor.GetUID(), err)
}
for _, replicaSet := range replicaSets {
replicaSet, ok := replicaSet.(*appsv1.ReplicaSet)
if !ok {
return nil, fmt.Errorf("expected *appsv1.ReplicaSet; got: %T", replicaSet)
}
podOwners = append(podOwners, replicaSet.GetUID())
p.UIDLock.Lock()
for k := range p.deploymentUIDToRsUIDs[metaAccessor.GetUID()] {
podOwners = append(podOwners, k)
}
p.UIDLock.Unlock()
default:
podOwners = append(podOwners, metaAccessor.GetUID())
}
Expand Down
Loading

0 comments on commit 59ee553

Please sign in to comment.