Skip to content

Commit

Permalink
Implement WaitForNodes measurement
Browse files Browse the repository at this point in the history
  • Loading branch information
mm4tt committed Jan 3, 2020
1 parent 59089be commit df1290e
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 0 deletions.
95 changes: 95 additions & 0 deletions clusterloader2/pkg/measurement/common/wait_for_nodes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package common

import (
"time"

"k8s.io/klog"
"k8s.io/perf-tests/clusterloader2/pkg/measurement"
measurementutil "k8s.io/perf-tests/clusterloader2/pkg/measurement/util"
"k8s.io/perf-tests/clusterloader2/pkg/util"
)

const (
defaultWaitForNodesTimeout = 30 * time.Minute
defaultWaitForNodesInterval = 30 * time.Second
waitForNodesMeasurementName = "WaitForNodes"
)

func init() {
if err := measurement.Register(waitForNodesMeasurementName, createWaitForNodesMeasurement); err != nil {
klog.Fatalf("Cannot register %s: %v", waitForNodesMeasurementName, err)
}
}

func createWaitForNodesMeasurement() measurement.Measurement {
return &waitForNodesMeasurement{}
}

type waitForNodesMeasurement struct{}

// Execute waits until desired number of Nodes are ready or until a
// timeout happens. Nodes can be optionally specified by field and/or label
// selectors.
func (w *waitForNodesMeasurement) Execute(config *measurement.MeasurementConfig) ([]measurement.Summary, error) {
minNodeCount, maxNodeCount, err := getMinMaxDesiredNodeCount(config.Params)
if err != nil {
return nil, err
}

selector := measurementutil.NewObjectSelector()
if err := selector.Parse(config.Params); err != nil {
return nil, err
}

timeout, err := util.GetDurationOrDefault(config.Params, "timeout", defaultWaitForNodesTimeout)
if err != nil {
return nil, err
}
stopCh := make(chan struct{})
time.AfterFunc(timeout, func() {
close(stopCh)
})

options := &measurementutil.WaitForNodeOptions{
Selector: selector,
MinDesiredNodeCount: minNodeCount,
MaxDesiredNodeCount: maxNodeCount,
EnableLogging: true,
CallerName: w.String(),
WaitForNodesInterval: defaultWaitForNodesInterval,
}
return nil, measurementutil.WaitForNodes(config.ClusterFramework.GetClientSets().GetClient(), stopCh, options)
}

// Dispose cleans up after the measurement.
func (*waitForNodesMeasurement) Dispose() {}

// String returns a string representation of the measurement.
func (*waitForNodesMeasurement) String() string {
return waitForNodesMeasurementName
}

func getMinMaxDesiredNodeCount(params map[string]interface{}) (minDesiredNodeCount, maxDesiredNodeCount int, err error) {
minDesiredNodeCount, err = util.GetInt(params, "minDesiredNodeCount")
if err != nil {
return
}
maxDesiredNodeCount, err = util.GetInt(params, "maxDesiredNodeCount")
return
}
36 changes: 36 additions & 0 deletions clusterloader2/pkg/measurement/util/object_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,39 @@ func (s *PVStore) List() []*v1.PersistentVolume {
}
return pvs
}

// NodeStore is a convenient wrapper around cache.Store.
type NodeStore struct {
*ObjectStore
}

// NewNodeStore creates NodeStore based on a given object selector.
func NewNodeStore(c clientset.Interface, selector *ObjectSelector) (*NodeStore, error) {
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = selector.LabelSelector
options.FieldSelector = selector.FieldSelector
return c.CoreV1().Nodes().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = selector.LabelSelector
options.FieldSelector = selector.FieldSelector
return c.CoreV1().Nodes().Watch(options)
},
}
objectStore, err := newObjectStore(&v1.Node{}, lw, selector)
if err != nil {
return nil, err
}
return &NodeStore{ObjectStore: objectStore}, nil
}

// List returns list of nodes that satisfy conditions provided to NewNodeStore.
func (s *NodeStore) List() []*v1.Node {
objects := s.Store.List()
nodes := make([]*v1.Node, 0, len(objects))
for _, o := range objects {
nodes = append(nodes, o.(*v1.Node))
}
return nodes
}
74 changes: 74 additions & 0 deletions clusterloader2/pkg/measurement/util/wait_for_nodes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package util

import (
"fmt"
"time"

v1 "k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog"
"k8s.io/perf-tests/clusterloader2/pkg/util"
)

// WaitForNodeOptions is an options object used by WaitForNodes methods.
type WaitForNodeOptions struct {
Selector *ObjectSelector
MinDesiredNodeCount int
MaxDesiredNodeCount int
EnableLogging bool
CallerName string
WaitForNodesInterval time.Duration
}

// WaitForNodes waits till the desired number of nodes is ready.
// If stopCh is closed before all nodes are ready, the error will be returned.
func WaitForNodes(clientSet clientset.Interface, stopCh <-chan struct{}, options *WaitForNodeOptions) error {
ps, err := NewNodeStore(clientSet, options.Selector)
if err != nil {
return fmt.Errorf("node store creation error: %v", err)
}
defer ps.Stop()

nodeCount := getNumReadyNodes(ps.List())
for {
select {
case <-stopCh:
return fmt.Errorf("timeout while waiting for [%d-%d] Nodes with selector '%v' to be ready - currently there is %d Nodes",
options.MinDesiredNodeCount, options.MaxDesiredNodeCount, options.Selector.String(), nodeCount)
case <-time.After(options.WaitForNodesInterval):
nodeCount = getNumReadyNodes(ps.List())
if options.EnableLogging {
klog.Infof("%s: node count (selector = %v): %d", options.CallerName, options.Selector.String(), nodeCount)
}
if options.MinDesiredNodeCount <= nodeCount && nodeCount <= options.MaxDesiredNodeCount {
return nil
}
}
}
}

func getNumReadyNodes(nodes []*v1.Node) int {
nReady := 0
for _, n := range nodes {
if util.IsNodeSchedulableAndUntainted(n) {
nReady++
}
}
return nReady
}

0 comments on commit df1290e

Please sign in to comment.