Skip to content

Commit

Permalink
Increase page size in slo-monitor to 5000 (attempt kubernetes#2).
Browse files Browse the repository at this point in the history
  • Loading branch information
mborsz committed Sep 2, 2020
1 parent efd097d commit 62bba03
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 22 deletions.
2 changes: 1 addition & 1 deletion slo-monitor/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

PACKAGE = k8s.io/perf-tests/slo-monitor
TAG = 0.13.0
TAG = 0.14.0
# Image should be pulled from k8s.gcr.io, which will auto-detect
# region (us, eu, asia, ...) and pull from the closest.
REPOSITORY?=k8s.gcr.io
Expand Down
17 changes: 4 additions & 13 deletions slo-monitor/src/monitors/pod_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package monitors

import (
"context"
"strings"
"sync"
"time"
Expand All @@ -31,7 +30,6 @@ import (

clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/pager"

kubeletevents "k8s.io/kubernetes/pkg/kubelet/events"

Expand Down Expand Up @@ -131,10 +129,8 @@ func (pm *PodStartupLatencyDataMonitor) Run(stopCh chan struct{}) error {
controller := NewWatcherWithHandler(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
pg := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return pm.kubeClient.CoreV1().Pods(v1.NamespaceAll).List(opts)
}))
return pg.List(context.Background(), options)
// This call is paginated by cache.Reflector.
return pm.kubeClient.CoreV1().Pods(v1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return pm.kubeClient.CoreV1().Pods(v1.NamespaceAll).Watch(options)
Expand Down Expand Up @@ -180,13 +176,8 @@ func (pm *PodStartupLatencyDataMonitor) Run(stopCh chan struct{}) error {
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = eventSelector
pg := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return pm.kubeClient.CoreV1().Events(v1.NamespaceAll).List(opts)
}))
// Increase page size from default 500 to 5000 to increase listing throughput.
// In our test setup 5000 objects, should have total size of ~4MB, which seems to be acceptable page size.
pg.PageSize = 5000
return pg.List(context.Background(), options)
// This call is paginated by cache.Reflector.
return pm.kubeClient.CoreV1().Events(v1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = eventSelector
Expand Down
20 changes: 12 additions & 8 deletions slo-monitor/src/monitors/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,25 @@ import (
"k8s.io/apimachinery/pkg/runtime"

"k8s.io/client-go/tools/cache"
cachefork "k8s.io/perf-tests/slo-monitor/src/third_party/forked/k8s.io/client-go/tools/cache"
)

const (
resyncPeriod = time.Duration(0)
resyncPeriod = time.Duration(0)
watchListPageSize = 5000
)

// NewWatcherWithHandler creates a simple watcher that will call `h` for all coming objects
func NewWatcherWithHandler(lw cache.ListerWatcher, objType runtime.Object, setHandler, deleteHandler func(obj interface{}) error) cache.Controller {
fifo := NewNoStoreQueue()

cfg := &cache.Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
cfg := &cachefork.Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
WatchListPageSize: watchListPageSize,

Process: func(obj interface{}) error {
workItem := obj.(workItem)
Expand All @@ -52,5 +55,6 @@ func NewWatcherWithHandler(lw cache.ListerWatcher, objType runtime.Object, setHa
}
},
}
return cache.New(cfg)
return cachefork.New(cfg)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
"sync"
"time"

"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
)

//////////////////////////////////////////////////
// This is a fork of Config and Controller structs from k8s.io/client-go/tools/cache/controller.go with
// cherry pick of https://github.com/kubernetes/kubernetes/pull/94363.
// TODO(maciejborsz): Get rid of it once https://github.com/kubernetes/kubernetes/pull/94363 is available
// in version of client-go used in slo-monitor.
/////////////////////////////////////////////////

// Config contains all the settings for a Controller.
type Config struct {
// The queue for your objects - has to be a DeltaFIFO due to
// assumptions in the implementation. Your Process() function
// should accept the output of this Queue's Pop() method.
cache.Queue

// Something that can list and watch your objects.
cache.ListerWatcher

// Something that can process a popped Deltas.
Process ProcessFunc

// ObjectType is an example object of the type this controller is
// expected to handle. Only the type needs to be right, except
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` and `"kind"` must also be right.
ObjectType runtime.Object

// FullResyncPeriod is the period at which ShouldResync is considered.
FullResyncPeriod time.Duration

// ShouldResync is periodically used by the reflector to determine
// whether to Resync the Queue. If ShouldResync is `nil` or
// returns true, it means the reflector should proceed with the
// resync.
ShouldResync ShouldResyncFunc

// If true, when Process() returns an error, re-enqueue the object.
// TODO: add interface to let you inject a delay/backoff or drop
// the object completely if desired. Pass the object in
// question to this interface as a parameter. This is probably moot
// now that this functionality appears at a higher level.
RetryOnError bool

// XXX: Not possible to implement in a fork.
// Called whenever the ListAndWatch drops the connection with an error.
// WatchErrorHandler cache.WatchErrorHandler

// WatchListPageSize is the requested chunk size of initial and relist watch lists.
WatchListPageSize int64
}

// ShouldResyncFunc is a type of function that indicates if a reflector should perform a
// resync or not. It can be used by a shared informer to support multiple event handlers with custom
// resync periods.
type ShouldResyncFunc func() bool

// ProcessFunc processes a single object.
type ProcessFunc func(obj interface{}) error

// `*controller` implements Controller
type controller struct {
config Config
reflector *cache.Reflector
reflectorMutex sync.RWMutex
// XXX: Not possible to implement in a fork.
// clock clock.Clock
}

// Controller is a low-level controller that is parameterized by a
// Config and used in sharedIndexInformer.
type Controller interface {
// Run does two things. One is to construct and run a Reflector
// to pump objects/notifications from the Config's ListerWatcher
// to the Config's Queue and possibly invoke the occasional Resync
// on that Queue. The other is to repeatedly Pop from the Queue
// and process with the Config's ProcessFunc. Both of these
// continue until `stopCh` is closed.
Run(stopCh <-chan struct{})

// HasSynced delegates to the Config's Queue
HasSynced() bool

// LastSyncResourceVersion delegates to the Reflector when there
// is one, otherwise returns the empty string
LastSyncResourceVersion() string
}

// New makes a new Controller from the given Config.
func New(c *Config) Controller {
ctlr := &controller{
config: *c,
// XXX: Not possible to implement in a fork.
// clock: &clock.RealClock{},
}
return ctlr
}

// Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := cache.NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize

// XXX: Not possible to implement in a fork.
// r.clock = c.clock
// if c.config.WatchErrorHandler != nil {
// r.watchErrorHandler = c.config.WatchErrorHandler
// }

c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()

var wg wait.Group
defer wg.Wait()

wg.StartWithChannel(stopCh, r.Run)

wait.Until(c.processLoop, time.Second, stopCh)
}

// Returns true once this controller has completed an initial resource listing
func (c *controller) HasSynced() bool {
return c.config.Queue.HasSynced()
}

func (c *controller) LastSyncResourceVersion() string {
c.reflectorMutex.RLock()
defer c.reflectorMutex.RUnlock()
if c.reflector == nil {
return ""
}
return c.reflector.LastSyncResourceVersion()
}

// processLoop drains the work queue.
// TODO: Consider doing the processing in parallel. This will require a little thought
// to make sure that we don't end up processing the same object multiple times
// concurrently.
//
// TODO: Plumb through the stopCh here (and down to the queue) so that this can
// actually exit when the controller is stopped. Or just give up on this stuff
// ever being stoppable. Converting this whole package to use Context would
// also be helpful.
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(cache.PopProcessFunc(c.config.Process))
if err != nil {
if err == cache.FIFOClosedError {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}

0 comments on commit 62bba03

Please sign in to comment.