From ccd36b77f1b4e9b47157b91c3b4ee50da5448abd Mon Sep 17 00:00:00 2001 From: Jonathan Halterman Date: Fri, 15 Mar 2024 12:55:14 -0700 Subject: [PATCH 1/8] Optimize marking blocks as queried --- pkg/storegateway/bucket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index 5406757df9f..e009d8a2f98 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -628,7 +628,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor level.Debug(s.logger).Log("msg", "queried non-compacted block", "blockId", b.meta.ULID, "ooo", b.meta.Compaction.FromOutOfOrder()) } - b.queried.Store(true) + b.queried.CompareAndSwap(false, true) } if err := s.sendHints(srv, resHints); err != nil { return err From 05f4aa219bd0162af7fe4d283b5bb3ff200eb533 Mon Sep 17 00:00:00 2001 From: Jonathan Halterman Date: Fri, 15 Mar 2024 18:09:13 -0700 Subject: [PATCH 2/8] Add block compaction delay summary metric --- pkg/compactor/bucket_compactor.go | 10 ++++++++++ pkg/compactor/job.go | 7 ++----- pkg/storage/tsdb/block/block.go | 10 ++++++++++ pkg/storage/tsdb/block/fetcher.go | 9 ++++++++- pkg/storage/tsdb/block/fetcher_test.go | 2 +- pkg/storage/tsdb/block/meta.go | 4 +++- 6 files changed, 34 insertions(+), 8 deletions(-) diff --git a/pkg/compactor/bucket_compactor.go b/pkg/compactor/bucket_compactor.go index a31b9fe6e67..def2f6cb564 100644 --- a/pkg/compactor/bucket_compactor.go +++ b/pkg/compactor/bucket_compactor.go @@ -11,6 +11,7 @@ import ( "os" "path" "path/filepath" + "strconv" "strings" "sync" "time" @@ -433,6 +434,10 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul // into the next planning cycle. // Eventually the block we just uploaded should get synced into the job again (including sync-delay). for _, meta := range toCompact { + if meta.LastModified != nil { + c.metrics.blockCompactionDelay.WithLabelValues(strconv.Itoa(meta.Compaction.Level)).Observe(compactionBegin.Sub(*meta.LastModified).Seconds()) + } + if err := deleteBlock(c.bkt, meta.ULID, filepath.Join(subDir, meta.ULID.String()), jobLogger, c.metrics.blocksMarkedForDeletion); err != nil { return false, nil, errors.Wrapf(err, "mark old block for deletion from bucket") } @@ -645,6 +650,7 @@ type BucketCompactorMetrics struct { groupCompactionRunsCompleted prometheus.Counter groupCompactionRunsFailed prometheus.Counter groupCompactions prometheus.Counter + blockCompactionDelay *prometheus.SummaryVec compactionBlocksVerificationFailed prometheus.Counter blocksMarkedForDeletion prometheus.Counter blocksMarkedForNoCompact *prometheus.CounterVec @@ -670,6 +676,10 @@ func NewBucketCompactorMetrics(blocksMarkedForDeletion prometheus.Counter, reg p Name: "cortex_compactor_group_compactions_total", Help: "Total number of group compaction attempts that resulted in new block(s).", }), + blockCompactionDelay: promauto.With(reg).NewSummaryVec(prometheus.SummaryOpts{ + Name: "cortex_compactor_block_compaction_delay_seconds", + Help: "Delay between a block being created and successfully compacting it in seconds.", + }, []string{"level"}), compactionBlocksVerificationFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_blocks_verification_failures_total", Help: "Total number of failures when verifying min/max time ranges of compacted blocks.", diff --git a/pkg/compactor/job.go b/pkg/compactor/job.go index ffae8878b48..abc06c635d5 100644 --- a/pkg/compactor/job.go +++ b/pkg/compactor/job.go @@ -6,7 +6,6 @@ import ( "context" "fmt" "math" - "path" "sort" "time" @@ -174,11 +173,9 @@ func jobWaitPeriodElapsed(ctx context.Context, job *Job, waitPeriod time.Duratio continue } - metaPath := path.Join(meta.ULID.String(), block.MetaFilename) - - attrs, err := userBucket.Attributes(ctx, metaPath) + attrs, err := block.GetAttributes(ctx, meta, userBucket) if err != nil { - return false, meta, errors.Wrapf(err, "unable to get object attributes for %s", metaPath) + return false, meta, err } if attrs.LastModified.After(threshold) { diff --git a/pkg/storage/tsdb/block/block.go b/pkg/storage/tsdb/block/block.go index 8e97622e332..512a053b208 100644 --- a/pkg/storage/tsdb/block/block.go +++ b/pkg/storage/tsdb/block/block.go @@ -325,6 +325,16 @@ func GatherFileStats(blockDir string) (res []File, _ error) { return res, err } +// GetAttributes returns the attributes for the block associated with the meta, using the userBucket to read the attributes. +func GetAttributes(ctx context.Context, meta *Meta, bucketReader objstore.BucketReader) (objstore.ObjectAttributes, error) { + metaPath := path.Join(meta.ULID.String(), MetaFilename) + attrs, err := bucketReader.Attributes(ctx, metaPath) + if err != nil { + return objstore.ObjectAttributes{}, errors.Wrapf(err, "unable to get object attributes for %s", metaPath) + } + return attrs, nil +} + // MarkForNoCompact creates a file which marks block to be not compacted. func MarkForNoCompact(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, reason NoCompactReason, details string, markedForNoCompact prometheus.Counter) error { m := path.Join(id.String(), NoCompactMarkFilename) diff --git a/pkg/storage/tsdb/block/fetcher.go b/pkg/storage/tsdb/block/fetcher.go index 27c50f8809e..e96685af9f6 100644 --- a/pkg/storage/tsdb/block/fetcher.go +++ b/pkg/storage/tsdb/block/fetcher.go @@ -194,6 +194,7 @@ func (f *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*Meta, error) // // - The block upload is completed: this is the normal case. meta.json file still exists in the // object storage and it's expected to match the locally cached one (because it's immutable by design). + // // - The block has been marked for deletion: the deletion hasn't started yet, so the full block (including // the meta.json file) is still in the object storage. This case is not different than the previous one. // @@ -248,6 +249,12 @@ func (f *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*Meta, error) return nil, errors.Errorf("unexpected meta file: %s version: %d", metaFile, m.Version) } + attrs, err := GetAttributes(ctx, m, f.bkt) + if err != nil { + return nil, err + } + m.LastModified = &attrs.LastModified + // Best effort cache in local dir. if f.cacheDir != "" { if err := os.MkdirAll(cachedBlockDir, os.ModePerm); err != nil { @@ -417,7 +424,7 @@ func (f *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*Meta, par } // FetchWithoutMarkedForDeletion returns all block metas as well as partial blocks (blocks without or with corrupted meta file) from the bucket. -// This function excludes all blocks for deletion (no deletion delay applied). +// This function excludes all blocks marked for deletion (no deletion delay applied). // It's caller responsibility to not change the returned metadata files. Maps can be modified. // // Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing. diff --git a/pkg/storage/tsdb/block/fetcher_test.go b/pkg/storage/tsdb/block/fetcher_test.go index 74edbbbb537..a42d18c49ec 100644 --- a/pkg/storage/tsdb/block/fetcher_test.go +++ b/pkg/storage/tsdb/block/fetcher_test.go @@ -295,7 +295,7 @@ func TestMetaFetcher_ShouldNotIssueAnyAPICallToObjectStorageIfAllBlockMetasAreCa assert.NoError(t, testutil.GatherAndCompare(reg1, strings.NewReader(` # HELP thanos_objstore_bucket_operations_total Total number of all attempted operations against a bucket. # TYPE thanos_objstore_bucket_operations_total counter - thanos_objstore_bucket_operations_total{bucket="test",operation="attributes"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="attributes"} 2 thanos_objstore_bucket_operations_total{bucket="test",operation="delete"} 0 thanos_objstore_bucket_operations_total{bucket="test",operation="exists"} 0 thanos_objstore_bucket_operations_total{bucket="test",operation="get"} 2 diff --git a/pkg/storage/tsdb/block/meta.go b/pkg/storage/tsdb/block/meta.go index cc3a2ffc54e..00e221ab48e 100644 --- a/pkg/storage/tsdb/block/meta.go +++ b/pkg/storage/tsdb/block/meta.go @@ -11,6 +11,7 @@ import ( "io" "os" "path/filepath" + "time" "github.com/go-kit/log" "github.com/grafana/dskit/runutil" @@ -44,7 +45,8 @@ const ( type Meta struct { tsdb.BlockMeta - Thanos ThanosMeta `json:"thanos"` + Thanos ThanosMeta `json:"thanos"` + LastModified *time.Time `json:"last_modified,omitempty"` } func (m *Meta) String() string { From b0d062bed1e1c7128ed6e04eb4231eeed1888e8f Mon Sep 17 00:00:00 2001 From: Jonathan Halterman Date: Thu, 21 Mar 2024 19:01:01 -0700 Subject: [PATCH 3/8] Address PR comments - Change summary to a histogram - Make LastUpdated field transient - Fetch attributes only when compactor is fetching meta - Use LastUpdated when filtering jobs --- pkg/compactor/bucket_compactor.go | 18 +++++++++------- pkg/compactor/job.go | 11 ++-------- pkg/compactor/job_test.go | 3 +-- pkg/storage/tsdb/block/fetcher.go | 35 +++++++++++++++++++------------ pkg/storage/tsdb/block/meta.go | 2 +- 5 files changed, 37 insertions(+), 32 deletions(-) diff --git a/pkg/compactor/bucket_compactor.go b/pkg/compactor/bucket_compactor.go index def2f6cb564..6b2683bc267 100644 --- a/pkg/compactor/bucket_compactor.go +++ b/pkg/compactor/bucket_compactor.go @@ -650,7 +650,7 @@ type BucketCompactorMetrics struct { groupCompactionRunsCompleted prometheus.Counter groupCompactionRunsFailed prometheus.Counter groupCompactions prometheus.Counter - blockCompactionDelay *prometheus.SummaryVec + blockCompactionDelay *prometheus.HistogramVec compactionBlocksVerificationFailed prometheus.Counter blocksMarkedForDeletion prometheus.Counter blocksMarkedForNoCompact *prometheus.CounterVec @@ -676,9 +676,13 @@ func NewBucketCompactorMetrics(blocksMarkedForDeletion prometheus.Counter, reg p Name: "cortex_compactor_group_compactions_total", Help: "Total number of group compaction attempts that resulted in new block(s).", }), - blockCompactionDelay: promauto.With(reg).NewSummaryVec(prometheus.SummaryOpts{ - Name: "cortex_compactor_block_compaction_delay_seconds", - Help: "Delay between a block being created and successfully compacting it in seconds.", + blockCompactionDelay: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_compactor_block_compaction_delay_seconds", + Help: "Delay between a block being created and successfully compacting it in seconds.", + Buckets: []float64{1.0, 5.0, 10.0, 15.0, 30.0, 60.0, 120.0, 300.0, 600.0, 1200.0, 2400.0, 3600.0, 7200.0}, + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 1 * time.Hour, }, []string{"level"}), compactionBlocksVerificationFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_blocks_verification_failures_total", @@ -923,7 +927,7 @@ func (c *BucketCompactor) Compact(ctx context.Context, maxCompactionTime time.Du } // Skip jobs for which the wait period hasn't been honored yet. - jobs = c.filterJobsByWaitPeriod(ctx, jobs) + jobs = c.filterJobsByWaitPeriod(jobs) // Sort jobs based on the configured ordering algorithm. jobs = c.sortJobs(jobs) @@ -1009,9 +1013,9 @@ func (c *BucketCompactor) filterOwnJobs(jobs []*Job) ([]*Job, error) { } // filterJobsByWaitPeriod filters out jobs for which the configured wait period hasn't been honored yet. -func (c *BucketCompactor) filterJobsByWaitPeriod(ctx context.Context, jobs []*Job) []*Job { +func (c *BucketCompactor) filterJobsByWaitPeriod(jobs []*Job) []*Job { for i := 0; i < len(jobs); { - if elapsed, notElapsedBlock, err := jobWaitPeriodElapsed(ctx, jobs[i], c.waitPeriod, c.bkt); err != nil { + if elapsed, notElapsedBlock, err := jobWaitPeriodElapsed(jobs[i], c.waitPeriod); err != nil { level.Warn(c.logger).Log("msg", "not enforcing compaction wait period because the check if compaction job contains recently uploaded blocks has failed", "groupKey", jobs[i].Key(), "err", err) // Keep the job. diff --git a/pkg/compactor/job.go b/pkg/compactor/job.go index abc06c635d5..b994f6f8c59 100644 --- a/pkg/compactor/job.go +++ b/pkg/compactor/job.go @@ -3,7 +3,6 @@ package compactor import ( - "context" "fmt" "math" "sort" @@ -12,7 +11,6 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" - "github.com/thanos-io/objstore" "github.com/grafana/mimir/pkg/storage/tsdb/block" ) @@ -155,7 +153,7 @@ func (job *Job) String() string { // elapsed for the input job. If the wait period has not elapsed, then this function // also returns the Meta of the first source block encountered for which the wait // period has not elapsed yet. -func jobWaitPeriodElapsed(ctx context.Context, job *Job, waitPeriod time.Duration, userBucket objstore.Bucket) (bool, *block.Meta, error) { +func jobWaitPeriodElapsed(job *Job, waitPeriod time.Duration) (bool, *block.Meta, error) { if waitPeriod <= 0 { return true, nil, nil } @@ -173,12 +171,7 @@ func jobWaitPeriodElapsed(ctx context.Context, job *Job, waitPeriod time.Duratio continue } - attrs, err := block.GetAttributes(ctx, meta, userBucket) - if err != nil { - return false, meta, err - } - - if attrs.LastModified.After(threshold) { + if meta.LastModified.After(threshold) { return false, meta, nil } } diff --git a/pkg/compactor/job_test.go b/pkg/compactor/job_test.go index 7290da1eea8..5c34b868fdb 100644 --- a/pkg/compactor/job_test.go +++ b/pkg/compactor/job_test.go @@ -3,7 +3,6 @@ package compactor import ( - "context" "path" "testing" "time" @@ -131,7 +130,7 @@ func TestJobWaitPeriodElapsed(t *testing.T) { userBucket.MockAttributes(path.Join(b.meta.ULID.String(), block.MetaFilename), b.attrs, b.attrsErr) } - elapsed, meta, err := jobWaitPeriodElapsed(context.Background(), job, testData.waitPeriod, userBucket) + elapsed, meta, err := jobWaitPeriodElapsed(job, testData.waitPeriod) if testData.expectedErr != "" { require.Error(t, err) assert.ErrorContains(t, err, testData.expectedErr) diff --git a/pkg/storage/tsdb/block/fetcher.go b/pkg/storage/tsdb/block/fetcher.go index e96685af9f6..b53d618c37b 100644 --- a/pkg/storage/tsdb/block/fetcher.go +++ b/pkg/storage/tsdb/block/fetcher.go @@ -175,13 +175,27 @@ var ( ) // loadMeta returns metadata from object storage or error. +// fetchAttributes indicates whether file attributes, such as lastModified, should be fetched and populated on the resulting meta. // It returns ErrorSyncMetaNotFound and ErrorSyncMetaCorrupted sentinel errors in those cases. -func (f *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*Meta, error) { +func (f *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID, fetchAttributes bool) (meta *Meta, err error) { var ( metaFile = path.Join(id.String(), MetaFilename) cachedBlockDir = filepath.Join(f.cacheDir, id.String()) ) + if fetchAttributes { + defer func() { + if err == nil && meta != nil && meta.LastModified == nil { + attrs, getErr := GetAttributes(ctx, meta, f.bkt) + if getErr != nil { + err = getErr + } else { + meta.LastModified = &attrs.LastModified + } + } + }() + } + // Block meta.json file is immutable, so we lookup the cache as first thing without issuing // any API call to the object storage. This significantly reduce the pressure on the object // storage. @@ -249,12 +263,6 @@ func (f *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*Meta, error) return nil, errors.Errorf("unexpected meta file: %s version: %d", metaFile, m.Version) } - attrs, err := GetAttributes(ctx, m, f.bkt) - if err != nil { - return nil, err - } - m.LastModified = &attrs.LastModified - // Best effort cache in local dir. if f.cacheDir != "" { if err := os.MkdirAll(cachedBlockDir, os.ModePerm); err != nil { @@ -281,7 +289,7 @@ type response struct { markedForDeletionCount float64 } -func (f *MetaFetcher) fetchMetadata(ctx context.Context, excludeMarkedForDeletion bool) (interface{}, error) { +func (f *MetaFetcher) fetchMetadata(ctx context.Context, excludeMarkedForDeletion bool, fetchAttributes bool) (interface{}, error) { var ( resp = response{ metas: make(map[ulid.ULID]*Meta), @@ -308,7 +316,7 @@ func (f *MetaFetcher) fetchMetadata(ctx context.Context, excludeMarkedForDeletio for i := 0; i < f.concurrency; i++ { eg.Go(func() error { for id := range ch { - meta, err := f.loadMeta(ctx, id) + meta, err := f.loadMeta(ctx, id, fetchAttributes) if err == nil { mtx.Lock() resp.metas[id] = meta @@ -419,21 +427,22 @@ func (f *MetaFetcher) fetchMetadata(ctx context.Context, excludeMarkedForDeletio // // Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing. func (f *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*Meta, partials map[ulid.ULID]error, err error) { - metas, partials, err = f.fetch(ctx, false) + metas, partials, err = f.fetch(ctx, false, false) return } // FetchWithoutMarkedForDeletion returns all block metas as well as partial blocks (blocks without or with corrupted meta file) from the bucket. +// Attributes, such as LastModified, are also fetched and populated on resulting block metas. // This function excludes all blocks marked for deletion (no deletion delay applied). // It's caller responsibility to not change the returned metadata files. Maps can be modified. // // Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing. func (f *MetaFetcher) FetchWithoutMarkedForDeletion(ctx context.Context) (metas map[ulid.ULID]*Meta, partials map[ulid.ULID]error, err error) { - metas, partials, err = f.fetch(ctx, true) + metas, partials, err = f.fetch(ctx, true, true) return } -func (f *MetaFetcher) fetch(ctx context.Context, excludeMarkedForDeletion bool) (_ map[ulid.ULID]*Meta, _ map[ulid.ULID]error, err error) { +func (f *MetaFetcher) fetch(ctx context.Context, excludeMarkedForDeletion bool, fetchAttributes bool) (_ map[ulid.ULID]*Meta, _ map[ulid.ULID]error, err error) { start := time.Now() defer func() { f.metrics.SyncDuration.Observe(time.Since(start).Seconds()) @@ -447,7 +456,7 @@ func (f *MetaFetcher) fetch(ctx context.Context, excludeMarkedForDeletion bool) // Run this in thread safe run group. v, err := f.g.Do("", func() (i interface{}, err error) { // NOTE: First go routine context will go through. - return f.fetchMetadata(ctx, excludeMarkedForDeletion) + return f.fetchMetadata(ctx, excludeMarkedForDeletion, fetchAttributes) }) if err != nil { return nil, nil, err diff --git a/pkg/storage/tsdb/block/meta.go b/pkg/storage/tsdb/block/meta.go index 00e221ab48e..ed062f0330c 100644 --- a/pkg/storage/tsdb/block/meta.go +++ b/pkg/storage/tsdb/block/meta.go @@ -46,7 +46,7 @@ type Meta struct { tsdb.BlockMeta Thanos ThanosMeta `json:"thanos"` - LastModified *time.Time `json:"last_modified,omitempty"` + LastModified *time.Time `json:"-"` } func (m *Meta) String() string { From f735336ff743db85782f54f1fefcea40cd3ea49b Mon Sep 17 00:00:00 2001 From: Jonathan Halterman Date: Fri, 22 Mar 2024 12:43:26 -0700 Subject: [PATCH 4/8] Address PR comments - remove LastUpdated from block meta - change metric to minutes and use buckets with longer durations - only fetch attributes for compaction job filtering and recording the new metric --- pkg/compactor/bucket_compactor.go | 19 ++++++++++------- pkg/compactor/job.go | 11 ++++++++-- pkg/compactor/job_test.go | 3 ++- pkg/storage/tsdb/block/fetcher.go | 29 +++++++------------------- pkg/storage/tsdb/block/fetcher_test.go | 2 +- pkg/storage/tsdb/block/meta.go | 4 +--- 6 files changed, 31 insertions(+), 37 deletions(-) diff --git a/pkg/compactor/bucket_compactor.go b/pkg/compactor/bucket_compactor.go index 6b2683bc267..a3d6d724d2d 100644 --- a/pkg/compactor/bucket_compactor.go +++ b/pkg/compactor/bucket_compactor.go @@ -434,8 +434,11 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul // into the next planning cycle. // Eventually the block we just uploaded should get synced into the job again (including sync-delay). for _, meta := range toCompact { - if meta.LastModified != nil { - c.metrics.blockCompactionDelay.WithLabelValues(strconv.Itoa(meta.Compaction.Level)).Observe(compactionBegin.Sub(*meta.LastModified).Seconds()) + attrs, err := block.GetAttributes(ctx, meta, c.bkt) + if err != nil { + level.Warn(jobLogger).Log("err", err) + } else { + c.metrics.blockCompactionDelay.WithLabelValues(strconv.Itoa(meta.Compaction.Level)).Observe(compactionBegin.Sub(attrs.LastModified).Minutes()) } if err := deleteBlock(c.bkt, meta.ULID, filepath.Join(subDir, meta.ULID.String()), jobLogger, c.metrics.blocksMarkedForDeletion); err != nil { @@ -677,9 +680,9 @@ func NewBucketCompactorMetrics(blocksMarkedForDeletion prometheus.Counter, reg p Help: "Total number of group compaction attempts that resulted in new block(s).", }), blockCompactionDelay: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ - Name: "cortex_compactor_block_compaction_delay_seconds", - Help: "Delay between a block being created and successfully compacting it in seconds.", - Buckets: []float64{1.0, 5.0, 10.0, 15.0, 30.0, 60.0, 120.0, 300.0, 600.0, 1200.0, 2400.0, 3600.0, 7200.0}, + Name: "cortex_compactor_block_compaction_delay_minutes", + Help: "Delay between a block being created and successfully compacting it in minutes.", + Buckets: []float64{1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 180.0, 240.0, 300.0, 600.0, 1200.0}, NativeHistogramBucketFactor: 1.1, NativeHistogramMaxBucketNumber: 100, NativeHistogramMinResetDuration: 1 * time.Hour, @@ -927,7 +930,7 @@ func (c *BucketCompactor) Compact(ctx context.Context, maxCompactionTime time.Du } // Skip jobs for which the wait period hasn't been honored yet. - jobs = c.filterJobsByWaitPeriod(jobs) + jobs = c.filterJobsByWaitPeriod(ctx, jobs) // Sort jobs based on the configured ordering algorithm. jobs = c.sortJobs(jobs) @@ -1013,9 +1016,9 @@ func (c *BucketCompactor) filterOwnJobs(jobs []*Job) ([]*Job, error) { } // filterJobsByWaitPeriod filters out jobs for which the configured wait period hasn't been honored yet. -func (c *BucketCompactor) filterJobsByWaitPeriod(jobs []*Job) []*Job { +func (c *BucketCompactor) filterJobsByWaitPeriod(ctx context.Context, jobs []*Job) []*Job { for i := 0; i < len(jobs); { - if elapsed, notElapsedBlock, err := jobWaitPeriodElapsed(jobs[i], c.waitPeriod); err != nil { + if elapsed, notElapsedBlock, err := jobWaitPeriodElapsed(ctx, jobs[i], c.waitPeriod, c.bkt); err != nil { level.Warn(c.logger).Log("msg", "not enforcing compaction wait period because the check if compaction job contains recently uploaded blocks has failed", "groupKey", jobs[i].Key(), "err", err) // Keep the job. diff --git a/pkg/compactor/job.go b/pkg/compactor/job.go index b994f6f8c59..abc06c635d5 100644 --- a/pkg/compactor/job.go +++ b/pkg/compactor/job.go @@ -3,6 +3,7 @@ package compactor import ( + "context" "fmt" "math" "sort" @@ -11,6 +12,7 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/objstore" "github.com/grafana/mimir/pkg/storage/tsdb/block" ) @@ -153,7 +155,7 @@ func (job *Job) String() string { // elapsed for the input job. If the wait period has not elapsed, then this function // also returns the Meta of the first source block encountered for which the wait // period has not elapsed yet. -func jobWaitPeriodElapsed(job *Job, waitPeriod time.Duration) (bool, *block.Meta, error) { +func jobWaitPeriodElapsed(ctx context.Context, job *Job, waitPeriod time.Duration, userBucket objstore.Bucket) (bool, *block.Meta, error) { if waitPeriod <= 0 { return true, nil, nil } @@ -171,7 +173,12 @@ func jobWaitPeriodElapsed(job *Job, waitPeriod time.Duration) (bool, *block.Meta continue } - if meta.LastModified.After(threshold) { + attrs, err := block.GetAttributes(ctx, meta, userBucket) + if err != nil { + return false, meta, err + } + + if attrs.LastModified.After(threshold) { return false, meta, nil } } diff --git a/pkg/compactor/job_test.go b/pkg/compactor/job_test.go index 5c34b868fdb..7290da1eea8 100644 --- a/pkg/compactor/job_test.go +++ b/pkg/compactor/job_test.go @@ -3,6 +3,7 @@ package compactor import ( + "context" "path" "testing" "time" @@ -130,7 +131,7 @@ func TestJobWaitPeriodElapsed(t *testing.T) { userBucket.MockAttributes(path.Join(b.meta.ULID.String(), block.MetaFilename), b.attrs, b.attrsErr) } - elapsed, meta, err := jobWaitPeriodElapsed(job, testData.waitPeriod) + elapsed, meta, err := jobWaitPeriodElapsed(context.Background(), job, testData.waitPeriod, userBucket) if testData.expectedErr != "" { require.Error(t, err) assert.ErrorContains(t, err, testData.expectedErr) diff --git a/pkg/storage/tsdb/block/fetcher.go b/pkg/storage/tsdb/block/fetcher.go index b53d618c37b..fad9a792155 100644 --- a/pkg/storage/tsdb/block/fetcher.go +++ b/pkg/storage/tsdb/block/fetcher.go @@ -175,27 +175,13 @@ var ( ) // loadMeta returns metadata from object storage or error. -// fetchAttributes indicates whether file attributes, such as lastModified, should be fetched and populated on the resulting meta. // It returns ErrorSyncMetaNotFound and ErrorSyncMetaCorrupted sentinel errors in those cases. -func (f *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID, fetchAttributes bool) (meta *Meta, err error) { +func (f *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*Meta, error) { var ( metaFile = path.Join(id.String(), MetaFilename) cachedBlockDir = filepath.Join(f.cacheDir, id.String()) ) - if fetchAttributes { - defer func() { - if err == nil && meta != nil && meta.LastModified == nil { - attrs, getErr := GetAttributes(ctx, meta, f.bkt) - if getErr != nil { - err = getErr - } else { - meta.LastModified = &attrs.LastModified - } - } - }() - } - // Block meta.json file is immutable, so we lookup the cache as first thing without issuing // any API call to the object storage. This significantly reduce the pressure on the object // storage. @@ -289,7 +275,7 @@ type response struct { markedForDeletionCount float64 } -func (f *MetaFetcher) fetchMetadata(ctx context.Context, excludeMarkedForDeletion bool, fetchAttributes bool) (interface{}, error) { +func (f *MetaFetcher) fetchMetadata(ctx context.Context, excludeMarkedForDeletion bool) (interface{}, error) { var ( resp = response{ metas: make(map[ulid.ULID]*Meta), @@ -316,7 +302,7 @@ func (f *MetaFetcher) fetchMetadata(ctx context.Context, excludeMarkedForDeletio for i := 0; i < f.concurrency; i++ { eg.Go(func() error { for id := range ch { - meta, err := f.loadMeta(ctx, id, fetchAttributes) + meta, err := f.loadMeta(ctx, id) if err == nil { mtx.Lock() resp.metas[id] = meta @@ -427,22 +413,21 @@ func (f *MetaFetcher) fetchMetadata(ctx context.Context, excludeMarkedForDeletio // // Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing. func (f *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*Meta, partials map[ulid.ULID]error, err error) { - metas, partials, err = f.fetch(ctx, false, false) + metas, partials, err = f.fetch(ctx, false) return } // FetchWithoutMarkedForDeletion returns all block metas as well as partial blocks (blocks without or with corrupted meta file) from the bucket. -// Attributes, such as LastModified, are also fetched and populated on resulting block metas. // This function excludes all blocks marked for deletion (no deletion delay applied). // It's caller responsibility to not change the returned metadata files. Maps can be modified. // // Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing. func (f *MetaFetcher) FetchWithoutMarkedForDeletion(ctx context.Context) (metas map[ulid.ULID]*Meta, partials map[ulid.ULID]error, err error) { - metas, partials, err = f.fetch(ctx, true, true) + metas, partials, err = f.fetch(ctx, true) return } -func (f *MetaFetcher) fetch(ctx context.Context, excludeMarkedForDeletion bool, fetchAttributes bool) (_ map[ulid.ULID]*Meta, _ map[ulid.ULID]error, err error) { +func (f *MetaFetcher) fetch(ctx context.Context, excludeMarkedForDeletion bool) (_ map[ulid.ULID]*Meta, _ map[ulid.ULID]error, err error) { start := time.Now() defer func() { f.metrics.SyncDuration.Observe(time.Since(start).Seconds()) @@ -456,7 +441,7 @@ func (f *MetaFetcher) fetch(ctx context.Context, excludeMarkedForDeletion bool, // Run this in thread safe run group. v, err := f.g.Do("", func() (i interface{}, err error) { // NOTE: First go routine context will go through. - return f.fetchMetadata(ctx, excludeMarkedForDeletion, fetchAttributes) + return f.fetchMetadata(ctx, excludeMarkedForDeletion) }) if err != nil { return nil, nil, err diff --git a/pkg/storage/tsdb/block/fetcher_test.go b/pkg/storage/tsdb/block/fetcher_test.go index a42d18c49ec..74edbbbb537 100644 --- a/pkg/storage/tsdb/block/fetcher_test.go +++ b/pkg/storage/tsdb/block/fetcher_test.go @@ -295,7 +295,7 @@ func TestMetaFetcher_ShouldNotIssueAnyAPICallToObjectStorageIfAllBlockMetasAreCa assert.NoError(t, testutil.GatherAndCompare(reg1, strings.NewReader(` # HELP thanos_objstore_bucket_operations_total Total number of all attempted operations against a bucket. # TYPE thanos_objstore_bucket_operations_total counter - thanos_objstore_bucket_operations_total{bucket="test",operation="attributes"} 2 + thanos_objstore_bucket_operations_total{bucket="test",operation="attributes"} 0 thanos_objstore_bucket_operations_total{bucket="test",operation="delete"} 0 thanos_objstore_bucket_operations_total{bucket="test",operation="exists"} 0 thanos_objstore_bucket_operations_total{bucket="test",operation="get"} 2 diff --git a/pkg/storage/tsdb/block/meta.go b/pkg/storage/tsdb/block/meta.go index ed062f0330c..cc3a2ffc54e 100644 --- a/pkg/storage/tsdb/block/meta.go +++ b/pkg/storage/tsdb/block/meta.go @@ -11,7 +11,6 @@ import ( "io" "os" "path/filepath" - "time" "github.com/go-kit/log" "github.com/grafana/dskit/runutil" @@ -45,8 +44,7 @@ const ( type Meta struct { tsdb.BlockMeta - Thanos ThanosMeta `json:"thanos"` - LastModified *time.Time `json:"-"` + Thanos ThanosMeta `json:"thanos"` } func (m *Meta) String() string { From 54f25764051c75a475028d64a4a9656972b9cb19 Mon Sep 17 00:00:00 2001 From: Jonathan Halterman Date: Tue, 26 Mar 2024 09:28:57 -0700 Subject: [PATCH 5/8] Update pkg/compactor/bucket_compactor.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Peter Štibraný --- pkg/compactor/bucket_compactor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/compactor/bucket_compactor.go b/pkg/compactor/bucket_compactor.go index a3d6d724d2d..05dcd0c9cc9 100644 --- a/pkg/compactor/bucket_compactor.go +++ b/pkg/compactor/bucket_compactor.go @@ -436,7 +436,7 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul for _, meta := range toCompact { attrs, err := block.GetAttributes(ctx, meta, c.bkt) if err != nil { - level.Warn(jobLogger).Log("err", err) + level.Warn(jobLogger).Log("msg", "failed to determine block upload time", "block", meta.ULID.String(), "err", err) } else { c.metrics.blockCompactionDelay.WithLabelValues(strconv.Itoa(meta.Compaction.Level)).Observe(compactionBegin.Sub(attrs.LastModified).Minutes()) } From 95e2ff844d7d58fb33ba8102fd2a4f5a2ff01664 Mon Sep 17 00:00:00 2001 From: Jonathan Halterman Date: Tue, 26 Mar 2024 16:14:08 -0700 Subject: [PATCH 6/8] Revert "Optimize marking blocks as queried" This reverts commit 55ef1ddd10b5b688a002b2cfb998055c92789b08. --- pkg/storegateway/bucket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index e009d8a2f98..5406757df9f 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -628,7 +628,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor level.Debug(s.logger).Log("msg", "queried non-compacted block", "blockId", b.meta.ULID, "ooo", b.meta.Compaction.FromOutOfOrder()) } - b.queried.CompareAndSwap(false, true) + b.queried.Store(true) } if err := s.sendHints(srv, resHints); err != nil { return err From 8e67121f4f651a0ebbe6b763a3ac1e6b705d3976 Mon Sep 17 00:00:00 2001 From: Jonathan Halterman Date: Tue, 26 Mar 2024 16:15:02 -0700 Subject: [PATCH 7/8] Address PR comments --- pkg/compactor/bucket_compactor.go | 10 +++++----- pkg/compactor/job.go | 2 +- pkg/storage/tsdb/block/block.go | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/compactor/bucket_compactor.go b/pkg/compactor/bucket_compactor.go index 05dcd0c9cc9..ea6ab82c017 100644 --- a/pkg/compactor/bucket_compactor.go +++ b/pkg/compactor/bucket_compactor.go @@ -434,11 +434,11 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul // into the next planning cycle. // Eventually the block we just uploaded should get synced into the job again (including sync-delay). for _, meta := range toCompact { - attrs, err := block.GetAttributes(ctx, meta, c.bkt) + attrs, err := block.GetMetaAttributes(ctx, meta, c.bkt) if err != nil { level.Warn(jobLogger).Log("msg", "failed to determine block upload time", "block", meta.ULID.String(), "err", err) } else { - c.metrics.blockCompactionDelay.WithLabelValues(strconv.Itoa(meta.Compaction.Level)).Observe(compactionBegin.Sub(attrs.LastModified).Minutes()) + c.metrics.blockCompactionDelay.WithLabelValues(strconv.Itoa(meta.Compaction.Level)).Observe(compactionBegin.Sub(attrs.LastModified).Seconds()) } if err := deleteBlock(c.bkt, meta.ULID, filepath.Join(subDir, meta.ULID.String()), jobLogger, c.metrics.blocksMarkedForDeletion); err != nil { @@ -680,9 +680,9 @@ func NewBucketCompactorMetrics(blocksMarkedForDeletion prometheus.Counter, reg p Help: "Total number of group compaction attempts that resulted in new block(s).", }), blockCompactionDelay: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ - Name: "cortex_compactor_block_compaction_delay_minutes", - Help: "Delay between a block being created and successfully compacting it in minutes.", - Buckets: []float64{1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 180.0, 240.0, 300.0, 600.0, 1200.0}, + Name: "cortex_compactor_block_compaction_delay_seconds", + Help: "Delay between a block being uploaded and successfully compacting it.", + Buckets: []float64{60.0, 300.0, 600.0, 1800.0, 3600.0, 7200.0, 10800.0, 14400.0, 18000.0, 36000.0, 72000.0}, NativeHistogramBucketFactor: 1.1, NativeHistogramMaxBucketNumber: 100, NativeHistogramMinResetDuration: 1 * time.Hour, diff --git a/pkg/compactor/job.go b/pkg/compactor/job.go index abc06c635d5..b4794a72f6c 100644 --- a/pkg/compactor/job.go +++ b/pkg/compactor/job.go @@ -173,7 +173,7 @@ func jobWaitPeriodElapsed(ctx context.Context, job *Job, waitPeriod time.Duratio continue } - attrs, err := block.GetAttributes(ctx, meta, userBucket) + attrs, err := block.GetMetaAttributes(ctx, meta, userBucket) if err != nil { return false, meta, err } diff --git a/pkg/storage/tsdb/block/block.go b/pkg/storage/tsdb/block/block.go index 512a053b208..be8d04ee3ca 100644 --- a/pkg/storage/tsdb/block/block.go +++ b/pkg/storage/tsdb/block/block.go @@ -325,8 +325,8 @@ func GatherFileStats(blockDir string) (res []File, _ error) { return res, err } -// GetAttributes returns the attributes for the block associated with the meta, using the userBucket to read the attributes. -func GetAttributes(ctx context.Context, meta *Meta, bucketReader objstore.BucketReader) (objstore.ObjectAttributes, error) { +// GetMetaAttributes returns the attributes for the block associated with the meta, using the userBucket to read the attributes. +func GetMetaAttributes(ctx context.Context, meta *Meta, bucketReader objstore.BucketReader) (objstore.ObjectAttributes, error) { metaPath := path.Join(meta.ULID.String(), MetaFilename) attrs, err := bucketReader.Attributes(ctx, metaPath) if err != nil { From cabea8d6e378e9bdd46477763d75ee235ab4f6b6 Mon Sep 17 00:00:00 2001 From: Jonathan Halterman Date: Mon, 1 Apr 2024 11:19:43 -0700 Subject: [PATCH 8/8] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e6bf457b976..424ce1863f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * [ENHANCEMENT] Store-gateway: merge series from different blocks concurrently. #7456 * [ENHANCEMENT] Store-gateway: Add `stage="wait_max_concurrent"` to `cortex_bucket_store_series_request_stage_duration_seconds` which records how long the query had to wait for its turn for `-blocks-storage.bucket-store.max-concurrent`. #7609 * [ENHANCEMENT] Querier: add `cortex_querier_federation_upstream_query_wait_duration_seconds` to observe time from when a querier picks up a cross-tenant query to when work begins on its single-tenant counterparts. #7209 +* [ENHANCEMENT] Compactor: Add `cortex_compactor_block_compaction_delay_seconds` metric to track how long it takes to compact blocks. #7635 * [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567 * [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520 * [BUGFIX] Ingester: Prevent timely compaction of empty blocks. #7624