Skip to content

Commit

Permalink
Fixing review findings
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
  • Loading branch information
duricanikolic committed May 31, 2024
1 parent c0ad0a5 commit 6b915ca
Show file tree
Hide file tree
Showing 7 changed files with 406 additions and 286 deletions.
6 changes: 3 additions & 3 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -3149,12 +3149,12 @@
},
{
"kind": "field",
"name": "failure_threshold",
"name": "failure_threshold_percentage",
"required": false,
"desc": "Max percentage of requests that can fail over period before the circuit breaker opens",
"fieldValue": null,
"fieldDefaultValue": 10,
"fieldFlag": "ingester.circuit-breaker.failure-threshold",
"fieldFlag": "ingester.circuit-breaker.failure-threshold-percentage",
"fieldType": "int",
"fieldCategory": "experimental"
},
Expand Down Expand Up @@ -3208,7 +3208,7 @@
"required": false,
"desc": "How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldDefaultValue": 2000000000,
"fieldFlag": "ingester.circuit-breaker.push-timeout",
"fieldType": "duration",
"fieldCategory": "experiment"
Expand Down
4 changes: 2 additions & 2 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1307,12 +1307,12 @@ Usage of ./cmd/mimir/mimir:
[experimental] Enable circuit breaking when making requests to ingesters
-ingester.circuit-breaker.failure-execution-threshold uint
[experimental] How many requests must have been executed in period for the circuit breaker to be eligible to open for the rate of failures (default 100)
-ingester.circuit-breaker.failure-threshold uint
-ingester.circuit-breaker.failure-threshold-percentage uint
[experimental] Max percentage of requests that can fail over period before the circuit breaker opens (default 10)
-ingester.circuit-breaker.initial-delay duration
[experimental] How long the circuit breaker should wait between creation and starting up. During that time both failures and successes will not be counted.
-ingester.circuit-breaker.push-timeout duration
How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors
How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors (default 2s)
-ingester.circuit-breaker.thresholding-period duration
[experimental] Moving window of time that the percentage of failed requests is computed over (default 1m0s)
-ingester.client.backoff-max-period duration
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ Usage of ./cmd/mimir/mimir:
-help-all
Print help, also including advanced and experimental parameters.
-ingester.circuit-breaker.push-timeout duration
How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors
How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors (default 2s)
-ingester.max-global-metadata-per-metric int
The maximum number of metadata per metric, across the cluster. 0 to disable.
-ingester.max-global-metadata-per-user int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1220,8 +1220,8 @@ circuit_breaker:
# (experimental) Max percentage of requests that can fail over period before
# the circuit breaker opens
# CLI flag: -ingester.circuit-breaker.failure-threshold
[failure_threshold: <int> | default = 10]
# CLI flag: -ingester.circuit-breaker.failure-threshold-percentage
[failure_threshold_percentage: <int> | default = 10]
# (experimental) How many requests must have been executed in period for the
# circuit breaker to be eligible to open for the rate of failures
Expand Down Expand Up @@ -1249,7 +1249,7 @@ circuit_breaker:
# used for circuit breakers only, and timeout expirations are not reported as
# errors
# CLI flag: -ingester.circuit-breaker.push-timeout
[push_timeout: <duration> | default = 0s]
[push_timeout: <duration> | default = 2s]
```

### querier
Expand Down
210 changes: 95 additions & 115 deletions pkg/ingester/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,144 +23,122 @@ import (
type testCtxKey string

const (
resultSuccess = "success"
resultError = "error"
resultOpen = "circuit_breaker_open"
testDelayKey testCtxKey = "test-delay"

circuitBreakerCurrentStateGaugeName = "cortex_ingester_circuit_breaker_current_state"
circuitBreakerCurrentStateGaugeHelp = "Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name."
circuitBreakerCurrentStateGaugeLabel = "state"
resultSuccess = "success"
resultError = "error"
resultOpen = "circuit_breaker_open"
defaultPushTimeout = 2 * time.Second
testDelayKey testCtxKey = "test-delay"
)

type circuitBreakerMetrics struct {
circuitBreakerOpenStateGauge prometheus.GaugeFunc
circuitBreakerHalfOpenStateGauge prometheus.GaugeFunc
circuitBreakerClosedStateGauge prometheus.GaugeFunc

circuitBreakerTransitions *prometheus.CounterVec
circuitBreakerResults *prometheus.CounterVec
}

func newCircuitBreakerMetrics(r prometheus.Registerer, currentStateFn func() circuitbreaker.State) *circuitBreakerMetrics {
return &circuitBreakerMetrics{
circuitBreakerOpenStateGauge: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{
Name: circuitBreakerCurrentStateGaugeName,
Help: circuitBreakerCurrentStateGaugeHelp,
ConstLabels: map[string]string{circuitBreakerCurrentStateGaugeLabel: circuitbreaker.OpenState.String()},
}, func() float64 {
if currentStateFn() == circuitbreaker.OpenState {
return 1
}
return 0
}),
circuitBreakerHalfOpenStateGauge: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{
Name: circuitBreakerCurrentStateGaugeName,
Help: circuitBreakerCurrentStateGaugeHelp,
ConstLabels: map[string]string{circuitBreakerCurrentStateGaugeLabel: circuitbreaker.HalfOpenState.String()},
}, func() float64 {
if currentStateFn() == circuitbreaker.HalfOpenState {
return 1
}
return 0
}),
circuitBreakerClosedStateGauge: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{
Name: circuitBreakerCurrentStateGaugeName,
Help: circuitBreakerCurrentStateGaugeHelp,
ConstLabels: map[string]string{circuitBreakerCurrentStateGaugeLabel: circuitbreaker.ClosedState.String()},
}, func() float64 {
if currentStateFn() == circuitbreaker.ClosedState {
return 1
}
return 0
}),
cbMetrics := &circuitBreakerMetrics{
circuitBreakerTransitions: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_circuit_breaker_transitions_total",
Help: "Number of times the circuit breaker has entered a state.",
}, []string{"ingester", "state"}),
}, []string{"state"}),
circuitBreakerResults: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_circuit_breaker_results_total",
Help: "Results of executing requests via the circuit breaker.",
}, []string{"ingester", "result"}),
}, []string{"result"}),
}
circuitBreakerCurrentStateGaugeFn := func(state circuitbreaker.State) prometheus.GaugeFunc {
return promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_ingester_circuit_breaker_current_state",
Help: "Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name.",
ConstLabels: map[string]string{"state": state.String()},
}, func() float64 {
if currentStateFn() == state {
return 1
}
return 0
})
}
for _, s := range []circuitbreaker.State{circuitbreaker.OpenState, circuitbreaker.HalfOpenState, circuitbreaker.ClosedState} {
circuitBreakerCurrentStateGaugeFn(s)
// We initialize all possible states for the circuitBreakerTransitions metrics
cbMetrics.circuitBreakerTransitions.WithLabelValues(s.String())
}
for _, r := range []string{resultSuccess, resultError, resultOpen} {
// We initialize all possible results for the circuitBreakerResults metrics
cbMetrics.circuitBreakerResults.WithLabelValues(r)
}
return cbMetrics
}

type CircuitBreakerConfig struct {
Enabled bool `yaml:"enabled" category:"experimental"`
FailureThreshold uint `yaml:"failure_threshold" category:"experimental"`
FailureExecutionThreshold uint `yaml:"failure_execution_threshold" category:"experimental"`
ThresholdingPeriod time.Duration `yaml:"thresholding_period" category:"experimental"`
CooldownPeriod time.Duration `yaml:"cooldown_period" category:"experimental"`
InitialDelay time.Duration `yaml:"initial_delay" category:"experimental"`
PushTimeout time.Duration `yaml:"push_timeout" category:"experiment"`
testModeEnabled bool `yaml:"-"`
Enabled bool `yaml:"enabled" category:"experimental"`
FailureThresholdPercentage uint `yaml:"failure_threshold_percentage" category:"experimental"`
FailureExecutionThreshold uint `yaml:"failure_execution_threshold" category:"experimental"`
ThresholdingPeriod time.Duration `yaml:"thresholding_period" category:"experimental"`
CooldownPeriod time.Duration `yaml:"cooldown_period" category:"experimental"`
InitialDelay time.Duration `yaml:"initial_delay" category:"experimental"`
PushTimeout time.Duration `yaml:"push_timeout" category:"experiment"`
testModeEnabled bool `yaml:"-"`
}

func (cfg *CircuitBreakerConfig) RegisterFlags(f *flag.FlagSet) {
prefix := "ingester.circuit-breaker."
f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "Enable circuit breaking when making requests to ingesters")
f.UintVar(&cfg.FailureThreshold, prefix+"failure-threshold", 10, "Max percentage of requests that can fail over period before the circuit breaker opens")
f.UintVar(&cfg.FailureThresholdPercentage, prefix+"failure-threshold-percentage", 10, "Max percentage of requests that can fail over period before the circuit breaker opens")
f.UintVar(&cfg.FailureExecutionThreshold, prefix+"failure-execution-threshold", 100, "How many requests must have been executed in period for the circuit breaker to be eligible to open for the rate of failures")
f.DurationVar(&cfg.ThresholdingPeriod, prefix+"thresholding-period", time.Minute, "Moving window of time that the percentage of failed requests is computed over")
f.DurationVar(&cfg.CooldownPeriod, prefix+"cooldown-period", 10*time.Second, "How long the circuit breaker will stay in the open state before allowing some requests")
f.DurationVar(&cfg.InitialDelay, prefix+"initial-delay", 0, "How long the circuit breaker should wait between creation and starting up. During that time both failures and successes will not be counted.")
f.DurationVar(&cfg.PushTimeout, prefix+"push-timeout", 0, "How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors")
}

func (cfg *CircuitBreakerConfig) Validate() error {
return nil
f.DurationVar(&cfg.PushTimeout, prefix+"push-timeout", defaultPushTimeout, "How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors")
}

type circuitBreaker struct {
circuitbreaker.CircuitBreaker[any]
cfg CircuitBreakerConfig
ingesterID string
logger log.Logger
metrics *circuitBreakerMetrics
startTime time.Time
cfg CircuitBreakerConfig
logger log.Logger
metrics *circuitBreakerMetrics
startTime time.Time
cb circuitbreaker.CircuitBreaker[any]
}

func newCircuitBreaker(cfg CircuitBreakerConfig, ingesterID string, logger log.Logger, registerer prometheus.Registerer) *circuitBreaker {
func newCircuitBreaker(cfg CircuitBreakerConfig, logger log.Logger, registerer prometheus.Registerer) *circuitBreaker {
cb := circuitBreaker{
cfg: cfg,
ingesterID: ingesterID,
logger: logger,
startTime: time.Now().Add(cfg.InitialDelay),
cfg: cfg,
logger: logger,
startTime: time.Now().Add(cfg.InitialDelay),
}

cb.metrics = newCircuitBreakerMetrics(registerer, cb.State)
// Initialize each of the known labels for circuit breaker metrics for this particular ingester.
transitionOpen := cb.metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.OpenState.String())
transitionHalfOpen := cb.metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.HalfOpenState.String())
transitionClosed := cb.metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.ClosedState.String())
circuitBreakerTransitionsCounterFn := func(metrics *circuitBreakerMetrics, state circuitbreaker.State) prometheus.Counter {
return metrics.circuitBreakerTransitions.WithLabelValues(state.String())
}

cbBuilder := circuitbreaker.Builder[any]().
WithFailureThreshold(cfg.FailureThreshold).
WithFailureThreshold(cfg.FailureThresholdPercentage).
WithDelay(cfg.CooldownPeriod).
OnClose(func(event circuitbreaker.StateChangedEvent) {
transitionClosed.Inc()
level.Info(logger).Log("msg", "circuit breaker is closed", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState)
circuitBreakerTransitionsCounterFn(cb.metrics, circuitbreaker.ClosedState).Inc()
level.Info(logger).Log("msg", "circuit breaker is closed", "previous", event.OldState, "current", event.NewState)
}).
OnOpen(func(event circuitbreaker.StateChangedEvent) {
transitionOpen.Inc()
level.Warn(logger).Log("msg", "circuit breaker is open", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState)
circuitBreakerTransitionsCounterFn(cb.metrics, circuitbreaker.OpenState).Inc()
level.Warn(logger).Log("msg", "circuit breaker is open", "previous", event.OldState, "current", event.NewState)
}).
OnHalfOpen(func(event circuitbreaker.StateChangedEvent) {
transitionHalfOpen.Inc()
level.Info(logger).Log("msg", "circuit breaker is half-open", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState)
circuitBreakerTransitionsCounterFn(cb.metrics, circuitbreaker.HalfOpenState).Inc()
level.Info(logger).Log("msg", "circuit breaker is half-open", "previous", event.OldState, "current", event.NewState)
}).
HandleIf(func(_ any, err error) bool { return isFailure(err) })

if cfg.testModeEnabled {
// In case of testing purposes, we initialize the circuit breaker with count based failure thresholding,
// since it is more deterministic, and therefore it is easier to predict the outcome.
cbBuilder = cbBuilder.WithFailureThreshold(cfg.FailureThreshold)
cbBuilder = cbBuilder.WithFailureThreshold(cfg.FailureThresholdPercentage)
} else {
// In case of production code, we prefer time based failure thresholding.
cbBuilder = cbBuilder.WithFailureRateThreshold(cfg.FailureThreshold, cfg.FailureExecutionThreshold, cfg.ThresholdingPeriod)
cbBuilder = cbBuilder.WithFailureRateThreshold(cfg.FailureThresholdPercentage, cfg.FailureExecutionThreshold, cfg.ThresholdingPeriod)
}

cb.CircuitBreaker = cbBuilder.Build()
cb.cb = cbBuilder.Build()
cb.metrics = newCircuitBreakerMetrics(registerer, cb.cb.State)
return &cb
}

Expand Down Expand Up @@ -190,60 +168,62 @@ func isFailure(err error) bool {
return false
}

func (cb *circuitBreaker) State() circuitbreaker.State {
return cb.CircuitBreaker.State()
}

func (cb *circuitBreaker) isActive() bool {
if cb == nil {
return false
}
return cb.startTime.Before(time.Now())
}

func (cb *circuitBreaker) tryAcquirePermit() error {
// tryAcquirePermit tries to acquire a permit to use the circuit breaker and returns whether a permit was acquired.
// If the circuit breaker is not yet active, a status false and no error are returned.
// If it was not possible to acquire a permit, this means that the circuit breaker is open. In this case, a status
// false and an circuitBreakerOpenError are returned.
// If it was possible to acquire a permit, a status true and no error are returned. In this case, the permission
// will be automatically released once when a result is recorded by calling resultRecorded.
func (cb *circuitBreaker) tryAcquirePermit() (bool, error) {
if !cb.isActive() {
return nil
return false, nil
}
if !cb.CircuitBreaker.TryAcquirePermit() {
cb.metrics.circuitBreakerResults.WithLabelValues(cb.ingesterID, resultOpen).Inc()
return middleware.DoNotLogError{Err: newCircuitBreakerOpenError(cb.RemainingDelay())}
if !cb.cb.TryAcquirePermit() {
cb.metrics.circuitBreakerResults.WithLabelValues(resultOpen).Inc()
return false, middleware.DoNotLogError{Err: newCircuitBreakerOpenError(cb.cb.RemainingDelay())}
}
return nil
return true, nil
}

func (cb *circuitBreaker) recordSuccess() {
func (cb *circuitBreaker) recordResult(err error) {
if !cb.isActive() {
return
}
cb.CircuitBreaker.RecordSuccess()
cb.metrics.circuitBreakerResults.WithLabelValues(cb.ingesterID, resultSuccess).Inc()
}

func (cb *circuitBreaker) recordError(err error) {
if !cb.isActive() {
return
if err != nil && isFailure(err) {
cb.cb.RecordError(err)
cb.metrics.circuitBreakerResults.WithLabelValues(resultError).Inc()
} else {
cb.metrics.circuitBreakerResults.WithLabelValues(resultSuccess).Inc()
cb.cb.RecordSuccess()
}
cb.CircuitBreaker.RecordError(err)
cb.metrics.circuitBreakerResults.WithLabelValues(cb.ingesterID, resultError).Inc()
}

func (cb *circuitBreaker) finishPushRequest(ctx context.Context, duration time.Duration, err error) error {
// finishPushRequest records the result of a push request with this circuit breaker.
// If the circuit breaker is not active, finishPushRequest does nothing.
// If the passed duration of a push request exceeds the configured maximal push request duration,
// a context.DeadlineExceeded error is recorded and returned, independently of the passed pushErr,
// the actual error that occurred during the push request.
// Otherwise, the given pushErr is recorded and returned.
// The returned error is needed only for testing purposes.
func (cb *circuitBreaker) finishPushRequest(ctx context.Context, duration time.Duration, pushErr error) error {
if !cb.isActive() {
return nil
}
if cb.cfg.testModeEnabled {
if initialDelay, ok := ctx.Value(testDelayKey).(time.Duration); ok {
duration += initialDelay
if testDelay, ok := ctx.Value(testDelayKey).(time.Duration); ok {
duration += testDelay
}
}
if cb.cfg.PushTimeout < duration {
err = context.DeadlineExceeded
}
if err == nil {
cb.recordSuccess()
} else {
cb.recordError(err)
pushErr = context.DeadlineExceeded
}
return err
cb.recordResult(pushErr)
return pushErr
}
Loading

0 comments on commit 6b915ca

Please sign in to comment.