diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 30dbee9410d..c3fd7280576 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -3149,12 +3149,12 @@ }, { "kind": "field", - "name": "failure_threshold", + "name": "failure_threshold_percentage", "required": false, "desc": "Max percentage of requests that can fail over period before the circuit breaker opens", "fieldValue": null, "fieldDefaultValue": 10, - "fieldFlag": "ingester.circuit-breaker.failure-threshold", + "fieldFlag": "ingester.circuit-breaker.failure-threshold-percentage", "fieldType": "int", "fieldCategory": "experimental" }, @@ -3208,7 +3208,7 @@ "required": false, "desc": "How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors", "fieldValue": null, - "fieldDefaultValue": 0, + "fieldDefaultValue": 2000000000, "fieldFlag": "ingester.circuit-breaker.push-timeout", "fieldType": "duration", "fieldCategory": "experiment" diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 2a41ff88e0b..0cc5602f105 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1307,12 +1307,12 @@ Usage of ./cmd/mimir/mimir: [experimental] Enable circuit breaking when making requests to ingesters -ingester.circuit-breaker.failure-execution-threshold uint [experimental] How many requests must have been executed in period for the circuit breaker to be eligible to open for the rate of failures (default 100) - -ingester.circuit-breaker.failure-threshold uint + -ingester.circuit-breaker.failure-threshold-percentage uint [experimental] Max percentage of requests that can fail over period before the circuit breaker opens (default 10) -ingester.circuit-breaker.initial-delay duration [experimental] How long the circuit breaker should wait between creation and starting up. During that time both failures and successes will not be counted. -ingester.circuit-breaker.push-timeout duration - How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors + How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors (default 2s) -ingester.circuit-breaker.thresholding-period duration [experimental] Moving window of time that the percentage of failed requests is computed over (default 1m0s) -ingester.client.backoff-max-period duration diff --git a/cmd/mimir/help.txt.tmpl b/cmd/mimir/help.txt.tmpl index b9ee5965b1d..0d5aec9b73d 100644 --- a/cmd/mimir/help.txt.tmpl +++ b/cmd/mimir/help.txt.tmpl @@ -386,7 +386,7 @@ Usage of ./cmd/mimir/mimir: -help-all Print help, also including advanced and experimental parameters. -ingester.circuit-breaker.push-timeout duration - How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors + How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors (default 2s) -ingester.max-global-metadata-per-metric int The maximum number of metadata per metric, across the cluster. 0 to disable. -ingester.max-global-metadata-per-user int diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 5674155ceb9..9b2753f842b 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -1220,8 +1220,8 @@ circuit_breaker: # (experimental) Max percentage of requests that can fail over period before # the circuit breaker opens - # CLI flag: -ingester.circuit-breaker.failure-threshold - [failure_threshold: | default = 10] + # CLI flag: -ingester.circuit-breaker.failure-threshold-percentage + [failure_threshold_percentage: | default = 10] # (experimental) How many requests must have been executed in period for the # circuit breaker to be eligible to open for the rate of failures @@ -1249,7 +1249,7 @@ circuit_breaker: # used for circuit breakers only, and timeout expirations are not reported as # errors # CLI flag: -ingester.circuit-breaker.push-timeout - [push_timeout: | default = 0s] + [push_timeout: | default = 2s] ``` ### querier diff --git a/pkg/ingester/circuitbreaker.go b/pkg/ingester/circuitbreaker.go index 765c637e511..a1f7b78f9fe 100644 --- a/pkg/ingester/circuitbreaker.go +++ b/pkg/ingester/circuitbreaker.go @@ -23,144 +23,122 @@ import ( type testCtxKey string const ( - resultSuccess = "success" - resultError = "error" - resultOpen = "circuit_breaker_open" - testDelayKey testCtxKey = "test-delay" - - circuitBreakerCurrentStateGaugeName = "cortex_ingester_circuit_breaker_current_state" - circuitBreakerCurrentStateGaugeHelp = "Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name." - circuitBreakerCurrentStateGaugeLabel = "state" + resultSuccess = "success" + resultError = "error" + resultOpen = "circuit_breaker_open" + defaultPushTimeout = 2 * time.Second + testDelayKey testCtxKey = "test-delay" ) type circuitBreakerMetrics struct { - circuitBreakerOpenStateGauge prometheus.GaugeFunc - circuitBreakerHalfOpenStateGauge prometheus.GaugeFunc - circuitBreakerClosedStateGauge prometheus.GaugeFunc - circuitBreakerTransitions *prometheus.CounterVec circuitBreakerResults *prometheus.CounterVec } func newCircuitBreakerMetrics(r prometheus.Registerer, currentStateFn func() circuitbreaker.State) *circuitBreakerMetrics { - return &circuitBreakerMetrics{ - circuitBreakerOpenStateGauge: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{ - Name: circuitBreakerCurrentStateGaugeName, - Help: circuitBreakerCurrentStateGaugeHelp, - ConstLabels: map[string]string{circuitBreakerCurrentStateGaugeLabel: circuitbreaker.OpenState.String()}, - }, func() float64 { - if currentStateFn() == circuitbreaker.OpenState { - return 1 - } - return 0 - }), - circuitBreakerHalfOpenStateGauge: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{ - Name: circuitBreakerCurrentStateGaugeName, - Help: circuitBreakerCurrentStateGaugeHelp, - ConstLabels: map[string]string{circuitBreakerCurrentStateGaugeLabel: circuitbreaker.HalfOpenState.String()}, - }, func() float64 { - if currentStateFn() == circuitbreaker.HalfOpenState { - return 1 - } - return 0 - }), - circuitBreakerClosedStateGauge: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{ - Name: circuitBreakerCurrentStateGaugeName, - Help: circuitBreakerCurrentStateGaugeHelp, - ConstLabels: map[string]string{circuitBreakerCurrentStateGaugeLabel: circuitbreaker.ClosedState.String()}, - }, func() float64 { - if currentStateFn() == circuitbreaker.ClosedState { - return 1 - } - return 0 - }), + cbMetrics := &circuitBreakerMetrics{ circuitBreakerTransitions: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ingester_circuit_breaker_transitions_total", Help: "Number of times the circuit breaker has entered a state.", - }, []string{"ingester", "state"}), + }, []string{"state"}), circuitBreakerResults: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ingester_circuit_breaker_results_total", Help: "Results of executing requests via the circuit breaker.", - }, []string{"ingester", "result"}), + }, []string{"result"}), + } + circuitBreakerCurrentStateGaugeFn := func(state circuitbreaker.State) prometheus.GaugeFunc { + return promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{ + Name: "cortex_ingester_circuit_breaker_current_state", + Help: "Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name.", + ConstLabels: map[string]string{"state": state.String()}, + }, func() float64 { + if currentStateFn() == state { + return 1 + } + return 0 + }) + } + for _, s := range []circuitbreaker.State{circuitbreaker.OpenState, circuitbreaker.HalfOpenState, circuitbreaker.ClosedState} { + circuitBreakerCurrentStateGaugeFn(s) + // We initialize all possible states for the circuitBreakerTransitions metrics + cbMetrics.circuitBreakerTransitions.WithLabelValues(s.String()) + } + for _, r := range []string{resultSuccess, resultError, resultOpen} { + // We initialize all possible results for the circuitBreakerResults metrics + cbMetrics.circuitBreakerResults.WithLabelValues(r) } + return cbMetrics } type CircuitBreakerConfig struct { - Enabled bool `yaml:"enabled" category:"experimental"` - FailureThreshold uint `yaml:"failure_threshold" category:"experimental"` - FailureExecutionThreshold uint `yaml:"failure_execution_threshold" category:"experimental"` - ThresholdingPeriod time.Duration `yaml:"thresholding_period" category:"experimental"` - CooldownPeriod time.Duration `yaml:"cooldown_period" category:"experimental"` - InitialDelay time.Duration `yaml:"initial_delay" category:"experimental"` - PushTimeout time.Duration `yaml:"push_timeout" category:"experiment"` - testModeEnabled bool `yaml:"-"` + Enabled bool `yaml:"enabled" category:"experimental"` + FailureThresholdPercentage uint `yaml:"failure_threshold_percentage" category:"experimental"` + FailureExecutionThreshold uint `yaml:"failure_execution_threshold" category:"experimental"` + ThresholdingPeriod time.Duration `yaml:"thresholding_period" category:"experimental"` + CooldownPeriod time.Duration `yaml:"cooldown_period" category:"experimental"` + InitialDelay time.Duration `yaml:"initial_delay" category:"experimental"` + PushTimeout time.Duration `yaml:"push_timeout" category:"experiment"` + testModeEnabled bool `yaml:"-"` } func (cfg *CircuitBreakerConfig) RegisterFlags(f *flag.FlagSet) { prefix := "ingester.circuit-breaker." f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "Enable circuit breaking when making requests to ingesters") - f.UintVar(&cfg.FailureThreshold, prefix+"failure-threshold", 10, "Max percentage of requests that can fail over period before the circuit breaker opens") + f.UintVar(&cfg.FailureThresholdPercentage, prefix+"failure-threshold-percentage", 10, "Max percentage of requests that can fail over period before the circuit breaker opens") f.UintVar(&cfg.FailureExecutionThreshold, prefix+"failure-execution-threshold", 100, "How many requests must have been executed in period for the circuit breaker to be eligible to open for the rate of failures") f.DurationVar(&cfg.ThresholdingPeriod, prefix+"thresholding-period", time.Minute, "Moving window of time that the percentage of failed requests is computed over") f.DurationVar(&cfg.CooldownPeriod, prefix+"cooldown-period", 10*time.Second, "How long the circuit breaker will stay in the open state before allowing some requests") f.DurationVar(&cfg.InitialDelay, prefix+"initial-delay", 0, "How long the circuit breaker should wait between creation and starting up. During that time both failures and successes will not be counted.") - f.DurationVar(&cfg.PushTimeout, prefix+"push-timeout", 0, "How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors") -} - -func (cfg *CircuitBreakerConfig) Validate() error { - return nil + f.DurationVar(&cfg.PushTimeout, prefix+"push-timeout", defaultPushTimeout, "How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors") } type circuitBreaker struct { - circuitbreaker.CircuitBreaker[any] - cfg CircuitBreakerConfig - ingesterID string - logger log.Logger - metrics *circuitBreakerMetrics - startTime time.Time + cfg CircuitBreakerConfig + logger log.Logger + metrics *circuitBreakerMetrics + startTime time.Time + cb circuitbreaker.CircuitBreaker[any] } -func newCircuitBreaker(cfg CircuitBreakerConfig, ingesterID string, logger log.Logger, registerer prometheus.Registerer) *circuitBreaker { +func newCircuitBreaker(cfg CircuitBreakerConfig, logger log.Logger, registerer prometheus.Registerer) *circuitBreaker { cb := circuitBreaker{ - cfg: cfg, - ingesterID: ingesterID, - logger: logger, - startTime: time.Now().Add(cfg.InitialDelay), + cfg: cfg, + logger: logger, + startTime: time.Now().Add(cfg.InitialDelay), } - cb.metrics = newCircuitBreakerMetrics(registerer, cb.State) - // Initialize each of the known labels for circuit breaker metrics for this particular ingester. - transitionOpen := cb.metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.OpenState.String()) - transitionHalfOpen := cb.metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.HalfOpenState.String()) - transitionClosed := cb.metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.ClosedState.String()) + circuitBreakerTransitionsCounterFn := func(metrics *circuitBreakerMetrics, state circuitbreaker.State) prometheus.Counter { + return metrics.circuitBreakerTransitions.WithLabelValues(state.String()) + } cbBuilder := circuitbreaker.Builder[any](). - WithFailureThreshold(cfg.FailureThreshold). + WithFailureThreshold(cfg.FailureThresholdPercentage). WithDelay(cfg.CooldownPeriod). OnClose(func(event circuitbreaker.StateChangedEvent) { - transitionClosed.Inc() - level.Info(logger).Log("msg", "circuit breaker is closed", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState) + circuitBreakerTransitionsCounterFn(cb.metrics, circuitbreaker.ClosedState).Inc() + level.Info(logger).Log("msg", "circuit breaker is closed", "previous", event.OldState, "current", event.NewState) }). OnOpen(func(event circuitbreaker.StateChangedEvent) { - transitionOpen.Inc() - level.Warn(logger).Log("msg", "circuit breaker is open", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState) + circuitBreakerTransitionsCounterFn(cb.metrics, circuitbreaker.OpenState).Inc() + level.Warn(logger).Log("msg", "circuit breaker is open", "previous", event.OldState, "current", event.NewState) }). OnHalfOpen(func(event circuitbreaker.StateChangedEvent) { - transitionHalfOpen.Inc() - level.Info(logger).Log("msg", "circuit breaker is half-open", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState) + circuitBreakerTransitionsCounterFn(cb.metrics, circuitbreaker.HalfOpenState).Inc() + level.Info(logger).Log("msg", "circuit breaker is half-open", "previous", event.OldState, "current", event.NewState) }). HandleIf(func(_ any, err error) bool { return isFailure(err) }) if cfg.testModeEnabled { // In case of testing purposes, we initialize the circuit breaker with count based failure thresholding, // since it is more deterministic, and therefore it is easier to predict the outcome. - cbBuilder = cbBuilder.WithFailureThreshold(cfg.FailureThreshold) + cbBuilder = cbBuilder.WithFailureThreshold(cfg.FailureThresholdPercentage) } else { // In case of production code, we prefer time based failure thresholding. - cbBuilder = cbBuilder.WithFailureRateThreshold(cfg.FailureThreshold, cfg.FailureExecutionThreshold, cfg.ThresholdingPeriod) + cbBuilder = cbBuilder.WithFailureRateThreshold(cfg.FailureThresholdPercentage, cfg.FailureExecutionThreshold, cfg.ThresholdingPeriod) } - cb.CircuitBreaker = cbBuilder.Build() + cb.cb = cbBuilder.Build() + cb.metrics = newCircuitBreakerMetrics(registerer, cb.cb.State) return &cb } @@ -190,10 +168,6 @@ func isFailure(err error) bool { return false } -func (cb *circuitBreaker) State() circuitbreaker.State { - return cb.CircuitBreaker.State() -} - func (cb *circuitBreaker) isActive() bool { if cb == nil { return false @@ -201,49 +175,55 @@ func (cb *circuitBreaker) isActive() bool { return cb.startTime.Before(time.Now()) } -func (cb *circuitBreaker) tryAcquirePermit() error { +// tryAcquirePermit tries to acquire a permit to use the circuit breaker and returns whether a permit was acquired. +// If the circuit breaker is not yet active, a status false and no error are returned. +// If it was not possible to acquire a permit, this means that the circuit breaker is open. In this case, a status +// false and an circuitBreakerOpenError are returned. +// If it was possible to acquire a permit, a status true and no error are returned. In this case, the permission +// will be automatically released once when a result is recorded by calling resultRecorded. +func (cb *circuitBreaker) tryAcquirePermit() (bool, error) { if !cb.isActive() { - return nil + return false, nil } - if !cb.CircuitBreaker.TryAcquirePermit() { - cb.metrics.circuitBreakerResults.WithLabelValues(cb.ingesterID, resultOpen).Inc() - return middleware.DoNotLogError{Err: newCircuitBreakerOpenError(cb.RemainingDelay())} + if !cb.cb.TryAcquirePermit() { + cb.metrics.circuitBreakerResults.WithLabelValues(resultOpen).Inc() + return false, middleware.DoNotLogError{Err: newCircuitBreakerOpenError(cb.cb.RemainingDelay())} } - return nil + return true, nil } -func (cb *circuitBreaker) recordSuccess() { +func (cb *circuitBreaker) recordResult(err error) { if !cb.isActive() { return } - cb.CircuitBreaker.RecordSuccess() - cb.metrics.circuitBreakerResults.WithLabelValues(cb.ingesterID, resultSuccess).Inc() -} - -func (cb *circuitBreaker) recordError(err error) { - if !cb.isActive() { - return + if err != nil && isFailure(err) { + cb.cb.RecordError(err) + cb.metrics.circuitBreakerResults.WithLabelValues(resultError).Inc() + } else { + cb.metrics.circuitBreakerResults.WithLabelValues(resultSuccess).Inc() + cb.cb.RecordSuccess() } - cb.CircuitBreaker.RecordError(err) - cb.metrics.circuitBreakerResults.WithLabelValues(cb.ingesterID, resultError).Inc() } -func (cb *circuitBreaker) finishPushRequest(ctx context.Context, duration time.Duration, err error) error { +// finishPushRequest records the result of a push request with this circuit breaker. +// If the circuit breaker is not active, finishPushRequest does nothing. +// If the passed duration of a push request exceeds the configured maximal push request duration, +// a context.DeadlineExceeded error is recorded and returned, independently of the passed pushErr, +// the actual error that occurred during the push request. +// Otherwise, the given pushErr is recorded and returned. +// The returned error is needed only for testing purposes. +func (cb *circuitBreaker) finishPushRequest(ctx context.Context, duration time.Duration, pushErr error) error { if !cb.isActive() { return nil } if cb.cfg.testModeEnabled { - if initialDelay, ok := ctx.Value(testDelayKey).(time.Duration); ok { - duration += initialDelay + if testDelay, ok := ctx.Value(testDelayKey).(time.Duration); ok { + duration += testDelay } } if cb.cfg.PushTimeout < duration { - err = context.DeadlineExceeded - } - if err == nil { - cb.recordSuccess() - } else { - cb.recordError(err) + pushErr = context.DeadlineExceeded } - return err + cb.recordResult(pushErr) + return pushErr } diff --git a/pkg/ingester/circuitbreaker_test.go b/pkg/ingester/circuitbreaker_test.go index d6b90613695..4eb6e99e90e 100644 --- a/pkg/ingester/circuitbreaker_test.go +++ b/pkg/ingester/circuitbreaker_test.go @@ -78,7 +78,7 @@ func TestCircuitBreaker_IsActive(t *testing.T) { registry := prometheus.NewRegistry() cfg := CircuitBreakerConfig{Enabled: true, InitialDelay: 10 * time.Millisecond} - cb = newCircuitBreaker(cfg, "ingester", log.NewNopLogger(), registry) + cb = newCircuitBreaker(cfg, log.NewNopLogger(), registry) // When InitialDelay is set, circuit breaker is not immediately active. require.False(t, cb.isActive()) @@ -93,28 +93,39 @@ func TestCircuitBreaker_TryAcquirePermit(t *testing.T) { "cortex_ingester_circuit_breaker_transitions_total", "cortex_ingester_circuit_breaker_current_state", } - cfg := CircuitBreakerConfig{Enabled: true, CooldownPeriod: 10 * time.Second} testCases := map[string]struct { + initialDelay time.Duration circuitBreakerSetup func(*circuitBreaker) + expectedStatus bool expectedCircuitBreakerError bool expectedMetrics string }{ - "if circuit breaker closed, no error returned": { - circuitBreakerSetup: func(cb *circuitBreaker) { cb.Close() }, + "if circuit breaker is not active, status false and no error are returned": { + initialDelay: 1 * time.Minute, + circuitBreakerSetup: func(cb *circuitBreaker) { cb.cb.Close() }, + expectedStatus: false, + expectedCircuitBreakerError: false, + }, + "if circuit breaker closed, status true and no error are returned": { + circuitBreakerSetup: func(cb *circuitBreaker) { cb.cb.Close() }, + expectedStatus: true, expectedCircuitBreakerError: false, }, - "if circuit breaker open, a circuitBreakerErrorOpen is returned": { - circuitBreakerSetup: func(cb *circuitBreaker) { cb.Open() }, + "if circuit breaker open, status false and a circuitBreakerErrorOpen are returned": { + circuitBreakerSetup: func(cb *circuitBreaker) { cb.cb.Open() }, + expectedStatus: false, expectedCircuitBreakerError: true, expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{ingester="ingester",result="circuit_breaker_open"} 1 + cortex_ingester_circuit_breaker_results_total{result="success"} 0 + cortex_ingester_circuit_breaker_results_total{result="error"} 0 + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 1 # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. # TYPE cortex_ingester_circuit_breaker_transitions_total counter - cortex_ingester_circuit_breaker_transitions_total{ingester="ingester",state="closed"} 0 - cortex_ingester_circuit_breaker_transitions_total{ingester="ingester",state="half-open"} 0 - cortex_ingester_circuit_breaker_transitions_total{ingester="ingester",state="open"} 1 + cortex_ingester_circuit_breaker_transitions_total{state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{state="open"} 1 # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. # TYPE cortex_ingester_circuit_breaker_current_state gauge cortex_ingester_circuit_breaker_current_state{state="open"} 1 @@ -122,18 +133,21 @@ func TestCircuitBreaker_TryAcquirePermit(t *testing.T) { cortex_ingester_circuit_breaker_current_state{state="closed"} 0 `, }, - "if circuit breaker half-open, a circuitBreakerErrorOpen is returned": { - circuitBreakerSetup: func(cb *circuitBreaker) { cb.HalfOpen() }, + "if circuit breaker half-open, status false and a circuitBreakerErrorOpen are returned": { + circuitBreakerSetup: func(cb *circuitBreaker) { cb.cb.HalfOpen() }, + expectedStatus: false, expectedCircuitBreakerError: true, expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{ingester="ingester",result="circuit_breaker_open"} 1 + cortex_ingester_circuit_breaker_results_total{result="success"} 0 + cortex_ingester_circuit_breaker_results_total{result="error"} 0 + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 1 # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. # TYPE cortex_ingester_circuit_breaker_transitions_total counter - cortex_ingester_circuit_breaker_transitions_total{ingester="ingester",state="closed"} 0 - cortex_ingester_circuit_breaker_transitions_total{ingester="ingester",state="half-open"} 1 - cortex_ingester_circuit_breaker_transitions_total{ingester="ingester",state="open"} 0 + cortex_ingester_circuit_breaker_transitions_total{state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{state="half-open"} 1 + cortex_ingester_circuit_breaker_transitions_total{state="open"} 0 # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. # TYPE cortex_ingester_circuit_breaker_current_state gauge cortex_ingester_circuit_breaker_current_state{state="open"} 0 @@ -146,9 +160,11 @@ func TestCircuitBreaker_TryAcquirePermit(t *testing.T) { for testName, testCase := range testCases { t.Run(testName, func(t *testing.T) { registry := prometheus.NewRegistry() - cb := newCircuitBreaker(cfg, "ingester", log.NewNopLogger(), registry) + cfg := CircuitBreakerConfig{Enabled: true, CooldownPeriod: 10 * time.Second, InitialDelay: testCase.initialDelay} + cb := newCircuitBreaker(cfg, log.NewNopLogger(), registry) testCase.circuitBreakerSetup(cb) - err := cb.tryAcquirePermit() + status, err := cb.tryAcquirePermit() + require.Equal(t, testCase.expectedStatus, status) if testCase.expectedCircuitBreakerError { checkCircuitBreakerOpenErr(ctx, err, t) assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(testCase.expectedMetrics), metricNames...)) @@ -159,88 +175,152 @@ func TestCircuitBreaker_TryAcquirePermit(t *testing.T) { } } -func TestCircuitBreaker_RecordSuccess(t *testing.T) { - registry := prometheus.NewRegistry() +func TestCircuitBreaker_RecordResult(t *testing.T) { metricNames := []string{ "cortex_ingester_circuit_breaker_results_total", } - cfg := CircuitBreakerConfig{Enabled: true, CooldownPeriod: 10 * time.Second} - cb := newCircuitBreaker(cfg, "ingester", log.NewNopLogger(), registry) - cb.recordSuccess() - expectedMetrics := ` - # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. - # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{ingester="ingester",result="success"} 1 - ` - assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...)) -} - -func TestCircuitBreaker_RecordError(t *testing.T) { - registry := prometheus.NewRegistry() - metricNames := []string{ - "cortex_ingester_circuit_breaker_results_total", + testCases := map[string]struct { + err error + expectedMetrics string + }{ + "successful execution records a success": { + err: nil, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{result="success"} 1 + cortex_ingester_circuit_breaker_results_total{result="error"} 0 + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + `, + }, + "erroneous execution not passing the failure check records a success": { + err: context.Canceled, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{result="success"} 1 + cortex_ingester_circuit_breaker_results_total{result="error"} 0 + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + `, + }, + "erroneous execution passing the failure check records an error": { + err: context.DeadlineExceeded, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{result="success"} 0 + cortex_ingester_circuit_breaker_results_total{result="error"} 1 + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + `, + }, } cfg := CircuitBreakerConfig{Enabled: true, CooldownPeriod: 10 * time.Second} - cb := newCircuitBreaker(cfg, "ingester", log.NewNopLogger(), registry) - cb.recordError(context.DeadlineExceeded) - expectedMetrics := ` - # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. - # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{ingester="ingester",result="error"} 1 - ` - assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...)) + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + registry := prometheus.NewRegistry() + cb := newCircuitBreaker(cfg, log.NewNopLogger(), registry) + cb.recordResult(testCase.err) + assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(testCase.expectedMetrics), metricNames...)) + }) + } } func TestCircuitBreaker_FinishPushRequest(t *testing.T) { metricNames := []string{ "cortex_ingester_circuit_breaker_results_total", } - cfg := CircuitBreakerConfig{ - Enabled: true, - PushTimeout: 2 * time.Second, - } testCases := map[string]struct { - delay time.Duration - err error - expectedErr error - expectedMetrics string + pushRequestDuration time.Duration + initialDelay time.Duration + err error + expectedErr error + expectedMetrics string }{ - "delay lower than PushTimeout and no input err give success": { - delay: 1 * time.Second, - err: nil, + "with a permit acquired, pushRequestDuration lower than PushTimeout and no input error, finishPushRequest gives success": { + pushRequestDuration: 1 * time.Second, + err: nil, expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{ingester="ingester",result="success"} 1 + cortex_ingester_circuit_breaker_results_total{result="success"} 1 + cortex_ingester_circuit_breaker_results_total{result="error"} 0 + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 `, }, - "delay higher than PushTimeout and no input error give context deadline exceeded error": { - delay: 3 * time.Second, - err: nil, - expectedErr: context.DeadlineExceeded, + "with circuit breaker not active, pushRequestDuration lower than PushTimeout and no input error, finishPushRequest does nothing": { + pushRequestDuration: 1 * time.Second, + initialDelay: 1 * time.Minute, + err: nil, expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{ingester="ingester",result="error"} 1 + cortex_ingester_circuit_breaker_results_total{result="success"} 0 + cortex_ingester_circuit_breaker_results_total{result="error"} 0 + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 `, }, - "delay higher than PushTimeout and an input error different from context deadline exceeded give context deadline exceeded error": { - delay: 3 * time.Second, - err: newInstanceLimitReachedError("error"), - expectedErr: context.DeadlineExceeded, + "with circuit breaker active, pushRequestDuration higher than PushTimeout and no input error, finishPushRequest gives context deadline exceeded error": { + pushRequestDuration: 3 * time.Second, + err: nil, + expectedErr: context.DeadlineExceeded, expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{ingester="ingester",result="error"} 1 + cortex_ingester_circuit_breaker_results_total{result="success"} 0 + cortex_ingester_circuit_breaker_results_total{result="error"} 1 + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + `, + }, + "with circuit breaker not active, pushRequestDuration higher than PushTimeout and no input error, finishPushRequest does nothing": { + pushRequestDuration: 3 * time.Second, + initialDelay: 1 * time.Minute, + err: nil, + expectedErr: nil, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{result="success"} 0 + cortex_ingester_circuit_breaker_results_total{result="error"} 0 + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + `, + }, + "with circuit breaker active, pushRequestDuration higher than PushTimeout and an input error different from context deadline exceeded, finishPushRequest gives context deadline exceeded error": { + pushRequestDuration: 3 * time.Second, + err: newInstanceLimitReachedError("error"), + expectedErr: context.DeadlineExceeded, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{result="success"} 0 + cortex_ingester_circuit_breaker_results_total{result="error"} 1 + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + `, + }, + "with circuit breaker not active, pushRequestDuration higher than PushTimeout and an input error different from context deadline exceeded, finishPushRequest does nothing": { + pushRequestDuration: 3 * time.Second, + initialDelay: 1 * time.Minute, + err: newInstanceLimitReachedError("error"), + expectedErr: nil, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{result="success"} 0 + cortex_ingester_circuit_breaker_results_total{result="error"} 0 + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 `, }, } for testName, testCase := range testCases { - registry := prometheus.NewRegistry() - ctx := context.Background() - cb := newCircuitBreaker(cfg, "ingester", log.NewNopLogger(), registry) t.Run(testName, func(t *testing.T) { - err := cb.finishPushRequest(ctx, testCase.delay, testCase.err) + registry := prometheus.NewRegistry() + ctx := context.Background() + cfg := CircuitBreakerConfig{ + Enabled: true, + InitialDelay: testCase.initialDelay, + PushTimeout: 2 * time.Second, + } + cb := newCircuitBreaker(cfg, log.NewNopLogger(), registry) + err := cb.finishPushRequest(ctx, testCase.pushRequestDuration, testCase.err) if testCase.expectedErr == nil { require.NoError(t, err) } else { @@ -295,12 +375,12 @@ func TestIngester_PushToStorage_CircuitBreaker(t *testing.T) { initialDelay = 200 * time.Millisecond } cfg.CircuitBreakerConfig = CircuitBreakerConfig{ - Enabled: true, - FailureThreshold: uint(failureThreshold), - CooldownPeriod: 10 * time.Second, - InitialDelay: initialDelay, - PushTimeout: pushTimeout, - testModeEnabled: true, + Enabled: true, + FailureThresholdPercentage: uint(failureThreshold), + CooldownPeriod: 10 * time.Second, + InitialDelay: initialDelay, + PushTimeout: pushTimeout, + testModeEnabled: true, } overrides, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) @@ -376,11 +456,16 @@ func TestIngester_PushToStorage_CircuitBreaker(t *testing.T) { var expectedMetrics string if initialDelayEnabled { expectedMetrics = ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{result="error"} 0 + cortex_ingester_circuit_breaker_results_total{result="success"} 0 # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. # TYPE cortex_ingester_circuit_breaker_transitions_total counter - cortex_ingester_circuit_breaker_transitions_total{ingester="ingester-zone-a-0",state="closed"} 0 - cortex_ingester_circuit_breaker_transitions_total{ingester="ingester-zone-a-0",state="half-open"} 0 - cortex_ingester_circuit_breaker_transitions_total{ingester="ingester-zone-a-0",state="open"} 0 + cortex_ingester_circuit_breaker_transitions_total{state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{state="open"} 0 # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. # TYPE cortex_ingester_circuit_breaker_current_state gauge cortex_ingester_circuit_breaker_current_state{state="open"} 0 @@ -391,14 +476,14 @@ func TestIngester_PushToStorage_CircuitBreaker(t *testing.T) { expectedMetrics = ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{ingester="ingester-zone-a-0",result="circuit_breaker_open"} 2 - cortex_ingester_circuit_breaker_results_total{ingester="ingester-zone-a-0",result="error"} 2 - cortex_ingester_circuit_breaker_results_total{ingester="ingester-zone-a-0",result="success"} 1 + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 2 + cortex_ingester_circuit_breaker_results_total{result="error"} 2 + cortex_ingester_circuit_breaker_results_total{result="success"} 1 # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. # TYPE cortex_ingester_circuit_breaker_transitions_total counter - cortex_ingester_circuit_breaker_transitions_total{ingester="ingester-zone-a-0",state="closed"} 0 - cortex_ingester_circuit_breaker_transitions_total{ingester="ingester-zone-a-0",state="half-open"} 0 - cortex_ingester_circuit_breaker_transitions_total{ingester="ingester-zone-a-0",state="open"} 1 + cortex_ingester_circuit_breaker_transitions_total{state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{state="open"} 1 # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. # TYPE cortex_ingester_circuit_breaker_current_state gauge cortex_ingester_circuit_breaker_current_state{state="open"} 1 @@ -441,12 +526,12 @@ func TestIngester_StartPushRequest_CircuitBreakerOpen(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "test") // If i's circuit breaker is closed, StartPushRequest is successful. - i.circuitBreaker.Close() + i.circuitBreaker.cb.Close() _, err = i.StartPushRequest(ctx, 0) require.NoError(t, err) // If i's circuit breaker is open, StartPushRequest returns a circuitBreakerOpenError. - i.circuitBreaker.Open() + i.circuitBreaker.cb.Open() _, err = i.StartPushRequest(ctx, 0) require.Error(t, err) require.ErrorAs(t, err, &circuitBreakerOpenError{}) @@ -459,12 +544,14 @@ func TestIngester_StartPushRequest_CircuitBreakerOpen(t *testing.T) { expectedMetrics := ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="circuit_breaker_open"} 1 + cortex_ingester_circuit_breaker_results_total{result="success"} 0 + cortex_ingester_circuit_breaker_results_total{result="error"} 0 + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 1 # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. # TYPE cortex_ingester_circuit_breaker_transitions_total counter - cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="closed"} 0 - cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="half-open"} 0 - cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="open"} 1 + cortex_ingester_circuit_breaker_transitions_total{state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{state="open"} 1 # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. # TYPE cortex_ingester_circuit_breaker_current_state gauge cortex_ingester_circuit_breaker_current_state{state="open"} 1 @@ -479,35 +566,81 @@ func TestIngester_FinishPushRequest(t *testing.T) { "cortex_ingester_circuit_breaker_results_total", } testCases := map[string]struct { - delay time.Duration - err error - expectedMetrics string + pushRequestDuration time.Duration + acquiredCircuitBreakerPermit bool + err error + expectedMetrics string }{ - "delay lower than PushTimeout and no input err give success": { - delay: 1 * time.Second, - err: nil, + "with a permit acquired, pushRequestDuration lower than PushTimeout and no input err, finishPushRequest gives success": { + pushRequestDuration: 1 * time.Second, + acquiredCircuitBreakerPermit: true, + err: nil, expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="success"} 1 + cortex_ingester_circuit_breaker_results_total{result="success"} 1 + cortex_ingester_circuit_breaker_results_total{result="error"} 0 + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 `, }, - "delay higher than PushTimeout and no input error give context deadline exceeded error": { - delay: 3 * time.Second, - err: nil, + "when a permit not acquired, pushRequestDuration lower than PushTimeout and no input err, finishPusRequest does nothing": { + pushRequestDuration: 1 * time.Second, + acquiredCircuitBreakerPermit: false, + err: nil, expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="error"} 1 + cortex_ingester_circuit_breaker_results_total{result="success"} 0 + cortex_ingester_circuit_breaker_results_total{result="error"} 0 + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 `, }, - "delay higher than PushTimeout and an input error different from context deadline exceeded give context deadline exceeded error": { - delay: 3 * time.Second, - err: newInstanceLimitReachedError("error"), + "with a permit acquired, pushRequestDuration higher than PushTimeout and no input error, finishPushRequest gives context deadline exceeded error": { + pushRequestDuration: 3 * time.Second, + acquiredCircuitBreakerPermit: true, + err: nil, expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="error"} 1 + cortex_ingester_circuit_breaker_results_total{result="success"} 0 + cortex_ingester_circuit_breaker_results_total{result="error"} 1 + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + `, + }, + "with a permit not acquired, pushRequestDuration higher than PushTimeout and no input error, finishPushRequest does nothing": { + pushRequestDuration: 3 * time.Second, + acquiredCircuitBreakerPermit: false, + err: nil, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{result="success"} 0 + cortex_ingester_circuit_breaker_results_total{result="error"} 0 + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + `, + }, + "with a permit acquired, pushRequestDuration higher than PushTimeout and an input error different from context deadline exceeded, finishPushRequest gives context deadline exceeded error": { + pushRequestDuration: 3 * time.Second, + acquiredCircuitBreakerPermit: true, + err: newInstanceLimitReachedError("error"), + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{result="success"} 0 + cortex_ingester_circuit_breaker_results_total{result="error"} 1 + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + `, + }, + "with a permit not acquired, pushRequestDuration higher than PushTimeout and an input error different from context deadline exceeded, finishPushRequest does nothing": { + pushRequestDuration: 3 * time.Second, + acquiredCircuitBreakerPermit: false, + err: newInstanceLimitReachedError("error"), + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{result="success"} 0 + cortex_ingester_circuit_breaker_results_total{result="error"} 0 + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 `, }, } @@ -533,8 +666,9 @@ func TestIngester_FinishPushRequest(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "test") st := &pushRequestState{ - requestDuration: testCase.delay, - err: testCase.err, + requestDuration: testCase.pushRequestDuration, + acquiredCircuitBreakerPermit: testCase.acquiredCircuitBreakerPermit, + err: testCase.err, } ctx = context.WithValue(ctx, pushReqCtxKey, st) @@ -567,12 +701,12 @@ func TestIngester_Push_CircuitBreaker_DeadlineExceeded(t *testing.T) { initialDelay = 200 * time.Millisecond } cfg.CircuitBreakerConfig = CircuitBreakerConfig{ - Enabled: true, - FailureThreshold: uint(failureThreshold), - CooldownPeriod: 10 * time.Second, - InitialDelay: initialDelay, - PushTimeout: pushTimeout, - testModeEnabled: true, + Enabled: true, + FailureThresholdPercentage: uint(failureThreshold), + CooldownPeriod: 10 * time.Second, + InitialDelay: initialDelay, + PushTimeout: pushTimeout, + testModeEnabled: true, } i, err := prepareIngesterWithBlocksStorage(t, cfg, nil, registry) @@ -632,7 +766,7 @@ func TestIngester_Push_CircuitBreaker_DeadlineExceeded(t *testing.T) { // less than failureThreshold deadline exceeded errors, it is still // closed. require.NoError(t, err) - require.Equal(t, circuitbreaker.ClosedState, i.circuitBreaker.State()) + require.Equal(t, circuitbreaker.ClosedState, i.circuitBreaker.cb.State()) st, ok := ctx.Value(pushReqCtxKey).(*pushRequestState) require.True(t, ok) require.Equal(t, int64(req.Size()), st.requestSize) @@ -640,7 +774,7 @@ func TestIngester_Push_CircuitBreaker_DeadlineExceeded(t *testing.T) { require.NoError(t, err) i.FinishPushRequest(ctx) } else { - require.Equal(t, circuitbreaker.OpenState, i.circuitBreaker.State()) + require.Equal(t, circuitbreaker.OpenState, i.circuitBreaker.cb.State()) require.Nil(t, ctx) checkCircuitBreakerOpenErr(ctx, err, t) } @@ -651,11 +785,16 @@ func TestIngester_Push_CircuitBreaker_DeadlineExceeded(t *testing.T) { var expectedMetrics string if initialDelayEnabled { expectedMetrics = ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{result="success"} 0 + cortex_ingester_circuit_breaker_results_total{result="error"} 0 + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. # TYPE cortex_ingester_circuit_breaker_transitions_total counter - cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="closed"} 0 - cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="half-open"} 0 - cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="open"} 0 + cortex_ingester_circuit_breaker_transitions_total{state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{state="open"} 0 # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. # TYPE cortex_ingester_circuit_breaker_current_state gauge cortex_ingester_circuit_breaker_current_state{state="open"} 0 @@ -666,13 +805,14 @@ func TestIngester_Push_CircuitBreaker_DeadlineExceeded(t *testing.T) { expectedMetrics = ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="circuit_breaker_open"} 2 - cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="error"} 2 + cortex_ingester_circuit_breaker_results_total{result="success"} 0 + cortex_ingester_circuit_breaker_results_total{result="error"} 2 + cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 2 # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. # TYPE cortex_ingester_circuit_breaker_transitions_total counter - cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="closed"} 0 - cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="half-open"} 0 - cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="open"} 1 + cortex_ingester_circuit_breaker_transitions_total{state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{state="open"} 1 # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. # TYPE cortex_ingester_circuit_breaker_current_state gauge cortex_ingester_circuit_breaker_current_state{state="open"} 1 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 07dfbffc231..5b73a609670 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -261,13 +261,7 @@ func (cfg *Config) Validate(logger log.Logger) error { util.WarnDeprecatedConfig(deprecatedReturnOnlyGRPCErrorsFlag, logger) } - err := cfg.IngesterRing.Validate() - - if err != nil { - return err - } - - return cfg.CircuitBreakerConfig.Validate() + return cfg.IngesterRing.Validate() } func (cfg *Config) getIgnoreSeriesLimitForMetricNamesMap() map[string]struct{} { @@ -405,7 +399,7 @@ func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, i.activeGroups = activeGroupsCleanupService if cfg.CircuitBreakerConfig.Enabled { - i.circuitBreaker = newCircuitBreaker(cfg.CircuitBreakerConfig, cfg.IngesterRing.InstanceID, logger, registerer) + i.circuitBreaker = newCircuitBreaker(cfg.CircuitBreakerConfig, logger, registerer) } if registerer != nil { @@ -968,15 +962,16 @@ type ctxKey int var pushReqCtxKey ctxKey = 1 type pushRequestState struct { - requestSize int64 - requestDuration time.Duration - err error + requestSize int64 + requestDuration time.Duration + acquiredCircuitBreakerPermit bool + err error } // StartPushRequest checks if ingester can start push request, and increments relevant counters. // If new push request cannot be started, errors convertible to gRPC status code are returned, and metrics are updated. func (i *Ingester) StartPushRequest(ctx context.Context, reqSize int64) (context.Context, error) { - ctx, _, err := i.startPushRequest(ctx, reqSize) + ctx, _, _, err := i.startPushRequest(ctx, reqSize) return ctx, err } @@ -985,7 +980,7 @@ func (i *Ingester) FinishPushRequest(ctx context.Context) { if !ok { return } - i.finishPushRequest(ctx, st.requestSize, st.requestDuration, st.err) + i.finishPushRequest(ctx, st.requestSize, st.requestDuration, st.acquiredCircuitBreakerPermit, st.err) } // This method can be called in two ways: 1. Ingester.PushWithCleanup, or 2. Ingester.StartPushRequest via gRPC server's method limiter. @@ -994,26 +989,30 @@ func (i *Ingester) FinishPushRequest(ctx context.Context) { // In the second case, returned errors will not be logged, because request will not reach any middleware. // // The shouldFinish flag tells if the caller must call finish on this request. If not, there is already someone in the call stack who will do that. -func (i *Ingester) startPushRequest(ctx context.Context, reqSize int64) (_ context.Context, shouldFinish bool, err error) { +// The acquiredCircuitBreakerPermit flag tells to the caller if this call has acquired a circuit breaker permit, +// and if the caller should take care of returning the permit back. +func (i *Ingester) startPushRequest(ctx context.Context, reqSize int64) (_ context.Context, shouldFinish, acquiredCircuitBreakerPermit bool, err error) { if err := i.checkAvailableForPush(); err != nil { - return nil, false, err + return nil, false, false, err } if _, ok := ctx.Value(pushReqCtxKey).(*pushRequestState); ok { // If state is already in context, this means we already passed through StartPushRequest for this request. - return ctx, false, nil + return ctx, false, false, nil } // We try to acquire a permit from the circuit breaker. // If it is not possible, it is because the circuit breaker is open, and a circuitBreakerOpenError is returned. // If it is possible, a permit has to be released by recording either a success or a failure with the circuit // breaker. This is done in finishPushRequest(). - if err := i.circuitBreaker.tryAcquirePermit(); err != nil { - return nil, false, err + acquiredCircuitBreakerPermit, err = i.circuitBreaker.tryAcquirePermit() + if err != nil { + return nil, false, acquiredCircuitBreakerPermit, err } st := &pushRequestState{ - requestSize: reqSize, + requestSize: reqSize, + acquiredCircuitBreakerPermit: acquiredCircuitBreakerPermit, } ctx = context.WithValue(ctx, pushReqCtxKey, st) @@ -1035,7 +1034,7 @@ func (i *Ingester) startPushRequest(ctx context.Context, reqSize int64) (_ conte // it is because one the per-instance limits has been hit. In this case, the // corresponding error has to be passed to finishPushRequest() in order to // record the failure with the circuit breaker. - i.finishPushRequest(ctx, reqSize, 0, err) + i.finishPushRequest(ctx, reqSize, 0, acquiredCircuitBreakerPermit, err) } }() @@ -1043,34 +1042,36 @@ func (i *Ingester) startPushRequest(ctx context.Context, reqSize int64) (_ conte if il != nil { if il.MaxInflightPushRequests > 0 && inflight > il.MaxInflightPushRequests { i.metrics.rejected.WithLabelValues(reasonIngesterMaxInflightPushRequests).Inc() - return nil, false, errMaxInflightRequestsReached + return nil, false, acquiredCircuitBreakerPermit, errMaxInflightRequestsReached } if il.MaxInflightPushRequestsBytes > 0 { if (rejectEqualInflightBytes && inflightBytes >= il.MaxInflightPushRequestsBytes) || inflightBytes > il.MaxInflightPushRequestsBytes { i.metrics.rejected.WithLabelValues(reasonIngesterMaxInflightPushRequestsBytes).Inc() - return nil, false, errMaxInflightRequestsBytesReached + return nil, false, acquiredCircuitBreakerPermit, errMaxInflightRequestsBytesReached } } if il.MaxIngestionRate > 0 { if rate := i.ingestionRate.Rate(); rate >= il.MaxIngestionRate { i.metrics.rejected.WithLabelValues(reasonIngesterMaxIngestionRate).Inc() - return nil, false, errMaxIngestionRateReached + return nil, false, acquiredCircuitBreakerPermit, errMaxIngestionRateReached } } } finishRequestInDefer = false - return ctx, true, nil + return ctx, true, acquiredCircuitBreakerPermit, nil } -func (i *Ingester) finishPushRequest(ctx context.Context, reqSize int64, duration time.Duration, err error) { +func (i *Ingester) finishPushRequest(ctx context.Context, reqSize int64, duration time.Duration, acquiredCircuitBreakerPermit bool, err error) { i.inflightPushRequests.Dec() if reqSize > 0 { i.inflightPushRequestsBytes.Sub(reqSize) } - _ = i.circuitBreaker.finishPushRequest(ctx, duration, err) + if acquiredCircuitBreakerPermit { + _ = i.circuitBreaker.finishPushRequest(ctx, duration, err) + } } // PushWithCleanup is the Push() implementation for blocks storage and takes a WriteRequest and adds it to the TSDB head. @@ -1085,28 +1086,27 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques // startPushRequest handles this. if i.cfg.IngestStorageConfig.Enabled || !i.cfg.LimitInflightRequestsUsingGrpcMethodLimiter { reqSize := int64(req.Size()) - _, shouldFinish, startPushErr := i.startPushRequest(ctx, reqSize) + _, shouldFinish, acquiredCircuitBreakerPermit, startPushErr := i.startPushRequest(ctx, reqSize) if startPushErr != nil { return middleware.DoNotLogError{Err: startPushErr} } if shouldFinish { defer func() { - i.finishPushRequest(ctx, reqSize, time.Since(start), err) + i.finishPushRequest(ctx, reqSize, time.Since(start), acquiredCircuitBreakerPermit, err) }() } - } else { - defer func() { - // We enrich the pushRequestState contained in the context with this PushWithCleanUp() - // call duration, and a possible error it returns. These data are needed during a - // successive call to FinishPushRequest(). - if st, ok := ctx.Value(pushReqCtxKey).(*pushRequestState); ok { - st.requestDuration = time.Since(start) - st.err = err - ctx = context.WithValue(ctx, pushReqCtxKey, st) - } - }() } + defer func() { + // We enrich the pushRequestState contained in the context with this PushWithCleanUp() + // call duration, and a possible error it returns. These data are needed during a + // successive call to FinishPushRequest(). + if st, ok := ctx.Value(pushReqCtxKey).(*pushRequestState); ok { + st.requestDuration = time.Since(start) + st.err = err + } + }() + userID, err := tenant.TenantID(ctx) if err != nil { return err