Skip to content

Commit

Permalink
Make GetReplicasFromRuntimeObject work for DaemonSets
Browse files Browse the repository at this point in the history
  • Loading branch information
mm4tt committed Sep 12, 2019
1 parent 9d5a6c5 commit 5602a6c
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 19 deletions.
7 changes: 6 additions & 1 deletion clusterloader2/pkg/framework/client/objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,14 @@ func RetryFunction(f func() error, options ...*ApiCallOptions) wait.ConditionFun

// ListNodes returns list of cluster nodes.
func ListNodes(c clientset.Interface) ([]apiv1.Node, error) {
return ListNodesWithOptions(c, metav1.ListOptions{})
}

// ListNodesWithOptions lists the cluster nodes using the provided options.
func ListNodesWithOptions(c clientset.Interface, listOpts metav1.ListOptions) ([]apiv1.Node, error) {
var nodes []apiv1.Node
listFunc := func() error {
nodesList, err := c.CoreV1().Nodes().List(metav1.ListOptions{})
nodesList, err := c.CoreV1().Nodes().List(listOpts)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,12 @@ func (w *waitForControlledPodsRunningMeasurement) handleObject(oldObj, newObj in
}
}

func checkScaledown(oldObj, newObj runtime.Object) (bool, error) {
oldReplicas, err := runtimeobjects.GetReplicasFromRuntimeObject(oldObj)
func (w *waitForControlledPodsRunningMeasurement) checkScaledown(oldObj, newObj runtime.Object) (bool, error) {
oldReplicas, err := runtimeobjects.GetReplicasFromRuntimeObject(w.clusterFramework.GetClientSets().GetClient(), oldObj)
if err != nil {
return false, err
}
newReplicas, err := runtimeobjects.GetReplicasFromRuntimeObject(newObj)
newReplicas, err := runtimeobjects.GetReplicasFromRuntimeObject(w.clusterFramework.GetClientSets().GetClient(), newObj)
if err != nil {
return false, err
}
Expand All @@ -292,7 +292,7 @@ func checkScaledown(oldObj, newObj runtime.Object) (bool, error) {

func (w *waitForControlledPodsRunningMeasurement) handleObjectLocked(oldObj, newObj runtime.Object) error {
isObjDeleted := newObj == nil
isScalingDown, err := checkScaledown(oldObj, newObj)
isScalingDown, err := w.checkScaledown(oldObj, newObj)
if err != nil {
return fmt.Errorf("checkScaledown error: %v", err)
}
Expand Down Expand Up @@ -394,7 +394,7 @@ func (w *waitForControlledPodsRunningMeasurement) waitForRuntimeObject(obj runti
if err != nil {
return nil, err
}
runtimeObjectReplicas, err := runtimeobjects.GetReplicasFromRuntimeObject(obj)
runtimeObjectReplicas, err := runtimeobjects.GetReplicasFromRuntimeObject(w.clusterFramework.GetClientSets().GetClient(), obj)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/perf-tests/clusterloader2/pkg/framework/client"
"k8s.io/perf-tests/clusterloader2/pkg/util"
)

// ListRuntimeObjectsForKind returns objects of given kind that satisfy given namespace, labelSelector and fieldSelector.
Expand Down Expand Up @@ -260,13 +261,13 @@ func getSpecFromUnstrutured(obj *unstructured.Unstructured) (map[string]interfac
}

// GetReplicasFromRuntimeObject returns replicas number from given runtime object.
func GetReplicasFromRuntimeObject(obj runtime.Object) (int32, error) {
func GetReplicasFromRuntimeObject(c clientset.Interface, obj runtime.Object) (int32, error) {
if obj == nil {
return 0, nil
}
switch typed := obj.(type) {
case *unstructured.Unstructured:
return getReplicasFromUnstrutured(typed)
return getReplicasFromUnstrutured(c, typed)
case *corev1.ReplicationController:
if typed.Spec.Replicas != nil {
return *typed.Spec.Replicas, nil
Expand All @@ -288,7 +289,8 @@ func GetReplicasFromRuntimeObject(obj runtime.Object) (int32, error) {
}
return 0, nil
case *appsv1.DaemonSet:
return 0, nil
// TODO(#790): In addition to nodeSelector the affinity should be also taken into account
return GetNumSchedulableNodesMatchingSelector(c, typed.Spec.Template.Spec.NodeSelector)
case *batch.Job:
if typed.Spec.Parallelism != nil {
return *typed.Spec.Parallelism, nil
Expand All @@ -299,20 +301,44 @@ func GetReplicasFromRuntimeObject(obj runtime.Object) (int32, error) {
}
}

// Note: This function assumes each controller has field Spec.Replicas, except Daemonset and Job.
func getReplicasFromUnstrutured(obj *unstructured.Unstructured) (int32, error) {
// GetNumSchedulableNodesMatchingSelector returns the number of schedulable nodes matching the provided selector.
func GetNumSchedulableNodesMatchingSelector(c clientset.Interface, nodeSelector map[string]string) (int32, error) {
selector, err := metav1.LabelSelectorAsSelector(metav1.SetAsLabelSelector(nodeSelector))
if err != nil {
return 0, err
}
listOpts := metav1.ListOptions{LabelSelector: selector.String()}
list, err := client.ListNodesWithOptions(c, listOpts)
if err != nil {
return 0, err
}
var numSchedulableNodes int32
for _, node := range list {
if util.IsNodeSchedulableAndUntainted(&node) {
numSchedulableNodes++
}
}
return numSchedulableNodes, nil
}

// Note: This function assumes each controller has field Spec.Replicas, except DaemonSets and Job.
func getReplicasFromUnstrutured(c clientset.Interface, obj *unstructured.Unstructured) (int32, error) {
spec, err := getSpecFromUnstrutured(obj)
if err != nil {
return -1, err
}

return tryAcquireReplicasFromUnstructuredSpec(spec, obj.GetKind())
return tryAcquireReplicasFromUnstructuredSpec(c, spec, obj.GetKind())
}

func tryAcquireReplicasFromUnstructuredSpec(spec map[string]interface{}, kind string) (int32, error) {
func tryAcquireReplicasFromUnstructuredSpec(c clientset.Interface, spec map[string]interface{}, kind string) (int32, error) {
switch kind {
case "DaemonSet":
return 0, nil
nodeSelector, err := getDaemonSetNodeSelectorFromUnstructuredSpec(spec)
if err != nil {
return 0, err
}
// TODO(#790): In addition to nodeSelector the affinity should be also taken into account
return GetNumSchedulableNodesMatchingSelector(c, nodeSelector)
case "Job":
replicas, found, err := unstructured.NestedInt64(spec, "parallelism")
if err != nil {
Expand All @@ -334,6 +360,19 @@ func tryAcquireReplicasFromUnstructuredSpec(spec map[string]interface{}, kind st
}
}

func getDaemonSetNodeSelectorFromUnstructuredSpec(spec map[string]interface{}) (map[string]string, error) {
template, found, err := unstructured.NestedMap(spec, "template")
if err != nil || !found {
return nil, err
}
podSpec, found, err := unstructured.NestedMap(template, "spec")
if err != nil || !found {
return nil, err
}
nodeSelector, found, err := unstructured.NestedStringMap(podSpec, "nodeSelector")
return nodeSelector, err
}

// IsEqualRuntimeObjectsSpec returns true if given runtime objects have identical specs.
func IsEqualRuntimeObjectsSpec(runtimeObj1, runtimeObj2 runtime.Object) (bool, error) {
runtimeObj1Spec, err := GetSpecFromRuntimeObject(runtimeObj1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/perf-tests/clusterloader2/pkg/measurement/util/runtimeobjects"
)
Expand Down Expand Up @@ -306,21 +307,21 @@ func TestGetReplicasFromRuntimeObject(t *testing.T) {
replicaset,
deployment,
job,
daemonset,
}
expected := []int32{
defaultReplicas,
defaultReplicas,
defaultReplicas,
defaultReplicas,
0,
}
// TODO(mm4tt): Use fake client and test logic for DaemonSets
var client kubernetes.Interface
for i, obj := range objects {
unstructured := &unstructured.Unstructured{}
if err := scheme.Scheme.Convert(obj, unstructured, nil); err != nil {
t.Fatalf("error converting controller to unstructured: %v", err)
}
replicas, err := runtimeobjects.GetReplicasFromRuntimeObject(unstructured)
replicas, err := runtimeobjects.GetReplicasFromRuntimeObject(client, unstructured)
if err != nil {
t.Fatalf("get replicas from runtime object failed: %v", err)
}
Expand Down
7 changes: 6 additions & 1 deletion clusterloader2/pkg/util/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func GetSchedulableUntainedNodes(c clientset.Interface) ([]corev1.Node, error) {
}
var filtered []corev1.Node
for i := range nodeList {
if isNodeSchedulable(&nodeList[i]) && isNodeUntainted(&nodeList[i]) {
if IsNodeSchedulableAndUntainted(&nodeList[i]) {
filtered = append(filtered, nodeList[i])
}
}
Expand Down Expand Up @@ -70,6 +70,11 @@ func LogClusterNodes(c clientset.Interface) error {
return nil
}

// IsNodeSchedulableAndUntainted returns true whether node is schedulable and untainted.
func IsNodeSchedulableAndUntainted(node *corev1.Node) bool {
return isNodeSchedulable(node) && isNodeUntainted(node)
}

// Node is schedulable if:
// 1) doesn't have "unschedulable" field set
// 2) it's Ready condition is set to true
Expand Down

0 comments on commit 5602a6c

Please sign in to comment.