Skip to content

Commit

Permalink
Adding test for hitting deadline when ingester.Push is used
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
  • Loading branch information
duricanikolic committed May 28, 2024
1 parent e329020 commit c0ad0a5
Showing 1 changed file with 142 additions and 1 deletion.
143 changes: 142 additions & 1 deletion pkg/ingester/circuitbreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/failsafe-go/failsafe-go/circuitbreaker"
"github.com/go-kit/log"
"github.com/gogo/status"
"github.com/grafana/dskit/middleware"
Expand Down Expand Up @@ -250,7 +251,7 @@ func TestCircuitBreaker_FinishPushRequest(t *testing.T) {
}
}

func TestIngester_Push_CircuitBreaker(t *testing.T) {
func TestIngester_PushToStorage_CircuitBreaker(t *testing.T) {
pushTimeout := 100 * time.Millisecond
tests := map[string]struct {
expectedErrorWhenCircuitBreakerClosed error
Expand Down Expand Up @@ -543,3 +544,143 @@ func TestIngester_FinishPushRequest(t *testing.T) {
})
}
}

func TestIngester_Push_CircuitBreaker_DeadlineExceeded(t *testing.T) {
pushTimeout := 100 * time.Millisecond
for initialDelayEnabled, initialDelayStatus := range map[bool]string{false: "disabled", true: "enabled"} {
t.Run(fmt.Sprintf("test slow push with initial delay %s", initialDelayStatus), func(t *testing.T) {
metricLabelAdapters := [][]mimirpb.LabelAdapter{{{Name: labels.MetricName, Value: "test"}}}
metricNames := []string{
"cortex_ingester_circuit_breaker_results_total",
"cortex_ingester_circuit_breaker_transitions_total",
"cortex_ingester_circuit_breaker_current_state",
}

registry := prometheus.NewRegistry()

// Create a mocked ingester
cfg := defaultIngesterTestConfig(t)
cfg.ActiveSeriesMetrics.IdleTimeout = 100 * time.Millisecond
failureThreshold := 2
var initialDelay time.Duration
if initialDelayEnabled {
initialDelay = 200 * time.Millisecond
}
cfg.CircuitBreakerConfig = CircuitBreakerConfig{
Enabled: true,
FailureThreshold: uint(failureThreshold),
CooldownPeriod: 10 * time.Second,
InitialDelay: initialDelay,
PushTimeout: pushTimeout,
testModeEnabled: true,
}

i, err := prepareIngesterWithBlocksStorage(t, cfg, nil, registry)
require.NoError(t, err)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until the ingester is healthy
test.Poll(t, 100*time.Millisecond, 1, func() interface{} {
return i.lifecycler.HealthyInstancesCount()
})

// the first request is successful
ctx := user.InjectOrgID(context.Background(), "test-0")
req := mimirpb.ToWriteRequest(
metricLabelAdapters,
[]mimirpb.Sample{{Value: 1, TimestampMs: 8}},
nil,
nil,
mimirpb.API,
)
err = i.PushToStorage(ctx, req)
require.NoError(t, err)

count := 0

// Push timeseries for each user
for _, userID := range []string{"test-1", "test-2"} {
reqs := []*mimirpb.WriteRequest{
mimirpb.ToWriteRequest(
metricLabelAdapters,
[]mimirpb.Sample{{Value: 1, TimestampMs: 9}},
nil,
nil,
mimirpb.API,
),
mimirpb.ToWriteRequest(
metricLabelAdapters,
[]mimirpb.Sample{{Value: 2, TimestampMs: 10}},
nil,
nil,
mimirpb.API,
),
}

for _, req := range reqs {
ctx := user.InjectOrgID(context.Background(), userID)
ctx = context.WithValue(ctx, testDelayKey, 2*pushTimeout)
count++

ctx, err = i.StartPushRequest(ctx, int64(req.Size()))
if initialDelayEnabled || count <= failureThreshold {
// If initial delay is enabled we expect no deadline exceeded errors
// to be registered with the circuit breaker.
// If initial delay is disabled, and the circuit breaker registered
// less than failureThreshold deadline exceeded errors, it is still
// closed.
require.NoError(t, err)
require.Equal(t, circuitbreaker.ClosedState, i.circuitBreaker.State())
st, ok := ctx.Value(pushReqCtxKey).(*pushRequestState)
require.True(t, ok)
require.Equal(t, int64(req.Size()), st.requestSize)
_, err = i.Push(ctx, req)
require.NoError(t, err)
i.FinishPushRequest(ctx)
} else {
require.Equal(t, circuitbreaker.OpenState, i.circuitBreaker.State())
require.Nil(t, ctx)
checkCircuitBreakerOpenErr(ctx, err, t)
}
}
}

// Check tracked Prometheus metrics
var expectedMetrics string
if initialDelayEnabled {
expectedMetrics = `
# HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state.
# TYPE cortex_ingester_circuit_breaker_transitions_total counter
cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="closed"} 0
cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="half-open"} 0
cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="open"} 0
# HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name.
# TYPE cortex_ingester_circuit_breaker_current_state gauge
cortex_ingester_circuit_breaker_current_state{state="open"} 0
cortex_ingester_circuit_breaker_current_state{state="half-open"} 0
cortex_ingester_circuit_breaker_current_state{state="closed"} 1
`
} else {
expectedMetrics = `
# HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker.
# TYPE cortex_ingester_circuit_breaker_results_total counter
cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="circuit_breaker_open"} 2
cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="error"} 2
# HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state.
# TYPE cortex_ingester_circuit_breaker_transitions_total counter
cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="closed"} 0
cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="half-open"} 0
cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="open"} 1
# HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name.
# TYPE cortex_ingester_circuit_breaker_current_state gauge
cortex_ingester_circuit_breaker_current_state{state="open"} 1
cortex_ingester_circuit_breaker_current_state{state="half-open"} 0
cortex_ingester_circuit_breaker_current_state{state="closed"} 0
`
}
assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...))
})
}
}

0 comments on commit c0ad0a5

Please sign in to comment.