Skip to content

Commit

Permalink
Fix TestDistributor_QueryStream_ShouldSupportIngestStorage() tracking…
Browse files Browse the repository at this point in the history
… the ingester calls as first thing

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed Feb 22, 2024
1 parent 09aea36 commit dfee880
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 25 deletions.
46 changes: 23 additions & 23 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5599,9 +5599,6 @@ func (i *mockIngester) series() map[uint32]*mimirpb.PreallocTimeseries {
}

func (i *mockIngester) Check(context.Context, *grpc_health_v1.HealthCheckRequest, ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) {
i.Lock()
defer i.Unlock()

i.trackCall("Check")

return &grpc_health_v1.HealthCheckResponse{}, nil
Expand All @@ -5612,13 +5609,13 @@ func (i *mockIngester) Close() error {
}

func (i *mockIngester) Push(ctx context.Context, req *mimirpb.WriteRequest, _ ...grpc.CallOption) (*mimirpb.WriteResponse, error) {
i.trackCall("Push")

time.Sleep(i.pushDelay)

i.Lock()
defer i.Unlock()

i.trackCall("Push")

if !i.happy {
return nil, errFail
}
Expand Down Expand Up @@ -5690,6 +5687,8 @@ func makeWireChunk(c chunk.EncodedChunk) client.Chunk {
}

func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest, _ ...grpc.CallOption) (client.Ingester_QueryStreamClient, error) {
i.trackCall("QueryStream")

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
}
Expand All @@ -5701,8 +5700,6 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest
i.Lock()
defer i.Unlock()

i.trackCall("QueryStream")

if !i.happy {
return nil, errFail
}
Expand Down Expand Up @@ -5853,15 +5850,15 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest
}

func (i *mockIngester) QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest, _ ...grpc.CallOption) (*client.ExemplarQueryResponse, error) {
i.trackCall("QueryExemplars")

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
}

i.Lock()
defer i.Unlock()

i.trackCall("QueryExemplars")

if !i.happy {
return nil, errFail
}
Expand Down Expand Up @@ -5925,15 +5922,15 @@ func (i *mockIngester) QueryExemplars(ctx context.Context, req *client.ExemplarQ
}

func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest, _ ...grpc.CallOption) (*client.MetricsForLabelMatchersResponse, error) {
i.trackCall("MetricsForLabelMatchers")

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
}

i.Lock()
defer i.Unlock()

i.trackCall("MetricsForLabelMatchers")

if !i.happy {
return nil, errFail
}
Expand All @@ -5955,15 +5952,15 @@ func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client.
}

func (i *mockIngester) LabelValues(ctx context.Context, req *client.LabelValuesRequest, _ ...grpc.CallOption) (*client.LabelValuesResponse, error) {
i.trackCall("LabelValues")

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
}

i.Lock()
defer i.Unlock()

i.trackCall("LabelValues")

if !i.happy {
return nil, errFail
}
Expand Down Expand Up @@ -6006,15 +6003,15 @@ func (i *mockIngester) LabelValues(ctx context.Context, req *client.LabelValuesR
}

func (i *mockIngester) LabelNames(ctx context.Context, req *client.LabelNamesRequest, _ ...grpc.CallOption) (*client.LabelNamesResponse, error) {
i.trackCall("LabelNames")

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
}

i.Lock()
defer i.Unlock()

i.trackCall("LabelNames")

if !i.happy {
return nil, errFail
}
Expand All @@ -6038,15 +6035,15 @@ func (i *mockIngester) LabelNames(ctx context.Context, req *client.LabelNamesReq
}

func (i *mockIngester) MetricsMetadata(ctx context.Context, _ *client.MetricsMetadataRequest, _ ...grpc.CallOption) (*client.MetricsMetadataResponse, error) {
i.trackCall("MetricsMetadata")

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
}

i.Lock()
defer i.Unlock()

i.trackCall("MetricsMetadata")

if !i.happy {
return nil, errFail
}
Expand Down Expand Up @@ -6113,15 +6110,15 @@ func (s *labelNamesAndValuesMockStream) Recv() (*client.LabelNamesAndValuesRespo
}

func (i *mockIngester) LabelValuesCardinality(ctx context.Context, req *client.LabelValuesCardinalityRequest, _ ...grpc.CallOption) (client.Ingester_LabelValuesCardinalityClient, error) {
i.trackCall("LabelValuesCardinality")

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
}

i.Lock()
defer i.Unlock()

i.trackCall("LabelValuesCardinality")

if !i.happy {
return nil, errFail
}
Expand Down Expand Up @@ -6187,15 +6184,15 @@ func (s *labelValuesCardinalityStream) Recv() (*client.LabelValuesCardinalityRes
}

func (i *mockIngester) ActiveSeries(ctx context.Context, req *client.ActiveSeriesRequest, _ ...grpc.CallOption) (client.Ingester_ActiveSeriesClient, error) {
i.trackCall("ActiveSeries")

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
}

i.Lock()
defer i.Unlock()

i.trackCall("ActiveSeries")

if !i.happy {
return nil, errFail
}
Expand Down Expand Up @@ -6245,6 +6242,9 @@ func (s *activeSeriesStream) CloseSend() error {
}

func (i *mockIngester) trackCall(name string) {
i.Lock()
defer i.Unlock()

if i.calls == nil {
i.calls = map[string]int{}
}
Expand Down Expand Up @@ -6350,15 +6350,15 @@ func (i *mockIngester) AllUserStats(context.Context, *client.UserStatsRequest, .
}

func (i *mockIngester) UserStats(ctx context.Context, _ *client.UserStatsRequest, _ ...grpc.CallOption) (*client.UserStatsResponse, error) {
i.trackCall("UserStats")

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
}

i.Lock()
defer i.Unlock()

i.trackCall("UserStats")

if !i.happy {
return nil, errFail
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/query_ingest_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func TestDistributor_QueryStream_ShouldSupportIngestStorage(t *testing.T) {
ingesterStateByZone: testData.ingesterStateByZone,
ingesterDataByZone: testData.ingesterDataByZone,
ingesterDataTenantID: tenantID,
queryDelay: 100 * time.Millisecond, // Give some time to start the calls to all ingesters before failures are received.
queryDelay: 250 * time.Millisecond, // Give some time to start the calls to all ingesters before failures are received.
replicationFactor: 1, // Ingest storage is not expected to use it.
limits: limits,
configure: func(config *Config) {
Expand Down Expand Up @@ -543,7 +543,7 @@ func TestDistributor_QueryStream_ShouldSupportIngestStorage(t *testing.T) {

// Check how many ingesters have been queried.
// Because we return immediately on failures, it might take some time for all ingester calls to register.
test.Poll(t, 100*time.Millisecond, testData.expectedQueriedIngesters, func() any { return countMockIngestersCalls(ingesters, "QueryStream") })
test.Poll(t, 4*cfg.queryDelay, testData.expectedQueriedIngesters, func() any { return countMockIngestersCalls(ingesters, "QueryStream") })

// We expected the number of non-deduplicated chunks to be equal to the number of queried series
// given we expect 1 chunk per series.
Expand Down

0 comments on commit dfee880

Please sign in to comment.