-
Notifications
You must be signed in to change notification settings - Fork 524
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added cluster seed support in preparation of anonymous usage reporter #2643
Changes from all commits
6c867be
7138223
4875ee8
c9fbe7c
8bf83ad
07768cc
93c2277
54574e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,10 @@ const ( | |
|
||
// validPrefixCharactersRegex allows only alphanumeric characters to prevent subtle bugs and simplify validation | ||
validPrefixCharactersRegex = `^[\da-zA-Z]+$` | ||
|
||
// MimirInternalsPrefix is the bucket prefix under which all Mimir internal cluster-wide objects are stored. | ||
// The object storage path delimiter (/) is appended to this prefix when building the full object path. | ||
MimirInternalsPrefix = "__mimir_cluster" | ||
) | ||
|
||
var ( | ||
|
@@ -119,7 +123,7 @@ type Config struct { | |
|
||
// Not used internally, meant to allow callers to wrap Buckets | ||
// created using this config | ||
Middlewares []func(objstore.Bucket) (objstore.Bucket, error) `yaml:"-"` | ||
Middlewares []func(objstore.InstrumentedBucket) (objstore.InstrumentedBucket, error) `yaml:"-"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note to reviewers: I will take care to re-vendor Mimir in GEM and fix Middlewares there. |
||
} | ||
|
||
// RegisterFlags registers the backend storage config. | ||
|
@@ -148,18 +152,23 @@ func (cfg *Config) Validate() error { | |
} | ||
|
||
// NewClient creates a new bucket client based on the configured backend | ||
func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (client objstore.Bucket, err error) { | ||
func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { | ||
var ( | ||
backendClient objstore.Bucket | ||
err error | ||
) | ||
|
||
switch cfg.Backend { | ||
case S3: | ||
client, err = s3.NewBucketClient(cfg.S3, name, logger) | ||
backendClient, err = s3.NewBucketClient(cfg.S3, name, logger) | ||
case GCS: | ||
client, err = gcs.NewBucketClient(ctx, cfg.GCS, name, logger) | ||
backendClient, err = gcs.NewBucketClient(ctx, cfg.GCS, name, logger) | ||
case Azure: | ||
client, err = azure.NewBucketClient(cfg.Azure, name, logger) | ||
backendClient, err = azure.NewBucketClient(cfg.Azure, name, logger) | ||
case Swift: | ||
client, err = swift.NewBucketClient(cfg.Swift, name, logger) | ||
backendClient, err = swift.NewBucketClient(cfg.Swift, name, logger) | ||
case Filesystem: | ||
client, err = filesystem.NewBucketClient(cfg.Filesystem) | ||
backendClient, err = filesystem.NewBucketClient(cfg.Filesystem) | ||
default: | ||
return nil, ErrUnsupportedStorageBackend | ||
} | ||
|
@@ -169,20 +178,20 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, | |
} | ||
|
||
if cfg.StoragePrefix != "" { | ||
client = NewPrefixedBucketClient(client, cfg.StoragePrefix) | ||
backendClient = NewPrefixedBucketClient(backendClient, cfg.StoragePrefix) | ||
} | ||
|
||
client = objstore.NewTracingBucket(bucketWithMetrics(client, name, reg)) | ||
instrumentedClient := objstore.NewTracingBucket(bucketWithMetrics(backendClient, name, reg)) | ||
|
||
// Wrap the client with any provided middleware | ||
for _, wrap := range cfg.Middlewares { | ||
client, err = wrap(client) | ||
instrumentedClient, err = wrap(instrumentedClient) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
return client, nil | ||
return instrumentedClient, nil | ||
} | ||
|
||
func bucketWithMetrics(bucketClient objstore.Bucket, name string, reg prometheus.Registerer) objstore.Bucket { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
// SPDX-License-Identifier: AGPL-3.0-only | ||
|
||
package bucket | ||
|
||
import ( | ||
"context" | ||
"io" | ||
"math/rand" | ||
"time" | ||
|
||
"github.com/thanos-io/thanos/pkg/objstore" | ||
) | ||
|
||
// DelayedBucketClient wraps objstore.InstrumentedBucket and add a random delay to each API call. | ||
// This client is intended to be used only for testing purposes. | ||
type DelayedBucketClient struct { | ||
wrapped objstore.Bucket | ||
minDelay time.Duration | ||
maxDelay time.Duration | ||
} | ||
|
||
func NewDelayedBucketClient(wrapped objstore.Bucket, minDelay, maxDelay time.Duration) objstore.Bucket { | ||
if minDelay < 0 || maxDelay < 0 || maxDelay < minDelay { | ||
// We're fine just panicking, because we expect this client to be used only for testing purposes. | ||
panic("invalid delay configuration") | ||
} | ||
|
||
return &DelayedBucketClient{ | ||
wrapped: wrapped, | ||
minDelay: minDelay, | ||
maxDelay: maxDelay, | ||
} | ||
} | ||
|
||
func (m *DelayedBucketClient) Upload(ctx context.Context, name string, r io.Reader) error { | ||
m.delay() | ||
defer m.delay() | ||
|
||
return m.wrapped.Upload(ctx, name, r) | ||
} | ||
|
||
func (m *DelayedBucketClient) Delete(ctx context.Context, name string) error { | ||
m.delay() | ||
defer m.delay() | ||
|
||
return m.wrapped.Delete(ctx, name) | ||
} | ||
|
||
func (m *DelayedBucketClient) Name() string { | ||
return m.wrapped.Name() | ||
} | ||
|
||
func (m *DelayedBucketClient) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { | ||
m.delay() | ||
defer m.delay() | ||
|
||
return m.wrapped.Iter(ctx, dir, f, options...) | ||
} | ||
|
||
func (m *DelayedBucketClient) Get(ctx context.Context, name string) (io.ReadCloser, error) { | ||
m.delay() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The delay should be deferred too, which IMO is more important, as this way we simulate the delay of observation of the real world (API told us that there was no item, but while we got the response, one could be created). I think the best option is to do both: m.delay()
defer m.delay() |
||
defer m.delay() | ||
|
||
return m.wrapped.Get(ctx, name) | ||
} | ||
func (m *DelayedBucketClient) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { | ||
m.delay() | ||
defer m.delay() | ||
|
||
return m.wrapped.GetRange(ctx, name, off, length) | ||
} | ||
|
||
func (m *DelayedBucketClient) Exists(ctx context.Context, name string) (bool, error) { | ||
m.delay() | ||
defer m.delay() | ||
|
||
return m.wrapped.Exists(ctx, name) | ||
} | ||
|
||
func (m *DelayedBucketClient) IsObjNotFoundErr(err error) bool { | ||
return m.wrapped.IsObjNotFoundErr(err) | ||
} | ||
|
||
func (m *DelayedBucketClient) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { | ||
m.delay() | ||
defer m.delay() | ||
|
||
return m.wrapped.Attributes(ctx, name) | ||
} | ||
|
||
func (m *DelayedBucketClient) Close() error { | ||
return m.wrapped.Close() | ||
} | ||
|
||
func (m *DelayedBucketClient) delay() { | ||
time.Sleep(m.minDelay + time.Duration(rand.Int63n(m.maxDelay.Nanoseconds()-m.minDelay.Nanoseconds()))) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to reviewers: define
bucketClient objstore.Bucket
because nowbucket.NewClient()
returns aobjstore.InstrumentedBucket
.