Skip to content

Commit

Permalink
Fixing review findings
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
  • Loading branch information
duricanikolic committed May 31, 2024
1 parent c0ad0a5 commit c5f7fff
Show file tree
Hide file tree
Showing 3 changed files with 397 additions and 277 deletions.
210 changes: 95 additions & 115 deletions pkg/ingester/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,144 +23,122 @@ import (
type testCtxKey string

const (
resultSuccess = "success"
resultError = "error"
resultOpen = "circuit_breaker_open"
testDelayKey testCtxKey = "test-delay"

circuitBreakerCurrentStateGaugeName = "cortex_ingester_circuit_breaker_current_state"
circuitBreakerCurrentStateGaugeHelp = "Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name."
circuitBreakerCurrentStateGaugeLabel = "state"
resultSuccess = "success"
resultError = "error"
resultOpen = "circuit_breaker_open"
defaultPushTimeout = 2 * time.Second
testDelayKey testCtxKey = "test-delay"
)

type circuitBreakerMetrics struct {
circuitBreakerOpenStateGauge prometheus.GaugeFunc
circuitBreakerHalfOpenStateGauge prometheus.GaugeFunc
circuitBreakerClosedStateGauge prometheus.GaugeFunc

circuitBreakerTransitions *prometheus.CounterVec
circuitBreakerResults *prometheus.CounterVec
}

func newCircuitBreakerMetrics(r prometheus.Registerer, currentStateFn func() circuitbreaker.State) *circuitBreakerMetrics {
return &circuitBreakerMetrics{
circuitBreakerOpenStateGauge: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{
Name: circuitBreakerCurrentStateGaugeName,
Help: circuitBreakerCurrentStateGaugeHelp,
ConstLabels: map[string]string{circuitBreakerCurrentStateGaugeLabel: circuitbreaker.OpenState.String()},
}, func() float64 {
if currentStateFn() == circuitbreaker.OpenState {
return 1
}
return 0
}),
circuitBreakerHalfOpenStateGauge: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{
Name: circuitBreakerCurrentStateGaugeName,
Help: circuitBreakerCurrentStateGaugeHelp,
ConstLabels: map[string]string{circuitBreakerCurrentStateGaugeLabel: circuitbreaker.HalfOpenState.String()},
}, func() float64 {
if currentStateFn() == circuitbreaker.HalfOpenState {
return 1
}
return 0
}),
circuitBreakerClosedStateGauge: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{
Name: circuitBreakerCurrentStateGaugeName,
Help: circuitBreakerCurrentStateGaugeHelp,
ConstLabels: map[string]string{circuitBreakerCurrentStateGaugeLabel: circuitbreaker.ClosedState.String()},
}, func() float64 {
if currentStateFn() == circuitbreaker.ClosedState {
return 1
}
return 0
}),
cbMetrics := &circuitBreakerMetrics{
circuitBreakerTransitions: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_circuit_breaker_transitions_total",
Help: "Number of times the circuit breaker has entered a state.",
}, []string{"ingester", "state"}),
}, []string{"state"}),
circuitBreakerResults: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_circuit_breaker_results_total",
Help: "Results of executing requests via the circuit breaker.",
}, []string{"ingester", "result"}),
}, []string{"result"}),
}
circuitBreakerCurrentStateGaugeFn := func(state circuitbreaker.State) prometheus.GaugeFunc {
return promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_ingester_circuit_breaker_current_state",
Help: "Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name.",
ConstLabels: map[string]string{"state": state.String()},
}, func() float64 {
if currentStateFn() == state {
return 1
}
return 0
})
}
for _, s := range []circuitbreaker.State{circuitbreaker.OpenState, circuitbreaker.HalfOpenState, circuitbreaker.ClosedState} {
circuitBreakerCurrentStateGaugeFn(s)
// We initialize all possible states for the circuitBreakerTransitions metrics
cbMetrics.circuitBreakerTransitions.WithLabelValues(s.String())
}
for _, r := range []string{resultSuccess, resultError, resultOpen} {
// We initialize all possible results for the circuitBreakerResults metrics
cbMetrics.circuitBreakerResults.WithLabelValues(r)
}
return cbMetrics
}

type CircuitBreakerConfig struct {
Enabled bool `yaml:"enabled" category:"experimental"`
FailureThreshold uint `yaml:"failure_threshold" category:"experimental"`
FailureExecutionThreshold uint `yaml:"failure_execution_threshold" category:"experimental"`
ThresholdingPeriod time.Duration `yaml:"thresholding_period" category:"experimental"`
CooldownPeriod time.Duration `yaml:"cooldown_period" category:"experimental"`
InitialDelay time.Duration `yaml:"initial_delay" category:"experimental"`
PushTimeout time.Duration `yaml:"push_timeout" category:"experiment"`
testModeEnabled bool `yaml:"-"`
Enabled bool `yaml:"enabled" category:"experimental"`
FailureThresholdPercentage uint `yaml:"failure_threshold_percentage" category:"experimental"`
FailureExecutionThreshold uint `yaml:"failure_execution_threshold" category:"experimental"`
ThresholdingPeriod time.Duration `yaml:"thresholding_period" category:"experimental"`
CooldownPeriod time.Duration `yaml:"cooldown_period" category:"experimental"`
InitialDelay time.Duration `yaml:"initial_delay" category:"experimental"`
PushTimeout time.Duration `yaml:"push_timeout" category:"experiment"`
testModeEnabled bool `yaml:"-"`
}

