Skip to content

Commit

Permalink
Merge pull request kubernetes#638 from krzysied/cluster_loader_extrac…
Browse files Browse the repository at this point in the history
…t_wait_for_pods

ClusterLoader - Extracting wait for pods method
  • Loading branch information
k8s-ci-robot committed Jul 15, 2019
2 parents cba7882 + f6d45a1 commit 4e2c449
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/perf-tests/clusterloader2/pkg/errors"
"k8s.io/perf-tests/clusterloader2/pkg/framework"
"k8s.io/perf-tests/clusterloader2/pkg/measurement"
measurementutil "k8s.io/perf-tests/clusterloader2/pkg/measurement/util"
"k8s.io/perf-tests/clusterloader2/pkg/measurement/util/informer"
"k8s.io/perf-tests/clusterloader2/pkg/measurement/util/runtimeobjects"
"k8s.io/perf-tests/clusterloader2/pkg/measurement/util/workerqueue"
Expand Down Expand Up @@ -436,9 +437,18 @@ func (w *waitForControlledPodsRunningMeasurement) waitForRuntimeObject(obj runti
o.lock.Lock()
defer o.lock.Unlock()
w.handlingGroup.Start(func() {
options := &measurementutil.WaitForPodOptions{
Namespace: runtimeObjectNamespace,
LabelSelector: runtimeObjectSelector.String(),
FieldSelector: "",
DesiredPodCount: int(runtimeObjectReplicas),
EnableLogging: true,
CallerName: w.String(),
WaitForPodsInterval: defaultWaitForPodsInterval,
}
// This function sets the status (and error message) for the object checker.
// The handling of bad statuses and errors is done by gather() function of the measurement.
err = waitForPods(w.clusterFramework.GetClientSets().GetClient(), runtimeObjectNamespace, runtimeObjectSelector.String(), "", int(runtimeObjectReplicas), o.stopCh, true, w.String())
err = measurementutil.WaitForPods(w.clusterFramework.GetClientSets().GetClient(), o.stopCh, options)
o.lock.Lock()
defer o.lock.Unlock()
if err != nil {
Expand Down
79 changes: 11 additions & 68 deletions clusterloader2/pkg/measurement/common/simple/wait_for_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,9 @@ limitations under the License.
package simple

import (
"fmt"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog"
"k8s.io/perf-tests/clusterloader2/pkg/measurement"
measurementutil "k8s.io/perf-tests/clusterloader2/pkg/measurement/util"
Expand Down Expand Up @@ -52,7 +47,7 @@ type waitForRunningPodsMeasurement struct{}
// Execute waits until desired number of pods are running or until timeout happens.
// Pods can be specified by field and/or label selectors.
// If namespace is not passed by parameter, all-namespace scope is assumed.
func (*waitForRunningPodsMeasurement) Execute(config *measurement.MeasurementConfig) ([]measurement.Summary, error) {
func (w *waitForRunningPodsMeasurement) Execute(config *measurement.MeasurementConfig) ([]measurement.Summary, error) {
desiredPodCount, err := util.GetInt(config.Params, "desiredPodCount")
if err != nil {
return nil, err
Expand All @@ -78,7 +73,16 @@ func (*waitForRunningPodsMeasurement) Execute(config *measurement.MeasurementCon
time.AfterFunc(timeout, func() {
close(stopCh)
})
return nil, waitForPods(config.ClusterFramework.GetClientSets().GetClient(), namespace, labelSelector, fieldSelector, desiredPodCount, stopCh, true, waitForRunningPodsMeasurementName)
options := &measurementutil.WaitForPodOptions{
Namespace: namespace,
LabelSelector: labelSelector,
FieldSelector: fieldSelector,
DesiredPodCount: desiredPodCount,
EnableLogging: true,
CallerName: w.String(),
WaitForPodsInterval: defaultWaitForPodsInterval,
}
return nil, measurementutil.WaitForPods(config.ClusterFramework.GetClientSets().GetClient(), stopCh, options)
}

// Dispose cleans up after the measurement.
Expand All @@ -88,64 +92,3 @@ func (*waitForRunningPodsMeasurement) Dispose() {}
func (*waitForRunningPodsMeasurement) String() string {
return waitForRunningPodsMeasurementName
}

const (
uninitialized = iota
up
down
none
)

func waitForPods(clientSet clientset.Interface, namespace, labelSelector, fieldSelector string, desiredPodCount int, stopCh <-chan struct{}, log bool, callerName string) error {
// TODO(#269): Change to shared podStore.
ps, err := measurementutil.NewPodStore(clientSet, namespace, labelSelector, fieldSelector)
if err != nil {
return fmt.Errorf("pod store creation error: %v", err)
}
defer ps.Stop()

var podsStatus measurementutil.PodsStartupStatus
selectorsString := measurementutil.CreateSelectorsString(namespace, labelSelector, fieldSelector)
scaling := uninitialized
var oldPods []*corev1.Pod
for {
select {
case <-stopCh:
return fmt.Errorf("timeout while waiting for %d pods to be running in namespace '%v' with labels '%v' and fields '%v' - only %d found running", desiredPodCount, namespace, labelSelector, fieldSelector, podsStatus.Running)
case <-time.After(defaultWaitForPodsInterval):
pods := ps.List()
podsStatus = measurementutil.ComputePodsStartupStatus(pods, desiredPodCount)
if scaling != uninitialized {
diff := measurementutil.DiffPods(oldPods, pods)
deletedPods := diff.DeletedPods()
if scaling != down && len(deletedPods) > 0 {
klog.Errorf("%s: %s: %d pods disappeared: %v", callerName, selectorsString, len(deletedPods), strings.Join(deletedPods, ", "))
klog.Infof("%s: %v", callerName, diff.String(sets.NewString()))
}
addedPods := diff.AddedPods()
if scaling != up && len(addedPods) > 0 {
klog.Errorf("%s: %s: %d pods appeared: %v", callerName, selectorsString, len(deletedPods), strings.Join(deletedPods, ", "))
klog.Infof("%s: %v", callerName, diff.String(sets.NewString()))
}
} else {
switch {
case len(pods) == desiredPodCount:
scaling = none
case len(pods) < desiredPodCount:
scaling = up
case len(pods) > desiredPodCount:
scaling = down
}
}
if log {
klog.Infof("%s: %s: %s", callerName, selectorsString, podsStatus.String())
}
// We allow inactive pods (e.g. eviction happened).
// We wait until there is a desired number of pods running and all other pods are inactive.
if len(pods) == (podsStatus.Running+podsStatus.Inactive) && podsStatus.Running == desiredPodCount {
return nil
}
oldPods = pods
}
}
}
104 changes: 104 additions & 0 deletions clusterloader2/pkg/measurement/util/wait_for_pods.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package util

import (
"fmt"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog"
)

const (
uninitialized = iota
up
down
none
)

// WaitForPodOptions is an options used by WaitForPods methods.
type WaitForPodOptions struct {
Namespace string
LabelSelector string
FieldSelector string
DesiredPodCount int
EnableLogging bool
CallerName string
WaitForPodsInterval time.Duration
}

// WaitForPods waits till disire nuber of pods is running.
// Pods are be specified by namespace, field and/or label selectors.
// If stopCh is closed before all pods are running, the error will be returned.
func WaitForPods(clientSet clientset.Interface, stopCh <-chan struct{}, options *WaitForPodOptions) error {
// TODO(#269): Change to shared podStore.
ps, err := NewPodStore(clientSet, options.Namespace, options.LabelSelector, options.FieldSelector)
if err != nil {
return fmt.Errorf("pod store creation error: %v", err)
}
defer ps.Stop()

var podsStatus PodsStartupStatus
selectorsString := CreateSelectorsString(options.Namespace, options.LabelSelector, options.FieldSelector)
scaling := uninitialized
var oldPods []*corev1.Pod
for {
select {
case <-stopCh:
return fmt.Errorf("timeout while waiting for %d pods to be running in namespace '%v' with labels '%v' and fields '%v' - only %d found running",
options.DesiredPodCount, options.Namespace, options.LabelSelector, options.FieldSelector, podsStatus.Running)
case <-time.After(options.WaitForPodsInterval):
pods := ps.List()
podsStatus = ComputePodsStartupStatus(pods, options.DesiredPodCount)
if scaling != uninitialized {
diff := DiffPods(oldPods, pods)
deletedPods := diff.DeletedPods()
if scaling != down && len(deletedPods) > 0 {
klog.Errorf("%s: %s: %d pods disappeared: %v", options.CallerName, selectorsString, len(deletedPods), strings.Join(deletedPods, ", "))
klog.Infof("%s: %v", options.CallerName, diff.String(sets.NewString()))
}
addedPods := diff.AddedPods()
if scaling != up && len(addedPods) > 0 {
klog.Errorf("%s: %s: %d pods appeared: %v", options.CallerName, selectorsString, len(deletedPods), strings.Join(deletedPods, ", "))
klog.Infof("%s: %v", options.CallerName, diff.String(sets.NewString()))
}
} else {
switch {
case len(pods) == options.DesiredPodCount:
scaling = none
case len(pods) < options.DesiredPodCount:
scaling = up
case len(pods) > options.DesiredPodCount:
scaling = down
}
}
if options.EnableLogging {
klog.Infof("%s: %s: %s", options.CallerName, selectorsString, podsStatus.String())
}
// We allow inactive pods (e.g. eviction happened).
// We wait until there is a desired number of pods running and all other pods are inactive.
if len(pods) == (podsStatus.Running+podsStatus.Inactive) && podsStatus.Running == options.DesiredPodCount {
return nil
}
oldPods = pods
}
}
}

0 comments on commit 4e2c449

Please sign in to comment.