Skip to content

Commit

Permalink
Migrate NewInformer to cache.ListerWatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
mborsz committed Apr 7, 2021
1 parent 54890c1 commit 880c465
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 23 deletions.
15 changes: 11 additions & 4 deletions clusterloader2/pkg/imagepreload/imagepreload.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
"k8s.io/perf-tests/clusterloader2/pkg/config"
"k8s.io/perf-tests/clusterloader2/pkg/flags"
"k8s.io/perf-tests/clusterloader2/pkg/framework"
"k8s.io/perf-tests/clusterloader2/pkg/framework/client"
"k8s.io/perf-tests/clusterloader2/pkg/measurement/util"
"k8s.io/perf-tests/clusterloader2/pkg/measurement/util/informer"
"k8s.io/perf-tests/clusterloader2/pkg/measurement/util/runtimeobjects"
)
Expand Down Expand Up @@ -98,9 +100,14 @@ func (c *controller) PreloadImages() error {
defer close(stopCh)

nodeInformer := informer.NewInformer(
kclient,
"nodes",
util.NewObjectSelector(),
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return kclient.CoreV1().Nodes().List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return kclient.CoreV1().Nodes().Watch(context.TODO(), options)
},
},
func(old, new interface{}) { c.checkNode(new, doneNodes) })
if err := informer.StartAndSync(nodeInformer, stopCh, informerTimeout); err != nil {
return err
Expand Down
18 changes: 15 additions & 3 deletions clusterloader2/pkg/measurement/common/service_creation_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@ limitations under the License.
package common

import (
"context"
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"

"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -137,9 +142,16 @@ func (s *serviceCreationLatencyMeasurement) start() error {
s.stopCh = make(chan struct{})

i := informer.NewInformer(
s.client,
"services",
s.selector,
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
s.selector.ApplySelectors(&options)
return s.client.CoreV1().Services(s.selector.Namespace).List(context.TODO(), options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
s.selector.ApplySelectors(&options)
return s.client.CoreV1().Services(s.selector.Namespace).Watch(context.TODO(), options)
},
},
func(oldObj, newObj interface{}) {
f := func() {
s.handleObject(oldObj, newObj)
Expand Down
16 changes: 13 additions & 3 deletions clusterloader2/pkg/measurement/common/slos/pod_startup_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
"k8s.io/perf-tests/clusterloader2/pkg/errors"
"k8s.io/perf-tests/clusterloader2/pkg/measurement"
Expand Down Expand Up @@ -115,9 +118,16 @@ func (p *podStartupLatencyMeasurement) start(c clientset.Interface) error {
p.isRunning = true
p.stopCh = make(chan struct{})
i := informer.NewInformer(
c,
"pods",
p.selector,
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
p.selector.ApplySelectors(&options)
return c.CoreV1().Pods(p.selector.Namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
p.selector.ApplySelectors(&options)
return c.CoreV1().Pods(p.selector.Namespace).Watch(context.TODO(), options)
},
},
p.checkPod,
)
return informer.StartAndSync(i, p.stopCh, informerSyncTimeout)
Expand Down
16 changes: 3 additions & 13 deletions clusterloader2/pkg/measurement/util/informer/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,18 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

measurementutil "k8s.io/perf-tests/clusterloader2/pkg/measurement/util"
)

// NewInformer creates a new informer
// for given kind, namespace, fieldSelector and labelSelector.
// NewInformer creates a new informer.
func NewInformer(
c clientset.Interface,
kind string,
selector *measurementutil.ObjectSelector,
lw cache.ListerWatcher,
handleObj func(interface{}, interface{}),
) cache.SharedInformer {
optionsModifier := func(options *metav1.ListOptions) {
options.FieldSelector = selector.FieldSelector
options.LabelSelector = selector.LabelSelector
}
listerWatcher := cache.NewFilteredListWatchFromClient(c.CoreV1().RESTClient(), kind, selector.Namespace, optionsModifier)
informer := cache.NewSharedInformer(listerWatcher, nil, 0)
informer := cache.NewSharedInformer(lw, nil, 0)
addEventHandler(informer, handleObj)

return informer
}

Expand Down
6 changes: 6 additions & 0 deletions clusterloader2/pkg/measurement/util/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ func (o *ObjectSelector) String() string {
return CreateSelectorsString(o.Namespace, o.LabelSelector, o.FieldSelector)
}

// ApplySelectors sets label and field selectors in a given ListOptions object.
func (o *ObjectSelector) ApplySelectors(options *metav1.ListOptions) {
options.FieldSelector = o.FieldSelector
options.LabelSelector = o.LabelSelector
}

// CreateSelectorsString creates a string representation for given namespace, label selector and field selector.
func CreateSelectorsString(namespace, labelSelector, fieldSelector string) string {
var selectorsStrings []string
Expand Down

0 comments on commit 880c465

Please sign in to comment.