func (cfg *CircuitBreakerConfig) RegisterFlags(f *flag.FlagSet) {
prefix := "ingester.circuit-breaker."
f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "Enable circuit breaking when making requests to ingesters")
f.UintVar(&cfg.FailureThreshold, prefix+"failure-threshold", 10, "Max percentage of requests that can fail over period before the circuit breaker opens")
f.UintVar(&cfg.FailureThresholdPercentage, prefix+"failure-threshold-percentage", 10, "Max percentage of requests that can fail over period before the circuit breaker opens")
f.UintVar(&cfg.FailureExecutionThreshold, prefix+"failure-execution-threshold", 100, "How many requests must have been executed in period for the circuit breaker to be eligible to open for the rate of failures")
f.DurationVar(&cfg.ThresholdingPeriod, prefix+"thresholding-period", time.Minute, "Moving window of time that the percentage of failed requests is computed over")
f.DurationVar(&cfg.CooldownPeriod, prefix+"cooldown-period", 10*time.Second, "How long the circuit breaker will stay in the open state before allowing some requests")
f.DurationVar(&cfg.InitialDelay, prefix+"initial-delay", 0, "How long the circuit breaker should wait between creation and starting up. During that time both failures and successes will not be counted.")
f.DurationVar(&cfg.PushTimeout, prefix+"push-timeout", 0, "How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors")
}

func (cfg *CircuitBreakerConfig) Validate() error {
return nil
f.DurationVar(&cfg.PushTimeout, prefix+"push-timeout", defaultPushTimeout, "How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors")
}

type circuitBreaker struct {
circuitbreaker.CircuitBreaker[any]
cfg CircuitBreakerConfig
ingesterID string
logger log.Logger
metrics *circuitBreakerMetrics
startTime time.Time
cfg CircuitBreakerConfig
logger log.Logger
metrics *circuitBreakerMetrics
startTime time.Time
cb circuitbreaker.CircuitBreaker[any]
}

func newCircuitBreaker(cfg CircuitBreakerConfig, ingesterID string, logger log.Logger, registerer prometheus.Registerer) *circuitBreaker {
func newCircuitBreaker(cfg CircuitBreakerConfig, logger log.Logger, registerer prometheus.Registerer) *circuitBreaker {
cb := circuitBreaker{
cfg: cfg,
ingesterID: ingesterID,
logger: logger,
startTime: time.Now().Add(cfg.InitialDelay),
cfg: cfg,
logger: logger,
startTime: time.Now().Add(cfg.InitialDelay),
}

cb.metrics = newCircuitBreakerMetrics(registerer, cb.State)
// Initialize each of the known labels for circuit breaker metrics for this particular ingester.
transitionOpen := cb.metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.OpenState.String())
transitionHalfOpen := cb.metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.HalfOpenState.String())
transitionClosed := cb.metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.ClosedState.String())
circuitBreakerTransitionsCounterFn := func(metrics *circuitBreakerMetrics, state circuitbreaker.State) prometheus.Counter {
return metrics.circuitBreakerTransitions.WithLabelValues(state.String())
}

cbBuilder := circuitbreaker.Builder[any]().
WithFailureThreshold(cfg.FailureThreshold).
WithFailureThreshold(cfg.FailureThresholdPercentage).
WithDelay(cfg.CooldownPeriod).
OnClose(func(event circuitbreaker.StateChangedEvent) {
transitionClosed.Inc()
level.Info(logger).Log("msg", "circuit breaker is closed", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState)
circuitBreakerTransitionsCounterFn(cb.metrics, circuitbreaker.ClosedState).Inc()
level.Info(logger).Log("msg", "circuit breaker is closed", "previous", event.OldState, "current", event.NewState)
}).
OnOpen(func(event circuitbreaker.StateChangedEvent) {
transitionOpen.Inc()
level.Warn(logger).Log("msg", "circuit breaker is open", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState)
circuitBreakerTransitionsCounterFn(cb.metrics, circuitbreaker.OpenState).Inc()
level.Warn(logger).Log("msg", "circuit breaker is open", "previous", event.OldState, "current", event.NewState)
}).
OnHalfOpen(func(event circuitbreaker.StateChangedEvent) {
transitionHalfOpen.Inc()
level.Info(logger).Log("msg", "circuit breaker is half-open", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState)
circuitBreakerTransitionsCounterFn(cb.metrics, circuitbreaker.HalfOpenState).Inc()
level.Info(logger).Log("msg", "circuit breaker is half-open", "previous", event.OldState, "current", event.NewState)
}).
HandleIf(func(_ any, err error) bool { return isFailure(err) })

