Skip to content

Commit

Permalink
Add WaitForGenericK8sObjects measurement
Browse files Browse the repository at this point in the history
The new measurement lists all objects from given resource
accesses 'status.conditions' field and compares it with
list of expected conditions.
  • Loading branch information
kisieland committed Sep 7, 2023
1 parent b59b123 commit 54abd9b
Show file tree
Hide file tree
Showing 5 changed files with 648 additions and 0 deletions.
4 changes: 4 additions & 0 deletions clusterloader2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ Pods can be specified by label selector, field selector and namespace.
In case of timeout test continues to run, with error (causing marking test as failed) being logged.
- **Sleep** \
This is a barrier that waits until requested amount of the time passes.
- **WaitForGenericK8sObjects** \
This is a barrier that waits until required number of k8s object fulfill given condition requirements.
Those conditions can be specified as a list of requirements of `Type=Status` format, e.g.: `NodeReady=True`.
In case of timeout test continues to run, with error (causing marking test as failed) being logged.

## Prometheus metrics

Expand Down
152 changes: 152 additions & 0 deletions clusterloader2/pkg/measurement/common/wait_for_generic_k8s_object.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package common

import (
"context"
"time"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"
"k8s.io/perf-tests/clusterloader2/pkg/measurement"
measurementutil "k8s.io/perf-tests/clusterloader2/pkg/measurement/util"
"k8s.io/perf-tests/clusterloader2/pkg/util"
)

const (
defaultWaitForGenericK8sObjectsTimeout = 30 * time.Minute
defaultWaitForGenericK8sObjectsInterval = 30 * time.Second
waitForGenericK8sObjectsMeasurementName = "WaitForGenericK8sObjects"
)

func init() {
if err := measurement.Register(waitForGenericK8sObjectsMeasurementName, createWaitForGenericK8sObjectsMeasurement); err != nil {
klog.Fatalf("Cannot register %s: %v", waitForGenericK8sObjectsMeasurementName, err)
}
}

func createWaitForGenericK8sObjectsMeasurement() measurement.Measurement {
return &waitForGenericK8sObjectsMeasurement{}
}

type waitForGenericK8sObjectsMeasurement struct{}

// Execute waits until desired number of k8s objects reached given conditions.
// Conditions can denote either success or failure. The measurement assumes the
// k8s object has a status.conditions field, which contains a []metav1.Condition.
// More here: https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#typical-status-properties
// Measurement will timeout if not enough objects have required conditions.
func (w *waitForGenericK8sObjectsMeasurement) Execute(config *measurement.Config) ([]measurement.Summary, error) {
groupVersionResource, err := getGroupVersionResource(config.Params)
if err != nil {
return nil, err
}
namespaces, err := getNamespaces(config.ClusterFramework.GetAutomanagedNamespacePrefix(), config.Params)
if err != nil {
return nil, err
}
timeout, err := util.GetDurationOrDefault(config.Params, "timeout", defaultWaitForGenericK8sObjectsTimeout)
if err != nil {
return nil, err
}
refreshInterval, err := util.GetDurationOrDefault(config.Params, "refreshInterval", defaultWaitForGenericK8sObjectsInterval)
if err != nil {
return nil, err
}
successfulConditions, err := util.GetStringArray(config.Params, "successfulConditions")
if err != nil {
return nil, err
}
failedConditions, err := util.GetStringArray(config.Params, "failedConditions")
if err != nil {
return nil, err
}
minDesiredObjectCount, err := util.GetInt(config.Params, "minDesiredObjectCount")
if err != nil {
return nil, err
}
maxFailedObjectCount, err := util.GetInt(config.Params, "maxFailedObjectCount")
if err != nil {
return nil, err
}

dynamicClient := config.ClusterFramework.GetDynamicClients().GetClient()
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()

options := &measurementutil.WaitForGenericK8sObjectsOptions{
GroupVersionResource: groupVersionResource,
Namespaces: namespaces,
SuccessfulConditions: successfulConditions,
FailedConditions: failedConditions,
MinDesiredObjectCount: minDesiredObjectCount,
MaxFailedObjectCount: maxFailedObjectCount,
CallerName: w.String(),
WaitInterval: refreshInterval,
}
return nil, measurementutil.WaitForGenericK8sObjects(ctx, dynamicClient, options)
}

