Skip to content

Commit

Permalink
Fixing review findings
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
  • Loading branch information
duricanikolic committed Jun 4, 2024
1 parent 1d3a8bb commit e136df3
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 79 deletions.
2 changes: 1 addition & 1 deletion cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -3206,7 +3206,7 @@
"kind": "field",
"name": "initial_delay",
"required": false,
"desc": "How long the circuit breaker should wait to start up after the corresponding ingester started. During that time both failures and successes will not be counted.",
"desc": "How long the circuit breaker should wait between an activation request and becoming effectively active. During that time both failures and successes will not be counted.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "ingester.circuit-breaker.initial-delay",
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,7 @@ Usage of ./cmd/mimir/mimir:
-ingester.circuit-breaker.failure-threshold-percentage uint
[experimental] Max percentage of requests that can fail over period before the circuit breaker opens (default 10)
-ingester.circuit-breaker.initial-delay duration
[experimental] How long the circuit breaker should wait to start up after the corresponding ingester started. During that time both failures and successes will not be counted.
[experimental] How long the circuit breaker should wait between an activation request and becoming effectively active. During that time both failures and successes will not be counted.
-ingester.circuit-breaker.push-timeout duration
How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors (default 2s)
-ingester.circuit-breaker.thresholding-period duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1243,9 +1243,9 @@ circuit_breaker:
# CLI flag: -ingester.circuit-breaker.cooldown-period
[cooldown_period: <duration> | default = 10s]
# (experimental) How long the circuit breaker should wait to start up after
# the corresponding ingester started. During that time both failures and
# successes will not be counted.
# (experimental) How long the circuit breaker should wait between an
# activation request and becoming effectively active. During that time both
# failures and successes will not be counted.
# CLI flag: -ingester.circuit-breaker.initial-delay
[initial_delay: <duration> | default = 0s]
Expand Down
67 changes: 37 additions & 30 deletions pkg/ingester/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
)

const (
resultSuccess = "success"
resultError = "error"
resultOpen = "circuit_breaker_open"
defaultPushTimeout = 2 * time.Second
circuitBreakerResultSuccess = "success"
circuitBreakerResultError = "error"
circuitBreakerResultOpen = "circuit_breaker_open"
circuitBreakerDefaultPushTimeout = 2 * time.Second
)

