From f55cd4fd0462e5c68bdec19744b6ed60c23adfe1 Mon Sep 17 00:00:00 2001 From: Jonathan Halterman Date: Mon, 1 Apr 2024 16:04:26 -0700 Subject: [PATCH] Add block compaction delay metric (#7635) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add block compaction delay summary metric Co-authored-by: Peter Štibraný (cherry picked from commit 96952cf7bbfbdbf5fa77f22032e834a12a8ac236) --- CHANGELOG.md | 1 + pkg/compactor/bucket_compactor.go | 17 +++++++++++++++++ pkg/compactor/job.go | 7 ++----- pkg/storage/tsdb/block/block.go | 10 ++++++++++ pkg/storage/tsdb/block/fetcher.go | 3 ++- 5 files changed, 32 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b3e8a4f8737..8e5647e67cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ * [ENHANCEMENT] Store-gateway: merge series from different blocks concurrently. #7456 * [ENHANCEMENT] Store-gateway: Add `stage="wait_max_concurrent"` to `cortex_bucket_store_series_request_stage_duration_seconds` which records how long the query had to wait for its turn for `-blocks-storage.bucket-store.max-concurrent`. #7609 * [ENHANCEMENT] Querier: add `cortex_querier_federation_upstream_query_wait_duration_seconds` to observe time from when a querier picks up a cross-tenant query to when work begins on its single-tenant counterparts. #7209 +* [ENHANCEMENT] Compactor: Add `cortex_compactor_block_compaction_delay_seconds` metric to track how long it takes to compact blocks. #7635 * [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567 * [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520 * [BUGFIX] Ingester: Prevent timely compaction of empty blocks. #7624 diff --git a/pkg/compactor/bucket_compactor.go b/pkg/compactor/bucket_compactor.go index a31b9fe6e67..ea6ab82c017 100644 --- a/pkg/compactor/bucket_compactor.go +++ b/pkg/compactor/bucket_compactor.go @@ -11,6 +11,7 @@ import ( "os" "path" "path/filepath" + "strconv" "strings" "sync" "time" @@ -433,6 +434,13 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul // into the next planning cycle. // Eventually the block we just uploaded should get synced into the job again (including sync-delay). for _, meta := range toCompact { + attrs, err := block.GetMetaAttributes(ctx, meta, c.bkt) + if err != nil { + level.Warn(jobLogger).Log("msg", "failed to determine block upload time", "block", meta.ULID.String(), "err", err) + } else { + c.metrics.blockCompactionDelay.WithLabelValues(strconv.Itoa(meta.Compaction.Level)).Observe(compactionBegin.Sub(attrs.LastModified).Seconds()) + } + if err := deleteBlock(c.bkt, meta.ULID, filepath.Join(subDir, meta.ULID.String()), jobLogger, c.metrics.blocksMarkedForDeletion); err != nil { return false, nil, errors.Wrapf(err, "mark old block for deletion from bucket") } @@ -645,6 +653,7 @@ type BucketCompactorMetrics struct { groupCompactionRunsCompleted prometheus.Counter groupCompactionRunsFailed prometheus.Counter groupCompactions prometheus.Counter + blockCompactionDelay *prometheus.HistogramVec compactionBlocksVerificationFailed prometheus.Counter blocksMarkedForDeletion prometheus.Counter blocksMarkedForNoCompact *prometheus.CounterVec @@ -670,6 +679,14 @@ func NewBucketCompactorMetrics(blocksMarkedForDeletion prometheus.Counter, reg p Name: "cortex_compactor_group_compactions_total", Help: "Total number of group compaction attempts that resulted in new block(s).", }), + blockCompactionDelay: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_compactor_block_compaction_delay_seconds", + Help: "Delay between a block being uploaded and successfully compacting it.", + Buckets: []float64{60.0, 300.0, 600.0, 1800.0, 3600.0, 7200.0, 10800.0, 14400.0, 18000.0, 36000.0, 72000.0}, + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 1 * time.Hour, + }, []string{"level"}), compactionBlocksVerificationFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_blocks_verification_failures_total", Help: "Total number of failures when verifying min/max time ranges of compacted blocks.", diff --git a/pkg/compactor/job.go b/pkg/compactor/job.go index ffae8878b48..b4794a72f6c 100644 --- a/pkg/compactor/job.go +++ b/pkg/compactor/job.go @@ -6,7 +6,6 @@ import ( "context" "fmt" "math" - "path" "sort" "time" @@ -174,11 +173,9 @@ func jobWaitPeriodElapsed(ctx context.Context, job *Job, waitPeriod time.Duratio continue } - metaPath := path.Join(meta.ULID.String(), block.MetaFilename) - - attrs, err := userBucket.Attributes(ctx, metaPath) + attrs, err := block.GetMetaAttributes(ctx, meta, userBucket) if err != nil { - return false, meta, errors.Wrapf(err, "unable to get object attributes for %s", metaPath) + return false, meta, err } if attrs.LastModified.After(threshold) { diff --git a/pkg/storage/tsdb/block/block.go b/pkg/storage/tsdb/block/block.go index 8e97622e332..be8d04ee3ca 100644 --- a/pkg/storage/tsdb/block/block.go +++ b/pkg/storage/tsdb/block/block.go @@ -325,6 +325,16 @@ func GatherFileStats(blockDir string) (res []File, _ error) { return res, err } +// GetMetaAttributes returns the attributes for the block associated with the meta, using the userBucket to read the attributes. +func GetMetaAttributes(ctx context.Context, meta *Meta, bucketReader objstore.BucketReader) (objstore.ObjectAttributes, error) { + metaPath := path.Join(meta.ULID.String(), MetaFilename) + attrs, err := bucketReader.Attributes(ctx, metaPath) + if err != nil { + return objstore.ObjectAttributes{}, errors.Wrapf(err, "unable to get object attributes for %s", metaPath) + } + return attrs, nil +} + // MarkForNoCompact creates a file which marks block to be not compacted. func MarkForNoCompact(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, reason NoCompactReason, details string, markedForNoCompact prometheus.Counter) error { m := path.Join(id.String(), NoCompactMarkFilename) diff --git a/pkg/storage/tsdb/block/fetcher.go b/pkg/storage/tsdb/block/fetcher.go index 27c50f8809e..fad9a792155 100644 --- a/pkg/storage/tsdb/block/fetcher.go +++ b/pkg/storage/tsdb/block/fetcher.go @@ -194,6 +194,7 @@ func (f *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*Meta, error) // // - The block upload is completed: this is the normal case. meta.json file still exists in the // object storage and it's expected to match the locally cached one (because it's immutable by design). + // // - The block has been marked for deletion: the deletion hasn't started yet, so the full block (including // the meta.json file) is still in the object storage. This case is not different than the previous one. // @@ -417,7 +418,7 @@ func (f *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*Meta, par } // FetchWithoutMarkedForDeletion returns all block metas as well as partial blocks (blocks without or with corrupted meta file) from the bucket. -// This function excludes all blocks for deletion (no deletion delay applied). +// This function excludes all blocks marked for deletion (no deletion delay applied). // It's caller responsibility to not change the returned metadata files. Maps can be modified. // // Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing.