diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index d2953ae9c5..69e27737e5 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -586,24 +586,28 @@ func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]* return nil } -var _ MetadataFilter = &DeduplicateFilter{} +var _ MetadataFilter = &DefaultDeduplicateFilter{} -// DeduplicateFilter is a BaseFetcher filter that filters out older blocks that have exactly the same data. +type DeduplicateFilter interface { + DuplicateIDs() []ulid.ULID +} + +// DefaultDeduplicateFilter is a BaseFetcher filter that filters out older blocks that have exactly the same data. // Not go-routine safe. -type DeduplicateFilter struct { +type DefaultDeduplicateFilter struct { duplicateIDs []ulid.ULID concurrency int mu sync.Mutex } -// NewDeduplicateFilter creates DeduplicateFilter. -func NewDeduplicateFilter(concurrency int) *DeduplicateFilter { - return &DeduplicateFilter{concurrency: concurrency} +// NewDeduplicateFilter creates DefaultDeduplicateFilter. +func NewDeduplicateFilter(concurrency int) *DefaultDeduplicateFilter { + return &DefaultDeduplicateFilter{concurrency: concurrency} } // Filter filters out duplicate blocks that can be formed // from two or more overlapping blocks that fully submatches the source blocks of the older blocks. -func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error { +func (f *DefaultDeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error { f.duplicateIDs = f.duplicateIDs[:0] var wg sync.WaitGroup @@ -635,7 +639,7 @@ func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metad return nil } -func (f *DeduplicateFilter) filterGroup(metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec) { +func (f *DefaultDeduplicateFilter) filterGroup(metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec) { sort.Slice(metaSlice, func(i, j int) bool { ilen := len(metaSlice[i].Compaction.Sources) jlen := len(metaSlice[j].Compaction.Sources) @@ -677,8 +681,8 @@ childLoop: f.mu.Unlock() } -// DuplicateIDs returns slice of block ids that are filtered out by DeduplicateFilter. -func (f *DeduplicateFilter) DuplicateIDs() []ulid.ULID { +// DuplicateIDs returns slice of block ids that are filtered out by DefaultDeduplicateFilter. +func (f *DefaultDeduplicateFilter) DuplicateIDs() []ulid.ULID { return f.duplicateIDs } diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index 0f6f7be29d..a390ff7ae3 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -93,6 +93,9 @@ type Thanos struct { // IndexStats contains stats info related to block index. IndexStats IndexStats `json:"index_stats,omitempty"` + + // Extensions are used for plugin any arbitrary additional information for block. Optional. + Extensions any `json:"extensions,omitempty"` } type IndexStats struct { @@ -100,6 +103,26 @@ type IndexStats struct { ChunkMaxSize int64 `json:"chunk_max_size,omitempty"` } +func (m *Thanos) ParseExtensions(v any) (any, error) { + return ConvertExtensions(m.Extensions, v) +} + +// ConvertExtensions converts extensions with `any` type into specific type `v` +// that the caller expects. +func ConvertExtensions(extensions any, v any) (any, error) { + if extensions == nil { + return nil, nil + } + extensionsContent, err := json.Marshal(extensions) + if err != nil { + return nil, err + } + if err = json.Unmarshal(extensionsContent, v); err != nil { + return nil, err + } + return v, nil +} + type Rewrite struct { // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` diff --git a/pkg/block/metadata/meta_test.go b/pkg/block/metadata/meta_test.go index 94de50da64..ecfa075228 100644 --- a/pkg/block/metadata/meta_test.go +++ b/pkg/block/metadata/meta_test.go @@ -221,4 +221,127 @@ func TestMeta_ReadWrite(t *testing.T) { m1.Thanos.Labels = map[string]string{} testutil.Equals(t, m1, *retMeta) }) + + t.Run("extensions write/read/write", func(t *testing.T) { + b := bytes.Buffer{} + m1 := Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: ulid.MustNew(5, nil), + MinTime: 2424, + MaxTime: 134, + Version: 1, + Compaction: tsdb.BlockMetaCompaction{ + Level: 123, + }, + Stats: tsdb.BlockStats{NumChunks: 14, NumSamples: 245, NumSeries: 4}, + }, + Thanos: Thanos{ + Labels: map[string]string{"ext": "lset1"}, + Source: ReceiveSource, + Downsample: ThanosDownsample{ + Resolution: 123144, + }, + Extensions: &TestExtensions{ + Field1: 1, + Field2: "test_string", + }, + }, + } + testutil.Ok(t, m1.Write(&b)) + testutil.Equals(t, `{ + "ulid": "00000000050000000000000000", + "minTime": 2424, + "maxTime": 134, + "stats": { + "numSamples": 245, + "numSeries": 4, + "numChunks": 14 + }, + "compaction": { + "level": 123 + }, + "version": 1, + "thanos": { + "labels": { + "ext": "lset1" + }, + "downsample": { + "resolution": 123144 + }, + "source": "receive", + "index_stats": {}, + "extensions": { + "field1": 1, + "field2": "test_string" + } + } +} +`, b.String()) + retMeta, err := Read(io.NopCloser(&b)) + testutil.Ok(t, err) + retExtensions, err := retMeta.Thanos.ParseExtensions(&TestExtensions{}) + _, ok := retExtensions.(*TestExtensions) + testutil.Equals(t, true, ok) + testutil.Ok(t, err) + testutil.Equals(t, m1.Thanos.Extensions, retExtensions) + }) + + t.Run("empty extensions write/read/write", func(t *testing.T) { + b := bytes.Buffer{} + m1 := Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: ulid.MustNew(5, nil), + MinTime: 2424, + MaxTime: 134, + Version: 1, + Compaction: tsdb.BlockMetaCompaction{ + Level: 123, + }, + Stats: tsdb.BlockStats{NumChunks: 14, NumSamples: 245, NumSeries: 4}, + }, + Thanos: Thanos{ + Labels: map[string]string{"ext": "lset1"}, + Source: ReceiveSource, + Downsample: ThanosDownsample{ + Resolution: 123144, + }, + }, + } + testutil.Ok(t, m1.Write(&b)) + testutil.Equals(t, `{ + "ulid": "00000000050000000000000000", + "minTime": 2424, + "maxTime": 134, + "stats": { + "numSamples": 245, + "numSeries": 4, + "numChunks": 14 + }, + "compaction": { + "level": 123 + }, + "version": 1, + "thanos": { + "labels": { + "ext": "lset1" + }, + "downsample": { + "resolution": 123144 + }, + "source": "receive", + "index_stats": {} + } +} +`, b.String()) + retMeta, err := Read(io.NopCloser(&b)) + testutil.Ok(t, err) + retExtensions, err := retMeta.Thanos.ParseExtensions(&TestExtensions{}) + testutil.Ok(t, err) + testutil.Equals(t, m1.Thanos.Extensions, retExtensions) + }) +} + +type TestExtensions struct { + Field1 int `json:"field1"` + Field2 string `json:"field2"` } diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index f36e70dc81..2e20ad73db 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -58,7 +58,7 @@ type Syncer struct { blocks map[ulid.ULID]*metadata.Meta partial map[ulid.ULID]error metrics *syncerMetrics - duplicateBlocksFilter *block.DeduplicateFilter + duplicateBlocksFilter block.DeduplicateFilter ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter } @@ -95,7 +95,7 @@ func newSyncerMetrics(reg prometheus.Registerer, blocksMarkedForDeletion, garbag // NewMetaSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. -func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter *block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks prometheus.Counter) (*Syncer, error) { +func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks prometheus.Counter) (*Syncer, error) { if logger == nil { logger = log.NewNopLogger() } @@ -351,6 +351,7 @@ type Group struct { hashFunc metadata.HashFunc blockFilesConcurrency int compactBlocksFetchConcurrency int + extensions any } // NewGroup returns a new compaction group. @@ -491,6 +492,14 @@ func (cg *Group) Resolution() int64 { return cg.resolution } +func (cg *Group) Extensions() any { + return cg.extensions +} + +func (cg *Group) SetExtensions(extensions any) { + cg.extensions = extensions +} + // CompactProgressMetrics contains Prometheus metrics related to compaction progress. type CompactProgressMetrics struct { NumberOfCompactionRuns prometheus.Gauge @@ -536,7 +545,7 @@ func (ps *CompactionProgressCalculator) ProgressCalculate(ctx context.Context, g if len(g.IDs()) == 1 { continue } - plan, err := ps.planner.Plan(ctx, g.metasByMinTime) + plan, err := ps.planner.Plan(ctx, g.metasByMinTime, nil, g.extensions) if err != nil { return errors.Wrapf(err, "could not plan") } @@ -728,7 +737,50 @@ func (rs *RetentionProgressCalculator) ProgressCalculate(ctx context.Context, gr type Planner interface { // Plan returns a list of blocks that should be compacted into single one. // The blocks can be overlapping. The provided metadata has to be ordered by minTime. - Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) + Plan(ctx context.Context, metasByMinTime []*metadata.Meta, errChan chan error, extensions any) ([]*metadata.Meta, error) +} + +type BlockDeletableChecker interface { + CanDelete(group *Group, blockID ulid.ULID) bool +} + +type DefaultBlockDeletableChecker struct { +} + +func (c DefaultBlockDeletableChecker) CanDelete(_ *Group, _ ulid.ULID) bool { + return true +} + +type CompactionLifecycleCallback interface { + PreCompactionCallback(ctx context.Context, logger log.Logger, group *Group, toCompactBlocks []*metadata.Meta) error + PostCompactionCallback(ctx context.Context, logger log.Logger, group *Group, blockID ulid.ULID) error + GetBlockPopulator(ctx context.Context, logger log.Logger, group *Group) (tsdb.BlockPopulator, error) +} + +type DefaultCompactionLifecycleCallback struct { +} + +func (c DefaultCompactionLifecycleCallback) PreCompactionCallback(_ context.Context, _ log.Logger, _ *Group, toCompactBlocks []*metadata.Meta) error { + // Due to #183 we verify that none of the blocks in the plan have overlapping sources. + // This is one potential source of how we could end up with duplicated chunks. + uniqueSources := map[ulid.ULID]struct{}{} + for _, m := range toCompactBlocks { + for _, s := range m.Compaction.Sources { + if _, ok := uniqueSources[s]; ok { + return halt(errors.Errorf("overlapping sources detected for plan %v", toCompactBlocks)) + } + uniqueSources[s] = struct{}{} + } + } + return nil +} + +func (c DefaultCompactionLifecycleCallback) PostCompactionCallback(_ context.Context, _ log.Logger, _ *Group, _ ulid.ULID) error { + return nil +} + +func (c DefaultCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, _ log.Logger, _ *Group) (tsdb.BlockPopulator, error) { + return tsdb.DefaultBlockPopulator{}, nil } // Compactor provides compaction against an underlying storage of time series data. @@ -748,11 +800,12 @@ type Compactor interface { // * The source dirs are marked Deletable. // * Returns empty ulid.ULID{}. Compact(dest string, dirs []string, open []*tsdb.Block) (ulid.ULID, error) + CompactWithBlockPopulator(dest string, dirs []string, open []*tsdb.Block, blockPopulator tsdb.BlockPopulator) (ulid.ULID, error) } // Compact plans and runs a single compaction against the group. The compacted result // is uploaded into the bucket the blocks were retrieved from. -func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, compID ulid.ULID, rerr error) { +func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp Compactor, blockDeletableChecker BlockDeletableChecker, compactionLifecycleCallback CompactionLifecycleCallback) (shouldRerun bool, compID ulid.ULID, rerr error) { cg.compactionRunsStarted.Inc() subDir := filepath.Join(dir, cg.Key()) @@ -772,10 +825,13 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir") } + errChan := make(chan error, 1) err := tracing.DoInSpanWithErr(ctx, "compaction_group", func(ctx context.Context) (err error) { - shouldRerun, compID, err = cg.compact(ctx, subDir, planner, comp) + shouldRerun, compID, err = cg.compact(ctx, subDir, planner, comp, blockDeletableChecker, compactionLifecycleCallback, errChan) return err }, opentracing.Tags{"group.key": cg.Key()}) + errChan <- err + close(errChan) if err != nil { cg.compactionFailures.Inc() return false, ulid.ULID{}, err @@ -975,7 +1031,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, return nil } -func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, compID ulid.ULID, _ error) { +func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor, blockDeletableChecker BlockDeletableChecker, compactionLifecycleCallback CompactionLifecycleCallback, errChan chan error) (shouldRerun bool, compID ulid.ULID, _ error) { cg.mtx.Lock() defer cg.mtx.Unlock() @@ -993,7 +1049,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp var toCompact []*metadata.Meta if err := tracing.DoInSpanWithErr(ctx, "compaction_planning", func(ctx context.Context) (e error) { - toCompact, e = planner.Plan(ctx, cg.metasByMinTime) + toCompact, e = planner.Plan(ctx, cg.metasByMinTime, errChan, cg.extensions) return e }); err != nil { return false, ulid.ULID{}, errors.Wrap(err, "plan compaction") @@ -1003,35 +1059,35 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp return false, ulid.ULID{}, nil } - level.Info(cg.logger).Log("msg", "compaction available and planned; downloading blocks", "plan", fmt.Sprintf("%v", toCompact)) - - // Due to #183 we verify that none of the blocks in the plan have overlapping sources. - // This is one potential source of how we could end up with duplicated chunks. - uniqueSources := map[ulid.ULID]struct{}{} + level.Info(cg.logger).Log("msg", "compaction available and planned", "plan", fmt.Sprintf("%v", toCompact)) // Once we have a plan we need to download the actual data. groupCompactionBegin := time.Now() begin := groupCompactionBegin + + if err := compactionLifecycleCallback.PreCompactionCallback(ctx, cg.logger, cg, toCompact); err != nil { + return false, ulid.ULID{}, errors.Wrapf(err, "failed to run pre compaction callback for plan: %s", fmt.Sprintf("%v", toCompact)) + } + level.Info(cg.logger).Log("msg", "finished running pre compaction callback; downloading blocks", "plan", fmt.Sprintf("%v", toCompact), "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) + + begin = time.Now() g, errCtx := errgroup.WithContext(ctx) g.SetLimit(cg.compactBlocksFetchConcurrency) toCompactDirs := make([]string, 0, len(toCompact)) for _, m := range toCompact { bdir := filepath.Join(dir, m.ULID.String()) - for _, s := range m.Compaction.Sources { - if _, ok := uniqueSources[s]; ok { - return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", toCompact)) - } - uniqueSources[s] = struct{}{} - } func(ctx context.Context, meta *metadata.Meta) { g.Go(func() error { + start := time.Now() if err := tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error { return block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFilesConcurrency)) }, opentracing.Tags{"block.id": meta.ULID}); err != nil { return retry(errors.Wrapf(err, "download block %s", meta.ULID)) } + level.Debug(cg.logger).Log("msg", "downloaded block", "block", meta.ULID.String(), "duration", time.Since(start), "duration_ms", time.Since(start).Milliseconds()) + start = time.Now() // Ensure all input blocks are valid. var stats block.HealthStats if err := tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) (e error) { @@ -1057,6 +1113,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp return errors.Wrapf(err, "block id %s, try running with --debug.accept-malformed-index", meta.ULID) } + level.Debug(cg.logger).Log("msg", "verified block", "block", meta.ULID.String(), "duration", time.Since(start), "duration_ms", time.Since(start).Milliseconds()) return nil }) }(errCtx, m) @@ -1073,7 +1130,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp begin = time.Now() if err := tracing.DoInSpanWithErr(ctx, "compaction", func(ctx context.Context) (e error) { - compID, e = comp.Compact(dir, toCompactDirs, nil) + populateBlockFunc, e := compactionLifecycleCallback.GetBlockPopulator(ctx, cg.logger, cg) + if e != nil { + return e + } + compID, e = comp.CompactWithBlockPopulator(dir, toCompactDirs, nil, populateBlockFunc) return e }); err != nil { return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact blocks %v", toCompactDirs)) @@ -1083,7 +1144,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp level.Info(cg.logger).Log("msg", "compacted block would have no samples, deleting source blocks", "blocks", sourceBlockStr) for _, meta := range toCompact { if meta.Stats.NumSamples == 0 { - if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())); err != nil { + if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String()), blockDeletableChecker); err != nil { level.Warn(cg.logger).Log("msg", "failed to mark for deletion an empty block found during compaction", "block", meta.ULID) } } @@ -1128,6 +1189,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Downsample: metadata.ThanosDownsample{Resolution: cg.resolution}, Source: metadata.CompactorSource, SegmentFiles: block.GetSegmentFiles(bdir), + Extensions: cg.extensions, } if stats.ChunkMaxSize > 0 { thanosMeta.IndexStats.ChunkMaxSize = stats.ChunkMaxSize @@ -1163,7 +1225,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp // Eventually the block we just uploaded should get synced into the group again (including sync-delay). for _, meta := range toCompact { err = tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error { - return cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())) + return cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String()), blockDeletableChecker) }, opentracing.Tags{"block.id": meta.ULID}) if err != nil { return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket")) @@ -1171,22 +1233,30 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp cg.groupGarbageCollectedBlocks.Inc() } + level.Info(cg.logger).Log("msg", "running post compaction callback", "result_block", compID) + if err := compactionLifecycleCallback.PostCompactionCallback(ctx, cg.logger, cg, compID); err != nil { + return false, ulid.ULID{}, retry(errors.Wrapf(err, "failed to run post compaction callback for result block %s", compID)) + } + level.Info(cg.logger).Log("msg", "finished running post compaction callback", "result_block", compID) + level.Info(cg.logger).Log("msg", "finished compacting blocks", "result_block", compID, "source_blocks", sourceBlockStr, "duration", time.Since(groupCompactionBegin), "duration_ms", time.Since(groupCompactionBegin).Milliseconds()) return true, compID, nil } -func (cg *Group) deleteBlock(id ulid.ULID, bdir string) error { +func (cg *Group) deleteBlock(id ulid.ULID, bdir string, blockDeletableChecker BlockDeletableChecker) error { if err := os.RemoveAll(bdir); err != nil { return errors.Wrapf(err, "remove old block dir %s", id) } - // Spawn a new context so we always mark a block for deletion in full on shutdown. - delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - defer cancel() - level.Info(cg.logger).Log("msg", "marking compacted block for deletion", "old_block", id) - if err := block.MarkForDeletion(delCtx, cg.logger, cg.bkt, id, "source of compacted block", cg.blocksMarkedForDeletion); err != nil { - return errors.Wrapf(err, "mark block %s for deletion from bucket", id) + if blockDeletableChecker.CanDelete(cg, id) { + // Spawn a new context so we always mark a block for deletion in full on shutdown. + delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + level.Info(cg.logger).Log("msg", "marking compacted block for deletion", "old_block", id) + if err := block.MarkForDeletion(delCtx, cg.logger, cg.bkt, id, "source of compacted block", cg.blocksMarkedForDeletion); err != nil { + return errors.Wrapf(err, "mark block %s for deletion from bucket", id) + } } return nil } @@ -1198,6 +1268,8 @@ type BucketCompactor struct { grouper Grouper comp Compactor planner Planner + blockDeletableChecker BlockDeletableChecker + compactionLifecycleCallback CompactionLifecycleCallback compactDir string bkt objstore.Bucket concurrency int @@ -1215,6 +1287,37 @@ func NewBucketCompactor( bkt objstore.Bucket, concurrency int, skipBlocksWithOutOfOrderChunks bool, +) (*BucketCompactor, error) { + if concurrency <= 0 { + return nil, errors.Errorf("invalid concurrency level (%d), concurrency level must be > 0", concurrency) + } + return NewBucketCompactorWithCheckerAndCallback( + logger, + sy, + grouper, + planner, + comp, + DefaultBlockDeletableChecker{}, + DefaultCompactionLifecycleCallback{}, + compactDir, + bkt, + concurrency, + skipBlocksWithOutOfOrderChunks, + ) +} + +func NewBucketCompactorWithCheckerAndCallback( + logger log.Logger, + sy *Syncer, + grouper Grouper, + planner Planner, + comp Compactor, + blockDeletableChecker BlockDeletableChecker, + compactionLifecycleCallback CompactionLifecycleCallback, + compactDir string, + bkt objstore.Bucket, + concurrency int, + skipBlocksWithOutOfOrderChunks bool, ) (*BucketCompactor, error) { if concurrency <= 0 { return nil, errors.Errorf("invalid concurrency level (%d), concurrency level must be > 0", concurrency) @@ -1225,6 +1328,8 @@ func NewBucketCompactor( grouper: grouper, planner: planner, comp: comp, + blockDeletableChecker: blockDeletableChecker, + compactionLifecycleCallback: compactionLifecycleCallback, compactDir: compactDir, bkt: bkt, concurrency: concurrency, @@ -1265,7 +1370,7 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { go func() { defer wg.Done() for g := range groupChan { - shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.planner, c.comp) + shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.planner, c.comp, c.blockDeletableChecker, c.compactionLifecycleCallback) if err == nil { if shouldRerunGroup { mtx.Lock() diff --git a/pkg/compact/planner.go b/pkg/compact/planner.go index 5c2a93df8d..783191cacf 100644 --- a/pkg/compact/planner.go +++ b/pkg/compact/planner.go @@ -49,7 +49,7 @@ func NewPlanner(logger log.Logger, ranges []int64, noCompBlocks *GatherNoCompact } // TODO(bwplotka): Consider smarter algorithm, this prefers smaller iterative compactions vs big single one: https://github.com/thanos-io/thanos/issues/3405 -func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { +func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error) { return p.plan(p.noCompBlocksFunc(), metasByMinTime) } @@ -243,7 +243,7 @@ func WithLargeTotalIndexSizeFilter(with *tsdbBasedPlanner, bkt objstore.Bucket, return &largeTotalIndexSizeFilter{tsdbBasedPlanner: with, bkt: bkt, totalMaxIndexSizeBytes: totalMaxIndexSizeBytes, markedForNoCompact: markedForNoCompact} } -func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { +func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error) { noCompactMarked := t.noCompBlocksFunc() copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(noCompactMarked)) for k, v := range noCompactMarked { diff --git a/pkg/compact/planner_test.go b/pkg/compact/planner_test.go index 9c36dbd54b..256a8be7bb 100644 --- a/pkg/compact/planner_test.go +++ b/pkg/compact/planner_test.go @@ -30,7 +30,7 @@ type tsdbPlannerAdapter struct { comp tsdb.Compactor } -func (p *tsdbPlannerAdapter) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { +func (p *tsdbPlannerAdapter) Plan(_ context.Context, metasByMinTime []*metadata.Meta, errChan chan error, _ any) ([]*metadata.Meta, error) { // TSDB planning works based on the meta.json files in the given dir. Mock it up. for _, meta := range metasByMinTime { bdir := filepath.Join(p.dir, meta.ULID.String()) @@ -364,7 +364,7 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { }) tsdbPlanner.dir = dir - plan, err := tsdbPlanner.Plan(context.Background(), metasByMinTime) + plan, err := tsdbPlanner.Plan(context.Background(), metasByMinTime, nil, nil) testutil.Ok(t, err) testutil.Equals(t, c.expected, plan) }) @@ -377,7 +377,7 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { return metasByMinTime[i].MinTime < metasByMinTime[j].MinTime }) - plan, err := tsdbBasedPlanner.Plan(context.Background(), metasByMinTime) + plan, err := tsdbBasedPlanner.Plan(context.Background(), metasByMinTime, nil, nil) testutil.Ok(t, err) testutil.Equals(t, c.expected, plan) }) @@ -440,12 +440,12 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() tsdbPlanner.dir = dir - plan, err := tsdbPlanner.Plan(context.Background(), c.metas) + plan, err := tsdbPlanner.Plan(context.Background(), c.metas, nil, nil) testutil.Ok(t, err) testutil.Equals(t, []*metadata.Meta(nil), plan) }) t.Run("tsdbBasedPlanner", func(t *testing.T) { - plan, err := tsdbBasedPlanner.Plan(context.Background(), c.metas) + plan, err := tsdbBasedPlanner.Plan(context.Background(), c.metas, nil, nil) testutil.Ok(t, err) testutil.Equals(t, []*metadata.Meta(nil), plan) }) @@ -638,7 +638,7 @@ func TestTSDBBasedPlanner_PlanWithNoCompactMarks(t *testing.T) { return metasByMinTime[i].MinTime < metasByMinTime[j].MinTime }) g.noCompactMarkedMap = c.noCompactMarks - plan, err := tsdbBasedPlanner.Plan(context.Background(), metasByMinTime) + plan, err := tsdbBasedPlanner.Plan(context.Background(), metasByMinTime, nil, nil) testutil.Ok(t, err) testutil.Equals(t, c.expected, plan) }) @@ -814,7 +814,7 @@ func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { return metasByMinTime[i].MinTime < metasByMinTime[j].MinTime }) - plan, err := planner.Plan(context.Background(), metasByMinTime) + plan, err := planner.Plan(context.Background(), metasByMinTime, nil, nil) testutil.Ok(t, err) for _, m := range plan { @@ -847,7 +847,7 @@ func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { m.Thanos = metadata.Thanos{} } - plan, err := planner.Plan(context.Background(), metasByMinTime) + plan, err := planner.Plan(context.Background(), metasByMinTime, nil, nil) testutil.Ok(t, err) testutil.Equals(t, c.expected, plan) testutil.Equals(t, c.expectedMarks, promtest.ToFloat64(marked)-lastMarkValue)