From 67026f0978c62c56748feb04cc9722e86696a50d Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Tue, 2 Apr 2024 20:17:29 +0200 Subject: [PATCH 01/15] store-gateway: add timeout to query gate wait This PR adds a timeout to query gate waiting. The store-gateway returns an empty response. The querier treats the empty response as if the blocks weren't discovered by the store-gateway and tries them again on another store-gateway. Signed-off-by: Dimitar Dimitrov --- cmd/mimir/config-descriptor.json | 11 ++++++ cmd/mimir/help-all.txt.tmpl | 2 + pkg/querier/blocks_store_queryable_test.go | 43 ++++++++++++++++++++++ pkg/storage/tsdb/config.go | 26 +++++++------ pkg/storegateway/bucket.go | 5 +++ pkg/storegateway/bucket_stores.go | 22 +++++++++++ 6 files changed, 97 insertions(+), 12 deletions(-) diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index c6cb29906f3..b241d9489c2 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -6502,6 +6502,17 @@ "fieldType": "int", "fieldCategory": "advanced" }, + { + "kind": "field", + "name": "max_concurrent_queue_timeout", + "required": false, + "desc": "Timeout for the queue of queries waiting for execution. If the queue is full and the timeout is reached, the query will be retired on another store-gateway. 0 means no timeout and all queries will wait for their turn.", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "blocks-storage.bucket-store.max-concurrent-queue-timeout", + "fieldType": "duration", + "fieldCategory": "advanced" + }, { "kind": "field", "name": "tenant_sync_concurrency", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 9d3d01d968f..2b998d854b6 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -537,6 +537,8 @@ Usage of ./cmd/mimir/mimir: If true, verify the checksum of index headers upon loading them (either on startup or lazily when lazy loading is enabled). Setting to true helps detect disk corruption at the cost of slowing down index header loading. -blocks-storage.bucket-store.max-concurrent int Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants. (default 100) + -blocks-storage.bucket-store.max-concurrent-queue-timeout duration + Timeout for the queue of queries waiting for execution. If the queue is full and the timeout is reached, the query will be retired on another store-gateway. 0 means no timeout and all queries will wait for their turn. -blocks-storage.bucket-store.meta-sync-concurrency int Number of Go routines to use when syncing block meta files from object storage per tenant. (default 20) -blocks-storage.bucket-store.metadata-cache.backend string diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 97ba1b52981..8627fb3f81c 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -625,6 +625,49 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { ` }, }, + "a single store-gateway instance returns no response implies querying another instance for the same blocks (consistency check passed)": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + }, + storeSetResponses: []interface{}{ + // First attempt returns a client whose response does not include all expected blocks. + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", mockedSeriesResponses: nil, + }: {block1}, + }, + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{mockHintsResponse(block1)}, + }: {block1}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + }, + "two store-gateway instances returning no response causes consistency check to fail": { // TODO dimitarvdimitrov + finderResult: bucketindex.Blocks{ + {ID: block1}, + }, + storeSetResponses: []interface{}{ + // First attempt returns a client whose response does not include all expected blocks. + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", mockedSeriesResponses: nil, + }: {block1}, + }, + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "2.2.2.2", mockedSeriesResponses: nil, + }: {block1}, + }, + // Third attempt returns an error because there are no other store-gateways left. + errors.New("no store-gateway remaining after exclude"), + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedErr: newStoreConsistencyCheckFailedError([]ulid.ULID{block1}), + }, "a single store-gateway instance has some missing blocks (consistency check failed)": { finderResult: bucketindex.Blocks{ {ID: block1}, diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 5a6bc1ceb9e..d022d3fcb46 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -386,18 +386,19 @@ func (cfg *TSDBConfig) IsBlocksShippingEnabled() bool { // BucketStoreConfig holds the config information for Bucket Stores used by the querier and store-gateway. type BucketStoreConfig struct { - SyncDir string `yaml:"sync_dir"` - SyncInterval time.Duration `yaml:"sync_interval" category:"advanced"` - MaxConcurrent int `yaml:"max_concurrent" category:"advanced"` - TenantSyncConcurrency int `yaml:"tenant_sync_concurrency" category:"advanced"` - BlockSyncConcurrency int `yaml:"block_sync_concurrency" category:"advanced"` - MetaSyncConcurrency int `yaml:"meta_sync_concurrency" category:"advanced"` - IndexCache IndexCacheConfig `yaml:"index_cache"` - ChunksCache ChunksCacheConfig `yaml:"chunks_cache"` - MetadataCache MetadataCacheConfig `yaml:"metadata_cache"` - IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay" category:"advanced"` - BucketIndex BucketIndexConfig `yaml:"bucket_index"` - IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within" category:"advanced"` + SyncDir string `yaml:"sync_dir"` + SyncInterval time.Duration `yaml:"sync_interval" category:"advanced"` + MaxConcurrent int `yaml:"max_concurrent" category:"advanced"` + MaxConcurrentQueueTimeout time.Duration `yaml:"max_concurrent_queue_timeout" category:"advanced"` + TenantSyncConcurrency int `yaml:"tenant_sync_concurrency" category:"advanced"` + BlockSyncConcurrency int `yaml:"block_sync_concurrency" category:"advanced"` + MetaSyncConcurrency int `yaml:"meta_sync_concurrency" category:"advanced"` + IndexCache IndexCacheConfig `yaml:"index_cache"` + ChunksCache ChunksCacheConfig `yaml:"chunks_cache"` + MetadataCache MetadataCacheConfig `yaml:"metadata_cache"` + IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay" category:"advanced"` + BucketIndex BucketIndexConfig `yaml:"bucket_index"` + IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within" category:"advanced"` // Series hash cache. SeriesHashCacheMaxBytes uint64 `yaml:"series_hash_cache_max_size_bytes" category:"advanced"` @@ -448,6 +449,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.SyncInterval, "blocks-storage.bucket-store.sync-interval", 15*time.Minute, "How frequently to scan the bucket, or to refresh the bucket index (if enabled), in order to look for changes (new blocks shipped by ingesters and blocks deleted by retention or compaction).") f.Uint64Var(&cfg.SeriesHashCacheMaxBytes, "blocks-storage.bucket-store.series-hash-cache-max-size-bytes", uint64(1*units.Gibibyte), "Max size - in bytes - of the in-memory series hash cache. The cache is shared across all tenants and it's used only when query sharding is enabled.") f.IntVar(&cfg.MaxConcurrent, "blocks-storage.bucket-store.max-concurrent", 100, "Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants.") + f.DurationVar(&cfg.MaxConcurrentQueueTimeout, "blocks-storage.bucket-store.max-concurrent-queue-timeout", 0, "Timeout for the queue of queries waiting for execution. If the queue is full and the timeout is reached, the query will be retired on another store-gateway. 0 means no timeout and all queries will wait for their turn.") f.IntVar(&cfg.TenantSyncConcurrency, "blocks-storage.bucket-store.tenant-sync-concurrency", 1, "Maximum number of concurrent tenants synching blocks.") f.IntVar(&cfg.BlockSyncConcurrency, "blocks-storage.bucket-store.block-sync-concurrency", 4, "Maximum number of concurrent blocks synching per tenant.") f.IntVar(&cfg.MetaSyncConcurrency, "blocks-storage.bucket-store.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from object storage per tenant.") diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index 5406757df9f..fa92cbf3f97 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -611,6 +611,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor // but sometimes it can take minutes if the block isn't loaded and there is a surge in queries for unloaded blocks. done, err := s.limitConcurrentQueries(ctx, stats) if err != nil { + if errors.Is(err, errGateTimeout) { + // If the gate timed out, then we behave in the same way as if the blocks aren't discovered yet. + // The querier will try the blocks again on a different store-gateway replica. + return nil + } return err } defer done() diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 0d565b6db59..9f37d290f9b 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -105,6 +105,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra // The number of concurrent queries against the tenants BucketStores are limited. queryGateReg := prometheus.WrapRegistererWith(prometheus.Labels{"gate": "query"}, gateReg) queryGate := gate.NewBlocking(cfg.BucketStore.MaxConcurrent) + queryGate = timeoutGate{delegate: queryGate, timeout: cfg.BucketStore.MaxConcurrentQueueTimeout} queryGate = gate.NewInstrumented(queryGateReg, cfg.BucketStore.MaxConcurrent, queryGate) // The number of concurrent index header loads from storegateway are limited. @@ -420,6 +421,27 @@ func (u *BucketStores) syncDirForUser(userID string) string { return filepath.Join(u.cfg.BucketStore.SyncDir, userID) } +type timeoutGate struct { + delegate gate.Gate + timeout time.Duration +} + +var errGateTimeout = errors.New("concurrency gate waiting timeout") + +func (t timeoutGate) Start(ctx context.Context) error { + if t.timeout == 0 { + return t.delegate.Start(ctx) + } + ctx, cancel := context.WithTimeoutCause(ctx, t.timeout, errGateTimeout) + defer cancel() + + return t.delegate.Start(ctx) +} + +func (t timeoutGate) Done() { + t.delegate.Done() +} + func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) { // Check if the store already exists. bs := u.getStore(userID) From 87abd77d804cfb879ec51395d32983d2a7833116 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Tue, 2 Apr 2024 20:21:43 +0200 Subject: [PATCH 02/15] Add CHANGELOG.md entry Signed-off-by: Dimitar Dimitrov --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7306edd04c9..74cfb337437 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ * [ENHANCEMENT] Rules: Add metric `cortex_prometheus_rule_group_last_restore_duration_seconds` which measures how long it takes to restore rule groups using the `ALERTS_FOR_STATE` series #7974 * [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #8012 * [ENHANCEMENT] Querying: Remove OpEmptyMatch from regex concatenations. #8012 +* [ENHANCEMENT] Store-gateway: add `-blocks-storage.bucket-store.max-concurrent-queue-timeout`. When set, queries at the store-gateway's query gate will not wait longer than that to execute. If a query reaches the wait timeout, then the querier will retry the blocks on a different store-gateway. If all store-gateways are unavailable, then the query will fail with `err-mimi-store-consistency-check-failed`. #7777 * [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567 * [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520 * [BUGFIX] Ingester: Prevent timely compaction of empty blocks. #7624 From 63adee83e2fa39b498758233f2c1bade9459dda6 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 3 Apr 2024 11:18:21 +0200 Subject: [PATCH 03/15] Update pkg/storage/tsdb/config.go Co-authored-by: Charles Korn --- pkg/storage/tsdb/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index d022d3fcb46..cd0765e30b5 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -449,7 +449,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.SyncInterval, "blocks-storage.bucket-store.sync-interval", 15*time.Minute, "How frequently to scan the bucket, or to refresh the bucket index (if enabled), in order to look for changes (new blocks shipped by ingesters and blocks deleted by retention or compaction).") f.Uint64Var(&cfg.SeriesHashCacheMaxBytes, "blocks-storage.bucket-store.series-hash-cache-max-size-bytes", uint64(1*units.Gibibyte), "Max size - in bytes - of the in-memory series hash cache. The cache is shared across all tenants and it's used only when query sharding is enabled.") f.IntVar(&cfg.MaxConcurrent, "blocks-storage.bucket-store.max-concurrent", 100, "Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants.") - f.DurationVar(&cfg.MaxConcurrentQueueTimeout, "blocks-storage.bucket-store.max-concurrent-queue-timeout", 0, "Timeout for the queue of queries waiting for execution. If the queue is full and the timeout is reached, the query will be retired on another store-gateway. 0 means no timeout and all queries will wait for their turn.") + f.DurationVar(&cfg.MaxConcurrentQueueTimeout, "blocks-storage.bucket-store.max-concurrent-queue-timeout", 0, "Timeout for the queue of queries waiting for execution. If the queue is full and the timeout is reached, the query will be retried on another store-gateway. 0 means no timeout and all queries will wait indefinitely for their turn.") f.IntVar(&cfg.TenantSyncConcurrency, "blocks-storage.bucket-store.tenant-sync-concurrency", 1, "Maximum number of concurrent tenants synching blocks.") f.IntVar(&cfg.BlockSyncConcurrency, "blocks-storage.bucket-store.block-sync-concurrency", 4, "Maximum number of concurrent blocks synching per tenant.") f.IntVar(&cfg.MetaSyncConcurrency, "blocks-storage.bucket-store.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from object storage per tenant.") From 52d2596a3f105a4b82f2679fa6b529912001492c Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 3 Apr 2024 11:18:26 +0200 Subject: [PATCH 04/15] Update CHANGELOG.md Co-authored-by: Charles Korn --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 74cfb337437..81bd135e4da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,7 +30,7 @@ * [ENHANCEMENT] Rules: Add metric `cortex_prometheus_rule_group_last_restore_duration_seconds` which measures how long it takes to restore rule groups using the `ALERTS_FOR_STATE` series #7974 * [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #8012 * [ENHANCEMENT] Querying: Remove OpEmptyMatch from regex concatenations. #8012 -* [ENHANCEMENT] Store-gateway: add `-blocks-storage.bucket-store.max-concurrent-queue-timeout`. When set, queries at the store-gateway's query gate will not wait longer than that to execute. If a query reaches the wait timeout, then the querier will retry the blocks on a different store-gateway. If all store-gateways are unavailable, then the query will fail with `err-mimi-store-consistency-check-failed`. #7777 +* [ENHANCEMENT] Store-gateway: add `-blocks-storage.bucket-store.max-concurrent-queue-timeout`. When set, queries at the store-gateway's query gate will not wait longer than that to execute. If a query reaches the wait timeout, then the querier will retry the blocks on a different store-gateway. If all store-gateways are unavailable, then the query will fail with `err-mimir-store-consistency-check-failed`. #7777 * [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567 * [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520 * [BUGFIX] Ingester: Prevent timely compaction of empty blocks. #7624 From 608b5a946ac92379d503863acf14ef7010d2ae20 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 3 Apr 2024 10:53:27 +0200 Subject: [PATCH 05/15] Remote TODO Signed-off-by: Dimitar Dimitrov --- pkg/querier/blocks_store_queryable_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 8627fb3f81c..e8b80d5c195 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -645,7 +645,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { limits: &blocksStoreLimitsMock{}, queryLimiter: noOpQueryLimiter, }, - "two store-gateway instances returning no response causes consistency check to fail": { // TODO dimitarvdimitrov + "two store-gateway instances returning no response causes consistency check to fail": { finderResult: bucketindex.Blocks{ {ID: block1}, }, From 7a0d772813a7c7e3597c601ab17d84e5e496e03f Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 3 Apr 2024 11:18:07 +0200 Subject: [PATCH 06/15] Add a spanLog to timeoutGate Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/bucket_stores.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 9f37d290f9b..327d7c4ab53 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -435,7 +435,11 @@ func (t timeoutGate) Start(ctx context.Context) error { ctx, cancel := context.WithTimeoutCause(ctx, t.timeout, errGateTimeout) defer cancel() - return t.delegate.Start(ctx) + err := t.delegate.Start(ctx) + if errors.Is(err, errGateTimeout) { + _ = spanlogger.FromContext(ctx, log.NewNopLogger()).Error(err) + } + return err } func (t timeoutGate) Done() { From 573ec0d318f6c0c1991bb8afd48ab73d082c1ad4 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 3 Apr 2024 11:56:22 +0200 Subject: [PATCH 07/15] Add comment on why timeoutGate is in Mimir Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/bucket_stores.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 327d7c4ab53..b6b1a164f20 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -421,6 +421,10 @@ func (u *BucketStores) syncDirForUser(userID string) string { return filepath.Join(u.cfg.BucketStore.SyncDir, userID) } +// timeoutGate belongs better in dskit. However, at the time of writing dskit supports go 1.20. +// go 1.20 doesn't have context.WithTimeoutCause yet, +// so we choose to implement timeoutGate here instead of implementing context.WithTimeoutCause ourselves in dskit. +// It also allows to keep the span logger in timeoutGate as opposed to in the bucket store. type timeoutGate struct { delegate gate.Gate timeout time.Duration From 68a0024551b4e4bbd530da2f310d29210fa95563 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 3 Apr 2024 13:46:48 +0200 Subject: [PATCH 08/15] Update docs Signed-off-by: Dimitar Dimitrov --- .../mimir/configure/configuration-parameters/index.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 73b6422693e..8e96d7d2e2f 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -3526,6 +3526,13 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.max-concurrent [max_concurrent: | default = 100] + # (advanced) Timeout for the queue of queries waiting for execution. If the + # queue is full and the timeout is reached, the query will be retried on + # another store-gateway. 0 means no timeout and all queries will wait + # indefinitely for their turn. + # CLI flag: -blocks-storage.bucket-store.max-concurrent-queue-timeout + [max_concurrent_queue_timeout: | default = 0s] + # (advanced) Maximum number of concurrent tenants synching blocks. # CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency [tenant_sync_concurrency: | default = 1] From ed7012cf91b08df98959d0483c8b16281ea1fe1b Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 3 May 2024 10:26:25 +0200 Subject: [PATCH 09/15] Propagate errors from store-gateway to querier when hitting concurrency limit Signed-off-by: Dimitar Dimitrov --- pkg/querier/blocks_store_queryable_test.go | 30 +++++++++++++++ pkg/storegateway/bucket.go | 44 +++++++++++++--------- pkg/storegateway/bucket_stores.go | 3 +- pkg/storegateway/error.go | 21 +++++++++++ 4 files changed, 80 insertions(+), 18 deletions(-) create mode 100644 pkg/storegateway/error.go diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index e8b80d5c195..0536ea22969 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -43,6 +43,7 @@ import ( "go.uber.org/atomic" "golang.org/x/exp/slices" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -54,6 +55,7 @@ import ( "github.com/grafana/mimir/pkg/storegateway/storegatewaypb" "github.com/grafana/mimir/pkg/storegateway/storepb" "github.com/grafana/mimir/pkg/util" + "github.com/grafana/mimir/pkg/util/globalerror" "github.com/grafana/mimir/pkg/util/limiter" "github.com/grafana/mimir/pkg/util/test" ) @@ -3286,6 +3288,34 @@ func TestShouldStopQueryFunc(t *testing.T) { err: errors.New("test"), expected: false, }, + "should not stop query on store-gateway instance limit": { + err: globalerror.NewErrorWithGRPCStatus(errors.New("instance limit"), codes.Aborted, &mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT}), + expected: false, + }, + "should not stop query on store-gateway instance limit; shouldn't look at the gRPC code, only Mimir error cause": { + err: globalerror.NewErrorWithGRPCStatus(errors.New("instance limit"), codes.Internal, &mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT}), + expected: false, + }, + "should not stop query on any other mimirpb error": { + err: globalerror.NewErrorWithGRPCStatus(errors.New("instance limit"), codes.Internal, &mimirpb.ErrorDetails{Cause: mimirpb.TOO_BUSY}), + expected: false, + }, + "should not stop query on any unknown error detail": { + err: func() error { + st, createErr := status.New(codes.Internal, "test").WithDetails(&hintspb.Block{Id: "123"}) + require.NoError(t, createErr) + return st.Err() + }(), + expected: false, + }, + "should not stop query on multiple error details": { + err: func() error { + st, createErr := status.New(codes.Internal, "test").WithDetails(&hintspb.Block{Id: "123"}, &mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT}) + require.NoError(t, createErr) + return st.Err() + }(), + expected: false, + }, } for testName, testData := range tests { diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index fa92cbf3f97..675cc214fb6 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -55,6 +55,7 @@ import ( "github.com/grafana/mimir/pkg/storegateway/storegatewaypb" "github.com/grafana/mimir/pkg/storegateway/storepb" "github.com/grafana/mimir/pkg/util" + "github.com/grafana/mimir/pkg/util/globalerror" "github.com/grafana/mimir/pkg/util/pool" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -546,18 +547,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor // We don't do the streaming call if we are not requesting the chunks. req.StreamingChunksBatchSize = 0 } - defer func() { - if err == nil { - return - } - code := codes.Internal - if st, ok := grpcutil.ErrorToStatus(err); ok { - code = st.Code() - } else if errors.Is(err, context.Canceled) { - code = codes.Canceled - } - err = status.Error(code, err.Error()) - }() + defer func() { err = mapSeriesError(err) }() matchers, err := storepb.MatchersToPromMatchers(req.Matchers...) if err != nil { @@ -611,11 +601,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor // but sometimes it can take minutes if the block isn't loaded and there is a surge in queries for unloaded blocks. done, err := s.limitConcurrentQueries(ctx, stats) if err != nil { - if errors.Is(err, errGateTimeout) { - // If the gate timed out, then we behave in the same way as if the blocks aren't discovered yet. - // The querier will try the blocks again on a different store-gateway replica. - return nil - } return err } defer done() @@ -714,6 +699,31 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor return nil } +func mapSeriesError(err error) error { + if err == nil { + return err + } + + var stGwErr storeGatewayError + switch { + case errors.As(err, &stGwErr): + switch cause := stGwErr.errorCause(); cause { + case mimirpb.INSTANCE_LIMIT: + return globalerror.NewErrorWithGRPCStatus(stGwErr, codes.Unavailable, &mimirpb.ErrorDetails{Cause: cause}) + default: + return globalerror.NewErrorWithGRPCStatus(stGwErr, codes.Internal, &mimirpb.ErrorDetails{Cause: cause}) + } + default: + code := codes.Internal + if st, ok := grpcutil.ErrorToStatus(err); ok { + code = st.Code() + } else if errors.Is(err, context.Canceled) { + code = codes.Canceled + } + return status.Error(code, err.Error()) + } +} + func (s *BucketStore) recordRequestAmbientTime(stats *safeQueryStats, requestStart time.Time) { stats.update(func(stats *queryStats) { stats.streamingSeriesAmbientTime += time.Since(requestStart) diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index b6b1a164f20..39660522fca 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -26,6 +26,7 @@ import ( "github.com/thanos-io/objstore" "google.golang.org/grpc/metadata" + "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/storage/bucket" "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storage/tsdb/block" @@ -430,7 +431,7 @@ type timeoutGate struct { timeout time.Duration } -var errGateTimeout = errors.New("concurrency gate waiting timeout") +var errGateTimeout = staticError{cause: mimirpb.INSTANCE_LIMIT, msg: "timeout waiting for concurrency gate"} func (t timeoutGate) Start(ctx context.Context) error { if t.timeout == 0 { diff --git a/pkg/storegateway/error.go b/pkg/storegateway/error.go new file mode 100644 index 00000000000..dc0bfcfb320 --- /dev/null +++ b/pkg/storegateway/error.go @@ -0,0 +1,21 @@ +package storegateway + +import "github.com/grafana/mimir/pkg/mimirpb" + +type storeGatewayError interface { + error + errorCause() mimirpb.ErrorCause +} + +type staticError struct { + cause mimirpb.ErrorCause + msg string +} + +func (s staticError) Error() string { + return s.msg +} + +func (s staticError) errorCause() mimirpb.ErrorCause { + return s.cause +} From f09508cc7232fbc810bd4ce22e53ab7685606a27 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 3 May 2024 10:36:18 +0200 Subject: [PATCH 10/15] Fix formatting Signed-off-by: Dimitar Dimitrov --- pkg/querier/blocks_store_queryable_test.go | 60 +++++++++++----------- pkg/storegateway/error.go | 12 ++--- 2 files changed, 36 insertions(+), 36 deletions(-) diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 0536ea22969..04d1f7b4281 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -43,7 +43,7 @@ import ( "go.uber.org/atomic" "golang.org/x/exp/slices" "google.golang.org/grpc" - "google.golang.org/grpc/codes" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -55,7 +55,7 @@ import ( "github.com/grafana/mimir/pkg/storegateway/storegatewaypb" "github.com/grafana/mimir/pkg/storegateway/storepb" "github.com/grafana/mimir/pkg/util" - "github.com/grafana/mimir/pkg/util/globalerror" + "github.com/grafana/mimir/pkg/util/globalerror" "github.com/grafana/mimir/pkg/util/limiter" "github.com/grafana/mimir/pkg/util/test" ) @@ -3288,34 +3288,34 @@ func TestShouldStopQueryFunc(t *testing.T) { err: errors.New("test"), expected: false, }, - "should not stop query on store-gateway instance limit": { - err: globalerror.NewErrorWithGRPCStatus(errors.New("instance limit"), codes.Aborted, &mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT}), - expected: false, - }, - "should not stop query on store-gateway instance limit; shouldn't look at the gRPC code, only Mimir error cause": { - err: globalerror.NewErrorWithGRPCStatus(errors.New("instance limit"), codes.Internal, &mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT}), - expected: false, - }, - "should not stop query on any other mimirpb error": { - err: globalerror.NewErrorWithGRPCStatus(errors.New("instance limit"), codes.Internal, &mimirpb.ErrorDetails{Cause: mimirpb.TOO_BUSY}), - expected: false, - }, - "should not stop query on any unknown error detail": { - err: func() error { - st, createErr := status.New(codes.Internal, "test").WithDetails(&hintspb.Block{Id: "123"}) - require.NoError(t, createErr) - return st.Err() - }(), - expected: false, - }, - "should not stop query on multiple error details": { - err: func() error { - st, createErr := status.New(codes.Internal, "test").WithDetails(&hintspb.Block{Id: "123"}, &mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT}) - require.NoError(t, createErr) - return st.Err() - }(), - expected: false, - }, + "should not stop query on store-gateway instance limit": { + err: globalerror.NewErrorWithGRPCStatus(errors.New("instance limit"), codes.Aborted, &mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT}), + expected: false, + }, + "should not stop query on store-gateway instance limit; shouldn't look at the gRPC code, only Mimir error cause": { + err: globalerror.NewErrorWithGRPCStatus(errors.New("instance limit"), codes.Internal, &mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT}), + expected: false, + }, + "should not stop query on any other mimirpb error": { + err: globalerror.NewErrorWithGRPCStatus(errors.New("instance limit"), codes.Internal, &mimirpb.ErrorDetails{Cause: mimirpb.TOO_BUSY}), + expected: false, + }, + "should not stop query on any unknown error detail": { + err: func() error { + st, createErr := status.New(codes.Internal, "test").WithDetails(&hintspb.Block{Id: "123"}) + require.NoError(t, createErr) + return st.Err() + }(), + expected: false, + }, + "should not stop query on multiple error details": { + err: func() error { + st, createErr := status.New(codes.Internal, "test").WithDetails(&hintspb.Block{Id: "123"}, &mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT}) + require.NoError(t, createErr) + return st.Err() + }(), + expected: false, + }, } for testName, testData := range tests { diff --git a/pkg/storegateway/error.go b/pkg/storegateway/error.go index dc0bfcfb320..29f7b990b84 100644 --- a/pkg/storegateway/error.go +++ b/pkg/storegateway/error.go @@ -3,19 +3,19 @@ package storegateway import "github.com/grafana/mimir/pkg/mimirpb" type storeGatewayError interface { - error - errorCause() mimirpb.ErrorCause + error + errorCause() mimirpb.ErrorCause } type staticError struct { - cause mimirpb.ErrorCause - msg string + cause mimirpb.ErrorCause + msg string } func (s staticError) Error() string { - return s.msg + return s.msg } func (s staticError) errorCause() mimirpb.ErrorCause { - return s.cause + return s.cause } From 523f26af04ab7bb8636ba3d9f96123f3b0aba509 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 3 May 2024 10:44:00 +0200 Subject: [PATCH 11/15] Update autogenerated config Signed-off-by: Dimitar Dimitrov --- cmd/mimir/config-descriptor.json | 2 +- cmd/mimir/help-all.txt.tmpl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index b241d9489c2..1ba03ebb171 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -6506,7 +6506,7 @@ "kind": "field", "name": "max_concurrent_queue_timeout", "required": false, - "desc": "Timeout for the queue of queries waiting for execution. If the queue is full and the timeout is reached, the query will be retired on another store-gateway. 0 means no timeout and all queries will wait for their turn.", + "desc": "Timeout for the queue of queries waiting for execution. If the queue is full and the timeout is reached, the query will be retried on another store-gateway. 0 means no timeout and all queries will wait indefinitely for their turn.", "fieldValue": null, "fieldDefaultValue": 0, "fieldFlag": "blocks-storage.bucket-store.max-concurrent-queue-timeout", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 2b998d854b6..b91968c49ea 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -538,7 +538,7 @@ Usage of ./cmd/mimir/mimir: -blocks-storage.bucket-store.max-concurrent int Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants. (default 100) -blocks-storage.bucket-store.max-concurrent-queue-timeout duration - Timeout for the queue of queries waiting for execution. If the queue is full and the timeout is reached, the query will be retired on another store-gateway. 0 means no timeout and all queries will wait for their turn. + Timeout for the queue of queries waiting for execution. If the queue is full and the timeout is reached, the query will be retried on another store-gateway. 0 means no timeout and all queries will wait indefinitely for their turn. -blocks-storage.bucket-store.meta-sync-concurrency int Number of Go routines to use when syncing block meta files from object storage per tenant. (default 20) -blocks-storage.bucket-store.metadata-cache.backend string From 28b00ff5da8ec6ee3be6d2e161c301457e8f3cd1 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 3 May 2024 10:44:29 +0200 Subject: [PATCH 12/15] Add license header Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/error.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/storegateway/error.go b/pkg/storegateway/error.go index 29f7b990b84..34cf7fdaee9 100644 --- a/pkg/storegateway/error.go +++ b/pkg/storegateway/error.go @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: AGPL-3.0-only + package storegateway import "github.com/grafana/mimir/pkg/mimirpb" From 375a044e76277ce3b026d0bde70ae7ba47b63d9e Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 3 May 2024 13:02:48 +0200 Subject: [PATCH 13/15] Add tests for timeoutGate Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/bucket_store_server_test.go | 12 ++- pkg/storegateway/bucket_test.go | 87 +++++++++++++++++++- 2 files changed, 93 insertions(+), 6 deletions(-) diff --git a/pkg/storegateway/bucket_store_server_test.go b/pkg/storegateway/bucket_store_server_test.go index 15914622a98..989d24fc54f 100644 --- a/pkg/storegateway/bucket_store_server_test.go +++ b/pkg/storegateway/bucket_store_server_test.go @@ -75,10 +75,7 @@ func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest ) // Create a gRPC connection to the server. - conn, err = grpc.Dial(s.serverListener.Addr().String(), - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024*1024)), - ) + conn, err = s.dialConn() if err != nil { return } @@ -258,6 +255,13 @@ func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest return } +func (s *storeTestServer) dialConn() (*grpc.ClientConn, error) { + return grpc.Dial(s.serverListener.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024*1024)), + ) +} + // Close releases all resources. func (s *storeTestServer) Close() { s.server.GracefulStop() diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index 1d89be74739..1969cf61d2e 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -993,7 +993,7 @@ func BenchmarkBucketIndexReader_ExpandedPostings(b *testing.B) { benchmarkExpandedPostings(test.NewTB(b), newTestBucketBlock, series) } -func prepareTestBlock(tb test.TB, dataSetup ...func(tb testing.TB, appenderFactory func() storage.Appender)) func() *bucketBlock { +func prepareTestBlock(tb test.TB, dataSetup ...testBlockDataSetup) func() *bucketBlock { tmpDir := tb.TempDir() bucketDir := filepath.Join(tmpDir, "bkt") @@ -1036,7 +1036,9 @@ type localBucket struct { dir string } -func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, dataSetup []func(tb testing.TB, appenderFactory func() storage.Appender)) (_ ulid.ULID, minT int64, maxT int64) { +type testBlockDataSetup = func(tb testing.TB, appenderFactory func() storage.Appender) + +func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, dataSetup []testBlockDataSetup) (_ ulid.ULID, minT int64, maxT int64) { headOpts := tsdb.DefaultHeadOptions() headOpts.ChunkDirRoot = tmpDir headOpts.ChunkRange = 1000 @@ -2040,6 +2042,87 @@ func TestBucketStore_Series_CanceledRequest(t *testing.T) { assert.Equal(t, codes.Canceled, s.Code()) } +func TestBucketStore_Series_TimeoutGate(t *testing.T) { + const ( + maxConcurrent = 1 + maxConcurrentWaitTimeout = time.Millisecond + ) + t.Parallel() + tmpDir := t.TempDir() + bktDir := filepath.Join(tmpDir, "bkt") + bkt, err := filesystem.NewBucket(bktDir) + assert.NoError(t, err) + defer func() { assert.NoError(t, bkt.Close()) }() + + logger := log.NewNopLogger() + instrBkt := objstore.WithNoopInstr(bkt) + fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, tmpDir, nil, nil) + assert.NoError(t, err) + + _, blockMinT, blockMaxT := uploadTestBlock(t, tmpDir, instrBkt, []testBlockDataSetup{appendTestSeries(100)}) + + store, err := NewBucketStore( + "test", + instrBkt, + fetcher, + tmpDir, + mimir_tsdb.BucketStoreConfig{ + StreamingBatchSize: 1, + BlockSyncConcurrency: 10, + PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, + }, + selectAllStrategy{}, + newStaticChunksLimiterFactory(100), + newStaticSeriesLimiterFactory(0), + newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), + hashcache.NewSeriesHashCache(1024*1024), + NewBucketStoreMetrics(nil), + WithLogger(logger), + WithQueryGate(timeoutGate{timeout: maxConcurrentWaitTimeout, delegate: gate.NewBlocking(maxConcurrent)}), + ) + assert.NoError(t, err) + defer func() { assert.NoError(t, store.RemoveBlocksAndClose()) }() + require.NoError(t, store.SyncBlocks(context.Background())) + + req := &storepb.SeriesRequest{ + MinTime: blockMinT, + MaxTime: blockMaxT, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "i", Value: ".*"}, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + firstRequestStarted := make(chan struct{}) + + srv := newStoreGatewayTestServer(t, store) + + go func() { + // Start the first Series call, but do not read responses. + // This keeps the request in flight in the store-gateway. + conn, err := srv.dialConn() + assert.NoError(t, err) + t.Cleanup(func() { _ = conn.Close() }) + _, err = srv.requestSeries(ctx, conn, req) + assert.NoError(t, err) + close(firstRequestStarted) + <-ctx.Done() + }() + + <-firstRequestStarted + + // Start the second Series call. This should wait until the first request is done because + // of the concurrency limit. Because we've blocked the first request, the second request + // should eventually time out at the timeout gate. + _, _, _, _, err = srv.Series(ctx, req) + assert.Error(t, err) + s, ok := grpcutil.ErrorToStatus(err) + assert.True(t, ok, err) + assert.Len(t, s.Details(), 1, err) + assert.Equal(t, s.Details()[0].(*mimirpb.ErrorDetails).GetCause(), mimirpb.INSTANCE_LIMIT, err) +} + func TestBucketStore_Series_InvalidRequest(t *testing.T) { tmpDir := t.TempDir() bktDir := filepath.Join(tmpDir, "bkt") From 413a415d67a651764ce143fa9663dbe756a036cf Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 3 May 2024 13:11:27 +0200 Subject: [PATCH 14/15] Report timeoutGate timeouts as rejected_deadline_exceeded not rejected_other Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/bucket_stores.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 39660522fca..c2d71953c86 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -106,8 +106,8 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra // The number of concurrent queries against the tenants BucketStores are limited. queryGateReg := prometheus.WrapRegistererWith(prometheus.Labels{"gate": "query"}, gateReg) queryGate := gate.NewBlocking(cfg.BucketStore.MaxConcurrent) - queryGate = timeoutGate{delegate: queryGate, timeout: cfg.BucketStore.MaxConcurrentQueueTimeout} queryGate = gate.NewInstrumented(queryGateReg, cfg.BucketStore.MaxConcurrent, queryGate) + queryGate = timeoutGate{delegate: queryGate, timeout: cfg.BucketStore.MaxConcurrentQueueTimeout} // The number of concurrent index header loads from storegateway are limited. lazyLoadingGateReg := prometheus.WrapRegistererWith(prometheus.Labels{"gate": "index_header"}, gateReg) From df43e5f94386b2eaae75bbd03a3a7dcfaf9a6128 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 3 May 2024 13:11:41 +0200 Subject: [PATCH 15/15] Properly detect custom errors in timeoutGate Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/bucket_stores.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index c2d71953c86..72d5c48bda4 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -422,6 +422,7 @@ func (u *BucketStores) syncDirForUser(userID string) string { return filepath.Join(u.cfg.BucketStore.SyncDir, userID) } +// timeoutGate returns errGateTimeout when the timeout is reached while still waiting for the delegate gate. // timeoutGate belongs better in dskit. However, at the time of writing dskit supports go 1.20. // go 1.20 doesn't have context.WithTimeoutCause yet, // so we choose to implement timeoutGate here instead of implementing context.WithTimeoutCause ourselves in dskit. @@ -437,12 +438,16 @@ func (t timeoutGate) Start(ctx context.Context) error { if t.timeout == 0 { return t.delegate.Start(ctx) } + + // Inject our own error so that we can differentiate between a timeout caused by this gate + // or a timeout in the original request timeout. ctx, cancel := context.WithTimeoutCause(ctx, t.timeout, errGateTimeout) defer cancel() err := t.delegate.Start(ctx) - if errors.Is(err, errGateTimeout) { + if errors.Is(context.Cause(ctx), errGateTimeout) { _ = spanlogger.FromContext(ctx, log.NewNopLogger()).Error(err) + err = errGateTimeout } return err }