From 08ac3132525e445401f56cfa60c0533c0b4dddd2 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Fri, 14 Jul 2023 14:52:16 -0700 Subject: [PATCH 1/4] Updated Thanos to latest. Fixed Cortex compactor and bucket client code due to Thanos change. Signed-off-by: Alex Le --- go.mod | 4 +- go.sum | 8 +- pkg/compactor/compactor_test.go | 9 +- pkg/compactor/shuffle_sharding_planner.go | 2 +- .../shuffle_sharding_planner_test.go | 2 +- pkg/cortex/tracing.go | 2 +- pkg/storage/bucket/client.go | 10 +- pkg/storage/tsdb/testutil/objstore.go | 2 +- .../thanos-io/objstore/CHANGELOG.md | 7 +- .../github.com/thanos-io/objstore/README.md | 24 +- .../github.com/thanos-io/objstore/objstore.go | 4 +- .../github.com/thanos-io/objstore/tracing.go | 163 ------------- .../tracing/opentracing/opentracing.go | 216 ++++++++++++++++++ .../thanos-io/objstore/tracing/tracing.go | 69 ------ .../thanos-io/thanos/pkg/block/fetcher.go | 24 +- .../thanos/pkg/block/metadata/meta.go | 23 ++ .../thanos-io/thanos/pkg/compact/compact.go | 165 ++++++++++--- .../thanos-io/thanos/pkg/compact/planner.go | 4 +- .../thanos-io/thanos/pkg/store/cache/cache.go | 12 +- .../thanos/pkg/store/cache/inmemory.go | 3 + .../thanos/pkg/store/cache/memcached.go | 22 +- vendor/modules.txt | 6 +- 22 files changed, 464 insertions(+), 317 deletions(-) delete mode 100644 vendor/github.com/thanos-io/objstore/tracing.go create mode 100644 vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go delete mode 100644 vendor/github.com/thanos-io/objstore/tracing/tracing.go diff --git a/go.mod b/go.mod index 3c62b88fcf..599539839e 100644 --- a/go.mod +++ b/go.mod @@ -51,9 +51,9 @@ require ( github.com/sony/gobreaker v0.5.0 github.com/spf13/afero v1.9.5 github.com/stretchr/testify v1.8.4 - github.com/thanos-io/objstore v0.0.0-20230710163637-47c0118da0ca + github.com/thanos-io/objstore v0.0.0-20230713070940-eb01c83b89a4 github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea - github.com/thanos-io/thanos v0.31.1-0.20230712154708-a395c5dbd054 + github.com/thanos-io/thanos v0.31.1-0.20230714171248-723dfd08764a github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d go.etcd.io/etcd/api/v3 v3.5.8 diff --git a/go.sum b/go.sum index 3965b06daa..d3eecb3816 100644 --- a/go.sum +++ b/go.sum @@ -1160,12 +1160,12 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM= github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1Zsv7OAU9iQhZwigp50Yl38W10g/vd5NC8Rdk1Jzng= github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM= -github.com/thanos-io/objstore v0.0.0-20230710163637-47c0118da0ca h1:JRF7i58HovirZQVJGwCClQsMK6CCmK2fvialXjeoSpI= -github.com/thanos-io/objstore v0.0.0-20230710163637-47c0118da0ca/go.mod h1:5V7lzXuaxwt6XFQoA/zJrhdnQrxq1+r0bwQ1iYOq3gM= +github.com/thanos-io/objstore v0.0.0-20230713070940-eb01c83b89a4 h1:SYs56N3zGaE8wwkU+QAfqeAC9SMjGWQORzrYSs58NAQ= +github.com/thanos-io/objstore v0.0.0-20230713070940-eb01c83b89a4/go.mod h1:Vc+D0zxX8fT7VOe8Gj0J6vzw0kcTrMCEgE140wCz1c0= github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea h1:kzK8sBn2+mo3NAxP+XjAjAqr1hwfxxFUy5CybaBkjAI= github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea/go.mod h1:eIgPaXWgOhNAv6CPPrgu09r0AtT7byBTZy+7WkX0D18= -github.com/thanos-io/thanos v0.31.1-0.20230712154708-a395c5dbd054 h1:kBuXA0B+jXX89JAJTymw7g/v/4jyjCSgfPcWQeFUOoM= -github.com/thanos-io/thanos v0.31.1-0.20230712154708-a395c5dbd054/go.mod h1:C0Cdk0kFFEDS3qkTgScF9ONSjrPxqnScGPoIgah3NJY= +github.com/thanos-io/thanos v0.31.1-0.20230714171248-723dfd08764a h1:6GRazFOeBtPpoDFLO7s8AS5upOWPCzQ96mq/UpH/4QI= +github.com/thanos-io/thanos v0.31.1-0.20230714171248-723dfd08764a/go.mod h1:PGAlwITP7IvWQXra3VbWomRqz8xrSlAR3ee6Z8k4No0= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 516f03e0b8..59fc5e6fbd 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1647,13 +1647,18 @@ func (m *tsdbCompactorMock) Compact(dest string, dirs []string, open []*tsdb.Blo return args.Get(0).(ulid.ULID), args.Error(1) } +func (m *tsdbCompactorMock) CompactWithBlockPopulator(dest string, dirs []string, open []*tsdb.Block, blockPopulator tsdb.BlockPopulator) (uid ulid.ULID, err error) { + args := m.Called(dest, dirs, open, blockPopulator) + return args.Get(0).(ulid.ULID), args.Error(1) +} + type tsdbPlannerMock struct { mock.Mock noCompactMarkFilters []*compact.GatherNoCompactionMarkFilter } -func (m *tsdbPlannerMock) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { - args := m.Called(ctx, metasByMinTime) +func (m *tsdbPlannerMock) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, errChan chan error, extensions any) ([]*metadata.Meta, error) { + args := m.Called(ctx, metasByMinTime, errChan, extensions) return args.Get(0).([]*metadata.Meta), args.Error(1) } diff --git a/pkg/compactor/shuffle_sharding_planner.go b/pkg/compactor/shuffle_sharding_planner.go index 7265ede93a..5da27b0bec 100644 --- a/pkg/compactor/shuffle_sharding_planner.go +++ b/pkg/compactor/shuffle_sharding_planner.go @@ -52,7 +52,7 @@ func NewShuffleShardingPlanner( } } -func (p *ShuffleShardingPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { +func (p *ShuffleShardingPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error) { // Ensure all blocks fits within the largest range. This is a double check // to ensure there's no bug in the previous blocks grouping, given this Plan() // is just a pass-through. diff --git a/pkg/compactor/shuffle_sharding_planner_test.go b/pkg/compactor/shuffle_sharding_planner_test.go index 9ecbae1afb..83de3ad164 100644 --- a/pkg/compactor/shuffle_sharding_planner_test.go +++ b/pkg/compactor/shuffle_sharding_planner_test.go @@ -383,7 +383,7 @@ func TestShuffleShardingPlanner_Plan(t *testing.T) { blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed, ) - actual, err := p.Plan(context.Background(), testData.blocks) + actual, err := p.Plan(context.Background(), testData.blocks, nil, nil) if testData.expectedErr != nil { assert.Equal(t, err, testData.expectedErr) diff --git a/pkg/cortex/tracing.go b/pkg/cortex/tracing.go index 1d0adb9f3b..1cdfa6a819 100644 --- a/pkg/cortex/tracing.go +++ b/pkg/cortex/tracing.go @@ -4,7 +4,7 @@ import ( "context" "github.com/opentracing/opentracing-go" - objstoretracing "github.com/thanos-io/objstore/tracing" + objstoretracing "github.com/thanos-io/objstore/tracing/opentracing" "github.com/thanos-io/thanos/pkg/tracing" "google.golang.org/grpc" ) diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index 852b4a7cc6..15133f0028 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" + "github.com/thanos-io/objstore/tracing/opentracing" "github.com/cortexproject/cortex/pkg/storage/bucket/azure" "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" @@ -122,7 +123,7 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, return nil, err } - client = objstore.NewTracingBucket(bucketWithMetrics(client, name, reg)) + client = opentracing.WrapWithTraces(bucketWithMetrics(client, name, reg)) // Wrap the client with any provided middleware for _, wrap := range cfg.Middlewares { @@ -140,8 +141,9 @@ func bucketWithMetrics(bucketClient objstore.Bucket, name string, reg prometheus return bucketClient } - return objstore.BucketWithMetrics( - "", // bucket label value + return objstore.WrapWithMetrics( bucketClient, - prometheus.WrapRegistererWith(prometheus.Labels{"component": name}, prometheus.WrapRegistererWithPrefix("thanos_", reg))) + prometheus.WrapRegistererWith(prometheus.Labels{"component": name}, prometheus.WrapRegistererWithPrefix("thanos_", reg)), + "", // bucket label value + ) } diff --git a/pkg/storage/tsdb/testutil/objstore.go b/pkg/storage/tsdb/testutil/objstore.go index 115bba1885..f224b7dec8 100644 --- a/pkg/storage/tsdb/testutil/objstore.go +++ b/pkg/storage/tsdb/testutil/objstore.go @@ -30,7 +30,7 @@ func PrepareFilesystemBucket(t testing.TB) (objstore.Bucket, string) { bkt, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) require.NoError(t, err) - return objstore.BucketWithMetrics("test", bkt, nil), storageDir + return objstore.WrapWithMetrics(bkt, nil, "test"), storageDir } type MockBucketFailure struct { diff --git a/vendor/github.com/thanos-io/objstore/CHANGELOG.md b/vendor/github.com/thanos-io/objstore/CHANGELOG.md index 97428e9eb7..3a43c27899 100644 --- a/vendor/github.com/thanos-io/objstore/CHANGELOG.md +++ b/vendor/github.com/thanos-io/objstore/CHANGELOG.md @@ -13,7 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Fixed - [#33](https://github.com/thanos-io/objstore/pull/33) Tracing: Add `ContextWithTracer()` to inject the tracer into the context. - [#34](https://github.com/thanos-io/objstore/pull/34) Fix ignored options when creating shared credential Azure client. -- [#62](https://github.com/thanos-io/objstore/pull/62) S3: Fix ignored context cancellation in `Iter` method. +- [#62](https://github.com/thanos-io/objstore/pull/62) S3: Fix ignored context cancellation in `Iter` method. ### Added - [#15](https://github.com/thanos-io/objstore/pull/15) Add Oracle Cloud Infrastructure Object Storage Bucket support. @@ -23,7 +23,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#43](https://github.com/thanos-io/objstore/pull/43) filesystem: abort filesystem bucket operations if the context has been cancelled - [#44](https://github.com/thanos-io/objstore/pull/44) Add new metric to count total number of fetched bytes from bucket - [#50](https://github.com/thanos-io/objstore/pull/50) Add Huawei Cloud OBS Object Storage Support -- [#59](https://github.com/thanos-io/objstore/pull/59) Adding method `IsCustomerManagedKeyError` on the bucket interface. +- [#59](https://github.com/thanos-io/objstore/pull/59) Adding method `IsCustomerManagedKeyError` on the bucket interface. +- [#61](https://github.com/thanos-io/objstore/pull/61) Add OpenTelemetry TracingBucket. + > This also changes the behaviour of `client.NewBucket`. Now it returns, uninstrumented and untraced bucket. + You can combine `objstore.WrapWithMetrics` and `tracing/{opentelemetry,opentracing}.WrapWithTraces` to have old behavior. ### Changed - [#38](https://github.com/thanos-io/objstore/pull/38) *: Upgrade minio-go version to `v7.0.45`. diff --git a/vendor/github.com/thanos-io/objstore/README.md b/vendor/github.com/thanos-io/objstore/README.md index 5cf090e3a4..c738889478 100644 --- a/vendor/github.com/thanos-io/objstore/README.md +++ b/vendor/github.com/thanos-io/objstore/README.md @@ -60,7 +60,7 @@ type Bucket interface { Upload(ctx context.Context, name string, r io.Reader) error // Delete removes the object with the given name. - // If object does not exists in the moment of deletion, Delete should throw error. + // If object does not exist in the moment of deletion, Delete should throw error. Delete(ctx context.Context, name string) error ``` @@ -88,7 +88,7 @@ type BucketReader interface { // IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. IsObjNotFoundErr(err error) bool - // Attributes returns information about the specified object. + // IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked. ``` Those interfaces represent the object storage operations your code can use from `objstore` clients. @@ -128,7 +128,7 @@ Current object storage client implementations: | [Baidu BOS](#baidu-bos) | Beta | Production Usage | no | @yahaa | | [Local Filesystem](#filesystem) | Stable | Testing and Demo only | yes | @bwplotka | | [Oracle Cloud Infrastructure Object Storage](#oracle-cloud-infrastructure-object-storage) | Beta | Production Usage | yes | @aarontams,@gaurav-05,@ericrrath | -| [HuaweiCloud OBS](#huaweicloud-obs) | Beta | Production Usage | no | @setoru | +| [HuaweiCloud OBS](#huaweicloud-obs) | Beta | Production Usage | no | @setoru | **Missing support to some object storage?** Check out [how to add your client section](#how-to-add-a-new-client-to-thanos) @@ -153,6 +153,7 @@ config: insecure: false signature_version2: false secret_key: "" + session_token: "" put_user_metadata: {} http_config: idle_conn_timeout: 1m30s @@ -643,17 +644,16 @@ You can also include any of the optional configuration just like the example in ##### HuaweiCloud OBS -To use HuaweiCloud OBS as an object store, you should apply for a HuaweiCloud Account to create an object storage bucket at first. -More details: [HuaweiCloud OBS](https://support.huaweicloud.com/obs/index.html) +To use HuaweiCloud OBS as an object store, you should apply for a HuaweiCloud Account to create an object storage bucket at first. More details: [HuaweiCloud OBS](https://support.huaweicloud.com/obs/index.html) To configure HuaweiCloud Account to use OBS as storage store you need to set these parameters in YAML format stored in a file: -```yaml mdox-exec="go run scripts/cfggen/main.go --name=cos.Config" -type: OBS -config: - bucket: "" - endpoint: "" - access_key: "" +```yaml mdox-exec="go run scripts/cfggen/main.go --name=obs.Config" +type: OBS +config: + bucket: "" + endpoint: "" + access_key: "" secret_key: "" http_config: idle_conn_timeout: 1m30s @@ -674,7 +674,7 @@ config: prefix: "" ``` -The `access_key` and `secret_key` field is required. The `http_config` field is optional for optimize HTTP transport settings. +The `access_key` and `secret_key` field is required. The `http_config` field is optional for optimize HTTP transport settings. #### How to add a new client to Thanos? diff --git a/vendor/github.com/thanos-io/objstore/objstore.go b/vendor/github.com/thanos-io/objstore/objstore.go index fb9b0bac8d..d7d0488846 100644 --- a/vendor/github.com/thanos-io/objstore/objstore.go +++ b/vendor/github.com/thanos-io/objstore/objstore.go @@ -400,9 +400,9 @@ type IsOpFailureExpectedFunc func(error) bool var _ InstrumentedBucket = &metricBucket{} -// BucketWithMetrics takes a bucket and registers metrics with the given registry for +// WrapWithMetrics takes a bucket and registers metrics with the given registry for // operations run against the bucket. -func BucketWithMetrics(name string, b Bucket, reg prometheus.Registerer) *metricBucket { +func WrapWithMetrics(b Bucket, reg prometheus.Registerer, name string) *metricBucket { bkt := &metricBucket{ bkt: b, isOpFailureExpected: func(err error) bool { return false }, diff --git a/vendor/github.com/thanos-io/objstore/tracing.go b/vendor/github.com/thanos-io/objstore/tracing.go deleted file mode 100644 index 9f09df668e..0000000000 --- a/vendor/github.com/thanos-io/objstore/tracing.go +++ /dev/null @@ -1,163 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package objstore - -import ( - "context" - "io" - - "github.com/opentracing/opentracing-go" - - "github.com/thanos-io/objstore/tracing" -) - -// TracingBucket includes bucket operations in the traces. -type TracingBucket struct { - bkt Bucket -} - -func NewTracingBucket(bkt Bucket) InstrumentedBucket { - return TracingBucket{bkt: bkt} -} - -func (t TracingBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) (err error) { - tracing.DoWithSpan(ctx, "bucket_iter", func(spanCtx context.Context, span opentracing.Span) { - span.LogKV("dir", dir) - err = t.bkt.Iter(spanCtx, dir, f, options...) - }) - return -} - -func (t TracingBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { - span, spanCtx := tracing.StartSpan(ctx, "bucket_get") - span.LogKV("name", name) - - r, err := t.bkt.Get(spanCtx, name) - if err != nil { - span.LogKV("err", err) - span.Finish() - return nil, err - } - - return newTracingReadCloser(r, span), nil -} - -func (t TracingBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { - span, spanCtx := tracing.StartSpan(ctx, "bucket_getrange") - span.LogKV("name", name, "offset", off, "length", length) - - r, err := t.bkt.GetRange(spanCtx, name, off, length) - if err != nil { - span.LogKV("err", err) - span.Finish() - return nil, err - } - - return newTracingReadCloser(r, span), nil -} - -func (t TracingBucket) Exists(ctx context.Context, name string) (exists bool, err error) { - tracing.DoWithSpan(ctx, "bucket_exists", func(spanCtx context.Context, span opentracing.Span) { - span.LogKV("name", name) - exists, err = t.bkt.Exists(spanCtx, name) - }) - return -} - -func (t TracingBucket) Attributes(ctx context.Context, name string) (attrs ObjectAttributes, err error) { - tracing.DoWithSpan(ctx, "bucket_attributes", func(spanCtx context.Context, span opentracing.Span) { - span.LogKV("name", name) - attrs, err = t.bkt.Attributes(spanCtx, name) - }) - return -} - -func (t TracingBucket) Upload(ctx context.Context, name string, r io.Reader) (err error) { - tracing.DoWithSpan(ctx, "bucket_upload", func(spanCtx context.Context, span opentracing.Span) { - span.LogKV("name", name) - err = t.bkt.Upload(spanCtx, name, r) - }) - return -} - -func (t TracingBucket) Delete(ctx context.Context, name string) (err error) { - tracing.DoWithSpan(ctx, "bucket_delete", func(spanCtx context.Context, span opentracing.Span) { - span.LogKV("name", name) - err = t.bkt.Delete(spanCtx, name) - }) - return -} - -func (t TracingBucket) Name() string { - return "tracing: " + t.bkt.Name() -} - -func (t TracingBucket) Close() error { - return t.bkt.Close() -} - -func (t TracingBucket) IsObjNotFoundErr(err error) bool { - return t.bkt.IsObjNotFoundErr(err) -} - -func (t TracingBucket) IsCustomerManagedKeyError(err error) bool { - return t.bkt.IsCustomerManagedKeyError(err) -} - -func (t TracingBucket) WithExpectedErrs(expectedFunc IsOpFailureExpectedFunc) Bucket { - if ib, ok := t.bkt.(InstrumentedBucket); ok { - return TracingBucket{bkt: ib.WithExpectedErrs(expectedFunc)} - } - return t -} - -func (t TracingBucket) ReaderWithExpectedErrs(expectedFunc IsOpFailureExpectedFunc) BucketReader { - return t.WithExpectedErrs(expectedFunc) -} - -type tracingReadCloser struct { - r io.ReadCloser - s opentracing.Span - - objSize int64 - objSizeErr error - - read int -} - -func newTracingReadCloser(r io.ReadCloser, span opentracing.Span) io.ReadCloser { - // Since TryToGetSize can only reliably return size before doing any read calls, - // we call during "construction" and remember the results. - objSize, objSizeErr := TryToGetSize(r) - - return &tracingReadCloser{r: r, s: span, objSize: objSize, objSizeErr: objSizeErr} -} - -func (t *tracingReadCloser) ObjectSize() (int64, error) { - return t.objSize, t.objSizeErr -} - -func (t *tracingReadCloser) Read(p []byte) (int, error) { - n, err := t.r.Read(p) - if n > 0 { - t.read += n - } - if err != nil && err != io.EOF && t.s != nil { - t.s.LogKV("err", err) - } - return n, err -} - -func (t *tracingReadCloser) Close() error { - err := t.r.Close() - if t.s != nil { - t.s.LogKV("read", t.read) - if err != nil { - t.s.LogKV("close err", err) - } - t.s.Finish() - t.s = nil - } - return err -} diff --git a/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go b/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go new file mode 100644 index 0000000000..8174afb142 --- /dev/null +++ b/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go @@ -0,0 +1,216 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package opentracing + +import ( + "context" + "io" + + "github.com/opentracing/opentracing-go" + + "github.com/thanos-io/objstore" +) + +type contextKey struct{} + +var tracerKey = contextKey{} + +// Tracer interface to provide GetTraceIDFromSpanContext method. +type Tracer interface { + GetTraceIDFromSpanContext(ctx opentracing.SpanContext) (string, bool) +} + +// ContextWithTracer returns a new `context.Context` that holds a reference to given opentracing.Tracer. +func ContextWithTracer(ctx context.Context, tracer opentracing.Tracer) context.Context { + return context.WithValue(ctx, tracerKey, tracer) +} + +// TracerFromContext extracts opentracing.Tracer from the given context. +func TracerFromContext(ctx context.Context) opentracing.Tracer { + val := ctx.Value(tracerKey) + if sp, ok := val.(opentracing.Tracer); ok { + return sp + } + return nil +} + +// TracingBucket includes bucket operations in the traces. +type TracingBucket struct { + bkt objstore.Bucket +} + +func WrapWithTraces(bkt objstore.Bucket) objstore.InstrumentedBucket { + return TracingBucket{bkt: bkt} +} + +func (t TracingBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) (err error) { + doWithSpan(ctx, "bucket_iter", func(spanCtx context.Context, span opentracing.Span) { + span.LogKV("dir", dir) + err = t.bkt.Iter(spanCtx, dir, f, options...) + }) + return +} + +func (t TracingBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + span, spanCtx := startSpan(ctx, "bucket_get") + span.LogKV("name", name) + + r, err := t.bkt.Get(spanCtx, name) + if err != nil { + span.LogKV("err", err) + span.Finish() + return nil, err + } + + return newTracingReadCloser(r, span), nil +} + +func (t TracingBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + span, spanCtx := startSpan(ctx, "bucket_getrange") + span.LogKV("name", name, "offset", off, "length", length) + + r, err := t.bkt.GetRange(spanCtx, name, off, length) + if err != nil { + span.LogKV("err", err) + span.Finish() + return nil, err + } + + return newTracingReadCloser(r, span), nil +} + +func (t TracingBucket) Exists(ctx context.Context, name string) (exists bool, err error) { + doWithSpan(ctx, "bucket_exists", func(spanCtx context.Context, span opentracing.Span) { + span.LogKV("name", name) + exists, err = t.bkt.Exists(spanCtx, name) + }) + return +} + +func (t TracingBucket) Attributes(ctx context.Context, name string) (attrs objstore.ObjectAttributes, err error) { + doWithSpan(ctx, "bucket_attributes", func(spanCtx context.Context, span opentracing.Span) { + span.LogKV("name", name) + attrs, err = t.bkt.Attributes(spanCtx, name) + }) + return +} + +func (t TracingBucket) Upload(ctx context.Context, name string, r io.Reader) (err error) { + doWithSpan(ctx, "bucket_upload", func(spanCtx context.Context, span opentracing.Span) { + span.LogKV("name", name) + err = t.bkt.Upload(spanCtx, name, r) + }) + return +} + +func (t TracingBucket) Delete(ctx context.Context, name string) (err error) { + doWithSpan(ctx, "bucket_delete", func(spanCtx context.Context, span opentracing.Span) { + span.LogKV("name", name) + err = t.bkt.Delete(spanCtx, name) + }) + return +} + +func (t TracingBucket) Name() string { + return "tracing: " + t.bkt.Name() +} + +func (t TracingBucket) Close() error { + return t.bkt.Close() +} + +func (t TracingBucket) IsObjNotFoundErr(err error) bool { + return t.bkt.IsObjNotFoundErr(err) +} + +func (t TracingBucket) IsCustomerManagedKeyError(err error) bool { + return t.bkt.IsCustomerManagedKeyError(err) +} + +func (t TracingBucket) WithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.Bucket { + if ib, ok := t.bkt.(objstore.InstrumentedBucket); ok { + return TracingBucket{bkt: ib.WithExpectedErrs(expectedFunc)} + } + return t +} + +func (t TracingBucket) ReaderWithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.BucketReader { + return t.WithExpectedErrs(expectedFunc) +} + +type tracingReadCloser struct { + r io.ReadCloser + s opentracing.Span + + objSize int64 + objSizeErr error + + read int +} + +func newTracingReadCloser(r io.ReadCloser, span opentracing.Span) io.ReadCloser { + // Since TryToGetSize can only reliably return size before doing any read calls, + // we call during "construction" and remember the results. + objSize, objSizeErr := objstore.TryToGetSize(r) + + return &tracingReadCloser{r: r, s: span, objSize: objSize, objSizeErr: objSizeErr} +} + +func (t *tracingReadCloser) ObjectSize() (int64, error) { + return t.objSize, t.objSizeErr +} + +func (t *tracingReadCloser) Read(p []byte) (int, error) { + n, err := t.r.Read(p) + if n > 0 { + t.read += n + } + if err != nil && err != io.EOF && t.s != nil { + t.s.LogKV("err", err) + } + return n, err +} + +func (t *tracingReadCloser) Close() error { + err := t.r.Close() + if t.s != nil { + t.s.LogKV("read", t.read) + if err != nil { + t.s.LogKV("close err", err) + } + t.s.Finish() + t.s = nil + } + return err +} + +// Aliases to avoid spreading opentracing package to Thanos code. +type Tag = opentracing.Tag +type Tags = opentracing.Tags +type Span = opentracing.Span + +// startSpan starts and returns span with `operationName` and hooking as child to a span found within given context if any. +// It uses opentracing.Tracer propagated in context. If no found, it uses noop tracer without notification. +func startSpan(ctx context.Context, operationName string, opts ...opentracing.StartSpanOption) (Span, context.Context) { + tracer := TracerFromContext(ctx) + if tracer == nil { + // No tracing found, return noop span. + return opentracing.NoopTracer{}.StartSpan(operationName), ctx + } + + var span Span + if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil { + opts = append(opts, opentracing.ChildOf(parentSpan.Context())) + } + span = tracer.StartSpan(operationName, opts...) + return span, opentracing.ContextWithSpan(ctx, span) +} + +// doWithSpan executes function doFn inside new span with `operationName` name and hooking as child to a span found within given context if any. +// It uses opentracing.Tracer propagated in context. If no found, it uses noop tracer notification. +func doWithSpan(ctx context.Context, operationName string, doFn func(context.Context, Span), opts ...opentracing.StartSpanOption) { + span, newCtx := startSpan(ctx, operationName, opts...) + defer span.Finish() + doFn(newCtx, span) +} diff --git a/vendor/github.com/thanos-io/objstore/tracing/tracing.go b/vendor/github.com/thanos-io/objstore/tracing/tracing.go deleted file mode 100644 index 9beb2bd813..0000000000 --- a/vendor/github.com/thanos-io/objstore/tracing/tracing.go +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package tracing - -import ( - "context" - - "github.com/opentracing/opentracing-go" -) - -const ( - // ForceTracingBaggageKey is a request header name that forces tracing sampling. - ForceTracingBaggageKey = "X-Thanos-Force-Tracing" -) - -// Aliases to avoid spreading opentracing package to Thanos code. - -type Tag = opentracing.Tag -type Tags = opentracing.Tags -type Span = opentracing.Span - -type contextKey struct{} - -var tracerKey = contextKey{} - -// Tracer interface to provide GetTraceIDFromSpanContext method. -type Tracer interface { - GetTraceIDFromSpanContext(ctx opentracing.SpanContext) (string, bool) -} - -// ContextWithTracer returns a new `context.Context` that holds a reference to given opentracing.Tracer. -func ContextWithTracer(ctx context.Context, tracer opentracing.Tracer) context.Context { - return context.WithValue(ctx, tracerKey, tracer) -} - -// tracerFromContext extracts opentracing.Tracer from the given context. -func tracerFromContext(ctx context.Context) opentracing.Tracer { - val := ctx.Value(tracerKey) - if sp, ok := val.(opentracing.Tracer); ok { - return sp - } - return nil -} - -// StartSpan starts and returns span with `operationName` and hooking as child to a span found within given context if any. -// It uses opentracing.Tracer propagated in context. If no found, it uses noop tracer without notification. -func StartSpan(ctx context.Context, operationName string, opts ...opentracing.StartSpanOption) (Span, context.Context) { - tracer := tracerFromContext(ctx) - if tracer == nil { - // No tracing found, return noop span. - return opentracing.NoopTracer{}.StartSpan(operationName), ctx - } - - var span Span - if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil { - opts = append(opts, opentracing.ChildOf(parentSpan.Context())) - } - span = tracer.StartSpan(operationName, opts...) - return span, opentracing.ContextWithSpan(ctx, span) -} - -// DoWithSpan executes function doFn inside new span with `operationName` name and hooking as child to a span found within given context if any. -// It uses opentracing.Tracer propagated in context. If no found, it uses noop tracer notification. -func DoWithSpan(ctx context.Context, operationName string, doFn func(context.Context, Span), opts ...opentracing.StartSpanOption) { - span, newCtx := StartSpan(ctx, operationName, opts...) - defer span.Finish() - doFn(newCtx, span) -} diff --git a/vendor/github.com/thanos-io/thanos/pkg/block/fetcher.go b/vendor/github.com/thanos-io/thanos/pkg/block/fetcher.go index d2953ae9c5..69e27737e5 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/block/fetcher.go +++ b/vendor/github.com/thanos-io/thanos/pkg/block/fetcher.go @@ -586,24 +586,28 @@ func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]* return nil } -var _ MetadataFilter = &DeduplicateFilter{} +var _ MetadataFilter = &DefaultDeduplicateFilter{} -// DeduplicateFilter is a BaseFetcher filter that filters out older blocks that have exactly the same data. +type DeduplicateFilter interface { + DuplicateIDs() []ulid.ULID +} + +// DefaultDeduplicateFilter is a BaseFetcher filter that filters out older blocks that have exactly the same data. // Not go-routine safe. -type DeduplicateFilter struct { +type DefaultDeduplicateFilter struct { duplicateIDs []ulid.ULID concurrency int mu sync.Mutex } -// NewDeduplicateFilter creates DeduplicateFilter. -func NewDeduplicateFilter(concurrency int) *DeduplicateFilter { - return &DeduplicateFilter{concurrency: concurrency} +// NewDeduplicateFilter creates DefaultDeduplicateFilter. +func NewDeduplicateFilter(concurrency int) *DefaultDeduplicateFilter { + return &DefaultDeduplicateFilter{concurrency: concurrency} } // Filter filters out duplicate blocks that can be formed // from two or more overlapping blocks that fully submatches the source blocks of the older blocks. -func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error { +func (f *DefaultDeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error { f.duplicateIDs = f.duplicateIDs[:0] var wg sync.WaitGroup @@ -635,7 +639,7 @@ func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metad return nil } -func (f *DeduplicateFilter) filterGroup(metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec) { +func (f *DefaultDeduplicateFilter) filterGroup(metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec) { sort.Slice(metaSlice, func(i, j int) bool { ilen := len(metaSlice[i].Compaction.Sources) jlen := len(metaSlice[j].Compaction.Sources) @@ -677,8 +681,8 @@ childLoop: f.mu.Unlock() } -// DuplicateIDs returns slice of block ids that are filtered out by DeduplicateFilter. -func (f *DeduplicateFilter) DuplicateIDs() []ulid.ULID { +// DuplicateIDs returns slice of block ids that are filtered out by DefaultDeduplicateFilter. +func (f *DefaultDeduplicateFilter) DuplicateIDs() []ulid.ULID { return f.duplicateIDs } diff --git a/vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go b/vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go index 0f6f7be29d..a390ff7ae3 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go +++ b/vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go @@ -93,6 +93,9 @@ type Thanos struct { // IndexStats contains stats info related to block index. IndexStats IndexStats `json:"index_stats,omitempty"` + + // Extensions are used for plugin any arbitrary additional information for block. Optional. + Extensions any `json:"extensions,omitempty"` } type IndexStats struct { @@ -100,6 +103,26 @@ type IndexStats struct { ChunkMaxSize int64 `json:"chunk_max_size,omitempty"` } +func (m *Thanos) ParseExtensions(v any) (any, error) { + return ConvertExtensions(m.Extensions, v) +} + +// ConvertExtensions converts extensions with `any` type into specific type `v` +// that the caller expects. +func ConvertExtensions(extensions any, v any) (any, error) { + if extensions == nil { + return nil, nil + } + extensionsContent, err := json.Marshal(extensions) + if err != nil { + return nil, err + } + if err = json.Unmarshal(extensionsContent, v); err != nil { + return nil, err + } + return v, nil +} + type Rewrite struct { // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` diff --git a/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go b/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go index f36e70dc81..2e20ad73db 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go +++ b/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go @@ -58,7 +58,7 @@ type Syncer struct { blocks map[ulid.ULID]*metadata.Meta partial map[ulid.ULID]error metrics *syncerMetrics - duplicateBlocksFilter *block.DeduplicateFilter + duplicateBlocksFilter block.DeduplicateFilter ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter } @@ -95,7 +95,7 @@ func newSyncerMetrics(reg prometheus.Registerer, blocksMarkedForDeletion, garbag // NewMetaSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. -func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter *block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks prometheus.Counter) (*Syncer, error) { +func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks prometheus.Counter) (*Syncer, error) { if logger == nil { logger = log.NewNopLogger() } @@ -351,6 +351,7 @@ type Group struct { hashFunc metadata.HashFunc blockFilesConcurrency int compactBlocksFetchConcurrency int + extensions any } // NewGroup returns a new compaction group. @@ -491,6 +492,14 @@ func (cg *Group) Resolution() int64 { return cg.resolution } +func (cg *Group) Extensions() any { + return cg.extensions +} + +func (cg *Group) SetExtensions(extensions any) { + cg.extensions = extensions +} + // CompactProgressMetrics contains Prometheus metrics related to compaction progress. type CompactProgressMetrics struct { NumberOfCompactionRuns prometheus.Gauge @@ -536,7 +545,7 @@ func (ps *CompactionProgressCalculator) ProgressCalculate(ctx context.Context, g if len(g.IDs()) == 1 { continue } - plan, err := ps.planner.Plan(ctx, g.metasByMinTime) + plan, err := ps.planner.Plan(ctx, g.metasByMinTime, nil, g.extensions) if err != nil { return errors.Wrapf(err, "could not plan") } @@ -728,7 +737,50 @@ func (rs *RetentionProgressCalculator) ProgressCalculate(ctx context.Context, gr type Planner interface { // Plan returns a list of blocks that should be compacted into single one. // The blocks can be overlapping. The provided metadata has to be ordered by minTime. - Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) + Plan(ctx context.Context, metasByMinTime []*metadata.Meta, errChan chan error, extensions any) ([]*metadata.Meta, error) +} + +type BlockDeletableChecker interface { + CanDelete(group *Group, blockID ulid.ULID) bool +} + +type DefaultBlockDeletableChecker struct { +} + +func (c DefaultBlockDeletableChecker) CanDelete(_ *Group, _ ulid.ULID) bool { + return true +} + +type CompactionLifecycleCallback interface { + PreCompactionCallback(ctx context.Context, logger log.Logger, group *Group, toCompactBlocks []*metadata.Meta) error + PostCompactionCallback(ctx context.Context, logger log.Logger, group *Group, blockID ulid.ULID) error + GetBlockPopulator(ctx context.Context, logger log.Logger, group *Group) (tsdb.BlockPopulator, error) +} + +type DefaultCompactionLifecycleCallback struct { +} + +func (c DefaultCompactionLifecycleCallback) PreCompactionCallback(_ context.Context, _ log.Logger, _ *Group, toCompactBlocks []*metadata.Meta) error { + // Due to #183 we verify that none of the blocks in the plan have overlapping sources. + // This is one potential source of how we could end up with duplicated chunks. + uniqueSources := map[ulid.ULID]struct{}{} + for _, m := range toCompactBlocks { + for _, s := range m.Compaction.Sources { + if _, ok := uniqueSources[s]; ok { + return halt(errors.Errorf("overlapping sources detected for plan %v", toCompactBlocks)) + } + uniqueSources[s] = struct{}{} + } + } + return nil +} + +func (c DefaultCompactionLifecycleCallback) PostCompactionCallback(_ context.Context, _ log.Logger, _ *Group, _ ulid.ULID) error { + return nil +} + +func (c DefaultCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, _ log.Logger, _ *Group) (tsdb.BlockPopulator, error) { + return tsdb.DefaultBlockPopulator{}, nil } // Compactor provides compaction against an underlying storage of time series data. @@ -748,11 +800,12 @@ type Compactor interface { // * The source dirs are marked Deletable. // * Returns empty ulid.ULID{}. Compact(dest string, dirs []string, open []*tsdb.Block) (ulid.ULID, error) + CompactWithBlockPopulator(dest string, dirs []string, open []*tsdb.Block, blockPopulator tsdb.BlockPopulator) (ulid.ULID, error) } // Compact plans and runs a single compaction against the group. The compacted result // is uploaded into the bucket the blocks were retrieved from. -func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, compID ulid.ULID, rerr error) { +func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp Compactor, blockDeletableChecker BlockDeletableChecker, compactionLifecycleCallback CompactionLifecycleCallback) (shouldRerun bool, compID ulid.ULID, rerr error) { cg.compactionRunsStarted.Inc() subDir := filepath.Join(dir, cg.Key()) @@ -772,10 +825,13 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir") } + errChan := make(chan error, 1) err := tracing.DoInSpanWithErr(ctx, "compaction_group", func(ctx context.Context) (err error) { - shouldRerun, compID, err = cg.compact(ctx, subDir, planner, comp) + shouldRerun, compID, err = cg.compact(ctx, subDir, planner, comp, blockDeletableChecker, compactionLifecycleCallback, errChan) return err }, opentracing.Tags{"group.key": cg.Key()}) + errChan <- err + close(errChan) if err != nil { cg.compactionFailures.Inc() return false, ulid.ULID{}, err @@ -975,7 +1031,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, return nil } -func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, compID ulid.ULID, _ error) { +func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor, blockDeletableChecker BlockDeletableChecker, compactionLifecycleCallback CompactionLifecycleCallback, errChan chan error) (shouldRerun bool, compID ulid.ULID, _ error) { cg.mtx.Lock() defer cg.mtx.Unlock() @@ -993,7 +1049,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp var toCompact []*metadata.Meta if err := tracing.DoInSpanWithErr(ctx, "compaction_planning", func(ctx context.Context) (e error) { - toCompact, e = planner.Plan(ctx, cg.metasByMinTime) + toCompact, e = planner.Plan(ctx, cg.metasByMinTime, errChan, cg.extensions) return e }); err != nil { return false, ulid.ULID{}, errors.Wrap(err, "plan compaction") @@ -1003,35 +1059,35 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp return false, ulid.ULID{}, nil } - level.Info(cg.logger).Log("msg", "compaction available and planned; downloading blocks", "plan", fmt.Sprintf("%v", toCompact)) - - // Due to #183 we verify that none of the blocks in the plan have overlapping sources. - // This is one potential source of how we could end up with duplicated chunks. - uniqueSources := map[ulid.ULID]struct{}{} + level.Info(cg.logger).Log("msg", "compaction available and planned", "plan", fmt.Sprintf("%v", toCompact)) // Once we have a plan we need to download the actual data. groupCompactionBegin := time.Now() begin := groupCompactionBegin + + if err := compactionLifecycleCallback.PreCompactionCallback(ctx, cg.logger, cg, toCompact); err != nil { + return false, ulid.ULID{}, errors.Wrapf(err, "failed to run pre compaction callback for plan: %s", fmt.Sprintf("%v", toCompact)) + } + level.Info(cg.logger).Log("msg", "finished running pre compaction callback; downloading blocks", "plan", fmt.Sprintf("%v", toCompact), "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) + + begin = time.Now() g, errCtx := errgroup.WithContext(ctx) g.SetLimit(cg.compactBlocksFetchConcurrency) toCompactDirs := make([]string, 0, len(toCompact)) for _, m := range toCompact { bdir := filepath.Join(dir, m.ULID.String()) - for _, s := range m.Compaction.Sources { - if _, ok := uniqueSources[s]; ok { - return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", toCompact)) - } - uniqueSources[s] = struct{}{} - } func(ctx context.Context, meta *metadata.Meta) { g.Go(func() error { + start := time.Now() if err := tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error { return block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFilesConcurrency)) }, opentracing.Tags{"block.id": meta.ULID}); err != nil { return retry(errors.Wrapf(err, "download block %s", meta.ULID)) } + level.Debug(cg.logger).Log("msg", "downloaded block", "block", meta.ULID.String(), "duration", time.Since(start), "duration_ms", time.Since(start).Milliseconds()) + start = time.Now() // Ensure all input blocks are valid. var stats block.HealthStats if err := tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) (e error) { @@ -1057,6 +1113,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp return errors.Wrapf(err, "block id %s, try running with --debug.accept-malformed-index", meta.ULID) } + level.Debug(cg.logger).Log("msg", "verified block", "block", meta.ULID.String(), "duration", time.Since(start), "duration_ms", time.Since(start).Milliseconds()) return nil }) }(errCtx, m) @@ -1073,7 +1130,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp begin = time.Now() if err := tracing.DoInSpanWithErr(ctx, "compaction", func(ctx context.Context) (e error) { - compID, e = comp.Compact(dir, toCompactDirs, nil) + populateBlockFunc, e := compactionLifecycleCallback.GetBlockPopulator(ctx, cg.logger, cg) + if e != nil { + return e + } + compID, e = comp.CompactWithBlockPopulator(dir, toCompactDirs, nil, populateBlockFunc) return e }); err != nil { return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact blocks %v", toCompactDirs)) @@ -1083,7 +1144,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp level.Info(cg.logger).Log("msg", "compacted block would have no samples, deleting source blocks", "blocks", sourceBlockStr) for _, meta := range toCompact { if meta.Stats.NumSamples == 0 { - if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())); err != nil { + if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String()), blockDeletableChecker); err != nil { level.Warn(cg.logger).Log("msg", "failed to mark for deletion an empty block found during compaction", "block", meta.ULID) } } @@ -1128,6 +1189,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Downsample: metadata.ThanosDownsample{Resolution: cg.resolution}, Source: metadata.CompactorSource, SegmentFiles: block.GetSegmentFiles(bdir), + Extensions: cg.extensions, } if stats.ChunkMaxSize > 0 { thanosMeta.IndexStats.ChunkMaxSize = stats.ChunkMaxSize @@ -1163,7 +1225,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp // Eventually the block we just uploaded should get synced into the group again (including sync-delay). for _, meta := range toCompact { err = tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error { - return cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())) + return cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String()), blockDeletableChecker) }, opentracing.Tags{"block.id": meta.ULID}) if err != nil { return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket")) @@ -1171,22 +1233,30 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp cg.groupGarbageCollectedBlocks.Inc() } + level.Info(cg.logger).Log("msg", "running post compaction callback", "result_block", compID) + if err := compactionLifecycleCallback.PostCompactionCallback(ctx, cg.logger, cg, compID); err != nil { + return false, ulid.ULID{}, retry(errors.Wrapf(err, "failed to run post compaction callback for result block %s", compID)) + } + level.Info(cg.logger).Log("msg", "finished running post compaction callback", "result_block", compID) + level.Info(cg.logger).Log("msg", "finished compacting blocks", "result_block", compID, "source_blocks", sourceBlockStr, "duration", time.Since(groupCompactionBegin), "duration_ms", time.Since(groupCompactionBegin).Milliseconds()) return true, compID, nil } -func (cg *Group) deleteBlock(id ulid.ULID, bdir string) error { +func (cg *Group) deleteBlock(id ulid.ULID, bdir string, blockDeletableChecker BlockDeletableChecker) error { if err := os.RemoveAll(bdir); err != nil { return errors.Wrapf(err, "remove old block dir %s", id) } - // Spawn a new context so we always mark a block for deletion in full on shutdown. - delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - defer cancel() - level.Info(cg.logger).Log("msg", "marking compacted block for deletion", "old_block", id) - if err := block.MarkForDeletion(delCtx, cg.logger, cg.bkt, id, "source of compacted block", cg.blocksMarkedForDeletion); err != nil { - return errors.Wrapf(err, "mark block %s for deletion from bucket", id) + if blockDeletableChecker.CanDelete(cg, id) { + // Spawn a new context so we always mark a block for deletion in full on shutdown. + delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + level.Info(cg.logger).Log("msg", "marking compacted block for deletion", "old_block", id) + if err := block.MarkForDeletion(delCtx, cg.logger, cg.bkt, id, "source of compacted block", cg.blocksMarkedForDeletion); err != nil { + return errors.Wrapf(err, "mark block %s for deletion from bucket", id) + } } return nil } @@ -1198,6 +1268,8 @@ type BucketCompactor struct { grouper Grouper comp Compactor planner Planner + blockDeletableChecker BlockDeletableChecker + compactionLifecycleCallback CompactionLifecycleCallback compactDir string bkt objstore.Bucket concurrency int @@ -1215,6 +1287,37 @@ func NewBucketCompactor( bkt objstore.Bucket, concurrency int, skipBlocksWithOutOfOrderChunks bool, +) (*BucketCompactor, error) { + if concurrency <= 0 { + return nil, errors.Errorf("invalid concurrency level (%d), concurrency level must be > 0", concurrency) + } + return NewBucketCompactorWithCheckerAndCallback( + logger, + sy, + grouper, + planner, + comp, + DefaultBlockDeletableChecker{}, + DefaultCompactionLifecycleCallback{}, + compactDir, + bkt, + concurrency, + skipBlocksWithOutOfOrderChunks, + ) +} + +func NewBucketCompactorWithCheckerAndCallback( + logger log.Logger, + sy *Syncer, + grouper Grouper, + planner Planner, + comp Compactor, + blockDeletableChecker BlockDeletableChecker, + compactionLifecycleCallback CompactionLifecycleCallback, + compactDir string, + bkt objstore.Bucket, + concurrency int, + skipBlocksWithOutOfOrderChunks bool, ) (*BucketCompactor, error) { if concurrency <= 0 { return nil, errors.Errorf("invalid concurrency level (%d), concurrency level must be > 0", concurrency) @@ -1225,6 +1328,8 @@ func NewBucketCompactor( grouper: grouper, planner: planner, comp: comp, + blockDeletableChecker: blockDeletableChecker, + compactionLifecycleCallback: compactionLifecycleCallback, compactDir: compactDir, bkt: bkt, concurrency: concurrency, @@ -1265,7 +1370,7 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { go func() { defer wg.Done() for g := range groupChan { - shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.planner, c.comp) + shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.planner, c.comp, c.blockDeletableChecker, c.compactionLifecycleCallback) if err == nil { if shouldRerunGroup { mtx.Lock() diff --git a/vendor/github.com/thanos-io/thanos/pkg/compact/planner.go b/vendor/github.com/thanos-io/thanos/pkg/compact/planner.go index 5c2a93df8d..783191cacf 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/compact/planner.go +++ b/vendor/github.com/thanos-io/thanos/pkg/compact/planner.go @@ -49,7 +49,7 @@ func NewPlanner(logger log.Logger, ranges []int64, noCompBlocks *GatherNoCompact } // TODO(bwplotka): Consider smarter algorithm, this prefers smaller iterative compactions vs big single one: https://github.com/thanos-io/thanos/issues/3405 -func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { +func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error) { return p.plan(p.noCompBlocksFunc(), metasByMinTime) } @@ -243,7 +243,7 @@ func WithLargeTotalIndexSizeFilter(with *tsdbBasedPlanner, bkt objstore.Bucket, return &largeTotalIndexSizeFilter{tsdbBasedPlanner: with, bkt: bkt, totalMaxIndexSizeBytes: totalMaxIndexSizeBytes, markedForNoCompact: markedForNoCompact} } -func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { +func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error) { noCompactMarked := t.noCompBlocksFunc() copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(noCompactMarked)) for k, v := range noCompactMarked { diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/cache/cache.go b/vendor/github.com/thanos-io/thanos/pkg/store/cache/cache.go index bcbb727b30..ad6ecac30e 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/cache/cache.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/cache/cache.go @@ -58,8 +58,9 @@ type IndexCache interface { // Common metrics that should be used by all cache implementations. type commonMetrics struct { - requestTotal *prometheus.CounterVec - hitsTotal *prometheus.CounterVec + requestTotal *prometheus.CounterVec + hitsTotal *prometheus.CounterVec + dataSizeBytes *prometheus.HistogramVec } func newCommonMetrics(reg prometheus.Registerer) *commonMetrics { @@ -72,6 +73,13 @@ func newCommonMetrics(reg prometheus.Registerer) *commonMetrics { Name: "thanos_store_index_cache_hits_total", Help: "Total number of items requests to the cache that were a hit.", }, []string{"item_type"}), + dataSizeBytes: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "thanos_store_index_cache_stored_data_size_bytes", + Help: "Histogram to track item data size stored in index cache", + Buckets: []float64{ + 32, 256, 512, 1024, 32 * 1024, 256 * 1024, 512 * 1024, 1024 * 1024, 32 * 1024 * 1024, 256 * 1024 * 1024, 512 * 1024 * 1024, + }, + }, []string{"item_type"}), } } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/cache/inmemory.go b/vendor/github.com/thanos-io/thanos/pkg/store/cache/inmemory.go index d5227285a2..747199b414 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/cache/inmemory.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/cache/inmemory.go @@ -295,6 +295,7 @@ func copyToKey(l labels.Label) cacheKeyPostings { // StorePostings sets the postings identified by the ulid and label to the value v, // if the postings already exists in the cache it is not mutated. func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { + c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypePostings).Observe(float64(len(v))) c.set(cacheTypePostings, cacheKey{block: blockID.String(), key: copyToKey(l)}, v) } @@ -318,6 +319,7 @@ func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid. // StoreExpandedPostings stores expanded postings for a set of label matchers. func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) { + c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeExpandedPostings).Observe(float64(len(v))) c.set(cacheTypeExpandedPostings, cacheKey{block: blockID.String(), key: cacheKeyExpandedPostings(labelMatchersToString(matchers))}, v) } @@ -332,6 +334,7 @@ func (c *InMemoryIndexCache) FetchExpandedPostings(_ context.Context, blockID ul // StoreSeries sets the series identified by the ulid and id to the value v, // if the series already exists in the cache it is not mutated. func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { + c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeSeries).Observe(float64(len(v))) c.set(cacheTypeSeries, cacheKey{blockID.String(), cacheKeySeries(id), ""}, v) } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/cache/memcached.go b/vendor/github.com/thanos-io/thanos/pkg/store/cache/memcached.go index f5ab1c4b02..9292f3ed59 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/cache/memcached.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/cache/memcached.go @@ -33,12 +33,15 @@ type RemoteIndexCache struct { compressionScheme string // Metrics. - postingRequests prometheus.Counter - seriesRequests prometheus.Counter - expandedPostingRequests prometheus.Counter - postingHits prometheus.Counter - seriesHits prometheus.Counter - expandedPostingHits prometheus.Counter + postingRequests prometheus.Counter + seriesRequests prometheus.Counter + expandedPostingRequests prometheus.Counter + postingHits prometheus.Counter + seriesHits prometheus.Counter + expandedPostingHits prometheus.Counter + postingDataSizeBytes prometheus.Observer + expandedPostingDataSizeBytes prometheus.Observer + seriesDataSizeBytes prometheus.Observer } // NewRemoteIndexCache makes a new RemoteIndexCache. @@ -61,6 +64,10 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli c.seriesHits = commonMetrics.hitsTotal.WithLabelValues(cacheTypeSeries) c.expandedPostingHits = commonMetrics.hitsTotal.WithLabelValues(cacheTypeExpandedPostings) + c.postingDataSizeBytes = commonMetrics.dataSizeBytes.WithLabelValues(cacheTypePostings) + c.seriesDataSizeBytes = commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeSeries) + c.expandedPostingDataSizeBytes = commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeExpandedPostings) + level.Info(logger).Log("msg", "created index cache") return c, nil @@ -70,6 +77,7 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { + c.postingDataSizeBytes.Observe(float64(len(v))) key := cacheKey{blockID.String(), cacheKeyPostings(l), c.compressionScheme}.string() if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err) @@ -118,6 +126,7 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid. // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labels.Matcher, v []byte) { + c.expandedPostingDataSizeBytes.Observe(float64(len(v))) key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(keys)), c.compressionScheme}.string() if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { @@ -148,6 +157,7 @@ func (c *RemoteIndexCache) FetchExpandedPostings(ctx context.Context, blockID ul // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { + c.seriesDataSizeBytes.Observe(float64(len(v))) key := cacheKey{blockID.String(), cacheKeySeries(id), ""}.string() if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { diff --git a/vendor/modules.txt b/vendor/modules.txt index 3e751de1b0..a8637a2d59 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -832,7 +832,7 @@ github.com/stretchr/objx github.com/stretchr/testify/assert github.com/stretchr/testify/mock github.com/stretchr/testify/require -# github.com/thanos-io/objstore v0.0.0-20230710163637-47c0118da0ca +# github.com/thanos-io/objstore v0.0.0-20230713070940-eb01c83b89a4 ## explicit; go 1.18 github.com/thanos-io/objstore github.com/thanos-io/objstore/exthttp @@ -841,7 +841,7 @@ github.com/thanos-io/objstore/providers/filesystem github.com/thanos-io/objstore/providers/gcs github.com/thanos-io/objstore/providers/s3 github.com/thanos-io/objstore/providers/swift -github.com/thanos-io/objstore/tracing +github.com/thanos-io/objstore/tracing/opentracing # github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea ## explicit; go 1.19 github.com/thanos-io/promql-engine/api @@ -863,7 +863,7 @@ github.com/thanos-io/promql-engine/logicalplan github.com/thanos-io/promql-engine/parser github.com/thanos-io/promql-engine/query github.com/thanos-io/promql-engine/worker -# github.com/thanos-io/thanos v0.31.1-0.20230712154708-a395c5dbd054 +# github.com/thanos-io/thanos v0.31.1-0.20230714171248-723dfd08764a ## explicit; go 1.18 github.com/thanos-io/thanos/pkg/block github.com/thanos-io/thanos/pkg/block/indexheader From 7c70c5c90298f49374369a29d1087c192b6c962a Mon Sep 17 00:00:00 2001 From: Alex Le Date: Fri, 14 Jul 2023 15:31:26 -0700 Subject: [PATCH 2/4] fix tests Signed-off-by: Alex Le --- pkg/compactor/compactor_test.go | 20 +++++++++---------- .../bucketindex/markers_bucket_client_test.go | 2 +- pkg/storage/tsdb/bucketindex/markers_test.go | 2 +- pkg/storage/tsdb/bucketindex/updater_test.go | 4 ++-- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 59fc5e6fbd..a7beb784d2 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -510,7 +510,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant( bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil) c, _, tsdbPlannerMock, _, registry := prepare(t, prepareConfig(), bucketClient, nil) - tsdbPlannerMock.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, errors.New("Failed to plan")) + tsdbPlannerMock.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, errors.New("Failed to plan")) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) // Wait until all retry attempts have completed. @@ -585,7 +585,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { // in order to simplify tests (all in all, we just want to // test our logic and not TSDB compactor which we expect to // be already tested). - tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) + tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) @@ -850,7 +850,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) { c, _, tsdbPlanner, _, registry := prepare(t, prepareConfig(), bucketClient, nil) - tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) + tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) @@ -907,7 +907,7 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) // in order to simplify tests (all in all, we just want to // test our logic and not TSDB compactor which we expect to // be already tested). - tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) + tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) @@ -1004,7 +1004,7 @@ func TestCompactor_ShouldSkipOutOrOrderBlocks(t *testing.T) { tsdbCompac.On("Compact", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(b1, nil) - tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{ + tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{ { BlockMeta: tsdb.BlockMeta{ ULID: b1, @@ -1097,7 +1097,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni // in order to simplify tests (all in all, we just want to // test our logic and not TSDB compactor which we expect to // be already tested). - tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) + tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) @@ -1193,7 +1193,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM // in order to simplify tests (all in all, we just want to // test our logic and not TSDB compactor which we expect to // be already tested). - tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) + tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) } // Start all compactors @@ -1325,7 +1325,7 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit // in order to simplify tests (all in all, we just want to // test our logic and not TSDB compactor which we expect to // be already tested). - tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) + tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) } // Start all compactors @@ -1814,7 +1814,7 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) { // in order to simplify tests (all in all, we just want to // test our logic and not TSDB compactor which we expect to // be already tested). - tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) + tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) } require.Equal(t, 2, len(compactors)) @@ -1900,7 +1900,7 @@ func TestCompactor_ShouldNotTreatInterruptionsAsErrors(t *testing.T) { tsdbCompactor.On("Compact", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ulid.ULID{}, context.Canceled).Run(func(args mock.Arguments) { cancel() }) - tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{ + tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{ { BlockMeta: tsdb.BlockMeta{ ULID: b1, diff --git a/pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go b/pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go index 7d6c20a609..faea4d30aa 100644 --- a/pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go +++ b/pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go @@ -215,7 +215,7 @@ func TestBucketWithGlobalMarkers_ShouldWorkCorrectlyWithBucketMetrics(t *testing // global markers (intentionally in the middle of the chain) and // user prefix. bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) - bkt = objstore.BucketWithMetrics("", bkt, prometheus.WrapRegistererWithPrefix("thanos_", reg)) + bkt = objstore.WrapWithMetrics(bkt, prometheus.WrapRegistererWithPrefix("thanos_", reg), "") bkt = BucketWithGlobalMarkers(bkt) userBkt := bucket.NewUserBucketClient("user-1", bkt, nil) diff --git a/pkg/storage/tsdb/bucketindex/markers_test.go b/pkg/storage/tsdb/bucketindex/markers_test.go index 087de20a9d..00b025937c 100644 --- a/pkg/storage/tsdb/bucketindex/markers_test.go +++ b/pkg/storage/tsdb/bucketindex/markers_test.go @@ -56,7 +56,7 @@ func TestMigrateBlockDeletionMarksToGlobalLocation(t *testing.T) { t.Run("doesn't increase thanos_objstore_bucket_operation_failures_total for NotFound deletion markers", func(t *testing.T) { reg := prometheus.NewPedanticRegistry() - bkt = objstore.BucketWithMetrics("", bkt, prometheus.WrapRegistererWithPrefix("thanos_", reg)) + bkt = objstore.WrapWithMetrics(bkt, prometheus.WrapRegistererWithPrefix("thanos_", reg), "") require.NoError(t, bkt.Upload(ctx, path.Join("user-1", block2.String(), metadata.MetaFilename), strings.NewReader("{}"))) require.NoError(t, MigrateBlockDeletionMarksToGlobalLocation(ctx, bkt, "user-1", nil)) diff --git a/pkg/storage/tsdb/bucketindex/updater_test.go b/pkg/storage/tsdb/bucketindex/updater_test.go index e9d40f3737..424ef6ecda 100644 --- a/pkg/storage/tsdb/bucketindex/updater_test.go +++ b/pkg/storage/tsdb/bucketindex/updater_test.go @@ -106,7 +106,7 @@ func TestUpdater_UpdateIndex_ShouldNotIncreaseOperationFailureMetric(t *testing. // Mock some blocks in the storage. bkt = BucketWithGlobalMarkers(bkt) - bkt = objstore.BucketWithMetrics("test-bucket", bkt, prometheus.WrapRegistererWithPrefix("thanos_", registry)) + bkt = objstore.WrapWithMetrics(bkt, prometheus.WrapRegistererWithPrefix("thanos_", registry), "test-bucket") block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20) block2 := testutil.MockStorageBlock(t, bkt, userID, 20, 30) block3 := testutil.MockStorageBlock(t, bkt, userID, 30, 40) @@ -149,7 +149,7 @@ func TestUpdater_UpdateIndex_ShouldNotIncreaseOperationFailureMetricCustomerKey( // Mock some blocks in the storage. bkt = BucketWithGlobalMarkers(bkt) - bkt = objstore.BucketWithMetrics("test-bucket", bkt, prometheus.WrapRegistererWithPrefix("thanos_", registry)) + bkt = objstore.WrapWithMetrics(bkt, prometheus.WrapRegistererWithPrefix("thanos_", registry), "test-bucket") block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20) block2 := testutil.MockStorageBlock(t, bkt, userID, 20, 30) From 59fa635295f5ddc9b7a6e42de85310842f99dd62 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Fri, 14 Jul 2023 16:20:47 -0700 Subject: [PATCH 3/4] fix tests Signed-off-by: Alex Le --- pkg/compactor/compactor_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index a7beb784d2..99c5753e06 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1002,7 +1002,7 @@ func TestCompactor_ShouldSkipOutOrOrderBlocks(t *testing.T) { cfg.SkipBlocksWithOutOfOrderChunksEnabled = true c, tsdbCompac, tsdbPlanner, _, registry := prepare(t, cfg, bucketClient, nil) - tsdbCompac.On("Compact", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(b1, nil) + tsdbCompac.On("CompactWithBlockPopulator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(b1, nil) tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{ { @@ -1897,7 +1897,7 @@ func TestCompactor_ShouldNotTreatInterruptionsAsErrors(t *testing.T) { c, tsdbCompactor, tsdbPlanner, logs, registry := prepare(t, prepareConfig(), bucketClient, nil) ctx, cancel := context.WithCancel(context.Background()) - tsdbCompactor.On("Compact", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ulid.ULID{}, context.Canceled).Run(func(args mock.Arguments) { + tsdbCompactor.On("CompactWithBlockPopulator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ulid.ULID{}, context.Canceled).Run(func(args mock.Arguments) { cancel() }) tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{ From c4ef4034eb5663decf29168e9484d1cd835071a0 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Fri, 14 Jul 2023 16:54:14 -0700 Subject: [PATCH 4/4] trigger workflow Signed-off-by: Alex Le