Skip to content

Commit

Permalink
store: add consistency delay to fetch blocks
Browse files Browse the repository at this point in the history
Signed-off-by: khyatisoneji <khyatisoneji5@gmail.com>
  • Loading branch information
khyatisoneji committed Feb 13, 2020
1 parent 5a93f51 commit d6bed5f
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 191 deletions.
37 changes: 7 additions & 30 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -184,17 +183,11 @@ func runCompact(
Name: "thanos_compactor_iterations_total",
Help: "Total number of iterations that were executed successfully.",
})
consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "thanos_consistency_delay_seconds",
Help: "Configured consistency delay in seconds.",
}, func() float64 {
return consistencyDelay.Seconds()
})
partialUploadDeleteAttempts := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_compactor_aborted_partial_uploads_deletion_attempts_total",
Help: "Total number of started deletions of blocks that are assumed aborted and only partially uploaded.",
})
reg.MustRegister(halted, retried, iterations, consistencyDelayMetric, partialUploadDeleteAttempts)
reg.MustRegister(halted, retried, iterations, partialUploadDeleteAttempts)

downsampleMetrics := newDownsampleMetrics(reg)

Expand Down Expand Up @@ -247,15 +240,18 @@ func runCompact(
}
}()

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg),
duplicateBlocksFilter := block.NewDeduplicateFilter()
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)
metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
(&consistencyDelayMetaFilter{logger: logger, consistencyDelay: consistencyDelay}).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter,
duplicateBlocksFilter.Filter,
)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}

sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, blockSyncConcurrency, acceptMalformedIndex, false)
sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, blockSyncConcurrency, acceptMalformedIndex, false)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down Expand Up @@ -392,25 +388,6 @@ func runCompact(
return nil
}

type consistencyDelayMetaFilter struct {
logger log.Logger
consistencyDelay time.Duration
}

func (f *consistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced block.GaugeLabeled, _ bool) {
for id, meta := range metas {
if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) &&
meta.Thanos.Source != metadata.BucketRepairSource &&
meta.Thanos.Source != metadata.CompactorSource &&
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id)
synced.WithLabelValues(block.TooFreshMeta).Inc()
delete(metas, id)
}
}
}

// genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage.
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, fetcher block.MetadataFetcher, dir string) error {
genIndex := prometheus.NewCounter(prometheus.CounterOpts{
Expand Down
9 changes: 8 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
enableIndexHeader := cmd.Flag("experimental.enable-index-header", "If true, Store Gateway will recreate index-header instead of index-cache.json for each block. This will replace index-cache.json permanently once it will be out of experimental stage.").
Hidden().Default("false").Bool()

consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of all blocks before they are being read.").
Default("30m"))

m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
Expand Down Expand Up @@ -116,6 +119,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
selectorRelabelConf,
*advertiseCompatibilityLabel,
*enableIndexHeader,
time.Duration(*consistencyDelay),
)
}
}
Expand Down Expand Up @@ -148,6 +152,7 @@ func runStore(
selectorRelabelConf *extflag.PathOrContent,
advertiseCompatibilityLabel bool,
enableIndexHeader bool,
consistencyDelay time.Duration,
) error {
grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
Expand Down Expand Up @@ -220,9 +225,11 @@ func runStore(
return errors.Wrap(err, "create index cache")
}

metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)
metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer,
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime).Filter,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter,
block.NewDeduplicateFilter().Filter,
)
if err != nil {
Expand Down
15 changes: 9 additions & 6 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ In general about 1MB of local disk space is required per TSDB block stored in th
## Flags
[embedmd]:# (flags/store.txt $)
[embedmd]: # "flags/store.txt $"
```$
usage: thanos store [<flags>]

Expand Down Expand Up @@ -137,6 +138,7 @@ Flags:
Prometheus relabel-config syntax. See format
details:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
--consistency-delay=30m Minimum age of all blocks before they are being read.

```

Expand Down Expand Up @@ -179,7 +181,8 @@ The `in-memory` index cache is enabled by default and its max size can be config

Alternatively, the `in-memory` index cache can also by configured using `--index-cache.config-file` to reference to the configuration file or `--index-cache.config` to put yaml config directly:

[embedmd]:# (../flags/config_index_cache_in_memory.txt yaml)
[embedmd]: # "../flags/config_index_cache_in_memory.txt yaml"

```yaml
type: IN-MEMORY
config:
Expand All @@ -196,7 +199,8 @@ All the settings are **optional**:

The `memcached` index cache allows to use [Memcached](https://memcached.org) as cache backend. This cache type is configured using `--index-cache.config-file` to reference to the configuration file or `--index-cache.config` to put yaml config directly:

[embedmd]:# (../flags/config_index_cache_memcached.txt yaml)
[embedmd]: # "../flags/config_index_cache_memcached.txt yaml"

```yaml
type: MEMCACHED
config:
Expand Down Expand Up @@ -224,13 +228,12 @@ While the remaining settings are **optional**:
- `max_get_multi_batch_size`: maximum number of keys a single underlying operation should fetch. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently, honoring `max_get_multi_concurrency`. If set to `0`, the batch size is unlimited.
- `dns_provider_update_interval`: the DNS discovery update interval.


## Index Header

In order to query series inside blocks from object storage, Store Gateway has to know certain initial info about each block such as:

* symbols table to unintern string values
* postings offset for posting lookup
- symbols table to unintern string values
- postings offset for posting lookup

In order to achieve so, on startup for each block `index-header` is built from pieces of original block's index and stored on disk.
Such `index-header` file is then mmaped and used by Store Gateway.
Expand Down
43 changes: 43 additions & 0 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,12 @@ func (f *DeduplicateFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced Ga
}
}

