From 1d3a8bb22563f8711749023aecc205ea6584eef4 Mon Sep 17 00:00:00 2001 From: Yuri Nikolic Date: Mon, 3 Jun 2024 23:56:18 +0200 Subject: [PATCH] Get rid of test-delay key from in context Signed-off-by: Yuri Nikolic --- pkg/ingester/circuitbreaker.go | 20 +++++++++----------- pkg/ingester/circuitbreaker_test.go | 16 ++++++---------- pkg/ingester/ingester.go | 2 +- 3 files changed, 16 insertions(+), 22 deletions(-) diff --git a/pkg/ingester/circuitbreaker.go b/pkg/ingester/circuitbreaker.go index da9477fc643..b4684b75f7a 100644 --- a/pkg/ingester/circuitbreaker.go +++ b/pkg/ingester/circuitbreaker.go @@ -20,14 +20,11 @@ import ( "github.com/grafana/mimir/pkg/mimirpb" ) -type testCtxKey string - const ( - resultSuccess = "success" - resultError = "error" - resultOpen = "circuit_breaker_open" - defaultPushTimeout = 2 * time.Second - testDelayKey testCtxKey = "test-delay" + resultSuccess = "success" + resultError = "error" + resultOpen = "circuit_breaker_open" + defaultPushTimeout = 2 * time.Second ) type circuitBreakerMetrics struct { @@ -98,6 +95,9 @@ type circuitBreaker struct { metrics *circuitBreakerMetrics active atomic.Bool cb circuitbreaker.CircuitBreaker[any] + + // testRequestDelay is needed for testing purposes to simulate long lasting requests + testRequestDelay time.Duration } func newCircuitBreaker(cfg CircuitBreakerConfig, isActive bool, logger log.Logger, registerer prometheus.Registerer) *circuitBreaker { @@ -216,14 +216,12 @@ func (cb *circuitBreaker) recordResult(err error) { // It records the result of the push request with the circuit breaker. Push requests // that lasted longer than the configured timeout are treated as a failure. // The returned error is only used for testing purposes. -func (cb *circuitBreaker) finishPushRequest(ctx context.Context, duration time.Duration, pushErr error) error { +func (cb *circuitBreaker) finishPushRequest(duration time.Duration, pushErr error) error { if !cb.isActive() { return nil } if cb.cfg.testModeEnabled { - if testDelay, ok := ctx.Value(testDelayKey).(time.Duration); ok { - duration += testDelay - } + duration += cb.testRequestDelay } if cb.cfg.PushTimeout < duration { pushErr = context.DeadlineExceeded diff --git a/pkg/ingester/circuitbreaker_test.go b/pkg/ingester/circuitbreaker_test.go index f84e2008437..4d65214859a 100644 --- a/pkg/ingester/circuitbreaker_test.go +++ b/pkg/ingester/circuitbreaker_test.go @@ -315,14 +315,13 @@ func TestCircuitBreaker_FinishPushRequest(t *testing.T) { for testName, testCase := range testCases { t.Run(testName, func(t *testing.T) { registry := prometheus.NewRegistry() - ctx := context.Background() cfg := CircuitBreakerConfig{ Enabled: true, InitialDelay: testCase.initialDelay, PushTimeout: 2 * time.Second, } cb := newCircuitBreaker(cfg, cfg.InitialDelay == 0, log.NewNopLogger(), registry) - err := cb.finishPushRequest(ctx, testCase.pushRequestDuration, testCase.err) + err := cb.finishPushRequest(testCase.pushRequestDuration, testCase.err) if testCase.expectedErr == nil { require.NoError(t, err) } else { @@ -337,15 +336,13 @@ func TestIngester_PushToStorage_CircuitBreaker(t *testing.T) { pushTimeout := 100 * time.Millisecond tests := map[string]struct { expectedErrorWhenCircuitBreakerClosed error - ctx func(context.Context) context.Context + pushRequestDelay time.Duration limits InstanceLimits }{ "deadline exceeded": { expectedErrorWhenCircuitBreakerClosed: nil, limits: InstanceLimits{MaxInMemoryTenants: 3}, - ctx: func(ctx context.Context) context.Context { - return context.WithValue(ctx, testDelayKey, 2*pushTimeout) - }, + pushRequestDelay: pushTimeout, }, "instance limit hit": { expectedErrorWhenCircuitBreakerClosed: instanceLimitReachedError{}, @@ -432,9 +429,7 @@ func TestIngester_PushToStorage_CircuitBreaker(t *testing.T) { for _, req := range reqs { ctx := user.InjectOrgID(context.Background(), userID) count++ - if testCase.ctx != nil { - ctx = testCase.ctx(ctx) - } + i.circuitBreaker.testRequestDelay = testCase.pushRequestDelay err = i.PushToStorage(ctx, req) if initialDelayEnabled { if testCase.expectedErrorWhenCircuitBreakerClosed != nil { @@ -746,7 +741,8 @@ func TestIngester_Push_CircuitBreaker_DeadlineExceeded(t *testing.T) { for _, req := range reqs { ctx := user.InjectOrgID(context.Background(), userID) - ctx = context.WithValue(ctx, testDelayKey, 2*pushTimeout) + // Configure circuit breaker to delay push requests. + i.circuitBreaker.testRequestDelay = pushTimeout count++ ctx, err = i.StartPushRequest(ctx, int64(req.Size())) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 0767ca861db..f44c65fb27d 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1005,7 +1005,7 @@ func (i *Ingester) FinishPushRequest(ctx context.Context) { i.inflightPushRequestsBytes.Sub(st.requestSize) } if st.acquiredCircuitBreakerPermit { - _ = i.circuitBreaker.finishPushRequest(ctx, st.requestDuration, st.pushErr) + _ = i.circuitBreaker.finishPushRequest(st.requestDuration, st.pushErr) } }