Skip to content

Commit

Permalink
Introduce "block-meta-fetch-concurrency" flag for compact and store
Browse files Browse the repository at this point in the history
Furthermore consolidate the fetcher concurrency by moving the
fetcherConcurrency const to the fetcher code of the block package.

Signed-off-by: Johannes Frey <johannes.frey@daimler.com>
  • Loading branch information
Johannes Frey committed Jan 27, 2021
1 parent 69bbf13 commit eeaa3bd
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 14 deletions.
7 changes: 5 additions & 2 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,10 @@ func runCompact(
// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
// This is to make sure compactor will not accidentally perform compactions with gap instead.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, deleteDelay/2, fetcherConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, deleteDelay/2, conf.blockMetaFetchConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter()

baseMetaFetcher, err := block.NewBaseFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg))
baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand Down Expand Up @@ -523,6 +523,7 @@ type compactConfig struct {
waitInterval time.Duration
disableDownsampling bool
blockSyncConcurrency int
blockMetaFetchConcurrency int
blockViewerSyncBlockInterval time.Duration
cleanupBlocksInterval time.Duration
compactionConcurrency int
Expand Down Expand Up @@ -574,6 +575,8 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage.").
Default("20").IntVar(&cc.blockSyncConcurrency)
cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage.").
Default("32").IntVar(&cc.blockMetaFetchConcurrency)
cmd.Flag("block-viewer.global.sync-block-interval", "Repeat interval for syncing the blocks between local and remote view for /global Block Viewer UI.").
Default("1m").DurationVar(&cc.blockViewerSyncBlockInterval)
cmd.Flag("compact.cleanup-interval", "How often we should clean up partially uploaded blocks and blocks with deletion mark in the background when --wait has been enabled. Setting it to \"0s\" disables it - the cleaning will only happen at the end of an iteration.").
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func RunDownsample(
return err
}

metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewDeduplicateFilter(),
}, nil)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(meta.Thanos))))
metaFetcher, err := block.NewMetaFetcher(nil, fetcherConcurrency, bkt, "", nil, nil, nil)
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, "", nil, nil, nil)
testutil.Ok(t, err)

metas, _, err := metaFetcher.Fetch(ctx)
Expand Down
11 changes: 7 additions & 4 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ import (
"github.com/thanos-io/thanos/pkg/ui"
)

const fetcherConcurrency = 32

// registerStore registers a store command.
func registerStore(app *extkingpin.App) {
cmd := app.Command(component.Store.String(), "Store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift, Tencent COS and Aliyun OSS.")
Expand Down Expand Up @@ -81,6 +79,9 @@ func registerStore(app *extkingpin.App) {
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage.").
Default("20").Int()

blockMetaFetchConcurrency := cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage.").
Default("32").Int()

minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))

Expand Down Expand Up @@ -145,6 +146,7 @@ func registerStore(app *extkingpin.App) {
debugLogging,
*syncInterval,
*blockSyncConcurrency,
*blockMetaFetchConcurrency,
&store.FilterConfig{
MinTime: *minTime,
MaxTime: *maxTime,
Expand Down Expand Up @@ -183,6 +185,7 @@ func runStore(
verbose bool,
syncInterval time.Duration,
blockSyncConcurrency int,
metaFetchConcurrency int,
filterConf *store.FilterConfig,
selectorRelabelConf *extflag.PathOrContent,
advertiseCompatibilityLabel bool,
Expand Down Expand Up @@ -277,8 +280,8 @@ func runStore(
return errors.Wrap(err, "create index cache")
}

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, ignoreDeletionMarksDelay, fetcherConcurrency)
metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, ignoreDeletionMarksDelay, metaFetchConcurrency)
metaFetcher, err := block.NewMetaFetcher(logger, metaFetchConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
[]block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
Expand Down
12 changes: 6 additions & 6 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path
return err
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func registerBucketLs(app extkingpin.AppClause, objStoreConfig *extflag.PathOrCo
return err
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -289,7 +289,7 @@ func registerBucketInspect(app extkingpin.AppClause, objStoreConfig *extflag.Pat
return err
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -383,7 +383,7 @@ func registerBucketWeb(app extkingpin.AppClause, objStoreConfig *extflag.PathOrC
}

// TODO(bwplotka): Allow Bucket UI to visualize the state of block as well.
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -537,15 +537,15 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat
// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
// This is to make sure compactor will not accidentally perform compactions with gap instead.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, *deleteDelay/2, fetcherConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, *deleteDelay/2, block.FetcherConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter()
blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, *deleteDelay, stubCounter, stubCounter)

ctx := context.Background()

var sy *compact.Syncer
{
baseMetaFetcher, err := block.NewBaseFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
baseMetaFetcher, err := block.NewBaseFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"github.com/thanos-io/thanos/pkg/runutil"
)

const FetcherConcurrency = 32

type fetcherMetrics struct {
syncs prometheus.Counter
syncFailures prometheus.Counter
Expand Down Expand Up @@ -301,6 +303,7 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
ch = make(chan ulid.ULID, f.concurrency)
mtx sync.Mutex
)
level.Debug(f.logger).Log("msg", "fetching meta data", "concurrency", f.concurrency)
for i := 0; i < f.concurrency; i++ {
eg.Go(func() error {
for id := range ch {
Expand Down

0 comments on commit eeaa3bd

Please sign in to comment.