Skip to content

Commit

Permalink
Add update checking to WaitForControlledPods
Browse files Browse the repository at this point in the history
  • Loading branch information
mborsz committed Feb 7, 2020
1 parent a3b1243 commit 7caf90f
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *schedulingThroughputMeasurement) start(clientSet clientset.Interface, s
return
case <-time.After(defaultWaitForPodsInterval):
pods := ps.List()
podsStatus := measurementutil.ComputePodsStartupStatus(pods, 0)
podsStatus := measurementutil.ComputePodsStartupStatus(pods, 0, nil /* updatePodPredicate */)
throughput := float64(podsStatus.Scheduled-lastScheduledCount) / float64(defaultWaitForPodsInterval/time.Second)
s.schedulingThroughputs = append(s.schedulingThroughputs, throughput)
lastScheduledCount = podsStatus.Scheduled
Expand Down
41 changes: 28 additions & 13 deletions clusterloader2/pkg/measurement/common/wait_for_controlled_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"time"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -63,19 +64,20 @@ func createWaitForControlledPodsRunningMeasurement() measurement.Measurement {
}

type waitForControlledPodsRunningMeasurement struct {
apiVersion string
kind string
selector *measurementutil.ObjectSelector
operationTimeout time.Duration
stopCh chan struct{}
isRunning bool
queue workerqueue.Interface
handlingGroup wait.Group
lock sync.Mutex
opResourceVersion uint64
gvr schema.GroupVersionResource
checkerMap checker.CheckerMap
clusterFramework *framework.Framework
apiVersion string
kind string
selector *measurementutil.ObjectSelector
operationTimeout time.Duration
stopCh chan struct{}
isRunning bool
queue workerqueue.Interface
handlingGroup wait.Group
lock sync.Mutex
opResourceVersion uint64
gvr schema.GroupVersionResource
checkerMap checker.CheckerMap
clusterFramework *framework.Framework
checkIfPodsAreUpdated bool
}

// Execute waits until all specified controlling objects have all pods running or until timeout happens.
Expand Down Expand Up @@ -108,6 +110,11 @@ func (w *waitForControlledPodsRunningMeasurement) Execute(config *measurement.Me
if err != nil {
return nil, err
}
// TODO(mborsz): Change default to true.
w.checkIfPodsAreUpdated, err = util.GetBoolOrDefault(config.Params, "checkIfPodsAreUpdated", false)
if err != nil {
return nil, err
}
return nil, w.start()
case "gather":
syncTimeout, err := util.GetDurationOrDefault(config.Params, "syncTimeout", defaultSyncTimeout)
Expand Down Expand Up @@ -400,6 +407,13 @@ func (w *waitForControlledPodsRunningMeasurement) waitForRuntimeObject(obj runti
if err != nil {
return nil, err
}
var isPodUpdated func(*v1.Pod) bool
if w.checkIfPodsAreUpdated {
isPodUpdated, err = runtimeobjects.GetIsPodUpdatedPredicateFromRuntimeObject(obj)
if err != nil {
return nil, err
}
}
if isDeleted {
runtimeObjectReplicas = 0
}
Expand All @@ -422,6 +436,7 @@ func (w *waitForControlledPodsRunningMeasurement) waitForRuntimeObject(obj runti
EnableLogging: true,
CallerName: w.String(),
WaitForPodsInterval: defaultWaitForPodsInterval,
IsPodUpdated: isPodUpdated,
}
// This function sets the status (and error message) for the object checker.
// The handling of bad statuses and errors is done by gather() function of the measurement.
Expand Down
10 changes: 7 additions & 3 deletions clusterloader2/pkg/measurement/util/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,17 @@ type PodsStartupStatus struct {
Unknown int
Inactive int
Created int
RunningUpdated int
}

// String returns string representation for podsStartupStatus.
func (s *PodsStartupStatus) String() string {
return fmt.Sprintf("Pods: %d out of %d created, %d running, %d pending scheduled, %d not scheduled, %d inactive, %d terminating, %d unknown, %d runningButNotReady ",
s.Created, s.Expected, s.Running, s.Pending, s.Waiting, s.Inactive, s.Terminating, s.Unknown, s.RunningButNotReady)
return fmt.Sprintf("Pods: %d out of %d created, %d running (%d updated), %d pending scheduled, %d not scheduled, %d inactive, %d terminating, %d unknown, %d runningButNotReady ",
s.Created, s.Expected, s.Running, s.RunningUpdated, s.Pending, s.Waiting, s.Inactive, s.Terminating, s.Unknown, s.RunningButNotReady)
}

// ComputePodsStartupStatus computes PodsStartupStatus for a group of pods.
func ComputePodsStartupStatus(pods []*corev1.Pod, expected int) PodsStartupStatus {
func ComputePodsStartupStatus(pods []*corev1.Pod, expected int, isPodUpdated func(*corev1.Pod) bool) PodsStartupStatus {
startupStatus := PodsStartupStatus{
Expected: expected,
}
Expand All @@ -69,6 +70,9 @@ func ComputePodsStartupStatus(pods []*corev1.Pod, expected int) PodsStartupStatu
if ready {
// Only count a pod is running when it is also ready.
startupStatus.Running++
if isPodUpdated == nil || isPodUpdated(p) {
startupStatus.RunningUpdated++
}
} else {
startupStatus.RunningButNotReady++
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strconv"

goerrors "github.com/go-errors/errors"
appsv1 "k8s.io/api/apps/v1"
batch "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -235,6 +236,35 @@ func getSelectorFromUnstrutured(obj *unstructured.Unstructured) (labels.Selector
}
}

// GetIsPodUpdatedPredicateFromRuntimeObject returns a func(*corev1.Pod) bool predicate
// that can be used to check if given pod represents the desired state of pod.
func GetIsPodUpdatedPredicateFromRuntimeObject(obj runtime.Object) (func(*corev1.Pod) bool, error) {
switch typed := obj.(type) {
case *unstructured.Unstructured:
return getIsPodUpdatedPodPredicateFromUnstructured(typed)
default:
return nil, goerrors.Errorf("unsupported kind when getting updated pod predicate: %v", obj)
}
}

func getIsPodUpdatedPodPredicateFromUnstructured(obj *unstructured.Unstructured) (func(_ *corev1.Pod) bool, error) {
templateMap, ok, err := unstructured.NestedMap(obj.UnstructuredContent(), "spec", "template")
if err != nil {
return nil, goerrors.Errorf("failed to get pod template: %v", err)
}
if !ok {
return nil, goerrors.Errorf("spec.template is not set in object %v", obj.UnstructuredContent())
}
template := corev1.PodTemplateSpec{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(templateMap, &template); err != nil {
return nil, goerrors.Errorf("failed to parse spec.teemplate as v1.PodTemplateSpec")
}

return func(pod *corev1.Pod) bool {
return equality.Semantic.DeepDerivative(template.Spec, pod.Spec)
}, nil
}

// GetSpecFromRuntimeObject returns spec of given runtime object.
func GetSpecFromRuntimeObject(obj runtime.Object) (interface{}, error) {
if obj == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
var (
simpleLabel = map[string]string{"foo": "bar"}
affinityLabel = map[string]string{"foo": "bar", "affinity": "true"}
image = "gcr.io/some-project/some-image"
)

var node1 = corev1.Node{
Expand Down Expand Up @@ -177,17 +178,80 @@ var job = &batch.Job{
},
}

// pod is a sample pod that can be created for replicationcontroller,
// replicaset, deployment, job (NOT daemonset).
var pod = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: controllerName + "-abcd",
Namespace: testNamespace,
ResourceVersion: defaultResourceVersion,
},
Spec: alterPodSpec(resourcePodSpec("", "50M", "0.5", nil, nil)),
}

var daemonsetPod = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: controllerName + "-abcd",
Namespace: testNamespace,
ResourceVersion: defaultResourceVersion,
},
Spec: alterPodSpec(resourcePodSpec("", "50M", "0.5", simpleLabel, affinity)),
}

func resourcePodSpec(nodeName, memory, cpu string, nodeSelector map[string]string, affinity *v1.Affinity) v1.PodSpec {
return v1.PodSpec{
NodeName: nodeName,
Containers: []v1.Container{{
Resources: v1.ResourceRequirements{
Requests: allocatableResources(memory, cpu),
},
Image: image,
Env: []v1.EnvVar{
{
Name: "env1",
Value: "val1",
},
},
}},
NodeSelector: nodeSelector,
Affinity: affinity,
Tolerations: []v1.Toleration{
{
Key: "default-toleration",
Value: "default-value",
Effect: v1.TaintEffectNoSchedule,
},
},
}
}

// alterPodSpec changees podSpec to simulate possible differences between template and final pod.
func alterPodSpec(in v1.PodSpec) v1.PodSpec {
out := in.DeepCopy()
// append some tolerations
out.Tolerations = append(out.Tolerations, v1.Toleration{
Key: "test",
Value: "value",
Effect: v1.TaintEffectNoExecute,
})
// set some defaults
i := int64(30)
out.TerminationGracePeriodSeconds = &i
out.ActiveDeadlineSeconds = &i

// Simulate schedule
if out.NodeName == "" {
out.NodeName = node1.Name
}

// Copy resources
for i := range out.Containers {
c := &out.Containers[i]
if c.Resources.Requests == nil {
c.Resources.Requests = c.Resources.Limits.DeepCopy()
}
}
return *out
}

func allocatableResources(memory, cpu string) v1.ResourceList {
Expand Down Expand Up @@ -337,6 +401,109 @@ func TestGetSpecFromRuntimeObject(t *testing.T) {
}
}

func changeImage(in *v1.Pod) *v1.Pod {
out := in.DeepCopy()

for i := range out.Spec.Containers {
c := &out.Spec.Containers[i]
c.Image = c.Image + "-diff"
}

return out
}

func changeEnv(in *v1.Pod) *v1.Pod {
out := in.DeepCopy()

for i := range out.Spec.Containers {
c := &out.Spec.Containers[i]
for j := range c.Env {
e := &c.Env[j]
e.Value = e.Value + "-diff"
}
}

return out
}

func TestGetIsPodUpdatedPredicateFromRuntimeObject(t *testing.T) {
testCases := []struct {
name string
obj runtime.Object
pod *corev1.Pod
wantErr bool
want bool
}{
{
name: "deployment, positive",
obj: deployment,
pod: pod,
want: true,
},
{
name: "deployment, different env",
obj: deployment,
pod: changeEnv(pod),
want: false,
},
{
name: "deployment, different image",
obj: deployment,
pod: changeImage(pod),
want: false,
},
{
name: "replicaset, positive",
obj: replicaset,
pod: pod,
want: true,
},
{
name: "replicationcontroller, positive",
obj: replicationcontroller,
pod: pod,
want: true,
},
{
name: "daemonset, positive",
obj: daemonset,
pod: daemonsetPod,
want: true,
},
{
name: "job, positive",
obj: job,
pod: pod,
want: true,
},
{
name: "no spec.template",
obj: pod, // pod has no spec.template field.
pod: pod,
wantErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
unstructured := &unstructured.Unstructured{}
if err := scheme.Scheme.Convert(tc.obj, unstructured, nil); err != nil {
t.Fatalf("error converting controller to unstructured: %v", err)
}
pred, err := runtimeobjects.GetIsPodUpdatedPredicateFromRuntimeObject(unstructured)
if (err != nil) != tc.wantErr {
t.Errorf("unexpected error; want: %v; got %v", tc.wantErr, err)
}
if err != nil {
return
}
if got := pred(tc.pod); got != tc.want {
t.Errorf("pred(tc.pod) = %v; want %v", got, tc.want)
}
})
}
}

func TestGetReplicasFromRuntimeObject(t *testing.T) {
objects := []runtime.Object{
replicationcontroller,
Expand Down
9 changes: 7 additions & 2 deletions clusterloader2/pkg/measurement/util/wait_for_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"time"

"k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog"
)
Expand All @@ -39,6 +40,10 @@ type WaitForPodOptions struct {
EnableLogging bool
CallerName string
WaitForPodsInterval time.Duration

// IsPodUpdated can be used to detect which pods have been already updated.
// nil value means all pods are updated.
IsPodUpdated func(*v1.Pod) bool
}

// WaitForPods waits till desired number of pods is running.
Expand Down Expand Up @@ -72,7 +77,7 @@ func WaitForPods(clientSet clientset.Interface, stopCh <-chan struct{}, options
options.DesiredPodCount, options.Selector.Namespace, options.Selector.LabelSelector, options.Selector.FieldSelector, podsStatus.Running)
case <-time.After(options.WaitForPodsInterval):
pods := ps.List()
podsStatus = ComputePodsStartupStatus(pods, options.DesiredPodCount)
podsStatus = ComputePodsStartupStatus(pods, options.DesiredPodCount, options.IsPodUpdated)

diff := DiffPods(oldPods, pods)
deletedPods := diff.DeletedPods()
Expand All @@ -88,7 +93,7 @@ func WaitForPods(clientSet clientset.Interface, stopCh <-chan struct{}, options
}
// We allow inactive pods (e.g. eviction happened).
// We wait until there is a desired number of pods running and all other pods are inactive.
if len(pods) == (podsStatus.Running+podsStatus.Inactive) && podsStatus.Running == options.DesiredPodCount {
if len(pods) == (podsStatus.Running+podsStatus.Inactive) && podsStatus.Running == podsStatus.RunningUpdated && podsStatus.RunningUpdated == options.DesiredPodCount {
return nil
}
oldPods = pods
Expand Down

0 comments on commit 7caf90f

Please sign in to comment.