// GetDuplicateIDs returns slice of block ids
// that are filtered out by DeduplicateFilter.
func (f *DeduplicateFilter) GetDuplicateIDs() []ulid.ULID {
return f.DuplicateIDs
}

func addNodeBySources(root *Node, add *Node) bool {
var rootNode *Node
for _, node := range root.Children {
Expand Down Expand Up @@ -506,3 +512,40 @@ func contains(s1 []ulid.ULID, s2 []ulid.ULID) bool {
}
return true
}

// ConsistencyDelayMetaFilter is a MetaFetcher filter that filters out blocks that are created before a specified consistency delay.
type ConsistencyDelayMetaFilter struct {
logger log.Logger
consistencyDelay time.Duration
}

// NewConsistencyDelayMetaFilter creates ConsistencyDelayMetaFilter.
func NewConsistencyDelayMetaFilter(logger log.Logger, consistencyDelay time.Duration, reg prometheus.Registerer) *ConsistencyDelayMetaFilter {
consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "consistency_delay_seconds",
Help: "Configured consistency delay in seconds.",
}, func() float64 {
return consistencyDelay.Seconds()
})
reg.MustRegister(consistencyDelayMetric)

return &ConsistencyDelayMetaFilter{
logger: logger,
consistencyDelay: consistencyDelay,
}
}

// Filter filters out blocks that filters blocks that have are created before a specified consistency delay.
func (f *ConsistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) {
for id, meta := range metas {
if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) &&
meta.Thanos.Source != metadata.BucketRepairSource &&
meta.Thanos.Source != metadata.CompactorSource &&
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id)
synced.WithLabelValues(TooFreshMeta).Inc()
delete(metas, id)
}
}
}
107 changes: 15 additions & 92 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Syncer struct {
metrics *syncerMetrics
acceptMalformedIndex bool
enableVerticalCompaction bool
duplicateBlocksFilter *block.DeduplicateFilter
}

