diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 5cac76ae6b..755b82111a 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -201,10 +201,10 @@ func runCompact( // While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter. // The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet. // This is to make sure compactor will not accidentally perform compactions with gap instead. - ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, deleteDelay/2, fetcherConcurrency) + ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, deleteDelay/2, conf.blockMetaFetchConcurrency) duplicateBlocksFilter := block.NewDeduplicateFilter() - baseMetaFetcher, err := block.NewBaseFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg)) + baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg)) if err != nil { return errors.Wrap(err, "create meta fetcher") } @@ -523,6 +523,7 @@ type compactConfig struct { waitInterval time.Duration disableDownsampling bool blockSyncConcurrency int + blockMetaFetchConcurrency int blockViewerSyncBlockInterval time.Duration cleanupBlocksInterval time.Duration compactionConcurrency int @@ -574,6 +575,8 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage."). Default("20").IntVar(&cc.blockSyncConcurrency) + cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage."). + Default("32").IntVar(&cc.blockMetaFetchConcurrency) cmd.Flag("block-viewer.global.sync-block-interval", "Repeat interval for syncing the blocks between local and remote view for /global Block Viewer UI."). Default("1m").DurationVar(&cc.blockViewerSyncBlockInterval) cmd.Flag("compact.cleanup-interval", "How often we should clean up partially uploaded blocks and blocks with deletion mark in the background when --wait has been enabled. Setting it to \"0s\" disables it - the cleaning will only happen at the end of an iteration."). diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 40331d361a..a945efcb99 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -72,7 +72,7 @@ func RunDownsample( return err } - metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ + metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ block.NewDeduplicateFilter(), }, nil) if err != nil { diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go index 08468abb71..127c3236ac 100644 --- a/cmd/thanos/main_test.go +++ b/cmd/thanos/main_test.go @@ -52,7 +52,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) { metrics := newDownsampleMetrics(prometheus.NewRegistry()) testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(meta.Thanos)))) - metaFetcher, err := block.NewMetaFetcher(nil, fetcherConcurrency, bkt, "", nil, nil, nil) + metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, "", nil, nil, nil) testutil.Ok(t, err) metas, _, err := metaFetcher.Fetch(ctx) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 6fb822698e..eadcf9340f 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -38,8 +38,6 @@ import ( "github.com/thanos-io/thanos/pkg/ui" ) -const fetcherConcurrency = 32 - // registerStore registers a store command. func registerStore(app *extkingpin.App) { cmd := app.Command(component.Store.String(), "Store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift, Tencent COS and Aliyun OSS.") @@ -81,6 +79,9 @@ func registerStore(app *extkingpin.App) { blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage."). Default("20").Int() + blockMetaFetchConcurrency := cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage."). + Default("32").Int() + minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). Default("0000-01-01T00:00:00Z")) @@ -145,6 +146,7 @@ func registerStore(app *extkingpin.App) { debugLogging, *syncInterval, *blockSyncConcurrency, + *blockMetaFetchConcurrency, &store.FilterConfig{ MinTime: *minTime, MaxTime: *maxTime, @@ -183,6 +185,7 @@ func runStore( verbose bool, syncInterval time.Duration, blockSyncConcurrency int, + metaFetchConcurrency int, filterConf *store.FilterConfig, selectorRelabelConf *extflag.PathOrContent, advertiseCompatibilityLabel bool, @@ -277,8 +280,8 @@ func runStore( return errors.Wrap(err, "create index cache") } - ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, ignoreDeletionMarksDelay, fetcherConcurrency) - metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), + ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, ignoreDeletionMarksDelay, metaFetchConcurrency) + metaFetcher, err := block.NewMetaFetcher(logger, metaFetchConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime), block.NewLabelShardedMetaFilter(relabelConfig), diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index d12aeb730d..5c2f1114bf 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -141,7 +141,7 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path return err } - fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil) + fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil) if err != nil { return err } @@ -189,7 +189,7 @@ func registerBucketLs(app extkingpin.AppClause, objStoreConfig *extflag.PathOrCo return err } - fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil) + fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil) if err != nil { return err } @@ -289,7 +289,7 @@ func registerBucketInspect(app extkingpin.AppClause, objStoreConfig *extflag.Pat return err } - fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil) + fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil) if err != nil { return err } @@ -383,7 +383,7 @@ func registerBucketWeb(app extkingpin.AppClause, objStoreConfig *extflag.PathOrC } // TODO(bwplotka): Allow Bucket UI to visualize the state of block as well. - fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil) + fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil) if err != nil { return err } @@ -537,7 +537,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat // While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter. // The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet. // This is to make sure compactor will not accidentally perform compactions with gap instead. - ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, *deleteDelay/2, fetcherConcurrency) + ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, *deleteDelay/2, block.FetcherConcurrency) duplicateBlocksFilter := block.NewDeduplicateFilter() blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, *deleteDelay, stubCounter, stubCounter) @@ -545,7 +545,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat var sy *compact.Syncer { - baseMetaFetcher, err := block.NewBaseFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) + baseMetaFetcher, err := block.NewBaseFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) if err != nil { return errors.Wrap(err, "create meta fetcher") } diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 3a05e97eb3..fd0abe3b48 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -35,6 +35,8 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" ) +const FetcherConcurrency = 32 + type fetcherMetrics struct { syncs prometheus.Counter syncFailures prometheus.Counter @@ -301,6 +303,7 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { ch = make(chan ulid.ULID, f.concurrency) mtx sync.Mutex ) + level.Debug(f.logger).Log("msg", "fetching meta data", "concurrency", f.concurrency) for i := 0; i < f.concurrency; i++ { eg.Go(func() error { for id := range ch {