diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index cb6785a4ae4..df83ac4d831 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -3206,7 +3206,7 @@ "kind": "field", "name": "initial_delay", "required": false, - "desc": "How long the circuit breaker should wait to start up after the corresponding ingester started. During that time both failures and successes will not be counted.", + "desc": "How long the circuit breaker should wait between an activation request and becoming effectively active. During that time both failures and successes will not be counted.", "fieldValue": null, "fieldDefaultValue": 0, "fieldFlag": "ingester.circuit-breaker.initial-delay", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 5df86e53bfc..e7ae95878ad 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1316,7 +1316,7 @@ Usage of ./cmd/mimir/mimir: -ingester.circuit-breaker.failure-threshold-percentage uint [experimental] Max percentage of requests that can fail over period before the circuit breaker opens (default 10) -ingester.circuit-breaker.initial-delay duration - [experimental] How long the circuit breaker should wait to start up after the corresponding ingester started. During that time both failures and successes will not be counted. + [experimental] How long the circuit breaker should wait between an activation request and becoming effectively active. During that time both failures and successes will not be counted. -ingester.circuit-breaker.push-timeout duration How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors (default 2s) -ingester.circuit-breaker.thresholding-period duration diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 58eaf184db7..a28ce1394b6 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -1243,9 +1243,9 @@ circuit_breaker: # CLI flag: -ingester.circuit-breaker.cooldown-period [cooldown_period: | default = 10s] - # (experimental) How long the circuit breaker should wait to start up after - # the corresponding ingester started. During that time both failures and - # successes will not be counted. + # (experimental) How long the circuit breaker should wait between an + # activation request and becoming effectively active. During that time both + # failures and successes will not be counted. # CLI flag: -ingester.circuit-breaker.initial-delay [initial_delay: | default = 0s] diff --git a/pkg/ingester/circuitbreaker.go b/pkg/ingester/circuitbreaker.go index b4684b75f7a..897294a4bb6 100644 --- a/pkg/ingester/circuitbreaker.go +++ b/pkg/ingester/circuitbreaker.go @@ -21,10 +21,10 @@ import ( ) const ( - resultSuccess = "success" - resultError = "error" - resultOpen = "circuit_breaker_open" - defaultPushTimeout = 2 * time.Second + circuitBreakerResultSuccess = "success" + circuitBreakerResultError = "error" + circuitBreakerResultOpen = "circuit_breaker_open" + circuitBreakerDefaultPushTimeout = 2 * time.Second ) type circuitBreakerMetrics struct { @@ -60,7 +60,7 @@ func newCircuitBreakerMetrics(r prometheus.Registerer, currentStateFn func() cir // We initialize all possible states for the circuitBreakerTransitions metrics cbMetrics.circuitBreakerTransitions.WithLabelValues(s.String()) } - for _, r := range []string{resultSuccess, resultError, resultOpen} { + for _, r := range []string{circuitBreakerResultSuccess, circuitBreakerResultError, circuitBreakerResultOpen} { // We initialize all possible results for the circuitBreakerResults metrics cbMetrics.circuitBreakerResults.WithLabelValues(r) } @@ -85,10 +85,12 @@ func (cfg *CircuitBreakerConfig) RegisterFlags(f *flag.FlagSet) { f.UintVar(&cfg.FailureExecutionThreshold, prefix+"failure-execution-threshold", 100, "How many requests must have been executed in period for the circuit breaker to be eligible to open for the rate of failures") f.DurationVar(&cfg.ThresholdingPeriod, prefix+"thresholding-period", time.Minute, "Moving window of time that the percentage of failed requests is computed over") f.DurationVar(&cfg.CooldownPeriod, prefix+"cooldown-period", 10*time.Second, "How long the circuit breaker will stay in the open state before allowing some requests") - f.DurationVar(&cfg.InitialDelay, prefix+"initial-delay", 0, "How long the circuit breaker should wait to start up after the corresponding ingester started. During that time both failures and successes will not be counted.") - f.DurationVar(&cfg.PushTimeout, prefix+"push-timeout", defaultPushTimeout, "How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors") + f.DurationVar(&cfg.InitialDelay, prefix+"initial-delay", 0, "How long the circuit breaker should wait between an activation request and becoming effectively active. During that time both failures and successes will not be counted.") + f.DurationVar(&cfg.PushTimeout, prefix+"push-timeout", circuitBreakerDefaultPushTimeout, "How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors") } +// circuitBreaker abstracts the ingester's server-side circuit breaker functionality. +// A nil *circuitBreaker is a valid noop implementation. type circuitBreaker struct { cfg CircuitBreakerConfig logger log.Logger @@ -100,8 +102,11 @@ type circuitBreaker struct { testRequestDelay time.Duration } -func newCircuitBreaker(cfg CircuitBreakerConfig, isActive bool, logger log.Logger, registerer prometheus.Registerer) *circuitBreaker { - active := atomic.NewBool(isActive) +func newCircuitBreaker(cfg CircuitBreakerConfig, logger log.Logger, registerer prometheus.Registerer) *circuitBreaker { + if !cfg.Enabled { + return nil + } + active := atomic.NewBool(false) cb := circuitBreaker{ cfg: cfg, logger: logger, @@ -142,7 +147,7 @@ func newCircuitBreaker(cfg CircuitBreakerConfig, isActive bool, logger log.Logge return &cb } -func isFailure(err error) bool { +func isCircuitBreakerFailure(err error) bool { if err == nil { return false } @@ -169,17 +174,19 @@ func isFailure(err error) bool { } func (cb *circuitBreaker) isActive() bool { - if cb == nil { - return false - } - return cb.active.Load() + return cb != nil && cb.active.Load() } -func (cb *circuitBreaker) setActive() { +func (cb *circuitBreaker) activate() { if cb == nil { return } - cb.active.Store(true) + if cb.cfg.InitialDelay == 0 { + cb.active.Store(true) + } + time.AfterFunc(cb.cfg.InitialDelay, func() { + cb.active.Store(true) + }) } // tryAcquirePermit tries to acquire a permit to use the circuit breaker and returns whether a permit was acquired. @@ -192,25 +199,12 @@ func (cb *circuitBreaker) tryAcquirePermit() (bool, error) { return false, nil } if !cb.cb.TryAcquirePermit() { - cb.metrics.circuitBreakerResults.WithLabelValues(resultOpen).Inc() + cb.metrics.circuitBreakerResults.WithLabelValues(circuitBreakerResultOpen).Inc() return false, newCircuitBreakerOpenError(cb.cb.RemainingDelay()) } return true, nil } -func (cb *circuitBreaker) recordResult(err error) { - if !cb.isActive() { - return - } - if err != nil && isFailure(err) { - cb.cb.RecordFailure() - cb.metrics.circuitBreakerResults.WithLabelValues(resultError).Inc() - } else { - cb.metrics.circuitBreakerResults.WithLabelValues(resultSuccess).Inc() - cb.cb.RecordSuccess() - } -} - // finishPushRequest should be called to complete the push request executed upon a // successfully acquired circuit breaker permit. // It records the result of the push request with the circuit breaker. Push requests @@ -229,3 +223,16 @@ func (cb *circuitBreaker) finishPushRequest(duration time.Duration, pushErr erro cb.recordResult(pushErr) return pushErr } + +func (cb *circuitBreaker) recordResult(err error) { + if !cb.isActive() { + return + } + if err != nil && isCircuitBreakerFailure(err) { + cb.cb.RecordFailure() + cb.metrics.circuitBreakerResults.WithLabelValues(circuitBreakerResultError).Inc() + } else { + cb.metrics.circuitBreakerResults.WithLabelValues(circuitBreakerResultSuccess).Inc() + cb.cb.RecordSuccess() + } +} diff --git a/pkg/ingester/circuitbreaker_test.go b/pkg/ingester/circuitbreaker_test.go index 4d65214859a..7240b689b82 100644 --- a/pkg/ingester/circuitbreaker_test.go +++ b/pkg/ingester/circuitbreaker_test.go @@ -28,30 +28,30 @@ import ( func TestIsFailure(t *testing.T) { t.Run("no error", func(t *testing.T) { - require.False(t, isFailure(nil)) + require.False(t, isCircuitBreakerFailure(nil)) }) t.Run("context cancelled", func(t *testing.T) { - require.False(t, isFailure(context.Canceled)) - require.False(t, isFailure(fmt.Errorf("%w", context.Canceled))) + require.False(t, isCircuitBreakerFailure(context.Canceled)) + require.False(t, isCircuitBreakerFailure(fmt.Errorf("%w", context.Canceled))) }) t.Run("gRPC context cancelled", func(t *testing.T) { err := status.Error(codes.Canceled, "cancelled!") - require.False(t, isFailure(err)) - require.False(t, isFailure(fmt.Errorf("%w", err))) + require.False(t, isCircuitBreakerFailure(err)) + require.False(t, isCircuitBreakerFailure(fmt.Errorf("%w", err))) }) t.Run("gRPC deadline exceeded", func(t *testing.T) { err := status.Error(codes.DeadlineExceeded, "broken!") - require.True(t, isFailure(err)) - require.True(t, isFailure(fmt.Errorf("%w", err))) + require.True(t, isCircuitBreakerFailure(err)) + require.True(t, isCircuitBreakerFailure(fmt.Errorf("%w", err))) }) t.Run("gRPC unavailable with INSTANCE_LIMIT details", func(t *testing.T) { err := newInstanceLimitReachedError("broken") - require.True(t, isFailure(err)) - require.True(t, isFailure(fmt.Errorf("%w", err))) + require.True(t, isCircuitBreakerFailure(err)) + require.True(t, isCircuitBreakerFailure(fmt.Errorf("%w", err))) }) t.Run("gRPC unavailable with SERVICE_UNAVAILABLE details is not a failure", func(t *testing.T) { @@ -59,14 +59,14 @@ func TestIsFailure(t *testing.T) { stat, err := stat.WithDetails(&mimirpb.ErrorDetails{Cause: mimirpb.SERVICE_UNAVAILABLE}) require.NoError(t, err) err = stat.Err() - require.False(t, isFailure(err)) - require.False(t, isFailure(fmt.Errorf("%w", err))) + require.False(t, isCircuitBreakerFailure(err)) + require.False(t, isCircuitBreakerFailure(fmt.Errorf("%w", err))) }) t.Run("gRPC unavailable without details is not a failure", func(t *testing.T) { err := status.Error(codes.Unavailable, "broken!") - require.False(t, isFailure(err)) - require.False(t, isFailure(fmt.Errorf("%w", err))) + require.False(t, isCircuitBreakerFailure(err)) + require.False(t, isCircuitBreakerFailure(fmt.Errorf("%w", err))) }) } @@ -77,10 +77,9 @@ func TestCircuitBreaker_IsActive(t *testing.T) { registry := prometheus.NewRegistry() cfg := CircuitBreakerConfig{Enabled: true, InitialDelay: 10 * time.Millisecond} - cb = newCircuitBreaker(cfg, false, log.NewNopLogger(), registry) - time.AfterFunc(cfg.InitialDelay, func() { - cb.setActive() - }) + cb = newCircuitBreaker(cfg, log.NewNopLogger(), registry) + cb.activate() + // When InitialDelay is set, circuit breaker is not immediately active. require.False(t, cb.isActive()) @@ -104,18 +103,26 @@ func TestCircuitBreaker_TryAcquirePermit(t *testing.T) { expectedMetrics string }{ "if circuit breaker is not active, status false and no error are returned": { - initialDelay: 1 * time.Minute, - circuitBreakerSetup: func(cb *circuitBreaker) { cb.cb.Close() }, + initialDelay: 1 * time.Minute, + circuitBreakerSetup: func(cb *circuitBreaker) { + cb.active.Store(false) + }, expectedSuccess: false, expectedCircuitBreakerError: false, }, "if circuit breaker closed, status true and no error are returned": { - circuitBreakerSetup: func(cb *circuitBreaker) { cb.cb.Close() }, + circuitBreakerSetup: func(cb *circuitBreaker) { + cb.activate() + cb.cb.Close() + }, expectedSuccess: true, expectedCircuitBreakerError: false, }, "if circuit breaker open, status false and a circuitBreakerErrorOpen are returned": { - circuitBreakerSetup: func(cb *circuitBreaker) { cb.cb.Open() }, + circuitBreakerSetup: func(cb *circuitBreaker) { + cb.activate() + cb.cb.Open() + }, expectedSuccess: false, expectedCircuitBreakerError: true, expectedMetrics: ` @@ -137,7 +144,10 @@ func TestCircuitBreaker_TryAcquirePermit(t *testing.T) { `, }, "if circuit breaker half-open, status false and a circuitBreakerErrorOpen are returned": { - circuitBreakerSetup: func(cb *circuitBreaker) { cb.cb.HalfOpen() }, + circuitBreakerSetup: func(cb *circuitBreaker) { + cb.activate() + cb.cb.HalfOpen() + }, expectedSuccess: false, expectedCircuitBreakerError: true, expectedMetrics: ` @@ -162,8 +172,8 @@ func TestCircuitBreaker_TryAcquirePermit(t *testing.T) { for testName, testCase := range testCases { t.Run(testName, func(t *testing.T) { registry := prometheus.NewRegistry() - cfg := CircuitBreakerConfig{Enabled: true, CooldownPeriod: 10 * time.Second, InitialDelay: testCase.initialDelay} - cb := newCircuitBreaker(cfg, cfg.InitialDelay == 0, log.NewNopLogger(), registry) + cfg := CircuitBreakerConfig{Enabled: true, CooldownPeriod: 10 * time.Second} + cb := newCircuitBreaker(cfg, log.NewNopLogger(), registry) testCase.circuitBreakerSetup(cb) status, err := cb.tryAcquirePermit() require.Equal(t, testCase.expectedSuccess, status) @@ -220,7 +230,8 @@ func TestCircuitBreaker_RecordResult(t *testing.T) { for testName, testCase := range testCases { t.Run(testName, func(t *testing.T) { registry := prometheus.NewRegistry() - cb := newCircuitBreaker(cfg, true, log.NewNopLogger(), registry) + cb := newCircuitBreaker(cfg, log.NewNopLogger(), registry) + cb.activate() cb.recordResult(testCase.err) assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(testCase.expectedMetrics), metricNames...)) }) @@ -233,13 +244,14 @@ func TestCircuitBreaker_FinishPushRequest(t *testing.T) { } testCases := map[string]struct { pushRequestDuration time.Duration - initialDelay time.Duration + isActive bool err error expectedErr error expectedMetrics string }{ "with a permit acquired, pushRequestDuration lower than PushTimeout and no input error, finishPushRequest gives success": { pushRequestDuration: 1 * time.Second, + isActive: true, err: nil, expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. @@ -251,7 +263,7 @@ func TestCircuitBreaker_FinishPushRequest(t *testing.T) { }, "with circuit breaker not active, pushRequestDuration lower than PushTimeout and no input error, finishPushRequest does nothing": { pushRequestDuration: 1 * time.Second, - initialDelay: 1 * time.Minute, + isActive: false, err: nil, expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. @@ -263,6 +275,7 @@ func TestCircuitBreaker_FinishPushRequest(t *testing.T) { }, "with circuit breaker active, pushRequestDuration higher than PushTimeout and no input error, finishPushRequest gives context deadline exceeded error": { pushRequestDuration: 3 * time.Second, + isActive: true, err: nil, expectedErr: context.DeadlineExceeded, expectedMetrics: ` @@ -275,7 +288,7 @@ func TestCircuitBreaker_FinishPushRequest(t *testing.T) { }, "with circuit breaker not active, pushRequestDuration higher than PushTimeout and no input error, finishPushRequest does nothing": { pushRequestDuration: 3 * time.Second, - initialDelay: 1 * time.Minute, + isActive: false, err: nil, expectedErr: nil, expectedMetrics: ` @@ -288,6 +301,7 @@ func TestCircuitBreaker_FinishPushRequest(t *testing.T) { }, "with circuit breaker active, pushRequestDuration higher than PushTimeout and an input error different from context deadline exceeded, finishPushRequest gives context deadline exceeded error": { pushRequestDuration: 3 * time.Second, + isActive: true, err: newInstanceLimitReachedError("error"), expectedErr: context.DeadlineExceeded, expectedMetrics: ` @@ -300,7 +314,7 @@ func TestCircuitBreaker_FinishPushRequest(t *testing.T) { }, "with circuit breaker not active, pushRequestDuration higher than PushTimeout and an input error different from context deadline exceeded, finishPushRequest does nothing": { pushRequestDuration: 3 * time.Second, - initialDelay: 1 * time.Minute, + isActive: false, err: newInstanceLimitReachedError("error"), expectedErr: nil, expectedMetrics: ` @@ -316,11 +330,11 @@ func TestCircuitBreaker_FinishPushRequest(t *testing.T) { t.Run(testName, func(t *testing.T) { registry := prometheus.NewRegistry() cfg := CircuitBreakerConfig{ - Enabled: true, - InitialDelay: testCase.initialDelay, - PushTimeout: 2 * time.Second, + Enabled: true, + PushTimeout: 2 * time.Second, } - cb := newCircuitBreaker(cfg, cfg.InitialDelay == 0, log.NewNopLogger(), registry) + cb := newCircuitBreaker(cfg, log.NewNopLogger(), registry) + cb.active.Store(testCase.isActive) err := cb.finishPushRequest(testCase.pushRequestDuration, testCase.err) if testCase.expectedErr == nil { require.NoError(t, err) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index f44c65fb27d..77e84bde583 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -398,10 +398,8 @@ func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, i.metrics = newIngesterMetrics(registerer, cfg.ActiveSeriesMetrics.Enabled, i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests, &i.inflightPushRequestsBytes) i.activeGroups = activeGroupsCleanupService - if cfg.CircuitBreakerConfig.Enabled { - // We create an inactive circuit breaker, which will be activated on a successful completion of starting. - i.circuitBreaker = newCircuitBreaker(cfg.CircuitBreakerConfig, false, logger, registerer) - } + // We create a circuit breaker, which will be activated on a successful completion of starting. + i.circuitBreaker = newCircuitBreaker(cfg.CircuitBreakerConfig, logger, registerer) if registerer != nil { promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ @@ -644,13 +642,7 @@ func (i *Ingester) starting(ctx context.Context) (err error) { return errors.Wrap(err, "failed to start ingester subservices after ingester ring lifecycler") } - if i.cfg.CircuitBreakerConfig.InitialDelay == 0 { - i.circuitBreaker.setActive() - } else { - time.AfterFunc(i.cfg.CircuitBreakerConfig.InitialDelay, func() { - i.circuitBreaker.setActive() - }) - } + i.circuitBreaker.activate() return nil } @@ -1054,12 +1046,15 @@ func (i *Ingester) startPushRequest(ctx context.Context, reqSize int64) (context instanceLimitsErr := i.checkInstanceLimits(inflight, inflightBytes, rejectEqualInflightBytes) if instanceLimitsErr == nil { + // In this case a pull request has been successfully started, and we return + // the context enriched with the corresponding pushRequestState object. return ctx, true, nil } // In this case a per-instance limit has been hit, and the corresponding error has to be passed // to FinishPushRequest, which finishes the push request, records the error with the circuit breaker, // and gives it a possibly acquired permit back. + st.pushErr = instanceLimitsErr i.FinishPushRequest(ctx) return nil, false, instanceLimitsErr }