Skip to content

Commit

Permalink
Fixing review findigs
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
  • Loading branch information
duricanikolic committed May 28, 2024
1 parent 1121ffe commit cd7e066
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 105 deletions.
106 changes: 34 additions & 72 deletions pkg/ingester/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,23 @@ import (
"flag"
"time"

"github.com/failsafe-go/failsafe-go"
"github.com/failsafe-go/failsafe-go/circuitbreaker"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/middleware"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc/codes"

"github.com/grafana/mimir/pkg/mimirpb"
)

type testCtxKey string

const (
resultSuccess = "success"
resultError = "error"
resultOpen = "circuit_breaker_open"
defaultTimeout = 2 * time.Second
testDelayKey testCtxKey = "test-delay"
resultSuccess = "success"
resultError = "error"
resultOpen = "circuit_breaker_open"
testDelayKey testCtxKey = "test-delay"
)

type circuitBreakerMetrics struct {
Expand All @@ -37,10 +33,6 @@ type circuitBreakerMetrics struct {
circuitBreakerResults *prometheus.CounterVec
}

type startPushRequestFn func(context.Context, int64) (context.Context, bool, error)

type pushRequestFn func(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error)

func newCircuitBreakerMetrics(r prometheus.Registerer) *circuitBreakerMetrics {
return &circuitBreakerMetrics{
circuitBreakerCurrentState: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Expand Down Expand Up @@ -85,9 +77,8 @@ func (cfg *CircuitBreakerConfig) Validate() error {
}

type circuitBreaker struct {
cfg CircuitBreakerConfig
circuitbreaker.CircuitBreaker[any]
executor failsafe.Executor[any]
cfg CircuitBreakerConfig
ingesterID string
logger log.Logger
metrics *circuitBreakerMetrics
Expand All @@ -100,21 +91,13 @@ func newCircuitBreaker(cfg CircuitBreakerConfig, ingesterID string, logger log.L
transitionOpen := metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.OpenState.String())
transitionHalfOpen := metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.HalfOpenState.String())
transitionClosed := metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.ClosedState.String())
countSuccess := metrics.circuitBreakerResults.WithLabelValues(ingesterID, resultSuccess)
countError := metrics.circuitBreakerResults.WithLabelValues(ingesterID, resultError)
gaugeOpen := metrics.circuitBreakerCurrentState.WithLabelValues(circuitbreaker.OpenState.String())
gaugeHalfOpen := metrics.circuitBreakerCurrentState.WithLabelValues(circuitbreaker.HalfOpenState.String())
gaugeClosed := metrics.circuitBreakerCurrentState.WithLabelValues(circuitbreaker.ClosedState.String())

cbBuilder := circuitbreaker.Builder[any]().
WithFailureThreshold(cfg.FailureThreshold).
WithDelay(cfg.CooldownPeriod).
OnFailure(func(failsafe.ExecutionEvent[any]) {
countError.Inc()
}).
OnSuccess(func(failsafe.ExecutionEvent[any]) {
countSuccess.Inc()
}).
OnClose(func(event circuitbreaker.StateChangedEvent) {
transitionClosed.Inc()
gaugeOpen.Set(0)
Expand Down Expand Up @@ -151,7 +134,6 @@ func newCircuitBreaker(cfg CircuitBreakerConfig, ingesterID string, logger log.L
return &circuitBreaker{
cfg: cfg,
CircuitBreaker: cb,
executor: failsafe.NewExecutor[any](cb),
ingesterID: ingesterID,
logger: logger,
metrics: metrics,
Expand Down Expand Up @@ -187,68 +169,48 @@ func (cb *circuitBreaker) isActive() bool {
return cb.startTime.Before(time.Now())
}

func (cb *circuitBreaker) get(ctx context.Context, op func() (any, error)) (any, error) {
res, err := cb.executor.Get(op)
if err != nil && errors.Is(err, circuitbreaker.ErrOpen) {
func (cb *circuitBreaker) tryAcquirePermit() error {
if !cb.isActive() {
return nil
}
if !cb.CircuitBreaker.TryAcquirePermit() {
cb.metrics.circuitBreakerResults.WithLabelValues(cb.ingesterID, resultOpen).Inc()
cbOpenErr := middleware.DoNotLogError{Err: newCircuitBreakerOpenError(cb.RemainingDelay())}
return res, newErrorWithStatus(cbOpenErr, codes.Unavailable)
return newCircuitBreakerOpenError(cb.RemainingDelay())
}
return res, cb.processError(ctx, err)
return nil
}

func (cb *circuitBreaker) processError(ctx context.Context, err error) error {
if err == nil {
return nil
}
if errors.Is(err, ctx.Err()) {
// ctx.Err() was registered with the circuit breaker's executor, but we don't propagate it
return nil
func (cb *circuitBreaker) recordSuccess() {
if !cb.isActive() {
return
}
cb.CircuitBreaker.RecordSuccess()
cb.metrics.circuitBreakerResults.WithLabelValues(cb.ingesterID, resultSuccess).Inc()
}

return err
func (cb *circuitBreaker) recordError(err error) {
if !cb.isActive() {
return
}
cb.CircuitBreaker.RecordError(err)
cb.metrics.circuitBreakerResults.WithLabelValues(cb.ingesterID, resultError).Inc()
}

func (cb *circuitBreaker) contextWithTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
if timeout == 0 {
timeout = defaultTimeout
func (cb *circuitBreaker) finishPushRequest(ctx context.Context, startTimestamp time.Time, err error) {
if !cb.isActive() {
return
}
ctx, cancel := context.WithTimeout(context.WithoutCancel(parent), timeout)
if cb.cfg.testModeEnabled {
if initialDelay, ok := parent.Value(testDelayKey).(time.Duration); ok {
if initialDelay, ok := ctx.Value(testDelayKey).(time.Duration); ok {
time.Sleep(initialDelay)
}
}
return ctx, cancel
}

func (cb *circuitBreaker) StartPushRequest(ctx context.Context, reqSize int64, startPushRequest startPushRequestFn) (context.Context, error) {
callbackCtx, callbackErr := cb.get(ctx, func() (any, error) {
callbackCtx, _, callbackErr := startPushRequest(ctx, reqSize)
return callbackCtx, callbackErr
})
if callbackErr == nil {
return callbackCtx.(context.Context), nil
if cb.cfg.PushTimeout < time.Since(startTimestamp) {
err = context.DeadlineExceeded
}
return nil, callbackErr
}

func (cb *circuitBreaker) Push(parent context.Context, req *mimirpb.WriteRequest, push pushRequestFn) (*mimirpb.WriteResponse, error) {
ctx, cancel := cb.contextWithTimeout(parent, cb.cfg.PushTimeout)
defer cancel()

callbackResult, callbackErr := cb.get(ctx, func() (any, error) {
callbackResult, callbackErr := push(ctx, req)
if callbackErr != nil {
return callbackResult, callbackErr
}

// We return ctx.Err() in order to register it with the circuit breaker's executor.
return callbackResult, ctx.Err()
})

if callbackResult == nil {
return nil, callbackErr
if err == nil {
cb.recordSuccess()
} else {
cb.recordError(err)
}
return callbackResult.(*mimirpb.WriteResponse), callbackErr
}
36 changes: 14 additions & 22 deletions pkg/ingester/circuitbreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"testing"
"time"

"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/test"
Expand All @@ -19,9 +18,9 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util/validation"
)

func TestIngester_Push_CircuitBreaker(t *testing.T) {
Expand Down Expand Up @@ -76,8 +75,9 @@ func TestIngester_Push_CircuitBreaker(t *testing.T) {
testModeEnabled: true,
}

i, err := prepareIngesterWithBlocksStorage(t, cfg, nil, registry)
overrides, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
i, _, _ := createTestIngesterWithIngestStorage(t, &cfg, overrides, registry)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

Expand All @@ -95,7 +95,7 @@ func TestIngester_Push_CircuitBreaker(t *testing.T) {
nil,
mimirpb.API,
)
_, err = i.Push(ctx, req)
err = i.PushToStorage(ctx, req)
require.NoError(t, err)

count := 0
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestIngester_Push_CircuitBreaker(t *testing.T) {
if testCase.ctx != nil {
ctx = testCase.ctx(ctx)
}
_, err = i.Push(ctx, req)
err = i.PushToStorage(ctx, req)
if initialDelayEnabled {
if testCase.expectedErrorWhenCircuitBreakerClosed != nil {
require.ErrorAs(t, err, &testCase.expectedErrorWhenCircuitBreakerClosed)
Expand All @@ -148,15 +148,11 @@ func TestIngester_Push_CircuitBreaker(t *testing.T) {
var expectedMetrics string
if initialDelayEnabled {
expectedMetrics = `
# HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker.
# TYPE cortex_ingester_circuit_breaker_results_total counter
cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="error"} 0
cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="success"} 0
# HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state.
# TYPE cortex_ingester_circuit_breaker_transitions_total counter
cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="closed"} 0
cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="half-open"} 0
cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="open"} 0
cortex_ingester_circuit_breaker_transitions_total{ingester="ingester-zone-a-0",state="closed"} 0
cortex_ingester_circuit_breaker_transitions_total{ingester="ingester-zone-a-0",state="half-open"} 0
cortex_ingester_circuit_breaker_transitions_total{ingester="ingester-zone-a-0",state="open"} 0
# HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name.
# TYPE cortex_ingester_circuit_breaker_current_state gauge
cortex_ingester_circuit_breaker_current_state{state="open"} 0
Expand All @@ -167,14 +163,14 @@ func TestIngester_Push_CircuitBreaker(t *testing.T) {
expectedMetrics = `
# HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker.
# TYPE cortex_ingester_circuit_breaker_results_total counter
cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="circuit_breaker_open"} 2
cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="error"} 2
cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="success"} 1
cortex_ingester_circuit_breaker_results_total{ingester="ingester-zone-a-0",result="circuit_breaker_open"} 2
cortex_ingester_circuit_breaker_results_total{ingester="ingester-zone-a-0",result="error"} 2
cortex_ingester_circuit_breaker_results_total{ingester="ingester-zone-a-0",result="success"} 1
# HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state.
# TYPE cortex_ingester_circuit_breaker_transitions_total counter
cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="closed"} 0
cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="half-open"} 0
cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="open"} 1
cortex_ingester_circuit_breaker_transitions_total{ingester="ingester-zone-a-0",state="closed"} 0
cortex_ingester_circuit_breaker_transitions_total{ingester="ingester-zone-a-0",state="half-open"} 0
cortex_ingester_circuit_breaker_transitions_total{ingester="ingester-zone-a-0",state="open"} 1
# HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name.
# TYPE cortex_ingester_circuit_breaker_current_state gauge
cortex_ingester_circuit_breaker_current_state{state="open"} 1
Expand All @@ -197,8 +193,4 @@ func checkCircuitBreakerOpenErr(ctx context.Context, err error, t *testing.T) {

shouldLog, _ := optional.ShouldLog(ctx)
require.False(t, shouldLog, "expected not to log via .ShouldLog()")

s, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok, "expected to be able to convert to gRPC status")
require.Equal(t, codes.Unavailable, s.Code())
}
26 changes: 15 additions & 11 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,9 +974,6 @@ type pushRequestState struct {
// StartPushRequest checks if ingester can start push request, and increments relevant counters.
// If new push request cannot be started, errors convertible to gRPC status code are returned, and metrics are updated.
func (i *Ingester) StartPushRequest(ctx context.Context, reqSize int64) (context.Context, error) {
if i.circuitBreaker.isActive() {
return i.circuitBreaker.StartPushRequest(ctx, reqSize, i.startPushRequest)
}
ctx, _, err := i.startPushRequest(ctx, reqSize)
return ctx, err
}
Expand All @@ -996,6 +993,15 @@ func (i *Ingester) FinishPushRequest(ctx context.Context) {
//
// The shouldFinish flag tells if the caller must call finish on this request. If not, there is already someone in the call stack who will do that.
func (i *Ingester) startPushRequest(ctx context.Context, reqSize int64) (_ context.Context, shouldFinish bool, err error) {
if err := i.circuitBreaker.tryAcquirePermit(); err != nil {
return nil, false, err
}
defer func() {
if err != nil {
i.circuitBreaker.recordError(err)
}
}()

if err := i.checkAvailableForPush(); err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -1061,7 +1067,7 @@ func (i *Ingester) finishPushRequest(reqSize int64) {
}

// PushWithCleanup is the Push() implementation for blocks storage and takes a WriteRequest and adds it to the TSDB head.
func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteRequest, cleanUp func()) error {
func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteRequest, cleanUp func()) (err error) {
// NOTE: because we use `unsafe` in deserialisation, we must not
// retain anything from `req` past the exit from this function.
defer cleanUp()
Expand All @@ -1080,6 +1086,11 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques
}
}

start := time.Now()
defer func() {
i.circuitBreaker.finishPushRequest(ctx, start, err)
}()

userID, err := tenant.TenantID(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -3768,13 +3779,6 @@ func (i *Ingester) PushToStorage(ctx context.Context, req *mimirpb.WriteRequest)

// Push implements client.IngesterServer, which is registered into gRPC server.
func (i *Ingester) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
if i.circuitBreaker.isActive() {
return i.circuitBreaker.Push(ctx, req, i.push)
}
return i.push(ctx, req)
}

func (i *Ingester) push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
if !i.cfg.PushGrpcMethodEnabled {
return nil, errPushGrpcDisabled
}
Expand Down

0 comments on commit cd7e066

Please sign in to comment.