type syncerMetrics struct {
Expand Down Expand Up @@ -123,19 +124,20 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {

// NewMetaSyncer returns a new Syncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay for being considered.
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool) (*Syncer, error) {
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter *block.DeduplicateFilter, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool) (*Syncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
return &Syncer{
logger: logger,
reg: reg,
bkt: bkt,
fetcher: fetcher,
blocks: map[ulid.ULID]*metadata.Meta{},
metrics: newSyncerMetrics(reg),
blockSyncConcurrency: blockSyncConcurrency,
acceptMalformedIndex: acceptMalformedIndex,
logger: logger,
reg: reg,
bkt: bkt,
fetcher: fetcher,
blocks: map[ulid.ULID]*metadata.Meta{},
metrics: newSyncerMetrics(reg),
duplicateBlocksFilter: duplicateBlocksFilter,
blockSyncConcurrency: blockSyncConcurrency,
acceptMalformedIndex: acceptMalformedIndex,
// The syncer offers an option to enable vertical compaction, even if it's
// not currently used by Thanos, because the compactor is also used by Cortex
// which needs vertical compaction.
Expand Down Expand Up @@ -231,89 +233,7 @@ func (s *Syncer) GarbageCollect(ctx context.Context) error {

begin := time.Now()

// Run a separate round of garbage collections for each valid resolution.
for _, res := range []int64{
downsample.ResLevel0, downsample.ResLevel1, downsample.ResLevel2,
} {
err := s.garbageCollect(ctx, res)
if err != nil {
s.metrics.garbageCollectionFailures.Inc()
}
s.metrics.garbageCollections.Inc()
s.metrics.garbageCollectionDuration.Observe(time.Since(begin).Seconds())

if err != nil {
return errors.Wrapf(err, "garbage collect resolution %d", res)
}
}
return nil
}

func (s *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) {
// Map each block to its highest priority parent. Initial blocks have themselves
// in their source section, i.e. are their own parent.
parents := map[ulid.ULID]ulid.ULID{}

for id, meta := range s.blocks {
// Skip any block that has a different resolution.
if meta.Thanos.Downsample.Resolution != resolution {
continue
}

// For each source block we contain, check whether we are the highest priority parent block.
for _, sid := range meta.Compaction.Sources {
pid, ok := parents[sid]
// No parents for the source block so far.
if !ok {
parents[sid] = id
continue
}
pmeta, ok := s.blocks[pid]
if !ok {
return nil, errors.Errorf("previous parent block %s not found", pid)
}
// The current block is the higher priority parent for the source if its
// compaction level is higher than that of the previously set parent.
// If compaction levels are equal, the more recent ULID wins.
//
// The ULID recency alone is not sufficient since races, e.g. induced
// by downtime of garbage collection, may re-compact blocks that are
// were already compacted into higher-level blocks multiple times.
level, plevel := meta.Compaction.Level, pmeta.Compaction.Level

if level > plevel || (level == plevel && id.Compare(pid) > 0) {
parents[sid] = id
}
}
}

// A block can safely be deleted if they are not the highest priority parent for
// any source block.
topParents := map[ulid.ULID]struct{}{}
for _, pid := range parents {
topParents[pid] = struct{}{}
}

for id, meta := range s.blocks {
// Skip any block that has a different resolution.
if meta.Thanos.Downsample.Resolution != resolution {
continue
}
if _, ok := topParents[id]; ok {
continue
}

ids = append(ids, id)
}
return ids, nil
}

func (s *Syncer) garbageCollect(ctx context.Context, resolution int64) error {
garbageIds, err := s.GarbageBlocks(resolution)
if err != nil {
return err
}

garbageIds := s.duplicateBlocksFilter.GetDuplicateIDs()
for _, id := range garbageIds {
if ctx.Err() != nil {
return ctx.Err()
Expand All @@ -327,6 +247,7 @@ func (s *Syncer) garbageCollect(ctx context.Context, resolution int64) error {
err := block.Delete(delCtx, s.logger, s.bkt, id)
cancel()
if err != nil {
s.metrics.garbageCollectionFailures.Inc()
return retry(errors.Wrapf(err, "delete block %s from bucket", id))
}

Expand All @@ -335,6 +256,8 @@ func (s *Syncer) garbageCollect(ctx context.Context, resolution int64) error {
delete(s.blocks, id)
s.metrics.garbageCollectedBlocks.Inc()
}
s.metrics.garbageCollections.Inc()
s.metrics.garbageCollectionDuration.Observe(time.Since(begin).Seconds())
return nil
}

Expand Down
Loading

0 comments on commit d6bed5f

Please sign in to comment.