Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added cluster seed support in preparation of anonymous usage reporter #2643

Merged
merged 8 commits into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* [CHANGE] Compactor: changed `-compactor.max-compaction-time` default from `0s` (disabled) to `1h`. When compacting blocks for a tenant, the compactor will move to compact blocks of another tenant or re-plan blocks to compact at least every 1h. #2514
* [CHANGE] Distributor: removed previously deprecated `extend_writes` (see #1856) YAML key and `-distributor.extend-writes` CLI flag from the distributor config. #2551
* [CHANGE] Ingester: removed previously deprecated `active_series_custom_trackers` (see #1188) YAML key from the ingester config. #2552
* [CHANGE] The tenant ID `__mimir_cluster` is reserved by Mimir and not allowed to store metrics. #2643
* [FEATURE] Compactor: Adds the ability to delete partial blocks after a configurable delay. This option can be configured per tenant. #2285
- `-compactor.partial-block-deletion-delay`, as a duration string, allows you to set the delay since a partial block has been modified before marking it for deletion. A value of `0`, the default, disables this feature.
- The metric `cortex_compactor_blocks_marked_for_deletion_total` has a new value for the `reason` label `reason="partial"`, when a block deletion marker is triggered by the partial block deletion delay.
Expand Down
21 changes: 21 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -10244,6 +10244,27 @@
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "block",
"name": "usage_stats",
"required": false,
"desc": "",
"blockEntries": [
{
"kind": "field",
"name": "enabled",
"required": false,
"desc": "Enable anonymous usage reporting.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "usage-stats.enabled",
"fieldType": "boolean",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "block",
"name": "common",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1801,6 +1801,8 @@ Usage of ./cmd/mimir/mimir:
Comma-separated list of components to include in the instantiated process. The default value 'all' includes all components that are required to form a functional Grafana Mimir instance in single-binary mode. Use the '-modules' command line flag to get a list of available components, and to see which components are included with 'all'. (default all)
-tenant-federation.enabled
If enabled on all services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a '|' character in the 'X-Scope-OrgID' header.
-usage-stats.enabled
[experimental] Enable anonymous usage reporting.
-validation.create-grace-period value
Controls how far into the future incoming samples are accepted compared to the wall clock. Any sample with timestamp `t` will be rejected if `t > (now + validation.create-grace-period)`. (default 10m)
-validation.enforce-metadata-metric-name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ Tenant IDs must be less-than or equal-to 150 bytes or characters in length and c
- Close parenthesis (`)`)

> **Note:** For security reasons, `.` and `..` are not valid tenant IDs.
> **Note:** The tenant ID `__mimir_cluster` is unsupported because its name is used internally by Mimir.

All other characters, including slashes and whitespace, are not supported.
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ runtime_config:
# The query_scheduler block configures the query-scheduler.
[query_scheduler: <query_scheduler>]

usage_stats:
# (experimental) Enable anonymous usage reporting.
# CLI flag: -usage-stats.enabled
[enabled: <boolean> | default = false]

# The common block holds configurations that configure multiple components at a
# time.
[common: <common>]
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ require (
require (
github.com/alecthomas/chroma v0.10.0
github.com/google/go-github/v32 v32.1.0
github.com/google/uuid v1.3.0
github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9
github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db
Expand Down Expand Up @@ -139,7 +140,6 @@ require (
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/pprof v0.0.0-20220608213341-c488b8fa1db3 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect
github.com/googleapis/gax-go/v2 v2.4.0 // indirect
github.com/googleapis/go-type-adapters v1.0.0 // indirect
Expand Down
9 changes: 1 addition & 8 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,14 +726,7 @@ func (c *MultitenantCompactor) discoverUsersWithRetries(ctx context.Context) ([]
}

func (c *MultitenantCompactor) discoverUsers(ctx context.Context) ([]string, error) {
var users []string

err := c.bucketClient.Iter(ctx, "", func(entry string) error {
users = append(users, strings.TrimSuffix(entry, "/"))
return nil
})

return users, err
return mimir_tsdb.ListUsers(ctx, c.bucketClient)
}

// shardingStrategy describes whether compactor "owns" given user or job.
Expand Down
4 changes: 4 additions & 0 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/grafana/mimir/pkg/storage/bucket"
"github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storegateway"
"github.com/grafana/mimir/pkg/usagestats"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/activitytracker"
util_log "github.com/grafana/mimir/pkg/util/log"
Expand Down Expand Up @@ -115,6 +116,7 @@ type Config struct {
RuntimeConfig runtimeconfig.Config `yaml:"runtime_config"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`
QueryScheduler scheduler.Config `yaml:"query_scheduler"`
UsageStats usagestats.Config `yaml:"usage_stats"`

Common CommonConfig `yaml:"common"`
}
Expand Down Expand Up @@ -160,6 +162,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
c.MemberlistKV.RegisterFlags(f)
c.ActivityTracker.RegisterFlags(f)
c.QueryScheduler.RegisterFlags(f)
c.UsageStats.RegisterFlags(f)

c.Common.RegisterFlags(f)
}
Expand Down Expand Up @@ -477,6 +480,7 @@ type Mimir struct {
StoreGateway *storegateway.StoreGateway
MemberlistKV *memberlist.KVInitService
ActivityTracker *activitytracker.ActivityTracker
UsageStatsReporter *usagestats.Reporter
BuildInfoHandler http.Handler

// Queryables that the querier should use to query the long term storage.
Expand Down
26 changes: 25 additions & 1 deletion pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ import (
querier_worker "github.com/grafana/mimir/pkg/querier/worker"
"github.com/grafana/mimir/pkg/ruler"
"github.com/grafana/mimir/pkg/scheduler"
"github.com/grafana/mimir/pkg/storage/bucket"
"github.com/grafana/mimir/pkg/storegateway"
"github.com/grafana/mimir/pkg/usagestats"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/activitytracker"
util_log "github.com/grafana/mimir/pkg/util/log"
Expand Down Expand Up @@ -87,6 +89,7 @@ const (
Purger string = "purger"
QueryScheduler string = "query-scheduler"
TenantFederation string = "tenant-federation"
UsageStats string = "usage-stats"
All string = "all"
)

Expand Down Expand Up @@ -755,6 +758,26 @@ func (t *Mimir) initQueryScheduler() (services.Service, error) {
return s, nil
}

func (t *Mimir) initUsageStats() (services.Service, error) {
if !t.Cfg.UsageStats.Enabled {
return nil, nil
}

// Since it requires the access to the blocks storage, we enable it only for components
// accessing the blocks storage.
if !t.Cfg.isAnyModuleEnabled(All, Ingester, Querier, StoreGateway, Compactor) {
return nil, nil
}

bucketClient, err := bucket.NewClient(context.Background(), t.Cfg.BlocksStorage.Bucket, "usage-stats", util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}

t.UsageStatsReporter = usagestats.NewReporter(bucketClient, util_log.Logger)
return t.UsageStatsReporter, nil
}

func (t *Mimir) setupModuleManager() error {
mm := modules.NewManager(util_log.Logger)

Expand Down Expand Up @@ -788,11 +811,12 @@ func (t *Mimir) setupModuleManager() error {
mm.RegisterModule(Purger, nil)
mm.RegisterModule(QueryScheduler, t.initQueryScheduler)
mm.RegisterModule(TenantFederation, t.initTenantFederation, modules.UserInvisibleModule)
mm.RegisterModule(UsageStats, t.initUsageStats, modules.UserInvisibleModule)
mm.RegisterModule(All, nil)

// Add dependencies
deps := map[string][]string{
Server: {ActivityTracker, SanityCheck},
Server: {ActivityTracker, SanityCheck, UsageStats},
API: {Server},
MemberlistKV: {API},
RuntimeConfig: {API},
Expand Down
6 changes: 5 additions & 1 deletion pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/store/hintspb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/strutil"
Expand Down Expand Up @@ -197,7 +198,10 @@ func NewBlocksStoreQueryable(
}

func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegateway.Config, storageCfg mimir_tsdb.BlocksStorageConfig, limits BlocksStoreLimits, logger log.Logger, reg prometheus.Registerer) (*BlocksStoreQueryable, error) {
var stores BlocksStoreSet
var (
stores BlocksStoreSet
bucketClient objstore.Bucket
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: define bucketClient objstore.Bucket because now bucket.NewClient() returns a objstore.InstrumentedBucket.

)

bucketClient, err := bucket.NewClient(context.Background(), storageCfg.Bucket, "querier", logger, reg)
if err != nil {
Expand Down
31 changes: 20 additions & 11 deletions pkg/storage/bucket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ const (

// validPrefixCharactersRegex allows only alphanumeric characters to prevent subtle bugs and simplify validation
validPrefixCharactersRegex = `^[\da-zA-Z]+$`

// MimirInternalsPrefix is the bucket prefix under which all Mimir internal cluster-wide objects are stored.
// The object storage path delimiter (/) is appended to this prefix when building the full object path.
MimirInternalsPrefix = "__mimir_cluster"
)

var (
Expand Down Expand Up @@ -119,7 +123,7 @@ type Config struct {

// Not used internally, meant to allow callers to wrap Buckets
// created using this config
Middlewares []func(objstore.Bucket) (objstore.Bucket, error) `yaml:"-"`
Middlewares []func(objstore.InstrumentedBucket) (objstore.InstrumentedBucket, error) `yaml:"-"`
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: I will take care to re-vendor Mimir in GEM and fix Middlewares there.

}

// RegisterFlags registers the backend storage config.
Expand Down Expand Up @@ -148,18 +152,23 @@ func (cfg *Config) Validate() error {
}

// NewClient creates a new bucket client based on the configured backend
func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (client objstore.Bucket, err error) {
func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
var (
backendClient objstore.Bucket
err error
)

switch cfg.Backend {
case S3:
client, err = s3.NewBucketClient(cfg.S3, name, logger)
backendClient, err = s3.NewBucketClient(cfg.S3, name, logger)
case GCS:
client, err = gcs.NewBucketClient(ctx, cfg.GCS, name, logger)
backendClient, err = gcs.NewBucketClient(ctx, cfg.GCS, name, logger)
case Azure:
client, err = azure.NewBucketClient(cfg.Azure, name, logger)
backendClient, err = azure.NewBucketClient(cfg.Azure, name, logger)
case Swift:
client, err = swift.NewBucketClient(cfg.Swift, name, logger)
backendClient, err = swift.NewBucketClient(cfg.Swift, name, logger)
case Filesystem:
client, err = filesystem.NewBucketClient(cfg.Filesystem)
backendClient, err = filesystem.NewBucketClient(cfg.Filesystem)
default:
return nil, ErrUnsupportedStorageBackend
}
Expand All @@ -169,20 +178,20 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger,
}

if cfg.StoragePrefix != "" {
client = NewPrefixedBucketClient(client, cfg.StoragePrefix)
backendClient = NewPrefixedBucketClient(backendClient, cfg.StoragePrefix)
}

client = objstore.NewTracingBucket(bucketWithMetrics(client, name, reg))
instrumentedClient := objstore.NewTracingBucket(bucketWithMetrics(backendClient, name, reg))

// Wrap the client with any provided middleware
for _, wrap := range cfg.Middlewares {
client, err = wrap(client)
instrumentedClient, err = wrap(instrumentedClient)
if err != nil {
return nil, err
}
}

return client, nil
return instrumentedClient, nil
}

func bucketWithMetrics(bucketClient objstore.Bucket, name string, reg prometheus.Registerer) objstore.Bucket {
Expand Down
97 changes: 97 additions & 0 deletions pkg/storage/bucket/delayed_bucket_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// SPDX-License-Identifier: AGPL-3.0-only

package bucket

import (
"context"
"io"
"math/rand"
"time"

"github.com/thanos-io/thanos/pkg/objstore"
)

// DelayedBucketClient wraps objstore.InstrumentedBucket and add a random delay to each API call.
// This client is intended to be used only for testing purposes.
type DelayedBucketClient struct {
wrapped objstore.Bucket
minDelay time.Duration
maxDelay time.Duration
}

func NewDelayedBucketClient(wrapped objstore.Bucket, minDelay, maxDelay time.Duration) objstore.Bucket {
if minDelay < 0 || maxDelay < 0 || maxDelay < minDelay {
// We're fine just panicking, because we expect this client to be used only for testing purposes.
panic("invalid delay configuration")
}

return &DelayedBucketClient{
wrapped: wrapped,
minDelay: minDelay,
maxDelay: maxDelay,
}
}

func (m *DelayedBucketClient) Upload(ctx context.Context, name string, r io.Reader) error {
m.delay()
defer m.delay()

return m.wrapped.Upload(ctx, name, r)
}

func (m *DelayedBucketClient) Delete(ctx context.Context, name string) error {
m.delay()
defer m.delay()

return m.wrapped.Delete(ctx, name)
}

func (m *DelayedBucketClient) Name() string {
return m.wrapped.Name()
}

func (m *DelayedBucketClient) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
m.delay()
defer m.delay()

return m.wrapped.Iter(ctx, dir, f, options...)
}

func (m *DelayedBucketClient) Get(ctx context.Context, name string) (io.ReadCloser, error) {
m.delay()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The delay should be deferred too, which IMO is more important, as this way we simulate the delay of observation of the real world (API told us that there was no item, but while we got the response, one could be created).

I think the best option is to do both:

m.delay()
defer m.delay()

defer m.delay()

return m.wrapped.Get(ctx, name)
}
func (m *DelayedBucketClient) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
m.delay()
defer m.delay()

return m.wrapped.GetRange(ctx, name, off, length)
}

func (m *DelayedBucketClient) Exists(ctx context.Context, name string) (bool, error) {
m.delay()
defer m.delay()

return m.wrapped.Exists(ctx, name)
}

func (m *DelayedBucketClient) IsObjNotFoundErr(err error) bool {
return m.wrapped.IsObjNotFoundErr(err)
}

func (m *DelayedBucketClient) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
m.delay()
defer m.delay()

return m.wrapped.Attributes(ctx, name)
}

func (m *DelayedBucketClient) Close() error {
return m.wrapped.Close()
}

func (m *DelayedBucketClient) delay() {
time.Sleep(m.minDelay + time.Duration(rand.Int63n(m.maxDelay.Nanoseconds()-m.minDelay.Nanoseconds())))
}
Loading