Skip to content

Commit

Permalink
Introduce "block-meta-fetch-concurrency" flag for compact and store (#…
Browse files Browse the repository at this point in the history
…3752)

* Introduce "block-meta-fetch-concurrency" flag for compact and store

Furthermore consolidate the fetcher concurrency by moving the
fetcherConcurrency const to the fetcher code of the block package.

Signed-off-by: Johannes Frey <johannes.frey@daimler.com>

* Document flags

Signed-off-by: Johannes Frey <johannes.frey@daimler.com>
  • Loading branch information
Johannes Frey authored Jan 27, 2021
1 parent 69bbf13 commit d61675d
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 14 deletions.
7 changes: 5 additions & 2 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,10 @@ func runCompact(
// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
// This is to make sure compactor will not accidentally perform compactions with gap instead.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, deleteDelay/2, fetcherConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, deleteDelay/2, conf.blockMetaFetchConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter()

baseMetaFetcher, err := block.NewBaseFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg))
baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand Down Expand Up @@ -523,6 +523,7 @@ type compactConfig struct {
waitInterval time.Duration
disableDownsampling bool
blockSyncConcurrency int
blockMetaFetchConcurrency int
blockViewerSyncBlockInterval time.Duration
cleanupBlocksInterval time.Duration
compactionConcurrency int
Expand Down Expand Up @@ -574,6 +575,8 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage.").
Default("20").IntVar(&cc.blockSyncConcurrency)
cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage.").
Default("32").IntVar(&cc.blockMetaFetchConcurrency)
cmd.Flag("block-viewer.global.sync-block-interval", "Repeat interval for syncing the blocks between local and remote view for /global Block Viewer UI.").
Default("1m").DurationVar(&cc.blockViewerSyncBlockInterval)
cmd.Flag("compact.cleanup-interval", "How often we should clean up partially uploaded blocks and blocks with deletion mark in the background when --wait has been enabled. Setting it to \"0s\" disables it - the cleaning will only happen at the end of an iteration.").
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func RunDownsample(
return err
}

metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewDeduplicateFilter(),
}, nil)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(meta.Thanos))))
metaFetcher, err := block.NewMetaFetcher(nil, fetcherConcurrency, bkt, "", nil, nil, nil)
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, "", nil, nil, nil)
testutil.Ok(t, err)

metas, _, err := metaFetcher.Fetch(ctx)
Expand Down
11 changes: 7 additions & 4 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ import (
"github.com/thanos-io/thanos/pkg/ui"
)

const fetcherConcurrency = 32

// registerStore registers a store command.
func registerStore(app *extkingpin.App) {
cmd := app.Command(component.Store.String(), "Store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift, Tencent COS and Aliyun OSS.")
Expand Down Expand Up @@ -81,6 +79,9 @@ func registerStore(app *extkingpin.App) {
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage.").
Default("20").Int()

blockMetaFetchConcurrency := cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage.").
Default("32").Int()

minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))

Expand Down Expand Up @@ -145,6 +146,7 @@ func registerStore(app *extkingpin.App) {
debugLogging,
*syncInterval,
*blockSyncConcurrency,
*blockMetaFetchConcurrency,
&store.FilterConfig{
MinTime: *minTime,
MaxTime: *maxTime,
Expand Down Expand Up @@ -183,6 +185,7 @@ func runStore(
verbose bool,
syncInterval time.Duration,
blockSyncConcurrency int,
metaFetchConcurrency int,
filterConf *store.FilterConfig,
selectorRelabelConf *extflag.PathOrContent,
advertiseCompatibilityLabel bool,
Expand Down Expand Up @@ -277,8 +280,8 @@ func runStore(
return errors.Wrap(err, "create index cache")
}

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, ignoreDeletionMarksDelay, fetcherConcurrency)
metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, ignoreDeletionMarksDelay, metaFetchConcurrency)
metaFetcher, err := block.NewMetaFetcher(logger, metaFetchConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
[]block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
Expand Down
12 changes: 6 additions & 6 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path
return err
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func registerBucketLs(app extkingpin.AppClause, objStoreConfig *extflag.PathOrCo
return err
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -289,7 +289,7 @@ func registerBucketInspect(app extkingpin.AppClause, objStoreConfig *extflag.Pat
return err
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -383,7 +383,7 @@ func registerBucketWeb(app extkingpin.AppClause, objStoreConfig *extflag.PathOrC
}

// TODO(bwplotka): Allow Bucket UI to visualize the state of block as well.
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -537,15 +537,15 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat
// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
// This is to make sure compactor will not accidentally perform compactions with gap instead.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, *deleteDelay/2, fetcherConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, *deleteDelay/2, block.FetcherConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter()
blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, *deleteDelay, stubCounter, stubCounter)

ctx := context.Background()

var sy *compact.Syncer
{
baseMetaFetcher, err := block.NewBaseFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
baseMetaFetcher, err := block.NewBaseFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand Down
3 changes: 3 additions & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,9 @@ Flags:
--block-sync-concurrency=20
Number of goroutines to use when syncing block
metadata from object storage.
--block-meta-fetch-concurrency=32
Number of goroutines to use when fetching block
metadata from object storage.
--block-viewer.global.sync-block-interval=1m
Repeat interval for syncing the blocks between
local and remote view for /global Block Viewer
Expand Down
3 changes: 3 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ Flags:
--block-sync-concurrency=20
Number of goroutines to use when constructing
index-cache.json blocks from object storage.
--block-meta-fetch-concurrency=32
Number of goroutines to use when fetching block
metadata from object storage.
--min-time=0000-01-01T00:00:00Z
Start of time range limit to serve. Thanos
Store will serve only metrics, which happened
Expand Down
3 changes: 3 additions & 0 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"github.com/thanos-io/thanos/pkg/runutil"
)

const FetcherConcurrency = 32

type fetcherMetrics struct {
syncs prometheus.Counter
syncFailures prometheus.Counter
Expand Down Expand Up @@ -301,6 +303,7 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
ch = make(chan ulid.ULID, f.concurrency)
mtx sync.Mutex
)
level.Debug(f.logger).Log("msg", "fetching meta data", "concurrency", f.concurrency)
for i := 0; i < f.concurrency; i++ {
eg.Go(func() error {
for id := range ch {
Expand Down

0 comments on commit d61675d

Please sign in to comment.