type circuitBreakerMetrics struct {
Expand Down Expand Up @@ -60,7 +60,7 @@ func newCircuitBreakerMetrics(r prometheus.Registerer, currentStateFn func() cir
// We initialize all possible states for the circuitBreakerTransitions metrics
cbMetrics.circuitBreakerTransitions.WithLabelValues(s.String())
}
for _, r := range []string{resultSuccess, resultError, resultOpen} {
for _, r := range []string{circuitBreakerResultSuccess, circuitBreakerResultError, circuitBreakerResultOpen} {
// We initialize all possible results for the circuitBreakerResults metrics
cbMetrics.circuitBreakerResults.WithLabelValues(r)
}
Expand All @@ -85,10 +85,12 @@ func (cfg *CircuitBreakerConfig) RegisterFlags(f *flag.FlagSet) {
f.UintVar(&cfg.FailureExecutionThreshold, prefix+"failure-execution-threshold", 100, "How many requests must have been executed in period for the circuit breaker to be eligible to open for the rate of failures")
f.DurationVar(&cfg.ThresholdingPeriod, prefix+"thresholding-period", time.Minute, "Moving window of time that the percentage of failed requests is computed over")
f.DurationVar(&cfg.CooldownPeriod, prefix+"cooldown-period", 10*time.Second, "How long the circuit breaker will stay in the open state before allowing some requests")
f.DurationVar(&cfg.InitialDelay, prefix+"initial-delay", 0, "How long the circuit breaker should wait to start up after the corresponding ingester started. During that time both failures and successes will not be counted.")
f.DurationVar(&cfg.PushTimeout, prefix+"push-timeout", defaultPushTimeout, "How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors")
f.DurationVar(&cfg.InitialDelay, prefix+"initial-delay", 0, "How long the circuit breaker should wait between an activation request and becoming effectively active. During that time both failures and successes will not be counted.")
f.DurationVar(&cfg.PushTimeout, prefix+"push-timeout", circuitBreakerDefaultPushTimeout, "How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors")
}

// circuitBreaker abstracts the ingester's server-side circuit breaker functionality.
// A nil *circuitBreaker is a valid noop implementation.
type circuitBreaker struct {
cfg CircuitBreakerConfig
logger log.Logger
Expand All @@ -100,8 +102,11 @@ type circuitBreaker struct {
testRequestDelay time.Duration
}

func newCircuitBreaker(cfg CircuitBreakerConfig, isActive bool, logger log.Logger, registerer prometheus.Registerer) *circuitBreaker {
active := atomic.NewBool(isActive)
func newCircuitBreaker(cfg CircuitBreakerConfig, logger log.Logger, registerer prometheus.Registerer) *circuitBreaker {
if !cfg.Enabled {
return nil
}
active := atomic.NewBool(false)
cb := circuitBreaker{
cfg: cfg,
logger: logger,
Expand Down Expand Up @@ -142,7 +147,7 @@ func newCircuitBreaker(cfg CircuitBreakerConfig, isActive bool, logger log.Logge
return &cb
}

func isFailure(err error) bool {
func isCircuitBreakerFailure(err error) bool {
if err == nil {
return false
}
Expand All @@ -169,17 +174,19 @@ func isFailure(err error) bool {
}

func (cb *circuitBreaker) isActive() bool {
if cb == nil {
return false
}
return cb.active.Load()
return cb != nil && cb.active.Load()
}

func (cb *circuitBreaker) setActive() {
func (cb *circuitBreaker) activate() {
if cb == nil {
return
}
cb.active.Store(true)
if cb.cfg.InitialDelay == 0 {
cb.active.Store(true)
}
time.AfterFunc(cb.cfg.InitialDelay, func() {
cb.active.Store(true)
})
}

// tryAcquirePermit tries to acquire a permit to use the circuit breaker and returns whether a permit was acquired.
Expand All @@ -192,25 +199,12 @@ func (cb *circuitBreaker) tryAcquirePermit() (bool, error) {
return false, nil
}
if !cb.cb.TryAcquirePermit() {
cb.metrics.circuitBreakerResults.WithLabelValues(resultOpen).Inc()
cb.metrics.circuitBreakerResults.WithLabelValues(circuitBreakerResultOpen).Inc()
return false, newCircuitBreakerOpenError(cb.cb.RemainingDelay())
}
return true, nil
}

func (cb *circuitBreaker) recordResult(err error) {
if !cb.isActive() {
return
}
if err != nil && isFailure(err) {
cb.cb.RecordFailure()
cb.metrics.circuitBreakerResults.WithLabelValues(resultError).Inc()
} else {
cb.metrics.circuitBreakerResults.WithLabelValues(resultSuccess).Inc()
cb.cb.RecordSuccess()
}
}

// finishPushRequest should be called to complete the push request executed upon a
// successfully acquired circuit breaker permit.
// It records the result of the push request with the circuit breaker. Push requests
Expand All @@ -229,3 +223,16 @@ func (cb *circuitBreaker) finishPushRequest(duration time.Duration, pushErr erro
cb.recordResult(pushErr)
return pushErr
}

func (cb *circuitBreaker) recordResult(err error) {
if !cb.isActive() {
return
}
if err != nil && isCircuitBreakerFailure(err) {
cb.cb.RecordFailure()
cb.metrics.circuitBreakerResults.WithLabelValues(circuitBreakerResultError).Inc()
} else {
cb.metrics.circuitBreakerResults.WithLabelValues(circuitBreakerResultSuccess).Inc()
cb.cb.RecordSuccess()
}
}
80 changes: 47 additions & 33 deletions pkg/ingester/circuitbreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,45 +28,45 @@ import (

func TestIsFailure(t *testing.T) {
t.Run("no error", func(t *testing.T) {
require.False(t, isFailure(nil))
require.False(t, isCircuitBreakerFailure(nil))
})

t.Run("context cancelled", func(t *testing.T) {
require.False(t, isFailure(context.Canceled))
require.False(t, isFailure(fmt.Errorf("%w", context.Canceled)))
require.False(t, isCircuitBreakerFailure(context.Canceled))
require.False(t, isCircuitBreakerFailure(fmt.Errorf("%w", context.Canceled)))
})

t.Run("gRPC context cancelled", func(t *testing.T) {
err := status.Error(codes.Canceled, "cancelled!")
require.False(t, isFailure(err))
require.False(t, isFailure(fmt.Errorf("%w", err)))
require.False(t, isCircuitBreakerFailure(err))
require.False(t, isCircuitBreakerFailure(fmt.Errorf("%w", err)))
})

t.Run("gRPC deadline exceeded", func(t *testing.T) {
err := status.Error(codes.DeadlineExceeded, "broken!")
require.True(t, isFailure(err))
require.True(t, isFailure(fmt.Errorf("%w", err)))
require.True(t, isCircuitBreakerFailure(err))
require.True(t, isCircuitBreakerFailure(fmt.Errorf("%w", err)))
})

t.Run("gRPC unavailable with INSTANCE_LIMIT details", func(t *testing.T) {
err := newInstanceLimitReachedError("broken")
require.True(t, isFailure(err))
require.True(t, isFailure(fmt.Errorf("%w", err)))
require.True(t, isCircuitBreakerFailure(err))
require.True(t, isCircuitBreakerFailure(fmt.Errorf("%w", err)))
})

t.Run("gRPC unavailable with SERVICE_UNAVAILABLE details is not a failure", func(t *testing.T) {
stat := status.New(codes.Unavailable, "broken!")
stat, err := stat.WithDetails(&mimirpb.ErrorDetails{Cause: mimirpb.SERVICE_UNAVAILABLE})
require.NoError(t, err)
err = stat.Err()
require.False(t, isFailure(err))
require.False(t, isFailure(fmt.Errorf("%w", err)))
require.False(t, isCircuitBreakerFailure(err))
require.False(t, isCircuitBreakerFailure(fmt.Errorf("%w", err)))
})

t.Run("gRPC unavailable without details is not a failure", func(t *testing.T) {
err := status.Error(codes.Unavailable, "broken!")
require.False(t, isFailure(err))
require.False(t, isFailure(fmt.Errorf("%w", err)))
require.False(t, isCircuitBreakerFailure(err))
require.False(t, isCircuitBreakerFailure(fmt.Errorf("%w", err)))
})
}

Expand All @@ -77,10 +77,9 @@ func TestCircuitBreaker_IsActive(t *testing.T) {

registry := prometheus.NewRegistry()
cfg := CircuitBreakerConfig{Enabled: true, InitialDelay: 10 * time.Millisecond}
cb = newCircuitBreaker(cfg, false, log.NewNopLogger(), registry)
time.AfterFunc(cfg.InitialDelay, func() {
cb.setActive()
})
cb = newCircuitBreaker(cfg, log.NewNopLogger(), registry)
cb.activate()

// When InitialDelay is set, circuit breaker is not immediately active.
require.False(t, cb.isActive())

Expand All @@ -104,18 +103,26 @@ func TestCircuitBreaker_TryAcquirePermit(t *testing.T) {
expectedMetrics string
}{
"if circuit breaker is not active, status false and no error are returned": {
initialDelay: 1 * time.Minute,
circuitBreakerSetup: func(cb *circuitBreaker) { cb.cb.Close() },
initialDelay: 1 * time.Minute,
circuitBreakerSetup: func(cb *circuitBreaker) {
cb.active.Store(false)
},
expectedSuccess: false,
expectedCircuitBreakerError: false,
},
"if circuit breaker closed, status true and no error are returned": {
circuitBreakerSetup: func(cb *circuitBreaker) { cb.cb.Close() },
circuitBreakerSetup: func(cb *circuitBreaker) {
cb.activate()
cb.cb.Close()
},
expectedSuccess: true,
expectedCircuitBreakerError: false,
},
"if circuit breaker open, status false and a circuitBreakerErrorOpen are returned": {
circuitBreakerSetup: func(cb *circuitBreaker) { cb.cb.Open() },
circuitBreakerSetup: func(cb *circuitBreaker) {
cb.activate()
cb.cb.Open()
},
expectedSuccess: false,
expectedCircuitBreakerError: true,
expectedMetrics: `
Expand All @@ -137,7 +144,10 @@ func TestCircuitBreaker_TryAcquirePermit(t *testing.T) {
`,
},
"if circuit breaker half-open, status false and a circuitBreakerErrorOpen are returned": {
circuitBreakerSetup: func(cb *circuitBreaker) { cb.cb.HalfOpen() },
circuitBreakerSetup: func(cb *circuitBreaker) {
cb.activate()
cb.cb.HalfOpen()
},
expectedSuccess: false,
expectedCircuitBreakerError: true,
expectedMetrics: `
Expand All @@ -162,8 +172,8 @@ func TestCircuitBreaker_TryAcquirePermit(t *testing.T) {
for testName, testCase := range testCases {
t.Run(testName, func(t *testing.T) {
registry := prometheus.NewRegistry()
cfg := CircuitBreakerConfig{Enabled: true, CooldownPeriod: 10 * time.Second, InitialDelay: testCase.initialDelay}
cb := newCircuitBreaker(cfg, cfg.InitialDelay == 0, log.NewNopLogger(), registry)
cfg := CircuitBreakerConfig{Enabled: true, CooldownPeriod: 10 * time.Second}
cb := newCircuitBreaker(cfg, log.NewNopLogger(), registry)
testCase.circuitBreakerSetup(cb)
status, err := cb.tryAcquirePermit()
require.Equal(t, testCase.expectedSuccess, status)
Expand Down Expand Up @@ -220,7 +230,8 @@ func TestCircuitBreaker_RecordResult(t *testing.T) {
for testName, testCase := range testCases {
t.Run(testName, func(t *testing.T) {
registry := prometheus.NewRegistry()
cb := newCircuitBreaker(cfg, true, log.NewNopLogger(), registry)
cb := newCircuitBreaker(cfg, log.NewNopLogger(), registry)
cb.activate()
cb.recordResult(testCase.err)
assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(testCase.expectedMetrics), metricNames...))
})
Expand All @@ -233,13 +244,14 @@ func TestCircuitBreaker_FinishPushRequest(t *testing.T) {
}
testCases := map[string]struct {
pushRequestDuration time.Duration
initialDelay time.Duration
isActive bool
err error
expectedErr error
expectedMetrics string
}{
"with a permit acquired, pushRequestDuration lower than PushTimeout and no input error, finishPushRequest gives success": {
pushRequestDuration: 1 * time.Second,
isActive: true,
err: nil,
expectedMetrics: `
# HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker.
Expand All @@ -251,7 +263,7 @@ func TestCircuitBreaker_FinishPushRequest(t *testing.T) {
},
"with circuit breaker not active, pushRequestDuration lower than PushTimeout and no input error, finishPushRequest does nothing": {
pushRequestDuration: 1 * time.Second,
initialDelay: 1 * time.Minute,
isActive: false,
err: nil,
expectedMetrics: `
# HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker.
Expand All @@ -263,6 +275,7 @@ func TestCircuitBreaker_FinishPushRequest(t *testing.T) {
},
"with circuit breaker active, pushRequestDuration higher than PushTimeout and no input error, finishPushRequest gives context deadline exceeded error": {
pushRequestDuration: 3 * time.Second,
isActive: true,
err: nil,
expectedErr: context.DeadlineExceeded,
expectedMetrics: `
Expand All @@ -275,7 +288,7 @@ func TestCircuitBreaker_FinishPushRequest(t *testing.T) {
},
"with circuit breaker not active, pushRequestDuration higher than PushTimeout and no input error, finishPushRequest does nothing": {
pushRequestDuration: 3 * time.Second,
initialDelay: 1 * time.Minute,
isActive: false,
err: nil,
expectedErr: nil,
expectedMetrics: `
Expand All @@ -288,6 +301,7 @@ func TestCircuitBreaker_FinishPushRequest(t *testing.T) {
},
"with circuit breaker active, pushRequestDuration higher than PushTimeout and an input error different from context deadline exceeded, finishPushRequest gives context deadline exceeded error": {
pushRequestDuration: 3 * time.Second,
isActive: true,
err: newInstanceLimitReachedError("error"),
expectedErr: context.DeadlineExceeded,
expectedMetrics: `
Expand All @@ -300,7 +314,7 @@ func TestCircuitBreaker_FinishPushRequest(t *testing.T) {
},
"with circuit breaker not active, pushRequestDuration higher than PushTimeout and an input error different from context deadline exceeded, finishPushRequest does nothing": {
pushRequestDuration: 3 * time.Second,
initialDelay: 1 * time.Minute,
isActive: false,
err: newInstanceLimitReachedError("error"),
expectedErr: nil,
expectedMetrics: `
Expand All @@ -316,11 +330,11 @@ func TestCircuitBreaker_FinishPushRequest(t *testing.T) {
t.Run(testName, func(t *testing.T) {
registry := prometheus.NewRegistry()
cfg := CircuitBreakerConfig{
Enabled: true,
InitialDelay: testCase.initialDelay,
PushTimeout: 2 * time.Second,
Enabled: true,
PushTimeout: 2 * time.Second,
}
cb := newCircuitBreaker(cfg, cfg.InitialDelay == 0, log.NewNopLogger(), registry)
cb := newCircuitBreaker(cfg, log.NewNopLogger(), registry)
cb.active.Store(testCase.isActive)
err := cb.finishPushRequest(testCase.pushRequestDuration, testCase.err)
if testCase.expectedErr == nil {
require.NoError(t, err)
Expand Down
Loading

0 comments on commit e136df3

Please sign in to comment.