if cfg.testModeEnabled {
// In case of testing purposes, we initialize the circuit breaker with count based failure thresholding,
// since it is more deterministic, and therefore it is easier to predict the outcome.
cbBuilder = cbBuilder.WithFailureThreshold(cfg.FailureThreshold)
cbBuilder = cbBuilder.WithFailureThreshold(cfg.FailureThresholdPercentage)
} else {
// In case of production code, we prefer time based failure thresholding.
cbBuilder = cbBuilder.WithFailureRateThreshold(cfg.FailureThreshold, cfg.FailureExecutionThreshold, cfg.ThresholdingPeriod)
cbBuilder = cbBuilder.WithFailureRateThreshold(cfg.FailureThresholdPercentage, cfg.FailureExecutionThreshold, cfg.ThresholdingPeriod)
}

cb.CircuitBreaker = cbBuilder.Build()
cb.cb = cbBuilder.Build()
cb.metrics = newCircuitBreakerMetrics(registerer, cb.cb.State)
return &cb
}

Expand Down Expand Up @@ -190,60 +168,62 @@ func isFailure(err error) bool {
return false
}

func (cb *circuitBreaker) State() circuitbreaker.State {
return cb.CircuitBreaker.State()
}

func (cb *circuitBreaker) isActive() bool {
if cb == nil {
return false
}
return cb.startTime.Before(time.Now())
}

func (cb *circuitBreaker) tryAcquirePermit() error {
// tryAcquirePermit tries to acquire a permit to use the circuit breaker and returns whether a permit was acquired.
// If the circuit breaker is not yet active, a status false and no error are returned.
// If it was not possible to acquire a permit, this means that the circuit breaker is open. In this case, a status
// false and an circuitBreakerOpenError are returned.
// If it was possible to acquire a permit, a status true and no error are returned. In this case, the permission
// will be automatically released once when a result is recorded by calling resultRecorded.
func (cb *circuitBreaker) tryAcquirePermit() (bool, error) {
if !cb.isActive() {
return nil
return false, nil
}
if !cb.CircuitBreaker.TryAcquirePermit() {
cb.metrics.circuitBreakerResults.WithLabelValues(cb.ingesterID, resultOpen).Inc()
return middleware.DoNotLogError{Err: newCircuitBreakerOpenError(cb.RemainingDelay())}
if !cb.cb.TryAcquirePermit() {
cb.metrics.circuitBreakerResults.WithLabelValues(resultOpen).Inc()
return false, middleware.DoNotLogError{Err: newCircuitBreakerOpenError(cb.cb.RemainingDelay())}
}
return nil
return true, nil
}

func (cb *circuitBreaker) recordSuccess() {
func (cb *circuitBreaker) recordResult(err error) {
if !cb.isActive() {
return
}
cb.CircuitBreaker.RecordSuccess()
cb.metrics.circuitBreakerResults.WithLabelValues(cb.ingesterID, resultSuccess).Inc()
}

func (cb *circuitBreaker) recordError(err error) {
if !cb.isActive() {
return
if err != nil && isFailure(err) {
cb.cb.RecordError(err)
cb.metrics.circuitBreakerResults.WithLabelValues(resultError).Inc()
} else {
cb.metrics.circuitBreakerResults.WithLabelValues(resultSuccess).Inc()
cb.cb.RecordSuccess()
}
cb.CircuitBreaker.RecordError(err)
cb.metrics.circuitBreakerResults.WithLabelValues(cb.ingesterID, resultError).Inc()
}

func (cb *circuitBreaker) finishPushRequest(ctx context.Context, duration time.Duration, err error) error {
// finishPushRequest records the result of a push request with this circuit breaker.
// If the circuit breaker is not active, finishPushRequest does nothing.
// If the passed duration of a push request exceeds the configured maximal push request duration,
// a context.DeadlineExceeded error is recorded and returned, independently of the passed pushErr,
// the actual error that occurred during the push request.
// Otherwise, the given pushErr is recorded and returned.
// The returned error is needed only for testing purposes.
func (cb *circuitBreaker) finishPushRequest(ctx context.Context, duration time.Duration, pushErr error) error {
if !cb.isActive() {
return nil
}
if cb.cfg.testModeEnabled {
if initialDelay, ok := ctx.Value(testDelayKey).(time.Duration); ok {
duration += initialDelay
if testDelay, ok := ctx.Value(testDelayKey).(time.Duration); ok {
duration += testDelay
}
}
if cb.cfg.PushTimeout < duration {
err = context.DeadlineExceeded
}
if err == nil {
cb.recordSuccess()
} else {
cb.recordError(err)
pushErr = context.DeadlineExceeded
}
return err
cb.recordResult(pushErr)
return pushErr
}
Loading

0 comments on commit c5f7fff

Please sign in to comment.