From 61a1c8b495480c5d557d3d6025a76876da00bf80 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 3 May 2024 14:24:20 +0200 Subject: [PATCH] store-gateway: add timeout to query gate wait (#7777) * store-gateway: add timeout to query gate wait This PR adds a timeout to query gate waiting. The store-gateway returns an empty response. The querier treats the empty response as if the blocks weren't discovered by the store-gateway and tries them again on another store-gateway. Signed-off-by: Dimitar Dimitrov * Add CHANGELOG.md entry Signed-off-by: Dimitar Dimitrov * Update pkg/storage/tsdb/config.go Co-authored-by: Charles Korn * Update CHANGELOG.md Co-authored-by: Charles Korn * Remote TODO Signed-off-by: Dimitar Dimitrov * Add a spanLog to timeoutGate Signed-off-by: Dimitar Dimitrov * Add comment on why timeoutGate is in Mimir Signed-off-by: Dimitar Dimitrov * Update docs Signed-off-by: Dimitar Dimitrov * Propagate errors from store-gateway to querier when hitting concurrency limit Signed-off-by: Dimitar Dimitrov * Fix formatting Signed-off-by: Dimitar Dimitrov * Update autogenerated config Signed-off-by: Dimitar Dimitrov * Add license header Signed-off-by: Dimitar Dimitrov * Add tests for timeoutGate Signed-off-by: Dimitar Dimitrov * Report timeoutGate timeouts as rejected_deadline_exceeded not rejected_other Signed-off-by: Dimitar Dimitrov * Properly detect custom errors in timeoutGate Signed-off-by: Dimitar Dimitrov --------- Signed-off-by: Dimitar Dimitrov Co-authored-by: Charles Korn --- CHANGELOG.md | 1 + cmd/mimir/config-descriptor.json | 11 +++ cmd/mimir/help-all.txt.tmpl | 2 + .../configuration-parameters/index.md | 7 ++ pkg/querier/blocks_store_queryable_test.go | 73 ++++++++++++++++ pkg/storage/tsdb/config.go | 26 +++--- pkg/storegateway/bucket.go | 39 ++++++--- pkg/storegateway/bucket_store_server_test.go | 12 ++- pkg/storegateway/bucket_stores.go | 36 ++++++++ pkg/storegateway/bucket_test.go | 87 ++++++++++++++++++- pkg/storegateway/error.go | 23 +++++ 11 files changed, 287 insertions(+), 30 deletions(-) create mode 100644 pkg/storegateway/error.go diff --git a/CHANGELOG.md b/CHANGELOG.md index c7198721d2e..725d0458522 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ * [ENHANCEMENT] Rules: Add metric `cortex_prometheus_rule_group_last_restore_duration_seconds` which measures how long it takes to restore rule groups using the `ALERTS_FOR_STATE` series #7974 * [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #8012 * [ENHANCEMENT] Querying: Remove OpEmptyMatch from regex concatenations. #8012 +* [ENHANCEMENT] Store-gateway: add `-blocks-storage.bucket-store.max-concurrent-queue-timeout`. When set, queries at the store-gateway's query gate will not wait longer than that to execute. If a query reaches the wait timeout, then the querier will retry the blocks on a different store-gateway. If all store-gateways are unavailable, then the query will fail with `err-mimir-store-consistency-check-failed`. #7777 * [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567 * [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520 * [BUGFIX] Ingester: Prevent timely compaction of empty blocks. #7624 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index c6cb29906f3..1ba03ebb171 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -6502,6 +6502,17 @@ "fieldType": "int", "fieldCategory": "advanced" }, + { + "kind": "field", + "name": "max_concurrent_queue_timeout", + "required": false, + "desc": "Timeout for the queue of queries waiting for execution. If the queue is full and the timeout is reached, the query will be retried on another store-gateway. 0 means no timeout and all queries will wait indefinitely for their turn.", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "blocks-storage.bucket-store.max-concurrent-queue-timeout", + "fieldType": "duration", + "fieldCategory": "advanced" + }, { "kind": "field", "name": "tenant_sync_concurrency", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 9d3d01d968f..b91968c49ea 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -537,6 +537,8 @@ Usage of ./cmd/mimir/mimir: If true, verify the checksum of index headers upon loading them (either on startup or lazily when lazy loading is enabled). Setting to true helps detect disk corruption at the cost of slowing down index header loading. -blocks-storage.bucket-store.max-concurrent int Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants. (default 100) + -blocks-storage.bucket-store.max-concurrent-queue-timeout duration + Timeout for the queue of queries waiting for execution. If the queue is full and the timeout is reached, the query will be retried on another store-gateway. 0 means no timeout and all queries will wait indefinitely for their turn. -blocks-storage.bucket-store.meta-sync-concurrency int Number of Go routines to use when syncing block meta files from object storage per tenant. (default 20) -blocks-storage.bucket-store.metadata-cache.backend string diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 73b6422693e..8e96d7d2e2f 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -3526,6 +3526,13 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.max-concurrent [max_concurrent: | default = 100] + # (advanced) Timeout for the queue of queries waiting for execution. If the + # queue is full and the timeout is reached, the query will be retried on + # another store-gateway. 0 means no timeout and all queries will wait + # indefinitely for their turn. + # CLI flag: -blocks-storage.bucket-store.max-concurrent-queue-timeout + [max_concurrent_queue_timeout: | default = 0s] + # (advanced) Maximum number of concurrent tenants synching blocks. # CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency [tenant_sync_concurrency: | default = 1] diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 97ba1b52981..04d1f7b4281 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -43,6 +43,7 @@ import ( "go.uber.org/atomic" "golang.org/x/exp/slices" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -54,6 +55,7 @@ import ( "github.com/grafana/mimir/pkg/storegateway/storegatewaypb" "github.com/grafana/mimir/pkg/storegateway/storepb" "github.com/grafana/mimir/pkg/util" + "github.com/grafana/mimir/pkg/util/globalerror" "github.com/grafana/mimir/pkg/util/limiter" "github.com/grafana/mimir/pkg/util/test" ) @@ -625,6 +627,49 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { ` }, }, + "a single store-gateway instance returns no response implies querying another instance for the same blocks (consistency check passed)": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + }, + storeSetResponses: []interface{}{ + // First attempt returns a client whose response does not include all expected blocks. + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", mockedSeriesResponses: nil, + }: {block1}, + }, + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{mockHintsResponse(block1)}, + }: {block1}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + }, + "two store-gateway instances returning no response causes consistency check to fail": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + }, + storeSetResponses: []interface{}{ + // First attempt returns a client whose response does not include all expected blocks. + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", mockedSeriesResponses: nil, + }: {block1}, + }, + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "2.2.2.2", mockedSeriesResponses: nil, + }: {block1}, + }, + // Third attempt returns an error because there are no other store-gateways left. + errors.New("no store-gateway remaining after exclude"), + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedErr: newStoreConsistencyCheckFailedError([]ulid.ULID{block1}), + }, "a single store-gateway instance has some missing blocks (consistency check failed)": { finderResult: bucketindex.Blocks{ {ID: block1}, @@ -3243,6 +3288,34 @@ func TestShouldStopQueryFunc(t *testing.T) { err: errors.New("test"), expected: false, }, + "should not stop query on store-gateway instance limit": { + err: globalerror.NewErrorWithGRPCStatus(errors.New("instance limit"), codes.Aborted, &mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT}), + expected: false, + }, + "should not stop query on store-gateway instance limit; shouldn't look at the gRPC code, only Mimir error cause": { + err: globalerror.NewErrorWithGRPCStatus(errors.New("instance limit"), codes.Internal, &mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT}), + expected: false, + }, + "should not stop query on any other mimirpb error": { + err: globalerror.NewErrorWithGRPCStatus(errors.New("instance limit"), codes.Internal, &mimirpb.ErrorDetails{Cause: mimirpb.TOO_BUSY}), + expected: false, + }, + "should not stop query on any unknown error detail": { + err: func() error { + st, createErr := status.New(codes.Internal, "test").WithDetails(&hintspb.Block{Id: "123"}) + require.NoError(t, createErr) + return st.Err() + }(), + expected: false, + }, + "should not stop query on multiple error details": { + err: func() error { + st, createErr := status.New(codes.Internal, "test").WithDetails(&hintspb.Block{Id: "123"}, &mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT}) + require.NoError(t, createErr) + return st.Err() + }(), + expected: false, + }, } for testName, testData := range tests { diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 5a6bc1ceb9e..cd0765e30b5 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -386,18 +386,19 @@ func (cfg *TSDBConfig) IsBlocksShippingEnabled() bool { // BucketStoreConfig holds the config information for Bucket Stores used by the querier and store-gateway. type BucketStoreConfig struct { - SyncDir string `yaml:"sync_dir"` - SyncInterval time.Duration `yaml:"sync_interval" category:"advanced"` - MaxConcurrent int `yaml:"max_concurrent" category:"advanced"` - TenantSyncConcurrency int `yaml:"tenant_sync_concurrency" category:"advanced"` - BlockSyncConcurrency int `yaml:"block_sync_concurrency" category:"advanced"` - MetaSyncConcurrency int `yaml:"meta_sync_concurrency" category:"advanced"` - IndexCache IndexCacheConfig `yaml:"index_cache"` - ChunksCache ChunksCacheConfig `yaml:"chunks_cache"` - MetadataCache MetadataCacheConfig `yaml:"metadata_cache"` - IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay" category:"advanced"` - BucketIndex BucketIndexConfig `yaml:"bucket_index"` - IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within" category:"advanced"` + SyncDir string `yaml:"sync_dir"` + SyncInterval time.Duration `yaml:"sync_interval" category:"advanced"` + MaxConcurrent int `yaml:"max_concurrent" category:"advanced"` + MaxConcurrentQueueTimeout time.Duration `yaml:"max_concurrent_queue_timeout" category:"advanced"` + TenantSyncConcurrency int `yaml:"tenant_sync_concurrency" category:"advanced"` + BlockSyncConcurrency int `yaml:"block_sync_concurrency" category:"advanced"` + MetaSyncConcurrency int `yaml:"meta_sync_concurrency" category:"advanced"` + IndexCache IndexCacheConfig `yaml:"index_cache"` + ChunksCache ChunksCacheConfig `yaml:"chunks_cache"` + MetadataCache MetadataCacheConfig `yaml:"metadata_cache"` + IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay" category:"advanced"` + BucketIndex BucketIndexConfig `yaml:"bucket_index"` + IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within" category:"advanced"` // Series hash cache. SeriesHashCacheMaxBytes uint64 `yaml:"series_hash_cache_max_size_bytes" category:"advanced"` @@ -448,6 +449,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.SyncInterval, "blocks-storage.bucket-store.sync-interval", 15*time.Minute, "How frequently to scan the bucket, or to refresh the bucket index (if enabled), in order to look for changes (new blocks shipped by ingesters and blocks deleted by retention or compaction).") f.Uint64Var(&cfg.SeriesHashCacheMaxBytes, "blocks-storage.bucket-store.series-hash-cache-max-size-bytes", uint64(1*units.Gibibyte), "Max size - in bytes - of the in-memory series hash cache. The cache is shared across all tenants and it's used only when query sharding is enabled.") f.IntVar(&cfg.MaxConcurrent, "blocks-storage.bucket-store.max-concurrent", 100, "Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants.") + f.DurationVar(&cfg.MaxConcurrentQueueTimeout, "blocks-storage.bucket-store.max-concurrent-queue-timeout", 0, "Timeout for the queue of queries waiting for execution. If the queue is full and the timeout is reached, the query will be retried on another store-gateway. 0 means no timeout and all queries will wait indefinitely for their turn.") f.IntVar(&cfg.TenantSyncConcurrency, "blocks-storage.bucket-store.tenant-sync-concurrency", 1, "Maximum number of concurrent tenants synching blocks.") f.IntVar(&cfg.BlockSyncConcurrency, "blocks-storage.bucket-store.block-sync-concurrency", 4, "Maximum number of concurrent blocks synching per tenant.") f.IntVar(&cfg.MetaSyncConcurrency, "blocks-storage.bucket-store.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from object storage per tenant.") diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index 5406757df9f..675cc214fb6 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -55,6 +55,7 @@ import ( "github.com/grafana/mimir/pkg/storegateway/storegatewaypb" "github.com/grafana/mimir/pkg/storegateway/storepb" "github.com/grafana/mimir/pkg/util" + "github.com/grafana/mimir/pkg/util/globalerror" "github.com/grafana/mimir/pkg/util/pool" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -546,18 +547,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor // We don't do the streaming call if we are not requesting the chunks. req.StreamingChunksBatchSize = 0 } - defer func() { - if err == nil { - return - } - code := codes.Internal - if st, ok := grpcutil.ErrorToStatus(err); ok { - code = st.Code() - } else if errors.Is(err, context.Canceled) { - code = codes.Canceled - } - err = status.Error(code, err.Error()) - }() + defer func() { err = mapSeriesError(err) }() matchers, err := storepb.MatchersToPromMatchers(req.Matchers...) if err != nil { @@ -709,6 +699,31 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor return nil } +func mapSeriesError(err error) error { + if err == nil { + return err + } + + var stGwErr storeGatewayError + switch { + case errors.As(err, &stGwErr): + switch cause := stGwErr.errorCause(); cause { + case mimirpb.INSTANCE_LIMIT: + return globalerror.NewErrorWithGRPCStatus(stGwErr, codes.Unavailable, &mimirpb.ErrorDetails{Cause: cause}) + default: + return globalerror.NewErrorWithGRPCStatus(stGwErr, codes.Internal, &mimirpb.ErrorDetails{Cause: cause}) + } + default: + code := codes.Internal + if st, ok := grpcutil.ErrorToStatus(err); ok { + code = st.Code() + } else if errors.Is(err, context.Canceled) { + code = codes.Canceled + } + return status.Error(code, err.Error()) + } +} + func (s *BucketStore) recordRequestAmbientTime(stats *safeQueryStats, requestStart time.Time) { stats.update(func(stats *queryStats) { stats.streamingSeriesAmbientTime += time.Since(requestStart) diff --git a/pkg/storegateway/bucket_store_server_test.go b/pkg/storegateway/bucket_store_server_test.go index 15914622a98..989d24fc54f 100644 --- a/pkg/storegateway/bucket_store_server_test.go +++ b/pkg/storegateway/bucket_store_server_test.go @@ -75,10 +75,7 @@ func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest ) // Create a gRPC connection to the server. - conn, err = grpc.Dial(s.serverListener.Addr().String(), - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024*1024)), - ) + conn, err = s.dialConn() if err != nil { return } @@ -258,6 +255,13 @@ func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest return } +func (s *storeTestServer) dialConn() (*grpc.ClientConn, error) { + return grpc.Dial(s.serverListener.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024*1024)), + ) +} + // Close releases all resources. func (s *storeTestServer) Close() { s.server.GracefulStop() diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 0d565b6db59..72d5c48bda4 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -26,6 +26,7 @@ import ( "github.com/thanos-io/objstore" "google.golang.org/grpc/metadata" + "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/storage/bucket" "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storage/tsdb/block" @@ -106,6 +107,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra queryGateReg := prometheus.WrapRegistererWith(prometheus.Labels{"gate": "query"}, gateReg) queryGate := gate.NewBlocking(cfg.BucketStore.MaxConcurrent) queryGate = gate.NewInstrumented(queryGateReg, cfg.BucketStore.MaxConcurrent, queryGate) + queryGate = timeoutGate{delegate: queryGate, timeout: cfg.BucketStore.MaxConcurrentQueueTimeout} // The number of concurrent index header loads from storegateway are limited. lazyLoadingGateReg := prometheus.WrapRegistererWith(prometheus.Labels{"gate": "index_header"}, gateReg) @@ -420,6 +422,40 @@ func (u *BucketStores) syncDirForUser(userID string) string { return filepath.Join(u.cfg.BucketStore.SyncDir, userID) } +// timeoutGate returns errGateTimeout when the timeout is reached while still waiting for the delegate gate. +// timeoutGate belongs better in dskit. However, at the time of writing dskit supports go 1.20. +// go 1.20 doesn't have context.WithTimeoutCause yet, +// so we choose to implement timeoutGate here instead of implementing context.WithTimeoutCause ourselves in dskit. +// It also allows to keep the span logger in timeoutGate as opposed to in the bucket store. +type timeoutGate struct { + delegate gate.Gate + timeout time.Duration +} + +var errGateTimeout = staticError{cause: mimirpb.INSTANCE_LIMIT, msg: "timeout waiting for concurrency gate"} + +func (t timeoutGate) Start(ctx context.Context) error { + if t.timeout == 0 { + return t.delegate.Start(ctx) + } + + // Inject our own error so that we can differentiate between a timeout caused by this gate + // or a timeout in the original request timeout. + ctx, cancel := context.WithTimeoutCause(ctx, t.timeout, errGateTimeout) + defer cancel() + + err := t.delegate.Start(ctx) + if errors.Is(context.Cause(ctx), errGateTimeout) { + _ = spanlogger.FromContext(ctx, log.NewNopLogger()).Error(err) + err = errGateTimeout + } + return err +} + +func (t timeoutGate) Done() { + t.delegate.Done() +} + func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) { // Check if the store already exists. bs := u.getStore(userID) diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index 1d89be74739..1969cf61d2e 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -993,7 +993,7 @@ func BenchmarkBucketIndexReader_ExpandedPostings(b *testing.B) { benchmarkExpandedPostings(test.NewTB(b), newTestBucketBlock, series) } -func prepareTestBlock(tb test.TB, dataSetup ...func(tb testing.TB, appenderFactory func() storage.Appender)) func() *bucketBlock { +func prepareTestBlock(tb test.TB, dataSetup ...testBlockDataSetup) func() *bucketBlock { tmpDir := tb.TempDir() bucketDir := filepath.Join(tmpDir, "bkt") @@ -1036,7 +1036,9 @@ type localBucket struct { dir string } -func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, dataSetup []func(tb testing.TB, appenderFactory func() storage.Appender)) (_ ulid.ULID, minT int64, maxT int64) { +type testBlockDataSetup = func(tb testing.TB, appenderFactory func() storage.Appender) + +func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, dataSetup []testBlockDataSetup) (_ ulid.ULID, minT int64, maxT int64) { headOpts := tsdb.DefaultHeadOptions() headOpts.ChunkDirRoot = tmpDir headOpts.ChunkRange = 1000 @@ -2040,6 +2042,87 @@ func TestBucketStore_Series_CanceledRequest(t *testing.T) { assert.Equal(t, codes.Canceled, s.Code()) } +func TestBucketStore_Series_TimeoutGate(t *testing.T) { + const ( + maxConcurrent = 1 + maxConcurrentWaitTimeout = time.Millisecond + ) + t.Parallel() + tmpDir := t.TempDir() + bktDir := filepath.Join(tmpDir, "bkt") + bkt, err := filesystem.NewBucket(bktDir) + assert.NoError(t, err) + defer func() { assert.NoError(t, bkt.Close()) }() + + logger := log.NewNopLogger() + instrBkt := objstore.WithNoopInstr(bkt) + fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, tmpDir, nil, nil) + assert.NoError(t, err) + + _, blockMinT, blockMaxT := uploadTestBlock(t, tmpDir, instrBkt, []testBlockDataSetup{appendTestSeries(100)}) + + store, err := NewBucketStore( + "test", + instrBkt, + fetcher, + tmpDir, + mimir_tsdb.BucketStoreConfig{ + StreamingBatchSize: 1, + BlockSyncConcurrency: 10, + PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, + }, + selectAllStrategy{}, + newStaticChunksLimiterFactory(100), + newStaticSeriesLimiterFactory(0), + newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), + hashcache.NewSeriesHashCache(1024*1024), + NewBucketStoreMetrics(nil), + WithLogger(logger), + WithQueryGate(timeoutGate{timeout: maxConcurrentWaitTimeout, delegate: gate.NewBlocking(maxConcurrent)}), + ) + assert.NoError(t, err) + defer func() { assert.NoError(t, store.RemoveBlocksAndClose()) }() + require.NoError(t, store.SyncBlocks(context.Background())) + + req := &storepb.SeriesRequest{ + MinTime: blockMinT, + MaxTime: blockMaxT, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "i", Value: ".*"}, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + firstRequestStarted := make(chan struct{}) + + srv := newStoreGatewayTestServer(t, store) + + go func() { + // Start the first Series call, but do not read responses. + // This keeps the request in flight in the store-gateway. + conn, err := srv.dialConn() + assert.NoError(t, err) + t.Cleanup(func() { _ = conn.Close() }) + _, err = srv.requestSeries(ctx, conn, req) + assert.NoError(t, err) + close(firstRequestStarted) + <-ctx.Done() + }() + + <-firstRequestStarted + + // Start the second Series call. This should wait until the first request is done because + // of the concurrency limit. Because we've blocked the first request, the second request + // should eventually time out at the timeout gate. + _, _, _, _, err = srv.Series(ctx, req) + assert.Error(t, err) + s, ok := grpcutil.ErrorToStatus(err) + assert.True(t, ok, err) + assert.Len(t, s.Details(), 1, err) + assert.Equal(t, s.Details()[0].(*mimirpb.ErrorDetails).GetCause(), mimirpb.INSTANCE_LIMIT, err) +} + func TestBucketStore_Series_InvalidRequest(t *testing.T) { tmpDir := t.TempDir() bktDir := filepath.Join(tmpDir, "bkt") diff --git a/pkg/storegateway/error.go b/pkg/storegateway/error.go new file mode 100644 index 00000000000..34cf7fdaee9 --- /dev/null +++ b/pkg/storegateway/error.go @@ -0,0 +1,23 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package storegateway + +import "github.com/grafana/mimir/pkg/mimirpb" + +type storeGatewayError interface { + error + errorCause() mimirpb.ErrorCause +} + +type staticError struct { + cause mimirpb.ErrorCause + msg string +} + +func (s staticError) Error() string { + return s.msg +} + +func (s staticError) errorCause() mimirpb.ErrorCause { + return s.cause +}