Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix premature context cancellation in Distributor.QueryStream() when experimental ingest storage is enabled #7437

Merged
merged 2 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/golang/snappy v0.0.4
github.com/google/gopacket v1.1.19
github.com/gorilla/mux v1.8.1
github.com/grafana/dskit v0.0.0-20240221153930-3050c8bcdb44
github.com/grafana/dskit v0.0.0-20240222125137-29d1a0513264
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/json-iterator/go v1.1.12
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,8 @@ github.com/gosimple/slug v1.1.1 h1:fRu/digW+NMwBIP+RmviTK97Ho/bEj/C9swrCspN3D4=
github.com/gosimple/slug v1.1.1/go.mod h1:ER78kgg1Mv0NQGlXiDe57DpCyfbNywXXZ9mIorhxAf0=
github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85Tnn+WEvr8fDpfwibmEPgfgFEaC87G24=
github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4=
github.com/grafana/dskit v0.0.0-20240221153930-3050c8bcdb44 h1:97IsHGRXaaYirbFGMtso+hClAbhgiMTZxyVzWRUtTj4=
github.com/grafana/dskit v0.0.0-20240221153930-3050c8bcdb44/go.mod h1:x5DMwyr1kyirtHOxoFSZ7RnyOgHdGh03ZruupdPetQM=
github.com/grafana/dskit v0.0.0-20240222125137-29d1a0513264 h1:StdGUGGgaBYvFu/HB2LfvsyBJaaIDN2t6pdgG7/0MSg=
github.com/grafana/dskit v0.0.0-20240222125137-29d1a0513264/go.mod h1:x5DMwyr1kyirtHOxoFSZ7RnyOgHdGh03ZruupdPetQM=
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM=
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI=
github.com/grafana/goautoneg v0.0.0-20231010094147-47ce5e72a9ae h1:Yxbw9jKGJVC6qAK5Ubzzb/qZwM6rRMMqaDc/d4Vp3pM=
Expand Down
73 changes: 48 additions & 25 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5599,9 +5599,6 @@ func (i *mockIngester) series() map[uint32]*mimirpb.PreallocTimeseries {
}

func (i *mockIngester) Check(context.Context, *grpc_health_v1.HealthCheckRequest, ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) {
i.Lock()
defer i.Unlock()

i.trackCall("Check")
Copy link
Collaborator Author

@pracucci pracucci Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: I moved calls to i.trackCall() as first thing and it takes the lock. See details here.


return &grpc_health_v1.HealthCheckResponse{}, nil
Expand All @@ -5612,13 +5609,13 @@ func (i *mockIngester) Close() error {
}

func (i *mockIngester) Push(ctx context.Context, req *mimirpb.WriteRequest, _ ...grpc.CallOption) (*mimirpb.WriteResponse, error) {
i.trackCall("Push")

time.Sleep(i.pushDelay)

i.Lock()
defer i.Unlock()

i.trackCall("Push")

if !i.happy {
return nil, errFail
}
Expand Down Expand Up @@ -5690,17 +5687,19 @@ func makeWireChunk(c chunk.EncodedChunk) client.Chunk {
}

func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest, _ ...grpc.CallOption) (client.Ingester_QueryStreamClient, error) {
i.trackCall("QueryStream")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviews: moving call to trackCall() fixes TestDistributor_QueryStream_ShouldSupportIngestStorage() flakyness. Why? Because when ingester requests are NOT minimized, the context gets canceled as soon as we reach quorum. There are cases where it gets canceled before enforceReadConsistency() is called. In such cases, the trackCall() was not called.


if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
}

time.Sleep(i.queryDelay)
if err := i.enforceQueryDelay(ctx); err != nil {
return nil, err
}

i.Lock()
defer i.Unlock()

i.trackCall("QueryStream")

if !i.happy {
return nil, errFail
}
Expand Down Expand Up @@ -5845,20 +5844,21 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest
}

return &stream{
ctx: ctx,
results: results,
}, nil
}

func (i *mockIngester) QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest, _ ...grpc.CallOption) (*client.ExemplarQueryResponse, error) {
i.trackCall("QueryExemplars")

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
}

i.Lock()
defer i.Unlock()

i.trackCall("QueryExemplars")

if !i.happy {
return nil, errFail
}
Expand Down Expand Up @@ -5922,15 +5922,15 @@ func (i *mockIngester) QueryExemplars(ctx context.Context, req *client.ExemplarQ
}

func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest, _ ...grpc.CallOption) (*client.MetricsForLabelMatchersResponse, error) {
i.trackCall("MetricsForLabelMatchers")

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
}

i.Lock()
defer i.Unlock()

i.trackCall("MetricsForLabelMatchers")

if !i.happy {
return nil, errFail
}
Expand All @@ -5952,15 +5952,15 @@ func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client.
}

func (i *mockIngester) LabelValues(ctx context.Context, req *client.LabelValuesRequest, _ ...grpc.CallOption) (*client.LabelValuesResponse, error) {
i.trackCall("LabelValues")

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
}

