diff --git a/pkg/ingester/circuitbreaker.go b/pkg/ingester/circuitbreaker.go index a87c79e078f..ef24aa309d4 100644 --- a/pkg/ingester/circuitbreaker.go +++ b/pkg/ingester/circuitbreaker.go @@ -7,15 +7,12 @@ import ( "flag" "time" - "github.com/failsafe-go/failsafe-go" "github.com/failsafe-go/failsafe-go/circuitbreaker" "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/grafana/dskit/middleware" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "google.golang.org/grpc/codes" "github.com/grafana/mimir/pkg/mimirpb" ) @@ -23,11 +20,10 @@ import ( type testCtxKey string const ( - resultSuccess = "success" - resultError = "error" - resultOpen = "circuit_breaker_open" - defaultTimeout = 2 * time.Second - testDelayKey testCtxKey = "test-delay" + resultSuccess = "success" + resultError = "error" + resultOpen = "circuit_breaker_open" + testDelayKey testCtxKey = "test-delay" ) type circuitBreakerMetrics struct { @@ -37,10 +33,6 @@ type circuitBreakerMetrics struct { circuitBreakerResults *prometheus.CounterVec } -type startPushRequestFn func(context.Context, int64) (context.Context, bool, error) - -type pushRequestFn func(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) - func newCircuitBreakerMetrics(r prometheus.Registerer) *circuitBreakerMetrics { return &circuitBreakerMetrics{ circuitBreakerCurrentState: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ @@ -85,9 +77,8 @@ func (cfg *CircuitBreakerConfig) Validate() error { } type circuitBreaker struct { - cfg CircuitBreakerConfig circuitbreaker.CircuitBreaker[any] - executor failsafe.Executor[any] + cfg CircuitBreakerConfig ingesterID string logger log.Logger metrics *circuitBreakerMetrics @@ -100,8 +91,6 @@ func newCircuitBreaker(cfg CircuitBreakerConfig, ingesterID string, logger log.L transitionOpen := metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.OpenState.String()) transitionHalfOpen := metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.HalfOpenState.String()) transitionClosed := metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.ClosedState.String()) - countSuccess := metrics.circuitBreakerResults.WithLabelValues(ingesterID, resultSuccess) - countError := metrics.circuitBreakerResults.WithLabelValues(ingesterID, resultError) gaugeOpen := metrics.circuitBreakerCurrentState.WithLabelValues(circuitbreaker.OpenState.String()) gaugeHalfOpen := metrics.circuitBreakerCurrentState.WithLabelValues(circuitbreaker.HalfOpenState.String()) gaugeClosed := metrics.circuitBreakerCurrentState.WithLabelValues(circuitbreaker.ClosedState.String()) @@ -109,12 +98,6 @@ func newCircuitBreaker(cfg CircuitBreakerConfig, ingesterID string, logger log.L cbBuilder := circuitbreaker.Builder[any](). WithFailureThreshold(cfg.FailureThreshold). WithDelay(cfg.CooldownPeriod). - OnFailure(func(failsafe.ExecutionEvent[any]) { - countError.Inc() - }). - OnSuccess(func(failsafe.ExecutionEvent[any]) { - countSuccess.Inc() - }). OnClose(func(event circuitbreaker.StateChangedEvent) { transitionClosed.Inc() gaugeOpen.Set(0) @@ -151,7 +134,6 @@ func newCircuitBreaker(cfg CircuitBreakerConfig, ingesterID string, logger log.L return &circuitBreaker{ cfg: cfg, CircuitBreaker: cb, - executor: failsafe.NewExecutor[any](cb), ingesterID: ingesterID, logger: logger, metrics: metrics, @@ -187,68 +169,48 @@ func (cb *circuitBreaker) isActive() bool { return cb.startTime.Before(time.Now()) } -func (cb *circuitBreaker) get(ctx context.Context, op func() (any, error)) (any, error) { - res, err := cb.executor.Get(op) - if err != nil && errors.Is(err, circuitbreaker.ErrOpen) { +func (cb *circuitBreaker) tryAcquirePermit() error { + if !cb.isActive() { + return nil + } + if !cb.CircuitBreaker.TryAcquirePermit() { cb.metrics.circuitBreakerResults.WithLabelValues(cb.ingesterID, resultOpen).Inc() - cbOpenErr := middleware.DoNotLogError{Err: newCircuitBreakerOpenError(cb.RemainingDelay())} - return res, newErrorWithStatus(cbOpenErr, codes.Unavailable) + return newCircuitBreakerOpenError(cb.RemainingDelay()) } - return res, cb.processError(ctx, err) + return nil } -func (cb *circuitBreaker) processError(ctx context.Context, err error) error { - if err == nil { - return nil - } - if errors.Is(err, ctx.Err()) { - // ctx.Err() was registered with the circuit breaker's executor, but we don't propagate it - return nil +func (cb *circuitBreaker) recordSuccess() { + if !cb.isActive() { + return } + cb.CircuitBreaker.RecordSuccess() + cb.metrics.circuitBreakerResults.WithLabelValues(cb.ingesterID, resultSuccess).Inc() +} - return err +func (cb *circuitBreaker) recordError(err error) { + if !cb.isActive() { + return + } + cb.CircuitBreaker.RecordError(err) + cb.metrics.circuitBreakerResults.WithLabelValues(cb.ingesterID, resultError).Inc() } -func (cb *circuitBreaker) contextWithTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { - if timeout == 0 { - timeout = defaultTimeout +func (cb *circuitBreaker) finishPushRequest(ctx context.Context, startTimestamp time.Time, err error) { + if !cb.isActive() { + return } - ctx, cancel := context.WithTimeout(context.WithoutCancel(parent), timeout) if cb.cfg.testModeEnabled { - if initialDelay, ok := parent.Value(testDelayKey).(time.Duration); ok { + if initialDelay, ok := ctx.Value(testDelayKey).(time.Duration); ok { time.Sleep(initialDelay) } } - return ctx, cancel -} - -func (cb *circuitBreaker) StartPushRequest(ctx context.Context, reqSize int64, startPushRequest startPushRequestFn) (context.Context, error) { - callbackCtx, callbackErr := cb.get(ctx, func() (any, error) { - callbackCtx, _, callbackErr := startPushRequest(ctx, reqSize) - return callbackCtx, callbackErr - }) - if callbackErr == nil { - return callbackCtx.(context.Context), nil + if cb.cfg.PushTimeout < time.Since(startTimestamp) { + err = context.DeadlineExceeded } - return nil, callbackErr -} - -func (cb *circuitBreaker) Push(parent context.Context, req *mimirpb.WriteRequest, push pushRequestFn) (*mimirpb.WriteResponse, error) { - ctx, cancel := cb.contextWithTimeout(parent, cb.cfg.PushTimeout) - defer cancel() - - callbackResult, callbackErr := cb.get(ctx, func() (any, error) { - callbackResult, callbackErr := push(ctx, req) - if callbackErr != nil { - return callbackResult, callbackErr - } - - // We return ctx.Err() in order to register it with the circuit breaker's executor. - return callbackResult, ctx.Err() - }) - - if callbackResult == nil { - return nil, callbackErr + if err == nil { + cb.recordSuccess() + } else { + cb.recordError(err) } - return callbackResult.(*mimirpb.WriteResponse), callbackErr } diff --git a/pkg/ingester/circuitbreaker_test.go b/pkg/ingester/circuitbreaker_test.go index 58cf861ce93..37f8e11bff9 100644 --- a/pkg/ingester/circuitbreaker_test.go +++ b/pkg/ingester/circuitbreaker_test.go @@ -9,7 +9,6 @@ import ( "testing" "time" - "github.com/grafana/dskit/grpcutil" "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/services" "github.com/grafana/dskit/test" @@ -19,9 +18,9 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/grpc/codes" "github.com/grafana/mimir/pkg/mimirpb" + "github.com/grafana/mimir/pkg/util/validation" ) func TestIngester_Push_CircuitBreaker(t *testing.T) { @@ -76,8 +75,9 @@ func TestIngester_Push_CircuitBreaker(t *testing.T) { testModeEnabled: true, } - i, err := prepareIngesterWithBlocksStorage(t, cfg, nil, registry) + overrides, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) + i, _, _ := createTestIngesterWithIngestStorage(t, &cfg, overrides, registry) require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -95,7 +95,7 @@ func TestIngester_Push_CircuitBreaker(t *testing.T) { nil, mimirpb.API, ) - _, err = i.Push(ctx, req) + err = i.PushToStorage(ctx, req) require.NoError(t, err) count := 0 @@ -125,7 +125,7 @@ func TestIngester_Push_CircuitBreaker(t *testing.T) { if testCase.ctx != nil { ctx = testCase.ctx(ctx) } - _, err = i.Push(ctx, req) + err = i.PushToStorage(ctx, req) if initialDelayEnabled { if testCase.expectedErrorWhenCircuitBreakerClosed != nil { require.ErrorAs(t, err, &testCase.expectedErrorWhenCircuitBreakerClosed) @@ -148,15 +148,11 @@ func TestIngester_Push_CircuitBreaker(t *testing.T) { var expectedMetrics string if initialDelayEnabled { expectedMetrics = ` - # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. - # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="error"} 0 - cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="success"} 0 # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. # TYPE cortex_ingester_circuit_breaker_transitions_total counter - cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="closed"} 0 - cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="half-open"} 0 - cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="open"} 0 + cortex_ingester_circuit_breaker_transitions_total{ingester="ingester-zone-a-0",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{ingester="ingester-zone-a-0",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{ingester="ingester-zone-a-0",state="open"} 0 # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. # TYPE cortex_ingester_circuit_breaker_current_state gauge cortex_ingester_circuit_breaker_current_state{state="open"} 0 @@ -167,14 +163,14 @@ func TestIngester_Push_CircuitBreaker(t *testing.T) { expectedMetrics = ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="circuit_breaker_open"} 2 - cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="error"} 2 - cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="success"} 1 + cortex_ingester_circuit_breaker_results_total{ingester="ingester-zone-a-0",result="circuit_breaker_open"} 2 + cortex_ingester_circuit_breaker_results_total{ingester="ingester-zone-a-0",result="error"} 2 + cortex_ingester_circuit_breaker_results_total{ingester="ingester-zone-a-0",result="success"} 1 # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. # TYPE cortex_ingester_circuit_breaker_transitions_total counter - cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="closed"} 0 - cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="half-open"} 0 - cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="open"} 1 + cortex_ingester_circuit_breaker_transitions_total{ingester="ingester-zone-a-0",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{ingester="ingester-zone-a-0",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{ingester="ingester-zone-a-0",state="open"} 1 # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. # TYPE cortex_ingester_circuit_breaker_current_state gauge cortex_ingester_circuit_breaker_current_state{state="open"} 1 @@ -197,8 +193,4 @@ func checkCircuitBreakerOpenErr(ctx context.Context, err error, t *testing.T) { shouldLog, _ := optional.ShouldLog(ctx) require.False(t, shouldLog, "expected not to log via .ShouldLog()") - - s, ok := grpcutil.ErrorToStatus(err) - require.True(t, ok, "expected to be able to convert to gRPC status") - require.Equal(t, codes.Unavailable, s.Code()) } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index d376eafcfce..1a44a7fba33 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -974,9 +974,6 @@ type pushRequestState struct { // StartPushRequest checks if ingester can start push request, and increments relevant counters. // If new push request cannot be started, errors convertible to gRPC status code are returned, and metrics are updated. func (i *Ingester) StartPushRequest(ctx context.Context, reqSize int64) (context.Context, error) { - if i.circuitBreaker.isActive() { - return i.circuitBreaker.StartPushRequest(ctx, reqSize, i.startPushRequest) - } ctx, _, err := i.startPushRequest(ctx, reqSize) return ctx, err } @@ -996,6 +993,15 @@ func (i *Ingester) FinishPushRequest(ctx context.Context) { // // The shouldFinish flag tells if the caller must call finish on this request. If not, there is already someone in the call stack who will do that. func (i *Ingester) startPushRequest(ctx context.Context, reqSize int64) (_ context.Context, shouldFinish bool, err error) { + if err := i.circuitBreaker.tryAcquirePermit(); err != nil { + return nil, false, err + } + defer func() { + if err != nil { + i.circuitBreaker.recordError(err) + } + }() + if err := i.checkAvailableForPush(); err != nil { return nil, false, err } @@ -1061,7 +1067,7 @@ func (i *Ingester) finishPushRequest(reqSize int64) { } // PushWithCleanup is the Push() implementation for blocks storage and takes a WriteRequest and adds it to the TSDB head. -func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteRequest, cleanUp func()) error { +func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteRequest, cleanUp func()) (err error) { // NOTE: because we use `unsafe` in deserialisation, we must not // retain anything from `req` past the exit from this function. defer cleanUp() @@ -1080,6 +1086,11 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques } } + start := time.Now() + defer func() { + i.circuitBreaker.finishPushRequest(ctx, start, err) + }() + userID, err := tenant.TenantID(ctx) if err != nil { return err @@ -3768,13 +3779,6 @@ func (i *Ingester) PushToStorage(ctx context.Context, req *mimirpb.WriteRequest) // Push implements client.IngesterServer, which is registered into gRPC server. func (i *Ingester) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) { - if i.circuitBreaker.isActive() { - return i.circuitBreaker.Push(ctx, req, i.push) - } - return i.push(ctx, req) -} - -func (i *Ingester) push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) { if !i.cfg.PushGrpcMethodEnabled { return nil, errPushGrpcDisabled }