Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add block compaction delay metric #7635

Merged
merged 8 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [ENHANCEMENT] Store-gateway: merge series from different blocks concurrently. #7456
* [ENHANCEMENT] Store-gateway: Add `stage="wait_max_concurrent"` to `cortex_bucket_store_series_request_stage_duration_seconds` which records how long the query had to wait for its turn for `-blocks-storage.bucket-store.max-concurrent`. #7609
* [ENHANCEMENT] Querier: add `cortex_querier_federation_upstream_query_wait_duration_seconds` to observe time from when a querier picks up a cross-tenant query to when work begins on its single-tenant counterparts. #7209
* [ENHANCEMENT] Compactor: Add `cortex_compactor_block_compaction_delay_seconds` metric to track how long it takes to compact blocks. #7635
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
* [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520
* [BUGFIX] Ingester: Prevent timely compaction of empty blocks. #7624
Expand Down
17 changes: 17 additions & 0 deletions pkg/compactor/bucket_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -433,6 +434,13 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
// into the next planning cycle.
// Eventually the block we just uploaded should get synced into the job again (including sync-delay).
for _, meta := range toCompact {
attrs, err := block.GetMetaAttributes(ctx, meta, c.bkt)
if err != nil {
level.Warn(jobLogger).Log("msg", "failed to determine block upload time", "block", meta.ULID.String(), "err", err)
} else {
c.metrics.blockCompactionDelay.WithLabelValues(strconv.Itoa(meta.Compaction.Level)).Observe(compactionBegin.Sub(attrs.LastModified).Seconds())
}

if err := deleteBlock(c.bkt, meta.ULID, filepath.Join(subDir, meta.ULID.String()), jobLogger, c.metrics.blocksMarkedForDeletion); err != nil {
return false, nil, errors.Wrapf(err, "mark old block for deletion from bucket")
}
Expand Down Expand Up @@ -645,6 +653,7 @@ type BucketCompactorMetrics struct {
groupCompactionRunsCompleted prometheus.Counter
groupCompactionRunsFailed prometheus.Counter
groupCompactions prometheus.Counter
blockCompactionDelay *prometheus.HistogramVec
compactionBlocksVerificationFailed prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact *prometheus.CounterVec
Expand All @@ -670,6 +679,14 @@ func NewBucketCompactorMetrics(blocksMarkedForDeletion prometheus.Counter, reg p
Name: "cortex_compactor_group_compactions_total",
Help: "Total number of group compaction attempts that resulted in new block(s).",
}),
blockCompactionDelay: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_compactor_block_compaction_delay_seconds",
Help: "Delay between a block being uploaded and successfully compacting it.",
Buckets: []float64{60.0, 300.0, 600.0, 1800.0, 3600.0, 7200.0, 10800.0, 14400.0, 18000.0, 36000.0, 72000.0},
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
}, []string{"level"}),
compactionBlocksVerificationFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_blocks_verification_failures_total",
Help: "Total number of failures when verifying min/max time ranges of compacted blocks.",
Expand Down
7 changes: 2 additions & 5 deletions pkg/compactor/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"context"
"fmt"
"math"
"path"
"sort"
"time"

Expand Down Expand Up @@ -174,11 +173,9 @@ func jobWaitPeriodElapsed(ctx context.Context, job *Job, waitPeriod time.Duratio
continue
}

metaPath := path.Join(meta.ULID.String(), block.MetaFilename)

attrs, err := userBucket.Attributes(ctx, metaPath)
attrs, err := block.GetMetaAttributes(ctx, meta, userBucket)
if err != nil {
return false, meta, errors.Wrapf(err, "unable to get object attributes for %s", metaPath)
return false, meta, err
}

if attrs.LastModified.After(threshold) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/tsdb/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,16 @@ func GatherFileStats(blockDir string) (res []File, _ error) {
return res, err
}

// GetMetaAttributes returns the attributes for the block associated with the meta, using the userBucket to read the attributes.
func GetMetaAttributes(ctx context.Context, meta *Meta, bucketReader objstore.BucketReader) (objstore.ObjectAttributes, error) {
metaPath := path.Join(meta.ULID.String(), MetaFilename)
attrs, err := bucketReader.Attributes(ctx, metaPath)
if err != nil {
return objstore.ObjectAttributes{}, errors.Wrapf(err, "unable to get object attributes for %s", metaPath)
}
return attrs, nil
}

// MarkForNoCompact creates a file which marks block to be not compacted.
func MarkForNoCompact(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, reason NoCompactReason, details string, markedForNoCompact prometheus.Counter) error {
m := path.Join(id.String(), NoCompactMarkFilename)
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/tsdb/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func (f *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*Meta, error)
//
// - The block upload is completed: this is the normal case. meta.json file still exists in the
// object storage and it's expected to match the locally cached one (because it's immutable by design).
//
// - The block has been marked for deletion: the deletion hasn't started yet, so the full block (including
// the meta.json file) is still in the object storage. This case is not different than the previous one.
//
Expand Down Expand Up @@ -417,7 +418,7 @@ func (f *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*Meta, par
}

// FetchWithoutMarkedForDeletion returns all block metas as well as partial blocks (blocks without or with corrupted meta file) from the bucket.
// This function excludes all blocks for deletion (no deletion delay applied).
// This function excludes all blocks marked for deletion (no deletion delay applied).
// It's caller responsibility to not change the returned metadata files. Maps can be modified.
//
// Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing.
Expand Down
Loading