i.Lock()
defer i.Unlock()

i.trackCall("LabelValues")

if !i.happy {
return nil, errFail
}
Expand Down Expand Up @@ -6003,15 +6003,15 @@ func (i *mockIngester) LabelValues(ctx context.Context, req *client.LabelValuesR
}

func (i *mockIngester) LabelNames(ctx context.Context, req *client.LabelNamesRequest, _ ...grpc.CallOption) (*client.LabelNamesResponse, error) {
i.trackCall("LabelNames")

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
}

i.Lock()
defer i.Unlock()

i.trackCall("LabelNames")

if !i.happy {
return nil, errFail
}
Expand All @@ -6035,15 +6035,15 @@ func (i *mockIngester) LabelNames(ctx context.Context, req *client.LabelNamesReq
}

func (i *mockIngester) MetricsMetadata(ctx context.Context, _ *client.MetricsMetadataRequest, _ ...grpc.CallOption) (*client.MetricsMetadataResponse, error) {
i.trackCall("MetricsMetadata")

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
}

i.Lock()
defer i.Unlock()

i.trackCall("MetricsMetadata")

if !i.happy {
return nil, errFail
}
Expand Down Expand Up @@ -6110,15 +6110,15 @@ func (s *labelNamesAndValuesMockStream) Recv() (*client.LabelNamesAndValuesRespo
}

func (i *mockIngester) LabelValuesCardinality(ctx context.Context, req *client.LabelValuesCardinalityRequest, _ ...grpc.CallOption) (client.Ingester_LabelValuesCardinalityClient, error) {
i.trackCall("LabelValuesCardinality")

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
}

i.Lock()
defer i.Unlock()

i.trackCall("LabelValuesCardinality")

if !i.happy {
return nil, errFail
}
Expand Down Expand Up @@ -6184,15 +6184,15 @@ func (s *labelValuesCardinalityStream) Recv() (*client.LabelValuesCardinalityRes
}

func (i *mockIngester) ActiveSeries(ctx context.Context, req *client.ActiveSeriesRequest, _ ...grpc.CallOption) (client.Ingester_ActiveSeriesClient, error) {
i.trackCall("ActiveSeries")

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
}

i.Lock()
defer i.Unlock()

i.trackCall("ActiveSeries")

if !i.happy {
return nil, errFail
}
Expand Down Expand Up @@ -6242,6 +6242,9 @@ func (s *activeSeriesStream) CloseSend() error {
}

