Skip to content

Commit

Permalink
Auto-detect ingestStoragePartitions in tests
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed Feb 16, 2024
1 parent 8c721df commit 7916307
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 51 deletions.
15 changes: 4 additions & 11 deletions pkg/distributor/distributor_ingest_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,19 +519,12 @@ func TestDistributor_UserStats_ShouldSupportIngestStorage(t *testing.T) {
t.Run(fmt.Sprintf("minimize ingester requests: %t", minimizeIngesterRequests), func(t *testing.T) {
t.Parallel()

// Find the max number of ingesters in a zone. It will be used as the number of partitions.
numPartitions := 0
for _, state := range testData.ingesterStateByZone {
numPartitions = max(numPartitions, max(state.numIngesters, len(state.states)))
}

// Create distributor
distributors, _, _, _ := prepare(t, prepConfig{
numDistributors: 1,
ingesterStateByZone: testData.ingesterStateByZone,
ingesterDataByZone: testData.ingesterDataByZone,
ingestStorageEnabled: true,
ingestStoragePartitions: int32(numPartitions),
numDistributors: 1,
ingesterStateByZone: testData.ingesterStateByZone,
ingesterDataByZone: testData.ingesterDataByZone,
ingestStorageEnabled: true,
configure: func(config *Config) {
config.PreferAvailabilityZone = preferredZone
config.MinimizeIngesterRequests = minimizeIngesterRequests
Expand Down
71 changes: 38 additions & 33 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2246,7 +2246,6 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {

if ingestStorageEnabled {
testConfig.ingestStorageEnabled = true
testConfig.ingestStoragePartitions = numIngesters
testConfig.limits = prepareDefaultLimits()
testConfig.limits.IngestionPartitionsTenantShardSize = testData.shuffleShardSize
} else {
Expand Down Expand Up @@ -2510,7 +2509,6 @@ func TestDistributor_LabelNames(t *testing.T) {

if ingestStorageEnabled {
testConfig.ingestStorageEnabled = true
testConfig.ingestStoragePartitions = numIngesters

testConfig.limits = prepareDefaultLimits()
testConfig.limits.IngestionPartitionsTenantShardSize = testData.shuffleShardSize
Expand Down Expand Up @@ -2591,7 +2589,6 @@ func TestDistributor_MetricsMetadata(t *testing.T) {

if ingestStorageEnabled {
testConfig.ingestStorageEnabled = true
testConfig.ingestStoragePartitions = numIngesters
testConfig.limits = prepareDefaultLimits()
testConfig.limits.IngestionPartitionsTenantShardSize = testData.shuffleShardSize
} else {
Expand Down Expand Up @@ -2688,14 +2685,11 @@ func TestDistributor_LabelNamesAndValuesLimitTest(t *testing.T) {
flagext.DefaultValues(&limits)
limits.LabelNamesAndValuesResultsMaxSizeBytes = testData.sizeLimitBytes
ds, _, _, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
limits: &limits,

// Ingest storage config is ignored when disabled.
ingestStorageEnabled: ingestStorageEnabled,
ingestStoragePartitions: 3,
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
limits: &limits,
ingestStorageEnabled: ingestStorageEnabled,
})

// Push fixtures
Expand Down Expand Up @@ -2768,14 +2762,11 @@ func TestDistributor_LabelValuesForLabelName(t *testing.T) {

// Create distributor
ds, _, _, _ := prepare(t, prepConfig{
numIngesters: 12,
happyIngesters: 12,
numDistributors: 1,
replicationFactor: 3,

// Ingest storage config is ignored when disabled.
ingestStorageEnabled: ingestStorageEnabled,
ingestStoragePartitions: 12,
numIngesters: 12,
happyIngesters: 12,
numDistributors: 1,
replicationFactor: 3,
ingestStorageEnabled: ingestStorageEnabled,
})

// Push fixtures
Expand Down Expand Up @@ -2832,12 +2823,11 @@ func TestDistributor_LabelNamesAndValues(t *testing.T) {

// Create distributor
ds, _, _, _ := prepare(t, prepConfig{
numIngesters: 12,
happyIngesters: 12,
numDistributors: 1,
replicationFactor: 3,
ingestStorageEnabled: ingestStorageEnabled,
ingestStoragePartitions: 12,
numIngesters: 12,
happyIngesters: 12,
numDistributors: 1,
replicationFactor: 3,
ingestStorageEnabled: ingestStorageEnabled,
})

// Push fixtures
Expand Down Expand Up @@ -4105,7 +4095,7 @@ type prepConfig struct {

// Ingest storage specific configuration.
ingestStorageEnabled bool
ingestStoragePartitions int32 // Number of partitions.
ingestStoragePartitions int32 // Number of partitions. Auto-detected from configured ingesters if not explicitly set.
ingestStorageKafka *kfake.Cluster
}

Expand Down Expand Up @@ -4138,6 +4128,19 @@ func (c prepConfig) totalZones() int {
return len(c.ingesterStateByZone)
}

// maxIngestersPerZone returns the max number of ingester per zone. For example,
// if a zone has 2 ingesters and another zone has 3 ingesters, this function will
// return 3.
func (c prepConfig) maxIngestersPerZone() int {
maxIngestersPerZone := c.numIngesters

for _, state := range c.ingesterStateByZone {
maxIngestersPerZone = max(maxIngestersPerZone, max(state.numIngesters, len(state.states)))
}

return maxIngestersPerZone
}

func (c prepConfig) ingesterRingState(zone string, id int) ring.InstanceState {
if len(c.ingesterStateByZone[zone].ringStates) == 0 {
return ring.ACTIVE
Expand Down Expand Up @@ -4320,6 +4323,14 @@ func prepareDefaultLimits() *validation.Limits {
func prepare(t testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []*prometheus.Registry, *kfake.Cluster) {
ctx := context.Background()

// Apply default config.
if cfg.replicationFactor == 0 {
cfg.replicationFactor = 3
}
if cfg.ingestStorageEnabled && cfg.ingestStoragePartitions == 0 {
cfg.ingestStoragePartitions = int32(cfg.maxIngestersPerZone())
}

cfg.validate(t)

logger := log.NewNopLogger()
Expand All @@ -4343,19 +4354,13 @@ func prepare(t testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []*
)
require.NoError(t, err)

// Use a default replication factor of 3 if there isn't a provided replication factor.
rf := cfg.replicationFactor
if rf == 0 {
rf = 3
}

ingestersHeartbeatTimeout := 60 * time.Minute
ingestersRing, err := ring.New(ring.Config{
KVStore: kv.Config{
Mock: kvStore,
},
HeartbeatTimeout: ingestersHeartbeatTimeout,
ReplicationFactor: rf,
ReplicationFactor: cfg.replicationFactor,
ZoneAwarenessEnabled: cfg.totalZones() > 1,
}, ingester.IngesterRingKey, ingester.IngesterRingKey, logger, nil)
require.NoError(t, err)
Expand Down
6 changes: 0 additions & 6 deletions pkg/distributor/query_ingest_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
ingester_client "github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier/stats"
util_math "github.com/grafana/mimir/pkg/util/math"
)

func TestDistributor_QueryStream_ShouldSupportIngestStorage(t *testing.T) {
Expand Down Expand Up @@ -512,11 +511,6 @@ func TestDistributor_QueryStream_ShouldSupportIngestStorage(t *testing.T) {
},
}

// Detect the number of partitions from test scenario.
for _, zone := range testData.ingesterStateByZone {
cfg.ingestStoragePartitions = util_math.Max(cfg.ingestStoragePartitions, int32(util_math.Max(zone.numIngesters, len(zone.states))))
}

distributors, ingesters, distributorRegistries, _ := prepare(t, cfg)
require.Len(t, distributors, 1)
require.Len(t, distributorRegistries, 1)
Expand Down
1 change: 0 additions & 1 deletion pkg/distributor/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ func TestDistributor_QueryExemplars(t *testing.T) {

if ingestStorageEnabled {
testConfig.ingestStorageEnabled = true
testConfig.ingestStoragePartitions = numIngesters
testConfig.limits.IngestionPartitionsTenantShardSize = testData.shuffleShardSize
} else {
testConfig.shuffleShardSize = testData.shuffleShardSize
Expand Down

0 comments on commit 7916307

Please sign in to comment.