Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade objstore #6507

Merged
merged 3 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 20 additions & 16 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ import (
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/client"
objstoretracing "github.com/thanos-io/objstore/tracing/opentracing"

blocksAPI "github.com/thanos-io/thanos/pkg/api/blocks"
"github.com/thanos-io/thanos/pkg/block"
Expand Down Expand Up @@ -202,10 +205,11 @@ func runCompact(
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, extprom.WrapRegistererWithPrefix("thanos_", reg), component.String())
bkt, err := client.NewBucket(logger, confContentYaml, component.String())
if err != nil {
return err
}
insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name()))

relabelContentYaml, err := conf.selectorRelabelConf.Content()
if err != nil {
Expand All @@ -220,21 +224,21 @@ func runCompact(
// Ensure we close up everything properly.
defer func() {
if err != nil {
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
runutil.CloseWithLogOnErr(logger, insBkt, "bucket client")
}
}()

// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
// This is to make sure compactor will not accidentally perform compactions with gap instead.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, deleteDelay/2, conf.blockMetaFetchConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, deleteDelay/2, conf.blockMetaFetchConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter(conf.blockMetaFetchConcurrency)
noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(logger, bkt, conf.blockMetaFetchConcurrency)
noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(logger, insBkt, conf.blockMetaFetchConcurrency)
labelShardedMetaFilter := block.NewLabelShardedMetaFilter(relabelConfig)
consistencyDelayMetaFilter := block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg))
timePartitionMetaFilter := block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime)

baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, bkt, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg))
baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand All @@ -252,7 +256,7 @@ func runCompact(
)
}
var (
api = blocksAPI.NewBlocksAPI(logger, conf.webConf.disableCORS, conf.label, flagsMap, bkt)
api = blocksAPI.NewBlocksAPI(logger, conf.webConf.disableCORS, conf.label, flagsMap, insBkt)
sy *compact.Syncer
)
{
Expand All @@ -274,7 +278,7 @@ func runCompact(
sy, err = compact.NewMetaSyncer(
logger,
reg,
bkt,
insBkt,
cf,
duplicateBlocksFilter,
ignoreDeletionMarkFilter,
Expand Down Expand Up @@ -341,7 +345,7 @@ func runCompact(

grouper := compact.NewDefaultGrouper(
logger,
bkt,
insBkt,
conf.acceptMalformedIndex,
enableVerticalCompaction,
reg,
Expand All @@ -355,19 +359,19 @@ func runCompact(
tsdbPlanner := compact.NewPlanner(logger, levels, noCompactMarkerFilter)
planner := compact.WithLargeTotalIndexSizeFilter(
tsdbPlanner,
bkt,
insBkt,
int64(conf.maxBlockIndexSize),
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason),
)
blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures)
blocksCleaner := compact.NewBlocksCleaner(logger, insBkt, ignoreDeletionMarkFilter, deleteDelay, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(
logger,
sy,
grouper,
planner,
comp,
compactDir,
bkt,
insBkt,
conf.compactionConcurrency,
conf.skipBlockWithOutOfOrderChunks,
)
Expand Down Expand Up @@ -409,7 +413,7 @@ func runCompact(
return errors.Wrap(err, "syncing metas")
}

compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), bkt, compactMetrics.partialUploadDeleteAttempts, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures)
compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), insBkt, compactMetrics.partialUploadDeleteAttempts, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures)
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "cleaning marked blocks")
}
Expand Down Expand Up @@ -437,15 +441,15 @@ func runCompact(
downsampleMetrics.downsamples.WithLabelValues(groupKey)
downsampleMetrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, metadata.HashFunc(conf.hashFunc), conf.acceptMalformedIndex); err != nil {
if err := downsampleBucket(ctx, logger, downsampleMetrics, insBkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, metadata.HashFunc(conf.hashFunc), conf.acceptMalformedIndex); err != nil {
return errors.Wrap(err, "first pass of downsampling failed")
}

level.Info(logger).Log("msg", "start second pass of downsampling")
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, metadata.HashFunc(conf.hashFunc), conf.acceptMalformedIndex); err != nil {
if err := downsampleBucket(ctx, logger, downsampleMetrics, insBkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, metadata.HashFunc(conf.hashFunc), conf.acceptMalformedIndex); err != nil {
return errors.Wrap(err, "second pass of downsampling failed")
}
level.Info(logger).Log("msg", "downsampling iterations done")
Expand All @@ -458,15 +462,15 @@ func runCompact(
return errors.Wrap(err, "sync before retention")
}

if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, sy.Metas(), retentionByResolution, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "")); err != nil {
if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, insBkt, sy.Metas(), retentionByResolution, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "")); err != nil {
return errors.Wrap(err, "retention failed")
}

return cleanPartialMarked()
}

g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
defer runutil.CloseWithLogOnErr(logger, insBkt, "bucket client")