func (i *mockIngester) trackCall(name string) {
i.Lock()
defer i.Unlock()

if i.calls == nil {
i.calls = map[string]int{}
}
Expand Down Expand Up @@ -6270,6 +6273,16 @@ func (i *mockIngester) enforceReadConsistency(ctx context.Context) error {
return i.partitionReader.WaitReadConsistency(ctx)
}

func (i *mockIngester) enforceQueryDelay(ctx context.Context) error {
select {
case <-time.After(i.queryDelay):
return nil

case <-ctx.Done():
return ctx.Err()
}
}

type mockIngesterPusherAdapter struct {
ingester *mockIngester
}
Expand Down Expand Up @@ -6301,6 +6314,10 @@ func (i *noopIngester) Push(context.Context, *mimirpb.WriteRequest, ...grpc.Call

type stream struct {
grpc.ClientStream

// The mocked gRPC client's context.
ctx context.Context

i int
results []*client.QueryStreamResponse
}
Expand All @@ -6310,6 +6327,12 @@ func (*stream) CloseSend() error {
}

func (s *stream) Recv() (*client.QueryStreamResponse, error) {
// Check whether the context has been canceled, so that we can test the case the context
// gets cancelled while reading messages from gRPC client.
if s.ctx.Err() != nil {
return nil, s.ctx.Err()
}

if s.i >= len(s.results) {
return nil, io.EOF
}
Expand All @@ -6319,23 +6342,23 @@ func (s *stream) Recv() (*client.QueryStreamResponse, error) {
}

func (s *stream) Context() context.Context {
return context.Background()
return s.ctx
}

func (i *mockIngester) AllUserStats(context.Context, *client.UserStatsRequest, ...grpc.CallOption) (*client.UsersStatsResponse, error) {
return &i.stats, nil
}

func (i *mockIngester) UserStats(ctx context.Context, _ *client.UserStatsRequest, _ ...grpc.CallOption) (*client.UserStatsResponse, error) {
i.trackCall("UserStats")

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
}

i.Lock()
defer i.Unlock()

i.trackCall("UserStats")

if !i.happy {
return nil, errFail
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/go-kit/log/level"
"github.com/grafana/dskit/cancellation"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/instrument"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/tenant"
Expand Down Expand Up @@ -221,6 +220,8 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx)
reqStats := stats.FromContext(ctx)

// queryIngester MUST call cancelContext once processing is completed in order to release resources. It's required
// by ring.DoMultiUntilQuorumWithoutSuccessfulContextCancellation() to properly release resources.
queryIngester := func(ctx context.Context, ing *ring.InstanceDesc, cancelContext context.CancelCauseFunc) (ingesterQueryResult, error) {
log, ctx := spanlogger.NewWithLogger(ctx, d.log, "Distributor.queryIngesterStream")
cleanup := func() {
Expand Down Expand Up @@ -352,9 +353,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
quorumConfig := d.queryQuorumConfigForReplicationSets(ctx, replicationSets)
quorumConfig.IsTerminalError = validation.IsLimitError

results, err := concurrency.ForEachJobMergeResults[ring.ReplicationSet, ingesterQueryResult](ctx, replicationSets, 0, func(ctx context.Context, replicationSet ring.ReplicationSet) ([]ingesterQueryResult, error) {
return ring.DoUntilQuorumWithoutSuccessfulContextCancellation(ctx, replicationSet, quorumConfig, queryIngester, cleanup)
})
results, err := ring.DoMultiUntilQuorumWithoutSuccessfulContextCancellation(ctx, replicationSets, quorumConfig, queryIngester, cleanup)
if err != nil {
return ingester_client.CombinedQueryStreamResponse{}, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/query_ingest_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func TestDistributor_QueryStream_ShouldSupportIngestStorage(t *testing.T) {
ingesterStateByZone: testData.ingesterStateByZone,
ingesterDataByZone: testData.ingesterDataByZone,
ingesterDataTenantID: tenantID,
queryDelay: 100 * time.Millisecond, // Give some time to start the calls to all ingesters before failures are received.
queryDelay: 250 * time.Millisecond, // Give some time to start the calls to all ingesters before failures are received.
replicationFactor: 1, // Ingest storage is not expected to use it.
limits: limits,
configure: func(config *Config) {
Expand Down Expand Up @@ -543,7 +543,7 @@ func TestDistributor_QueryStream_ShouldSupportIngestStorage(t *testing.T) {

// Check how many ingesters have been queried.
// Because we return immediately on failures, it might take some time for all ingester calls to register.
test.Poll(t, 100*time.Millisecond, testData.expectedQueriedIngesters, func() any { return countMockIngestersCalls(ingesters, "QueryStream") })
test.Poll(t, 4*cfg.queryDelay, testData.expectedQueriedIngesters, func() any { return countMockIngestersCalls(ingesters, "QueryStream") })

// We expected the number of non-deduplicated chunks to be equal to the number of queried series
// given we expect 1 chunk per series.
Expand Down
64 changes: 64 additions & 0 deletions pkg/distributor/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,70 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs
assert.Equal(t, err, limiter.NewMaxChunkBytesHitLimitError(uint64(maxBytesLimit)))
}

func TestDistributor_QueryStream_ShouldSuccessfullyRunOnSlowIngesterWithStreamingChunksIsEnabled(t *testing.T) {
const (
numSeries = 20
numQueries = 3
)

for _, ingestStorageEnabled := range []bool{false, true} {
ingestStorageEnabled := ingestStorageEnabled

t.Run(fmt.Sprintf("ingest storage enabled: %t", ingestStorageEnabled), func(t *testing.T) {
t.Parallel()

// Prepare distributors.
distributors, ingesters, reg, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
replicationFactor: 1, // Use replication factor of 1 so that we always wait the response from all ingesters.
ingestStorageEnabled: ingestStorageEnabled,
ingestStoragePartitions: 3,
configure: func(cfg *Config) {
cfg.PreferStreamingChunksFromIngesters = true
},
})

// Mock 1 ingester to be slow.
ingesters[0].queryDelay = time.Second

// Ensure strong read consistency, required to have no flaky tests when ingest storage is enabled.
ctx := user.InjectOrgID(context.Background(), "test")
ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)

// Push series.
for seriesID := 0; seriesID < numSeries; seriesID++ {
_, err := distributors[0].Push(ctx, makeWriteRequest(0, 1, 0, false, false, fmt.Sprintf("series_%d", seriesID)))
require.NoError(t, err)
}

// Query back multiple times and ensure each response is consistent.
matchers := labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, "series_.*")
queryMetrics := stats.NewQueryMetrics(reg[0])

for i := 1; i <= numQueries; i++ {
t.Run(fmt.Sprintf("Query #%d", i), func(t *testing.T) {
t.Parallel()

res, err := distributors[0].QueryStream(ctx, queryMetrics, math.MinInt32, math.MaxInt32, matchers)
require.NoError(t, err)
require.Equal(t, numSeries, len(res.StreamingSeries))

// Read all chunks.
for _, series := range res.StreamingSeries {
for sourceIdx, source := range series.Sources {
_, err := source.StreamReader.GetChunks(source.SeriesIndex)
require.NoErrorf(t, err, "GetChunks() from stream reader for series %d from source %d", source.SeriesIndex, sourceIdx)
}
}
})
}
})
}

}

func TestMergeSamplesIntoFirstDuplicates(t *testing.T) {
a := []mimirpb.Sample{
{Value: 1.084537996, TimestampMs: 1583946732744},
Expand Down
Loading
Loading