diff --git a/runtime/jitter/interval.go b/runtime/jitter/interval.go new file mode 100644 index 00000000..0ec4d6ab --- /dev/null +++ b/runtime/jitter/interval.go @@ -0,0 +1,122 @@ +/* +Copyright 2023 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package jitter + +import ( + "errors" + "github.com/spf13/pflag" + "math/rand" + "sync" + "time" + + ctrl "sigs.k8s.io/controller-runtime" +) + +const ( + flagIntervalJitter = "interval-jitter-percentage" +) + +var ( + globalIntervalJitter Duration = NoJitter + globalIntervalJitterOnce sync.Once + + errInvalidIntervalJitter = errors.New("the interval jitter percentage must be a non-negative value and less than 100") +) + +// SetGlobalIntervalJitter sets the global interval jitter. It is safe to call +// this method multiple times, but only the first call will have an effect. +func SetGlobalIntervalJitter(p float64, rand *rand.Rand) { + globalIntervalJitterOnce.Do(func() { + globalIntervalJitter = Percent(p, rand) + }) +} + +// RequeueAfterResult returns a result with a requeue-after interval that has +// been jittered. It will not modify the result if it is zero or is marked +// to requeue immediately. +// +// To use this function, you must first initialize the global jitter with +// SetGlobalIntervalJitter. +func RequeueAfterResult(res ctrl.Result) ctrl.Result { + if res.IsZero() || res.Requeue == true { + return res + } + if after := res.RequeueAfter; after > 0 { + res.RequeueAfter = globalIntervalJitter(after) + } + return res +} + +// IntervalDuration returns a jittered duration based on the given interval. +// +// To use this function, you must first initialize the global jitter with +// SetGlobalIntervalJitter. +func IntervalDuration(d time.Duration) time.Duration { + return globalIntervalJitter(d) +} + +// Interval is used to configure the interval jitter for a controller using +// command line flags. To use it, create an Interval and call BindFlags, then +// call SetGlobalJitter with a rand.Rand (or nil to use the default). +// +// Applying jitter to the interval duration can be useful to mitigate spikes in +// memory and CPU usage caused by many resources being configured with the same +// interval. +// +// When 1000 resources are configured to requeue every 5 minutes with a +// concurrency setting of 50 and a process time of approximately 1 second per +// resource. +// +// Without jitter, all 1000 resources will requeue every 5 minutes, resulting +// in 50 resources requeueing simultaneously every second over a 20-second +// window. +// +// However, when we apply +/-10% jitter to the interval duration, the requeueing +// will be spread out over a 1-minute window. As a result, the number of +// resources requeueing per second will vary between approximately 15 to 18.33. +// +// This smoother workload distribution can result in significant reductions in +// the impact of CPU and memory spikes. This improvement in workload +// distribution also translates into benefits for the Go garbage collector. +// Notably, the garbage collector experiences reduced GC bursts and more +// frequent collections, leading to improved overall performance. +type Interval struct { + // Percentage of jitter to apply to interval durations. A value of 10 + // will apply a jitter of +/-10% to the interval duration. It can not be negative, + // and must be less than 100. + Percentage uint8 +} + +// BindFlags will parse the given pflag.FlagSet and load the interval jitter. +func (o *Interval) BindFlags(fs *pflag.FlagSet) { + fs.Uint8Var(&o.Percentage, flagIntervalJitter, 0, + "Percentage of jitter to apply to interval durations. A value of 10 "+ + "will apply a jitter of +/-10% to the interval duration. It cannot be "+ + "negative, and must be less than 100.") +} + +// SetGlobalJitter sets the global interval jitter. It is safe to call this +// method multiple times, but only the first call will have an effect. +func (o *Interval) SetGlobalJitter(rand *rand.Rand) error { + if o.Percentage >= 100 { + return errInvalidIntervalJitter + } + if o.Percentage > 0 && o.Percentage < 100 { + SetGlobalIntervalJitter(float64(o.Percentage)/100.0, rand) + } + return nil +} diff --git a/runtime/jitter/interval_test.go b/runtime/jitter/interval_test.go new file mode 100644 index 00000000..b5ecf45b --- /dev/null +++ b/runtime/jitter/interval_test.go @@ -0,0 +1,108 @@ +/* +Copyright 2023 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package jitter + +import ( + "github.com/spf13/pflag" + "math/rand" + "testing" + "time" + + . "github.com/onsi/gomega" + ctrl "sigs.k8s.io/controller-runtime" +) + +func TestRequeueAfterResult(t *testing.T) { + r := rand.New(rand.NewSource(int64(12345))) + p := 0.2 + SetGlobalIntervalJitter(p, r) + + tests := []struct { + name string + res ctrl.Result + expectModified bool + }{ + {res: ctrl.Result{Requeue: true}, expectModified: false}, + {res: ctrl.Result{RequeueAfter: 0}, expectModified: false}, + {res: ctrl.Result{RequeueAfter: 10 * time.Second}, expectModified: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + if tt.expectModified { + lowerBound := float64(tt.res.RequeueAfter) * (1 - p) + upperBound := float64(tt.res.RequeueAfter) * (1 + p) + + for i := 0; i < 100; i++ { + res := RequeueAfterResult(tt.res) + + g.Expect(res.RequeueAfter).To(BeNumerically(">=", lowerBound)) + g.Expect(res.RequeueAfter).To(BeNumerically("<=", upperBound)) + g.Expect(res.RequeueAfter).ToNot(Equal(tt.res.RequeueAfter)) + } + } else { + res := RequeueAfterResult(tt.res) + g.Expect(res).To(Equal(tt.res)) + } + }) + } +} + +func TestIntervalDuration(t *testing.T) { + g := NewWithT(t) + + r := rand.New(rand.NewSource(int64(12345))) + p := 0.5 + SetGlobalIntervalJitter(p, r) + + interval := 10 * time.Second + lowerBound := float64(interval) * (1 - p) + upperBound := float64(interval) * (1 + p) + + for i := 0; i < 100; i++ { + d := IntervalDuration(interval) + + g.Expect(d).To(BeNumerically(">=", lowerBound)) + g.Expect(d).To(BeNumerically("<=", upperBound)) + g.Expect(d).ToNot(Equal(interval)) + } +} + +func TestInterval_BindFlags(t *testing.T) { + g := NewWithT(t) + + interval := &Interval{} + + fs := pflag.NewFlagSet("test", pflag.ContinueOnError) + interval.BindFlags(fs) + + err := fs.Set(flagIntervalJitter, "20") + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(interval.Percentage).To(Equal(uint8(20))) +} + +func TestInterval_SetGlobalJitter(t *testing.T) { + t.Run("invalid percentage >=100", func(t *testing.T) { + g := NewWithT(t) + + interval := &Interval{Percentage: uint8(100)} + err := interval.SetGlobalJitter(nil) + g.Expect(err).To(MatchError(errInvalidIntervalJitter)) + }) +} diff --git a/runtime/jitter/jitter.go b/runtime/jitter/jitter.go new file mode 100644 index 00000000..cd3b3513 --- /dev/null +++ b/runtime/jitter/jitter.go @@ -0,0 +1,61 @@ +/* +Copyright 2023 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package jitter + +import ( + "math/rand" + "time" +) + +// Duration is a function that takes a duration and returns a modified duration +// with jitter added. +type Duration func(time.Duration) time.Duration + +// NoJitter returns a Duration function that will return the given duration +// without modification. +func NoJitter(d time.Duration) time.Duration { + return d +} + +// Percent returns a Duration function that will modify the given duration +// by a random percentage between 0 and p, with the sign chosen randomly. +// +// For example, if percent is 0.1, the returned Duration will modify the duration +// by a random percentage between -10% and 10%. +// +// When p <= 0 or p >= 1, duration is returned without a modification. +// If r is nil, a new rand.Rand will be created using the current time as the +// seed. +func Percent(p float64, r *rand.Rand) Duration { + r = defaultOrRand(r) + if p <= 0 || p >= 1 { + return NoJitter + } + return func(d time.Duration) time.Duration { + randomP := p * (2*r.Float64() - 1) + return time.Duration(float64(d) * (1 + randomP)) + } +} + +// defaultOrRand returns the given rand.Rand if it is not nil, otherwise it +// returns a new rand.Rand +func defaultOrRand(r *rand.Rand) *rand.Rand { + if r == nil { + return rand.New(rand.NewSource(time.Now().UnixNano())) + } + return r +} diff --git a/runtime/jitter/jitter_test.go b/runtime/jitter/jitter_test.go new file mode 100644 index 00000000..bc6327ee --- /dev/null +++ b/runtime/jitter/jitter_test.go @@ -0,0 +1,71 @@ +/* +Copyright 2023 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package jitter + +import ( + "fmt" + "math/rand" + "testing" + "time" + + . "github.com/onsi/gomega" +) + +func TestNoJitter(t *testing.T) { + g := NewWithT(t) + + g.Expect(NoJitter(10 * time.Second)).To(Equal(10 * time.Second)) + g.Expect(NoJitter(0)).To(Equal(0 * time.Second)) + g.Expect(NoJitter(-10 * time.Second)).To(Equal(-10 * time.Second)) +} + +func TestPercent(t *testing.T) { + r := rand.New(rand.NewSource(int64(12345))) + + tests := []struct { + p float64 + duration time.Duration + }{ + {p: 0.1, duration: 100 * time.Millisecond}, + {p: 0, duration: 100 * time.Millisecond}, + {p: 1, duration: 100 * time.Millisecond}, + {p: -1, duration: 100 * time.Millisecond}, + {p: 2, duration: 100 * time.Millisecond}, + } + + for _, tt := range tests { + t.Run(fmt.Sprintf("p=%v, duration=%v", tt.p, tt.duration), func(t *testing.T) { + g := NewWithT(t) + + fn := Percent(tt.p, r) + + if tt.p > 0 && tt.p < 1 { + for i := 0; i < 100; i++ { + lowerBound := float64(tt.duration) * (1 - tt.p) + upperBound := float64(tt.duration) * (1 + tt.p) + + d := fn(tt.duration) + g.Expect(d).To(BeNumerically(">=", lowerBound)) + g.Expect(d).To(BeNumerically("<=", upperBound)) + g.Expect(d).ToNot(Equal(tt.duration)) + } + } else { + g.Expect(fn(tt.duration)).To(Equal(tt.duration)) + } + }) + } +}