From f86040d0b31d89de4ed225adbaf24853a2b05c80 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 5 Jul 2023 10:06:21 +0200 Subject: [PATCH] Add warmup period to CPU utilization moving average (#5394) * Add warmup period to CPU utilization moving average Signed-off-by: Marco Pracucci * Address review comments Signed-off-by: Marco Pracucci --------- Signed-off-by: Marco Pracucci --- CHANGELOG.md | 2 +- pkg/util/limiter/utilization.go | 54 ++++++---- pkg/util/limiter/utilization_test.go | 154 ++++++++++++++++++++++++++- 3 files changed, 186 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 46834b86e90..87fa7f0d838 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ * `cortex_frontend_query_result_cache_requests_total{request_type="query_range|cardinality"}` * `cortex_frontend_query_result_cache_hits_total{request_type="query_range|cardinality"}` * [FEATURE] Added `-.s3.list-objects-version` flag to configure the S3 list objects version. -* [FEATURE] Ingester: Add optional CPU/memory utilization based read request limiting, considered experimental. Disabled by default, enable by configuring limits via both of the following flags: #5012 #5392 +* [FEATURE] Ingester: Add optional CPU/memory utilization based read request limiting, considered experimental. Disabled by default, enable by configuring limits via both of the following flags: #5012 #5392 #5394 * `-ingester.read-path-cpu-utilization-limit` * `-ingester.read-path-memory-utilization-limit` * [FEATURE] Ruler: Support filtering results from rule status endpoint by `file`, `rule_group` and `rule_name`. #5291 diff --git a/pkg/util/limiter/utilization.go b/pkg/util/limiter/utilization.go index 172b821f33c..733593dd27a 100644 --- a/pkg/util/limiter/utilization.go +++ b/pkg/util/limiter/utilization.go @@ -18,8 +18,11 @@ import ( ) const ( - // Interval for updating resource (CPU/memory) utilization + // Interval for updating resource (CPU/memory) utilization. resourceUtilizationUpdateInterval = time.Second + + // How long is the sliding window used to compute the moving average. + resourceUtilizationSlidingWindow = 60 * time.Second ) type utilizationScanner interface { @@ -55,6 +58,8 @@ type UtilizationBasedLimiter struct { cpuLimit float64 // Last CPU time counter lastCPUTime float64 + // The time of the first update + firstUpdate time.Time // The time of the last update lastUpdate time.Time cpuMovingAvg *math.EwmaRate @@ -65,7 +70,7 @@ type UtilizationBasedLimiter struct { func NewUtilizationBasedLimiter(cpuLimit float64, memoryLimit uint64, logger log.Logger) *UtilizationBasedLimiter { // Calculate alpha for a minute long window // https://github.com/VividCortex/ewma#choosing-alpha - alpha := 2 / (60/resourceUtilizationUpdateInterval.Seconds() + 1) + alpha := 2 / (resourceUtilizationSlidingWindow.Seconds()/resourceUtilizationUpdateInterval.Seconds() + 1) l := &UtilizationBasedLimiter{ logger: logger, cpuLimit: cpuLimit, @@ -100,8 +105,10 @@ func (l *UtilizationBasedLimiter) update(_ context.Context) error { return nil } -func (l *UtilizationBasedLimiter) compute(now time.Time) { - cpuTime, memUtil, err := l.utilizationScanner.Scan() +// compute and return the current CPU and memory utilization. +// This function must be called at a regular interval (resourceUtilizationUpdateInterval) to get a predictable behaviour. +func (l *UtilizationBasedLimiter) compute(now time.Time) (currCPUUtil float64, currMemoryUtil uint64) { + cpuTime, currMemoryUtil, err := l.utilizationScanner.Scan() if err != nil { level.Warn(l.logger).Log("msg", "failed to get CPU and memory stats", "err", err.Error()) // Disable any limiting, since we can't tell resource utilization @@ -109,25 +116,30 @@ func (l *UtilizationBasedLimiter) compute(now time.Time) { return } - lastUpdate := l.lastUpdate - l.lastUpdate = now + // Add the instant CPU utilization to the moving average. The instant CPU + // utilization can only be computed starting from the 2nd tick. + if prevUpdate, prevCPUTime := l.lastUpdate, l.lastCPUTime; !prevUpdate.IsZero() { + cpuUtil := (cpuTime - prevCPUTime) / now.Sub(prevUpdate).Seconds() + l.cpuMovingAvg.Add(int64(cpuUtil * 100)) + l.cpuMovingAvg.Tick() + } - lastCPUTime := l.lastCPUTime + l.lastUpdate = now l.lastCPUTime = cpuTime - if lastUpdate.IsZero() { - return + // The CPU utilization moving average requires a warmup period before getting + // stable results. In this implementation we use a warmup period equal to the + // sliding window. During the warmup, the reported CPU utilization will be 0. + if l.firstUpdate.IsZero() { + l.firstUpdate = now + } else if now.Sub(l.firstUpdate) >= resourceUtilizationSlidingWindow { + currCPUUtil = float64(l.cpuMovingAvg.Rate()) / 100 } - cpuUtil := (cpuTime - lastCPUTime) / now.Sub(lastUpdate).Seconds() - l.cpuMovingAvg.Add(int64(cpuUtil * 100)) - l.cpuMovingAvg.Tick() - cpuA := float64(l.cpuMovingAvg.Rate()) / 100 - var reason string - if l.memoryLimit > 0 && memUtil >= l.memoryLimit { + if l.memoryLimit > 0 && currMemoryUtil >= l.memoryLimit { reason = "memory" - } else if l.cpuLimit > 0 && cpuA >= l.cpuLimit { + } else if l.cpuLimit > 0 && currCPUUtil >= l.cpuLimit { reason = "cpu" } @@ -140,14 +152,16 @@ func (l *UtilizationBasedLimiter) compute(now time.Time) { if enable { level.Info(l.logger).Log("msg", "enabling resource utilization based limiting", - "reason", reason, "memory_limit", formatMemoryLimit(l.memoryLimit), "memory_utilization", formatMemory(memUtil), - "cpu_limit", formatCPULimit(l.cpuLimit), "cpu_utilization", formatCPU(cpuA)) + "reason", reason, "memory_limit", formatMemoryLimit(l.memoryLimit), "memory_utilization", formatMemory(currMemoryUtil), + "cpu_limit", formatCPULimit(l.cpuLimit), "cpu_utilization", formatCPU(currCPUUtil)) } else { level.Info(l.logger).Log("msg", "disabling resource utilization based limiting", - "memory_limit", formatMemoryLimit(l.memoryLimit), "memory_utilization", formatMemory(memUtil), - "cpu_limit", formatCPULimit(l.cpuLimit), "cpu_utilization", formatCPU(cpuA)) + "memory_limit", formatMemoryLimit(l.memoryLimit), "memory_utilization", formatMemory(currMemoryUtil), + "cpu_limit", formatCPULimit(l.cpuLimit), "cpu_utilization", formatCPU(currCPUUtil)) } + l.limitingReason.Store(reason) + return } func formatCPU(value float64) string { diff --git a/pkg/util/limiter/utilization_test.go b/pkg/util/limiter/utilization_test.go index 23eb6695c15..7387c91a662 100644 --- a/pkg/util/limiter/utilization_test.go +++ b/pkg/util/limiter/utilization_test.go @@ -3,6 +3,7 @@ package limiter import ( + "math" "testing" "time" @@ -28,6 +29,12 @@ func TestUtilizationBasedLimiter(t *testing.T) { t.Run("CPU based limiting should be enabled if set to a value greater than 0", func(t *testing.T) { lim, _ := setup(t, 0.11, gigabyte) + // Warmup the CPU utilization. + for i := 0; i < int(resourceUtilizationSlidingWindow.Seconds()); i++ { + lim.compute(tim) + tim = tim.Add(time.Second) + } + // The fake utilization scanner linearly increases CPU usage for a minute for i := 0; i < 59; i++ { lim.compute(tim) @@ -38,9 +45,12 @@ func TestUtilizationBasedLimiter(t *testing.T) { tim = tim.Add(time.Second) require.Equal(t, "cpu", lim.LimitingReason(), "Limiting should be enabled due to CPU") - // The fake utilization scanner drops CPU usage again after a minute - lim.compute(tim) - tim = tim.Add(time.Second) + // The fake utilization scanner drops CPU usage again after a minute, so we expect + // limiting to be disabled shortly. + for i := 0; i < 5; i++ { + lim.compute(tim) + tim = tim.Add(time.Second) + } require.Empty(t, lim.LimitingReason(), "Limiting should be disabled again") }) @@ -109,6 +119,102 @@ func TestFormatMemoryLimit(t *testing.T) { assert.Equal(t, "1073741825", formatMemoryLimit((1024*1024*1024)+1)) } +func TestUtilizationBasedLimiter_CPUUtilizationSensitivity(t *testing.T) { + tests := map[string]struct { + instantCPUValues []float64 + expectedMaxCPUUtilization float64 + }{ + "2 minutes idle": { + instantCPUValues: generateConstCPUUtilization(120, 0), + expectedMaxCPUUtilization: 0, + }, + "2 minutes at constant utilization": { + instantCPUValues: generateConstCPUUtilization(120, 2.00), + expectedMaxCPUUtilization: 2, + }, + "1 minute idle + 10 seconds spike + 50 seconds idle": { + instantCPUValues: func() []float64 { + values := generateConstCPUUtilization(60, 0) + values = append(values, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1) + values = append(values, generateConstCPUUtilization(50, 0)...) + return values + }(), + expectedMaxCPUUtilization: 1.49, + }, + "10 seconds spike + 110 seconds idle (moving average warms up the first 60 seconds)": { + instantCPUValues: func() []float64 { + values := []float64{10, 9, 8, 7, 6, 5, 4, 3, 2, 1} + values = append(values, generateConstCPUUtilization(110, 0)...) + return values + }(), + expectedMaxCPUUtilization: 1.44, + }, + "1 minute base utilization + 10 seconds spike + 50 seconds base utilization": { + instantCPUValues: func() []float64 { + values := generateConstCPUUtilization(60, 1.0) + values = append(values, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1) + values = append(values, generateConstCPUUtilization(50, 1.0)...) + return values + }(), + expectedMaxCPUUtilization: 2.25, + }, + "1 minute base utilization + 10 seconds steady spike + 50 seconds base utilization": { + instantCPUValues: func() []float64 { + values := generateConstCPUUtilization(60, 1.0) + values = append(values, generateConstCPUUtilization(10, 10.0)...) + values = append(values, generateConstCPUUtilization(50, 1.0)...) + return values + }(), + expectedMaxCPUUtilization: 3.55, + }, + "1 minute base utilization + 30 seconds steady spike + 30 seconds base utilization": { + instantCPUValues: func() []float64 { + values := generateConstCPUUtilization(60, 1.0) + values = append(values, generateConstCPUUtilization(30, 10.0)...) + values = append(values, generateConstCPUUtilization(30, 1.0)...) + return values + }(), + expectedMaxCPUUtilization: 6.69, + }, + "linear increase and then linear decrease utilization": { + instantCPUValues: func() []float64 { + values := generateLinearStepCPUUtilization(60, 0, 0.1) + values = append(values, generateLinearStepCPUUtilization(60, 60*0.1, -0.1)...) + return values + }(), + expectedMaxCPUUtilization: 4.13, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + scanner := &preRecordedUtilizationScanner{instantCPUValues: testData.instantCPUValues} + + lim := NewUtilizationBasedLimiter(1, 0, log.NewNopLogger()) + lim.utilizationScanner = scanner + + minCPUUtilization := float64(math.MaxInt64) + maxCPUUtilization := float64(math.MinInt64) + + for i, ts := 0, time.Now(); i < len(testData.instantCPUValues); i++ { + currCPUUtilization, _ := lim.compute(ts) + ts = ts.Add(time.Second) + + // Keep track of the max CPU utilization as computed by the limiter. + if currCPUUtilization < minCPUUtilization { + minCPUUtilization = currCPUUtilization + } + if currCPUUtilization > maxCPUUtilization { + maxCPUUtilization = currCPUUtilization + } + } + + assert.InDelta(t, 0, minCPUUtilization, 0.01) // The minimum should always be 0 because of the warmup period. + assert.InDelta(t, testData.expectedMaxCPUUtilization, maxCPUUtilization, 0.01) + }) + } +} + type fakeUtilizationScanner struct { totalTime float64 counter int @@ -121,3 +227,45 @@ func (s *fakeUtilizationScanner) Scan() (float64, uint64, error) { s.counter %= 60 return s.totalTime, s.memoryUtilization, nil } + +func (s *fakeUtilizationScanner) String() string { + return "fake" +} + +// preRecordedUtilizationScanner allows to replay CPU values. +type preRecordedUtilizationScanner struct { + instantCPUValues []float64 + + // Keeps track of the accumulated CPU utilization. + totalCPUUtilization float64 +} + +func (s *preRecordedUtilizationScanner) Scan() (float64, uint64, error) { + if len(s.instantCPUValues) == 0 { + return s.totalCPUUtilization, 0, nil + } + + s.totalCPUUtilization += s.instantCPUValues[0] + s.instantCPUValues = s.instantCPUValues[1:] + return s.totalCPUUtilization, 0, nil +} + +func (s *preRecordedUtilizationScanner) String() string { + return "" +} + +func generateConstCPUUtilization(count int, value float64) []float64 { + values := make([]float64, 0, count) + for i := 0; i < count; i++ { + values = append(values, value) + } + return values +} + +func generateLinearStepCPUUtilization(count int, from, step float64) []float64 { + values := make([]float64, 0, count) + for i := 0; i < count; i++ { + values = append(values, from+(float64(i)*step)) + } + return values +}