Skip to content

Commit

Permalink
Add warmup period to CPU utilization moving average (#5394)
Browse files Browse the repository at this point in the history
* Add warmup period to CPU utilization moving average

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Address review comments

Signed-off-by: Marco Pracucci <marco@pracucci.com>

---------

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored Jul 5, 2023
1 parent f552a65 commit f86040d
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 24 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* `cortex_frontend_query_result_cache_requests_total{request_type="query_range|cardinality"}`
* `cortex_frontend_query_result_cache_hits_total{request_type="query_range|cardinality"}`
* [FEATURE] Added `-<prefix>.s3.list-objects-version` flag to configure the S3 list objects version.
* [FEATURE] Ingester: Add optional CPU/memory utilization based read request limiting, considered experimental. Disabled by default, enable by configuring limits via both of the following flags: #5012 #5392
* [FEATURE] Ingester: Add optional CPU/memory utilization based read request limiting, considered experimental. Disabled by default, enable by configuring limits via both of the following flags: #5012 #5392 #5394
* `-ingester.read-path-cpu-utilization-limit`
* `-ingester.read-path-memory-utilization-limit`
* [FEATURE] Ruler: Support filtering results from rule status endpoint by `file`, `rule_group` and `rule_name`. #5291
Expand Down
54 changes: 34 additions & 20 deletions pkg/util/limiter/utilization.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ import (
)

const (
// Interval for updating resource (CPU/memory) utilization
// Interval for updating resource (CPU/memory) utilization.
resourceUtilizationUpdateInterval = time.Second

// How long is the sliding window used to compute the moving average.
resourceUtilizationSlidingWindow = 60 * time.Second
)

type utilizationScanner interface {
Expand Down Expand Up @@ -55,6 +58,8 @@ type UtilizationBasedLimiter struct {
cpuLimit float64
// Last CPU time counter
lastCPUTime float64
// The time of the first update
firstUpdate time.Time
// The time of the last update
lastUpdate time.Time
cpuMovingAvg *math.EwmaRate
Expand All @@ -65,7 +70,7 @@ type UtilizationBasedLimiter struct {
func NewUtilizationBasedLimiter(cpuLimit float64, memoryLimit uint64, logger log.Logger) *UtilizationBasedLimiter {
// Calculate alpha for a minute long window
// https://github.com/VividCortex/ewma#choosing-alpha
alpha := 2 / (60/resourceUtilizationUpdateInterval.Seconds() + 1)
alpha := 2 / (resourceUtilizationSlidingWindow.Seconds()/resourceUtilizationUpdateInterval.Seconds() + 1)
l := &UtilizationBasedLimiter{
logger: logger,
cpuLimit: cpuLimit,
Expand Down Expand Up @@ -100,34 +105,41 @@ func (l *UtilizationBasedLimiter) update(_ context.Context) error {
return nil
}

func (l *UtilizationBasedLimiter) compute(now time.Time) {
cpuTime, memUtil, err := l.utilizationScanner.Scan()
// compute and return the current CPU and memory utilization.
// This function must be called at a regular interval (resourceUtilizationUpdateInterval) to get a predictable behaviour.
func (l *UtilizationBasedLimiter) compute(now time.Time) (currCPUUtil float64, currMemoryUtil uint64) {
cpuTime, currMemoryUtil, err := l.utilizationScanner.Scan()
if err != nil {
level.Warn(l.logger).Log("msg", "failed to get CPU and memory stats", "err", err.Error())
// Disable any limiting, since we can't tell resource utilization
l.limitingReason.Store("")
return
}

lastUpdate := l.lastUpdate
l.lastUpdate = now
// Add the instant CPU utilization to the moving average. The instant CPU
// utilization can only be computed starting from the 2nd tick.
if prevUpdate, prevCPUTime := l.lastUpdate, l.lastCPUTime; !prevUpdate.IsZero() {
cpuUtil := (cpuTime - prevCPUTime) / now.Sub(prevUpdate).Seconds()
l.cpuMovingAvg.Add(int64(cpuUtil * 100))
l.cpuMovingAvg.Tick()
}

lastCPUTime := l.lastCPUTime
l.lastUpdate = now
l.lastCPUTime = cpuTime

if lastUpdate.IsZero() {
return
// The CPU utilization moving average requires a warmup period before getting
// stable results. In this implementation we use a warmup period equal to the
// sliding window. During the warmup, the reported CPU utilization will be 0.
if l.firstUpdate.IsZero() {
l.firstUpdate = now
} else if now.Sub(l.firstUpdate) >= resourceUtilizationSlidingWindow {
currCPUUtil = float64(l.cpuMovingAvg.Rate()) / 100
}

cpuUtil := (cpuTime - lastCPUTime) / now.Sub(lastUpdate).Seconds()
l.cpuMovingAvg.Add(int64(cpuUtil * 100))
l.cpuMovingAvg.Tick()
cpuA := float64(l.cpuMovingAvg.Rate()) / 100

var reason string
if l.memoryLimit > 0 && memUtil >= l.memoryLimit {
if l.memoryLimit > 0 && currMemoryUtil >= l.memoryLimit {
reason = "memory"
} else if l.cpuLimit > 0 && cpuA >= l.cpuLimit {
} else if l.cpuLimit > 0 && currCPUUtil >= l.cpuLimit {
reason = "cpu"
}

Expand All @@ -140,14 +152,16 @@ func (l *UtilizationBasedLimiter) compute(now time.Time) {

if enable {
level.Info(l.logger).Log("msg", "enabling resource utilization based limiting",
"reason", reason, "memory_limit", formatMemoryLimit(l.memoryLimit), "memory_utilization", formatMemory(memUtil),
"cpu_limit", formatCPULimit(l.cpuLimit), "cpu_utilization", formatCPU(cpuA))
"reason", reason, "memory_limit", formatMemoryLimit(l.memoryLimit), "memory_utilization", formatMemory(currMemoryUtil),
"cpu_limit", formatCPULimit(l.cpuLimit), "cpu_utilization", formatCPU(currCPUUtil))
} else {
level.Info(l.logger).Log("msg", "disabling resource utilization based limiting",
"memory_limit", formatMemoryLimit(l.memoryLimit), "memory_utilization", formatMemory(memUtil),
"cpu_limit", formatCPULimit(l.cpuLimit), "cpu_utilization", formatCPU(cpuA))
"memory_limit", formatMemoryLimit(l.memoryLimit), "memory_utilization", formatMemory(currMemoryUtil),
"cpu_limit", formatCPULimit(l.cpuLimit), "cpu_utilization", formatCPU(currCPUUtil))
}

l.limitingReason.Store(reason)
return
}

func formatCPU(value float64) string {
Expand Down
154 changes: 151 additions & 3 deletions pkg/util/limiter/utilization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package limiter

import (
"math"
"testing"
"time"

Expand All @@ -28,6 +29,12 @@ func TestUtilizationBasedLimiter(t *testing.T) {
t.Run("CPU based limiting should be enabled if set to a value greater than 0", func(t *testing.T) {
lim, _ := setup(t, 0.11, gigabyte)

// Warmup the CPU utilization.
for i := 0; i < int(resourceUtilizationSlidingWindow.Seconds()); i++ {
lim.compute(tim)
tim = tim.Add(time.Second)
}

// The fake utilization scanner linearly increases CPU usage for a minute
for i := 0; i < 59; i++ {
lim.compute(tim)
Expand All @@ -38,9 +45,12 @@ func TestUtilizationBasedLimiter(t *testing.T) {
tim = tim.Add(time.Second)
require.Equal(t, "cpu", lim.LimitingReason(), "Limiting should be enabled due to CPU")

// The fake utilization scanner drops CPU usage again after a minute
lim.compute(tim)
tim = tim.Add(time.Second)
// The fake utilization scanner drops CPU usage again after a minute, so we expect
// limiting to be disabled shortly.
for i := 0; i < 5; i++ {
lim.compute(tim)
tim = tim.Add(time.Second)
}
require.Empty(t, lim.LimitingReason(), "Limiting should be disabled again")
})

Expand Down Expand Up @@ -109,6 +119,102 @@ func TestFormatMemoryLimit(t *testing.T) {
assert.Equal(t, "1073741825", formatMemoryLimit((1024*1024*1024)+1))
}

func TestUtilizationBasedLimiter_CPUUtilizationSensitivity(t *testing.T) {
tests := map[string]struct {
instantCPUValues []float64
expectedMaxCPUUtilization float64
}{
"2 minutes idle": {
instantCPUValues: generateConstCPUUtilization(120, 0),
expectedMaxCPUUtilization: 0,
},
"2 minutes at constant utilization": {
instantCPUValues: generateConstCPUUtilization(120, 2.00),
expectedMaxCPUUtilization: 2,
},
"1 minute idle + 10 seconds spike + 50 seconds idle": {
instantCPUValues: func() []float64 {
values := generateConstCPUUtilization(60, 0)
values = append(values, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1)
values = append(values, generateConstCPUUtilization(50, 0)...)
return values
}(),
expectedMaxCPUUtilization: 1.49,
},
"10 seconds spike + 110 seconds idle (moving average warms up the first 60 seconds)": {
instantCPUValues: func() []float64 {
values := []float64{10, 9, 8, 7, 6, 5, 4, 3, 2, 1}
values = append(values, generateConstCPUUtilization(110, 0)...)
return values
}(),
expectedMaxCPUUtilization: 1.44,
},
"1 minute base utilization + 10 seconds spike + 50 seconds base utilization": {
instantCPUValues: func() []float64 {
values := generateConstCPUUtilization(60, 1.0)
values = append(values, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1)
values = append(values, generateConstCPUUtilization(50, 1.0)...)
return values
}(),
expectedMaxCPUUtilization: 2.25,
},
"1 minute base utilization + 10 seconds steady spike + 50 seconds base utilization": {
instantCPUValues: func() []float64 {
values := generateConstCPUUtilization(60, 1.0)
values = append(values, generateConstCPUUtilization(10, 10.0)...)
values = append(values, generateConstCPUUtilization(50, 1.0)...)
return values
}(),
expectedMaxCPUUtilization: 3.55,
},
"1 minute base utilization + 30 seconds steady spike + 30 seconds base utilization": {
instantCPUValues: func() []float64 {
values := generateConstCPUUtilization(60, 1.0)
values = append(values, generateConstCPUUtilization(30, 10.0)...)
values = append(values, generateConstCPUUtilization(30, 1.0)...)
return values
}(),
expectedMaxCPUUtilization: 6.69,
},
"linear increase and then linear decrease utilization": {
instantCPUValues: func() []float64 {
values := generateLinearStepCPUUtilization(60, 0, 0.1)
values = append(values, generateLinearStepCPUUtilization(60, 60*0.1, -0.1)...)
return values
}(),
expectedMaxCPUUtilization: 4.13,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
scanner := &preRecordedUtilizationScanner{instantCPUValues: testData.instantCPUValues}

lim := NewUtilizationBasedLimiter(1, 0, log.NewNopLogger())
lim.utilizationScanner = scanner

minCPUUtilization := float64(math.MaxInt64)
maxCPUUtilization := float64(math.MinInt64)

for i, ts := 0, time.Now(); i < len(testData.instantCPUValues); i++ {
currCPUUtilization, _ := lim.compute(ts)
ts = ts.Add(time.Second)

// Keep track of the max CPU utilization as computed by the limiter.
if currCPUUtilization < minCPUUtilization {
minCPUUtilization = currCPUUtilization
}
if currCPUUtilization > maxCPUUtilization {
maxCPUUtilization = currCPUUtilization
}
}

assert.InDelta(t, 0, minCPUUtilization, 0.01) // The minimum should always be 0 because of the warmup period.
assert.InDelta(t, testData.expectedMaxCPUUtilization, maxCPUUtilization, 0.01)
})
}
}

type fakeUtilizationScanner struct {
totalTime float64
counter int
Expand All @@ -121,3 +227,45 @@ func (s *fakeUtilizationScanner) Scan() (float64, uint64, error) {
s.counter %= 60
return s.totalTime, s.memoryUtilization, nil
}

func (s *fakeUtilizationScanner) String() string {
return "fake"
}

// preRecordedUtilizationScanner allows to replay CPU values.
type preRecordedUtilizationScanner struct {
instantCPUValues []float64

// Keeps track of the accumulated CPU utilization.
totalCPUUtilization float64
}

func (s *preRecordedUtilizationScanner) Scan() (float64, uint64, error) {
if len(s.instantCPUValues) == 0 {
return s.totalCPUUtilization, 0, nil
}

s.totalCPUUtilization += s.instantCPUValues[0]
s.instantCPUValues = s.instantCPUValues[1:]
return s.totalCPUUtilization, 0, nil
}

func (s *preRecordedUtilizationScanner) String() string {
return ""
}

func generateConstCPUUtilization(count int, value float64) []float64 {
values := make([]float64, 0, count)
for i := 0; i < count; i++ {
values = append(values, value)
}
return values
}

func generateLinearStepCPUUtilization(count int, from, step float64) []float64 {
values := make([]float64, 0, count)
for i := 0; i < count; i++ {
values = append(values, from+(float64(i)*step))
}
return values
}

0 comments on commit f86040d

Please sign in to comment.