Skip to content

Commit

Permalink
Get rid of test-delay key from in context
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
  • Loading branch information
duricanikolic committed Jun 4, 2024
1 parent e3b0849 commit 1d3a8bb
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 22 deletions.
20 changes: 9 additions & 11 deletions pkg/ingester/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ import (
"github.com/grafana/mimir/pkg/mimirpb"
)

type testCtxKey string

const (
resultSuccess = "success"
resultError = "error"
resultOpen = "circuit_breaker_open"
defaultPushTimeout = 2 * time.Second
testDelayKey testCtxKey = "test-delay"
resultSuccess = "success"
resultError = "error"
resultOpen = "circuit_breaker_open"
defaultPushTimeout = 2 * time.Second
)

type circuitBreakerMetrics struct {
Expand Down Expand Up @@ -98,6 +95,9 @@ type circuitBreaker struct {
metrics *circuitBreakerMetrics
active atomic.Bool
cb circuitbreaker.CircuitBreaker[any]

// testRequestDelay is needed for testing purposes to simulate long lasting requests
testRequestDelay time.Duration
}

func newCircuitBreaker(cfg CircuitBreakerConfig, isActive bool, logger log.Logger, registerer prometheus.Registerer) *circuitBreaker {
Expand Down Expand Up @@ -216,14 +216,12 @@ func (cb *circuitBreaker) recordResult(err error) {
// It records the result of the push request with the circuit breaker. Push requests
// that lasted longer than the configured timeout are treated as a failure.
// The returned error is only used for testing purposes.
func (cb *circuitBreaker) finishPushRequest(ctx context.Context, duration time.Duration, pushErr error) error {
func (cb *circuitBreaker) finishPushRequest(duration time.Duration, pushErr error) error {
if !cb.isActive() {
return nil
}
if cb.cfg.testModeEnabled {
if testDelay, ok := ctx.Value(testDelayKey).(time.Duration); ok {
duration += testDelay
}
duration += cb.testRequestDelay
}
if cb.cfg.PushTimeout < duration {
pushErr = context.DeadlineExceeded
Expand Down
16 changes: 6 additions & 10 deletions pkg/ingester/circuitbreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,13 @@ func TestCircuitBreaker_FinishPushRequest(t *testing.T) {
for testName, testCase := range testCases {
t.Run(testName, func(t *testing.T) {
registry := prometheus.NewRegistry()
ctx := context.Background()
cfg := CircuitBreakerConfig{
Enabled: true,
InitialDelay: testCase.initialDelay,
PushTimeout: 2 * time.Second,
}
cb := newCircuitBreaker(cfg, cfg.InitialDelay == 0, log.NewNopLogger(), registry)
err := cb.finishPushRequest(ctx, testCase.pushRequestDuration, testCase.err)
err := cb.finishPushRequest(testCase.pushRequestDuration, testCase.err)
if testCase.expectedErr == nil {
require.NoError(t, err)
} else {
Expand All @@ -337,15 +336,13 @@ func TestIngester_PushToStorage_CircuitBreaker(t *testing.T) {
pushTimeout := 100 * time.Millisecond
tests := map[string]struct {
expectedErrorWhenCircuitBreakerClosed error
ctx func(context.Context) context.Context
pushRequestDelay time.Duration
limits InstanceLimits
}{
"deadline exceeded": {
expectedErrorWhenCircuitBreakerClosed: nil,
limits: InstanceLimits{MaxInMemoryTenants: 3},
ctx: func(ctx context.Context) context.Context {
return context.WithValue(ctx, testDelayKey, 2*pushTimeout)
},
pushRequestDelay: pushTimeout,
},
"instance limit hit": {
expectedErrorWhenCircuitBreakerClosed: instanceLimitReachedError{},
Expand Down Expand Up @@ -432,9 +429,7 @@ func TestIngester_PushToStorage_CircuitBreaker(t *testing.T) {
for _, req := range reqs {
ctx := user.InjectOrgID(context.Background(), userID)
count++
if testCase.ctx != nil {
ctx = testCase.ctx(ctx)
}
i.circuitBreaker.testRequestDelay = testCase.pushRequestDelay
err = i.PushToStorage(ctx, req)
if initialDelayEnabled {
if testCase.expectedErrorWhenCircuitBreakerClosed != nil {
Expand Down Expand Up @@ -746,7 +741,8 @@ func TestIngester_Push_CircuitBreaker_DeadlineExceeded(t *testing.T) {

for _, req := range reqs {
ctx := user.InjectOrgID(context.Background(), userID)
ctx = context.WithValue(ctx, testDelayKey, 2*pushTimeout)
// Configure circuit breaker to delay push requests.
i.circuitBreaker.testRequestDelay = pushTimeout
count++

ctx, err = i.StartPushRequest(ctx, int64(req.Size()))
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ func (i *Ingester) FinishPushRequest(ctx context.Context) {
i.inflightPushRequestsBytes.Sub(st.requestSize)
}
if st.acquiredCircuitBreakerPermit {
_ = i.circuitBreaker.finishPushRequest(ctx, st.requestDuration, st.pushErr)
_ = i.circuitBreaker.finishPushRequest(st.requestDuration, st.pushErr)
}
}

Expand Down

0 comments on commit 1d3a8bb

Please sign in to comment.