if !conf.wait {
return compactMainFn()
Expand Down
17 changes: 10 additions & 7 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/client"
objstoretracing "github.com/thanos-io/objstore/tracing/opentracing"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -80,15 +82,16 @@ func RunDownsample(
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, extprom.WrapRegistererWithPrefix("thanos_", reg), component.Downsample.String())
bkt, err := client.NewBucket(logger, confContentYaml, component.Downsample.String())
if err != nil {
return err
}
insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name()))

// While fetching blocks, filter out blocks that were marked for no downsample.
metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewDeduplicateFilter(block.FetcherConcurrency),
downsample.NewGatherNoDownsampleMarkFilter(logger, bkt),
downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt),
})
if err != nil {
return errors.Wrap(err, "create meta fetcher")
Expand All @@ -97,7 +100,7 @@ func RunDownsample(
// Ensure we close up everything properly.
defer func() {
if err != nil {
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
runutil.CloseWithLogOnErr(logger, insBkt, "bucket client")
}
}()

Expand All @@ -113,7 +116,7 @@ func RunDownsample(
ctx, cancel := context.WithCancel(context.Background())

g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
defer runutil.CloseWithLogOnErr(logger, insBkt, "bucket client")
statusProber.Ready()

return runutil.Repeat(waitInterval, ctx.Done(), func() error {
Expand All @@ -128,7 +131,7 @@ func RunDownsample(
metrics.downsamples.WithLabelValues(groupKey)
metrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc, false); err != nil {
if err := downsampleBucket(ctx, logger, metrics, insBkt, metas, dataDir, downsampleConcurrency, hashFunc, false); err != nil {
return errors.Wrap(err, "downsampling failed")
}

Expand All @@ -137,7 +140,7 @@ func RunDownsample(
if err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc, false); err != nil {
if err := downsampleBucket(ctx, logger, metrics, insBkt, metas, dataDir, downsampleConcurrency, hashFunc, false); err != nil {
return errors.Wrap(err, "downsampling failed")
}
return nil
Expand Down
9 changes: 5 additions & 4 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ type erroringBucket struct {
bkt objstore.InstrumentedBucket
}

func (b *erroringBucket) IsCustomerManagedKeyError(err error) bool {
return b.bkt.IsCustomerManagedKeyError(err)
}

func (b *erroringBucket) Close() error {
return b.bkt.Close()
}
Expand Down Expand Up @@ -81,6 +77,11 @@ func (b *erroringBucket) IsObjNotFoundErr(err error) bool {
return b.bkt.IsObjNotFoundErr(err)
}

// IsCustomerManagedKeyError returns true if error means that customer managed key is invalid.
func (b *erroringBucket) IsCustomerManagedKeyError(err error) bool {
return b.bkt.IsCustomerManagedKeyError(err)
}

// Attributes returns information about the specified object.
func (b *erroringBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
return b.bkt.Attributes(ctx, name)
Expand Down
10 changes: 6 additions & 4 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"strings"
"time"

"google.golang.org/grpc"

extflag "github.com/efficientgo/tools/extkingpin"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -26,9 +24,12 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/tsdb"
"google.golang.org/grpc"
"gopkg.in/yaml.v2"

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/client"
"gopkg.in/yaml.v2"
objstoretracing "github.com/thanos-io/objstore/tracing/opentracing"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
Expand Down Expand Up @@ -175,10 +176,11 @@ func runReceive(
}
// The background shipper continuously scans the data directory and uploads
// new blocks to object storage service.
bkt, err = client.NewBucket(logger, confContentYaml, extprom.WrapRegistererWithPrefix("thanos_", reg), comp.String())
bkt, err = client.NewBucket(logger, confContentYaml, comp.String())
if err != nil {
return err
}
bkt = objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name()))
} else {
level.Info(logger).Log("msg", "no supported bucket was configured, uploads will be disabled")
}
Expand Down
6 changes: 5 additions & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ import (
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/agent"
"github.com/prometheus/prometheus/util/strutil"

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/client"
objstoretracing "github.com/thanos-io/objstore/tracing/opentracing"
"gopkg.in/yaml.v2"

"github.com/thanos-io/thanos/pkg/alert"
Expand Down Expand Up @@ -716,10 +719,11 @@ func runRule(
if len(confContentYaml) > 0 {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
bkt, err := client.NewBucket(logger, confContentYaml, extprom.WrapRegistererWithPrefix("thanos_", reg), component.Rule.String())
bkt, err := client.NewBucket(logger, confContentYaml, component.Rule.String())
if err != nil {
return err
}
bkt = objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name()))

// Ensure we close up everything properly.
defer func() {
Expand Down
6 changes: 5 additions & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/client"
objstoretracing "github.com/thanos-io/objstore/tracing/opentracing"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
Expand Down Expand Up @@ -308,10 +311,11 @@ func runSidecar(
if uploads {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
bkt, err := client.NewBucket(logger, confContentYaml, extprom.WrapRegistererWithPrefix("thanos_", reg), component.Sidecar.String())
bkt, err := client.NewBucket(logger, confContentYaml, component.Sidecar.String())
if err != nil {
return err
}
bkt = objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name()))

// Ensure we close up everything properly.
defer func() {
Expand Down
Loading
Loading