// Dispose cleans up after the measurement.
func (*waitForGenericK8sObjectsMeasurement) Dispose() {}

// String returns a string representation of the measurement.
func (*waitForGenericK8sObjectsMeasurement) String() string {
return waitForGenericK8sObjectsMeasurementName
}

func getGroupVersionResource(params map[string]interface{}) (schema.GroupVersionResource, error) {
group, err := util.GetString(params, "objectGroup")
if err != nil {
return schema.GroupVersionResource{}, err
}
version, err := util.GetString(params, "objectVersion")
if err != nil {
return schema.GroupVersionResource{}, err
}
resource, err := util.GetString(params, "objectResource")
if err != nil {
return schema.GroupVersionResource{}, err
}

return schema.GroupVersionResource{
Group: group,
Version: version,
Resource: resource,
}, nil
}

func getNamespaces(namespacesPrefix string, params map[string]interface{}) (measurementutil.NamespacesRange, error) {
namespaceRange, err := util.GetMap(params, "namespaceRange")
if err != nil {
return measurementutil.NamespacesRange{}, err
}
min, err := util.GetInt(namespaceRange, "min")
if err != nil {
return measurementutil.NamespacesRange{}, err
}
max, err := util.GetInt(namespaceRange, "max")
if err != nil {
return measurementutil.NamespacesRange{}, err
}

return measurementutil.NamespacesRange{
Prefix: namespacesPrefix,
Min: min,
Max: max,
}, nil
}
109 changes: 109 additions & 0 deletions clusterloader2/pkg/measurement/util/object_store_dynamic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

/*
This file is copy of https://github.com/kubernetes/kubernetes/blob/master/test/utils/pod_store.go
with slight changes regarding labelSelector and flagSelector applied.
*/

package util

import (
"context"
"encoding/json"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/tools/cache"
)

const (
defaultResyncInterval = 10 * time.Second
)

// DynamicObjectStore is a convenient wrapper around cache.GenericLister.
type DynamicObjectStore struct {
cache.GenericLister
namespaces map[string]bool
}

// NewDynamicObjectStore creates DynamicObjectStore based on given object version resource and selector.
func NewDynamicObjectStore(ctx context.Context, dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, namespaces map[string]bool) (*DynamicObjectStore, error) {
informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, defaultResyncInterval)
lister := informerFactory.ForResource(gvr).Lister()
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())

return &DynamicObjectStore{
GenericLister: lister,
namespaces: namespaces,
}, nil
}

// ListConditions returns list of conditions for each object that was returned by lister.
func (s *DynamicObjectStore) ListConditions() ([][]metav1.Condition, error) {
objects, err := s.GenericLister.List(labels.Everything())
if err != nil {
return nil, err
}

result := make([][]metav1.Condition, 0, len(objects))
for _, o := range objects {
os, err := getObjectSimplification(o)
if err != nil {
return nil, err
}
if !s.namespaces[os.Metadata.Namespace] {
continue
}
result = append(result, os.Status.Conditions)
}
return result, nil
}

// ObjectSimplification represents the content of the object
// that is needed to be handled by this measurement.
type ObjectSimplification struct {
Metadata metav1.ObjectMeta `json:"metadata"`
Status StatusWithConditions `json:"status"`
}

// StatusWithConditions represents the content of the status field
// that is required to be handled by this measurement.
type StatusWithConditions struct {
Conditions []metav1.Condition `json:"conditions"`
}

func getObjectSimplification(o runtime.Object) (ObjectSimplification, error) {
dataMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(o)
if err != nil {
return ObjectSimplification{}, err
}

jsonBytes, err := json.Marshal(dataMap)
if err != nil {
return ObjectSimplification{}, err
}

object := ObjectSimplification{}
err = json.Unmarshal(jsonBytes, &object)
return object, err
}
Loading

0 comments on commit 54abd9b

Please sign in to comment.