diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e15d6c10c2..5d76c79f9ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ * [CHANGE] Compactor: changed `-compactor.max-compaction-time` default from `0s` (disabled) to `1h`. When compacting blocks for a tenant, the compactor will move to compact blocks of another tenant or re-plan blocks to compact at least every 1h. #2514 * [CHANGE] Distributor: removed previously deprecated `extend_writes` (see #1856) YAML key and `-distributor.extend-writes` CLI flag from the distributor config. #2551 * [CHANGE] Ingester: removed previously deprecated `active_series_custom_trackers` (see #1188) YAML key from the ingester config. #2552 +* [CHANGE] The tenant ID `__mimir_cluster` is reserved by Mimir and not allowed to store metrics. #2643 * [FEATURE] Compactor: Adds the ability to delete partial blocks after a configurable delay. This option can be configured per tenant. #2285 - `-compactor.partial-block-deletion-delay`, as a duration string, allows you to set the delay since a partial block has been modified before marking it for deletion. A value of `0`, the default, disables this feature. - The metric `cortex_compactor_blocks_marked_for_deletion_total` has a new value for the `reason` label `reason="partial"`, when a block deletion marker is triggered by the partial block deletion delay. diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 8c4e01fee8c..9351f16e5f0 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -10244,6 +10244,27 @@ "fieldValue": null, "fieldDefaultValue": null }, + { + "kind": "block", + "name": "usage_stats", + "required": false, + "desc": "", + "blockEntries": [ + { + "kind": "field", + "name": "enabled", + "required": false, + "desc": "Enable anonymous usage reporting.", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "usage-stats.enabled", + "fieldType": "boolean", + "fieldCategory": "experimental" + } + ], + "fieldValue": null, + "fieldDefaultValue": null + }, { "kind": "block", "name": "common", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 9713f607bc5..e9f7f080ca3 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1801,6 +1801,8 @@ Usage of ./cmd/mimir/mimir: Comma-separated list of components to include in the instantiated process. The default value 'all' includes all components that are required to form a functional Grafana Mimir instance in single-binary mode. Use the '-modules' command line flag to get a list of available components, and to see which components are included with 'all'. (default all) -tenant-federation.enabled If enabled on all services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a '|' character in the 'X-Scope-OrgID' header. + -usage-stats.enabled + [experimental] Enable anonymous usage reporting. -validation.create-grace-period value Controls how far into the future incoming samples are accepted compared to the wall clock. Any sample with timestamp `t` will be rejected if `t > (now + validation.create-grace-period)`. (default 10m) -validation.enforce-metadata-metric-name diff --git a/docs/sources/operators-guide/configure/about-tenant-ids.md b/docs/sources/operators-guide/configure/about-tenant-ids.md index e2384935925..b75e79ce01b 100644 --- a/docs/sources/operators-guide/configure/about-tenant-ids.md +++ b/docs/sources/operators-guide/configure/about-tenant-ids.md @@ -31,5 +31,6 @@ Tenant IDs must be less-than or equal-to 150 bytes or characters in length and c - Close parenthesis (`)`) > **Note:** For security reasons, `.` and `..` are not valid tenant IDs. +> **Note:** The tenant ID `__mimir_cluster` is unsupported because its name is used internally by Mimir. All other characters, including slashes and whitespace, are not supported. diff --git a/docs/sources/operators-guide/configure/reference-configuration-parameters/index.md b/docs/sources/operators-guide/configure/reference-configuration-parameters/index.md index f5693deea0e..f68fccdacd3 100644 --- a/docs/sources/operators-guide/configure/reference-configuration-parameters/index.md +++ b/docs/sources/operators-guide/configure/reference-configuration-parameters/index.md @@ -217,6 +217,11 @@ runtime_config: # The query_scheduler block configures the query-scheduler. [query_scheduler: ] +usage_stats: + # (experimental) Enable anonymous usage reporting. + # CLI flag: -usage-stats.enabled + [enabled: | default = false] + # The common block holds configurations that configure multiple components at a # time. [common: ] diff --git a/go.mod b/go.mod index 5c581af8804..3c10a3805fb 100644 --- a/go.mod +++ b/go.mod @@ -54,6 +54,7 @@ require ( require ( github.com/alecthomas/chroma v0.10.0 github.com/google/go-github/v32 v32.1.0 + github.com/google/uuid v1.3.0 github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9 github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2 github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db @@ -139,7 +140,6 @@ require ( github.com/google/go-cmp v0.5.8 // indirect github.com/google/go-querystring v1.1.0 // indirect github.com/google/pprof v0.0.0-20220608213341-c488b8fa1db3 // indirect - github.com/google/uuid v1.3.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect github.com/googleapis/gax-go/v2 v2.4.0 // indirect github.com/googleapis/go-type-adapters v1.0.0 // indirect diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index c70947db3bd..5d5b6eb1fca 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -726,14 +726,7 @@ func (c *MultitenantCompactor) discoverUsersWithRetries(ctx context.Context) ([] } func (c *MultitenantCompactor) discoverUsers(ctx context.Context) ([]string, error) { - var users []string - - err := c.bucketClient.Iter(ctx, "", func(entry string) error { - users = append(users, strings.TrimSuffix(entry, "/")) - return nil - }) - - return users, err + return mimir_tsdb.ListUsers(ctx, c.bucketClient) } // shardingStrategy describes whether compactor "owns" given user or job. diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index 82048b394ba..edbbb1589a2 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -57,6 +57,7 @@ import ( "github.com/grafana/mimir/pkg/storage/bucket" "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storegateway" + "github.com/grafana/mimir/pkg/usagestats" "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/activitytracker" util_log "github.com/grafana/mimir/pkg/util/log" @@ -115,6 +116,7 @@ type Config struct { RuntimeConfig runtimeconfig.Config `yaml:"runtime_config"` MemberlistKV memberlist.KVConfig `yaml:"memberlist"` QueryScheduler scheduler.Config `yaml:"query_scheduler"` + UsageStats usagestats.Config `yaml:"usage_stats"` Common CommonConfig `yaml:"common"` } @@ -160,6 +162,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { c.MemberlistKV.RegisterFlags(f) c.ActivityTracker.RegisterFlags(f) c.QueryScheduler.RegisterFlags(f) + c.UsageStats.RegisterFlags(f) c.Common.RegisterFlags(f) } @@ -477,6 +480,7 @@ type Mimir struct { StoreGateway *storegateway.StoreGateway MemberlistKV *memberlist.KVInitService ActivityTracker *activitytracker.ActivityTracker + UsageStatsReporter *usagestats.Reporter BuildInfoHandler http.Handler // Queryables that the querier should use to query the long term storage. diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index 1b63011299f..1bd39044bbc 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -49,7 +49,9 @@ import ( querier_worker "github.com/grafana/mimir/pkg/querier/worker" "github.com/grafana/mimir/pkg/ruler" "github.com/grafana/mimir/pkg/scheduler" + "github.com/grafana/mimir/pkg/storage/bucket" "github.com/grafana/mimir/pkg/storegateway" + "github.com/grafana/mimir/pkg/usagestats" "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/activitytracker" util_log "github.com/grafana/mimir/pkg/util/log" @@ -87,6 +89,7 @@ const ( Purger string = "purger" QueryScheduler string = "query-scheduler" TenantFederation string = "tenant-federation" + UsageStats string = "usage-stats" All string = "all" ) @@ -755,6 +758,26 @@ func (t *Mimir) initQueryScheduler() (services.Service, error) { return s, nil } +func (t *Mimir) initUsageStats() (services.Service, error) { + if !t.Cfg.UsageStats.Enabled { + return nil, nil + } + + // Since it requires the access to the blocks storage, we enable it only for components + // accessing the blocks storage. + if !t.Cfg.isAnyModuleEnabled(All, Ingester, Querier, StoreGateway, Compactor) { + return nil, nil + } + + bucketClient, err := bucket.NewClient(context.Background(), t.Cfg.BlocksStorage.Bucket, "usage-stats", util_log.Logger, prometheus.DefaultRegisterer) + if err != nil { + return nil, err + } + + t.UsageStatsReporter = usagestats.NewReporter(bucketClient, util_log.Logger) + return t.UsageStatsReporter, nil +} + func (t *Mimir) setupModuleManager() error { mm := modules.NewManager(util_log.Logger) @@ -788,11 +811,12 @@ func (t *Mimir) setupModuleManager() error { mm.RegisterModule(Purger, nil) mm.RegisterModule(QueryScheduler, t.initQueryScheduler) mm.RegisterModule(TenantFederation, t.initTenantFederation, modules.UserInvisibleModule) + mm.RegisterModule(UsageStats, t.initUsageStats, modules.UserInvisibleModule) mm.RegisterModule(All, nil) // Add dependencies deps := map[string][]string{ - Server: {ActivityTracker, SanityCheck}, + Server: {ActivityTracker, SanityCheck, UsageStats}, API: {Server}, MemberlistKV: {API}, RuntimeConfig: {API}, diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index e22519d4b7b..3e9391c033e 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -29,6 +29,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" @@ -197,7 +198,10 @@ func NewBlocksStoreQueryable( } func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegateway.Config, storageCfg mimir_tsdb.BlocksStorageConfig, limits BlocksStoreLimits, logger log.Logger, reg prometheus.Registerer) (*BlocksStoreQueryable, error) { - var stores BlocksStoreSet + var ( + stores BlocksStoreSet + bucketClient objstore.Bucket + ) bucketClient, err := bucket.NewClient(context.Background(), storageCfg.Bucket, "querier", logger, reg) if err != nil { diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index 4fe92635837..f9d921b9daa 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -44,6 +44,10 @@ const ( // validPrefixCharactersRegex allows only alphanumeric characters to prevent subtle bugs and simplify validation validPrefixCharactersRegex = `^[\da-zA-Z]+$` + + // MimirInternalsPrefix is the bucket prefix under which all Mimir internal cluster-wide objects are stored. + // The object storage path delimiter (/) is appended to this prefix when building the full object path. + MimirInternalsPrefix = "__mimir_cluster" ) var ( @@ -119,7 +123,7 @@ type Config struct { // Not used internally, meant to allow callers to wrap Buckets // created using this config - Middlewares []func(objstore.Bucket) (objstore.Bucket, error) `yaml:"-"` + Middlewares []func(objstore.InstrumentedBucket) (objstore.InstrumentedBucket, error) `yaml:"-"` } // RegisterFlags registers the backend storage config. @@ -148,18 +152,23 @@ func (cfg *Config) Validate() error { } // NewClient creates a new bucket client based on the configured backend -func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (client objstore.Bucket, err error) { +func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { + var ( + backendClient objstore.Bucket + err error + ) + switch cfg.Backend { case S3: - client, err = s3.NewBucketClient(cfg.S3, name, logger) + backendClient, err = s3.NewBucketClient(cfg.S3, name, logger) case GCS: - client, err = gcs.NewBucketClient(ctx, cfg.GCS, name, logger) + backendClient, err = gcs.NewBucketClient(ctx, cfg.GCS, name, logger) case Azure: - client, err = azure.NewBucketClient(cfg.Azure, name, logger) + backendClient, err = azure.NewBucketClient(cfg.Azure, name, logger) case Swift: - client, err = swift.NewBucketClient(cfg.Swift, name, logger) + backendClient, err = swift.NewBucketClient(cfg.Swift, name, logger) case Filesystem: - client, err = filesystem.NewBucketClient(cfg.Filesystem) + backendClient, err = filesystem.NewBucketClient(cfg.Filesystem) default: return nil, ErrUnsupportedStorageBackend } @@ -169,20 +178,20 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, } if cfg.StoragePrefix != "" { - client = NewPrefixedBucketClient(client, cfg.StoragePrefix) + backendClient = NewPrefixedBucketClient(backendClient, cfg.StoragePrefix) } - client = objstore.NewTracingBucket(bucketWithMetrics(client, name, reg)) + instrumentedClient := objstore.NewTracingBucket(bucketWithMetrics(backendClient, name, reg)) // Wrap the client with any provided middleware for _, wrap := range cfg.Middlewares { - client, err = wrap(client) + instrumentedClient, err = wrap(instrumentedClient) if err != nil { return nil, err } } - return client, nil + return instrumentedClient, nil } func bucketWithMetrics(bucketClient objstore.Bucket, name string, reg prometheus.Registerer) objstore.Bucket { diff --git a/pkg/storage/bucket/delayed_bucket_client.go b/pkg/storage/bucket/delayed_bucket_client.go new file mode 100644 index 00000000000..528b2ea633c --- /dev/null +++ b/pkg/storage/bucket/delayed_bucket_client.go @@ -0,0 +1,97 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package bucket + +import ( + "context" + "io" + "math/rand" + "time" + + "github.com/thanos-io/thanos/pkg/objstore" +) + +// DelayedBucketClient wraps objstore.InstrumentedBucket and add a random delay to each API call. +// This client is intended to be used only for testing purposes. +type DelayedBucketClient struct { + wrapped objstore.Bucket + minDelay time.Duration + maxDelay time.Duration +} + +func NewDelayedBucketClient(wrapped objstore.Bucket, minDelay, maxDelay time.Duration) objstore.Bucket { + if minDelay < 0 || maxDelay < 0 || maxDelay < minDelay { + // We're fine just panicking, because we expect this client to be used only for testing purposes. + panic("invalid delay configuration") + } + + return &DelayedBucketClient{ + wrapped: wrapped, + minDelay: minDelay, + maxDelay: maxDelay, + } +} + +func (m *DelayedBucketClient) Upload(ctx context.Context, name string, r io.Reader) error { + m.delay() + defer m.delay() + + return m.wrapped.Upload(ctx, name, r) +} + +func (m *DelayedBucketClient) Delete(ctx context.Context, name string) error { + m.delay() + defer m.delay() + + return m.wrapped.Delete(ctx, name) +} + +func (m *DelayedBucketClient) Name() string { + return m.wrapped.Name() +} + +func (m *DelayedBucketClient) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { + m.delay() + defer m.delay() + + return m.wrapped.Iter(ctx, dir, f, options...) +} + +func (m *DelayedBucketClient) Get(ctx context.Context, name string) (io.ReadCloser, error) { + m.delay() + defer m.delay() + + return m.wrapped.Get(ctx, name) +} +func (m *DelayedBucketClient) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + m.delay() + defer m.delay() + + return m.wrapped.GetRange(ctx, name, off, length) +} + +func (m *DelayedBucketClient) Exists(ctx context.Context, name string) (bool, error) { + m.delay() + defer m.delay() + + return m.wrapped.Exists(ctx, name) +} + +func (m *DelayedBucketClient) IsObjNotFoundErr(err error) bool { + return m.wrapped.IsObjNotFoundErr(err) +} + +func (m *DelayedBucketClient) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + m.delay() + defer m.delay() + + return m.wrapped.Attributes(ctx, name) +} + +func (m *DelayedBucketClient) Close() error { + return m.wrapped.Close() +} + +func (m *DelayedBucketClient) delay() { + time.Sleep(m.minDelay + time.Duration(rand.Int63n(m.maxDelay.Nanoseconds()-m.minDelay.Nanoseconds()))) +} diff --git a/pkg/storage/tsdb/users_scanner.go b/pkg/storage/tsdb/users_scanner.go index 7a8cdf82e9e..8b3b86fdbd7 100644 --- a/pkg/storage/tsdb/users_scanner.go +++ b/pkg/storage/tsdb/users_scanner.go @@ -12,6 +12,8 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/thanos-io/thanos/pkg/objstore" + + "github.com/grafana/mimir/pkg/storage/bucket" ) // AllUsers returns true to each call and should be used whenever the UsersScanner should not filter out @@ -39,10 +41,7 @@ func NewUsersScanner(bucketClient objstore.Bucket, isOwned func(userID string) ( // // If sharding is enabled, returned lists contains only the users owned by this instance. func (s *UsersScanner) ScanUsers(ctx context.Context) (users, markedForDeletion []string, err error) { - err = s.bucketClient.Iter(ctx, "", func(entry string) error { - users = append(users, strings.TrimSuffix(entry, "/")) - return nil - }) + users, err = ListUsers(ctx, s.bucketClient) if err != nil { return nil, nil, err } @@ -75,3 +74,26 @@ func (s *UsersScanner) ScanUsers(ctx context.Context) (users, markedForDeletion return users, markedForDeletion, nil } + +// ListUsers returns all user IDs found scanning the root of the bucket. +func ListUsers(ctx context.Context, bucketClient objstore.Bucket) (users []string, err error) { + // Iterate the bucket to find all users in the bucket. Due to how the bucket listing + // caching works, it's more likely to have a cache hit if there's no delay while + // iterating the bucket, so we do load all users in memory and later process them. + err = bucketClient.Iter(ctx, "", func(entry string) error { + userID := strings.TrimSuffix(entry, "/") + if isUserIDReserved(userID) { + return nil + } + + users = append(users, userID) + return nil + }) + + return users, err +} + +// isUserIDReserved returns whether the provided user ID is reserved and can't be used for storing metrics. +func isUserIDReserved(name string) bool { + return name == bucket.MimirInternalsPrefix +} diff --git a/pkg/storage/tsdb/users_scanner_test.go b/pkg/storage/tsdb/users_scanner_test.go index 9ecaf79e192..cc00a5ec87d 100644 --- a/pkg/storage/tsdb/users_scanner_test.go +++ b/pkg/storage/tsdb/users_scanner_test.go @@ -33,7 +33,6 @@ func TestUsersScanner_ScanUsers_ShouldReturnedOwnedUsersOnly(t *testing.T) { require.NoError(t, err) assert.Equal(t, []string{"user-1"}, actual) assert.Equal(t, []string{"user-3"}, deleted) - } func TestUsersScanner_ScanUsers_ShouldReturnUsersForWhichOwnerCheckOrTenantDeletionCheckFailed(t *testing.T) { @@ -54,3 +53,15 @@ func TestUsersScanner_ScanUsers_ShouldReturnUsersForWhichOwnerCheckOrTenantDelet assert.Equal(t, expected, actual) assert.Empty(t, deleted) } + +func TestUsersScanner_ScanUsers_ShouldNotReturnPrefixedUsedByMimirInternals(t *testing.T) { + bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("", []string{"user-1", "user-2", bucket.MimirInternalsPrefix}, nil) + bucketClient.MockExists(path.Join("user-1", TenantDeletionMarkPath), false, nil) + bucketClient.MockExists(path.Join("user-2", TenantDeletionMarkPath), false, nil) + + s := NewUsersScanner(bucketClient, AllUsers, log.NewNopLogger()) + actual, _, err := s.ScanUsers(context.Background()) + require.NoError(t, err) + assert.Equal(t, []string{"user-1", "user-2"}, actual) +} diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index ecdc05e8f94..bfab453dbcc 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -12,7 +12,6 @@ import ( "net/http" "os" "path/filepath" - "strings" "sync" "time" @@ -354,17 +353,7 @@ func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValues // scanUsers in the bucket and return the list of found users. If an error occurs while // iterating the bucket, it may return both an error and a subset of the users in the bucket. func (u *BucketStores) scanUsers(ctx context.Context) ([]string, error) { - var users []string - - // Iterate the bucket to find all users in the bucket. Due to how the bucket listing - // caching works, it's more likely to have a cache hit if there's no delay while - // iterating the bucket, so we do load all users in memory and later process them. - err := u.bucket.Iter(ctx, "", func(s string) error { - users = append(users, strings.TrimSuffix(s, "/")) - return nil - }) - - return users, err + return tsdb.ListUsers(ctx, u.bucket) } func (u *BucketStores) getStore(userID string) *BucketStore { diff --git a/pkg/usagestats/reporter.go b/pkg/usagestats/reporter.go new file mode 100644 index 00000000000..6aa9e085dfc --- /dev/null +++ b/pkg/usagestats/reporter.go @@ -0,0 +1,61 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package usagestats + +import ( + "context" + "flag" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/services" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/objstore" + + "github.com/grafana/mimir/pkg/storage/bucket" +) + +type Config struct { + Enabled bool `yaml:"enabled" category:"experimental"` +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.BoolVar(&cfg.Enabled, "usage-stats.enabled", false, "Enable anonymous usage reporting.") +} + +type Reporter struct { + logger log.Logger + bucket objstore.InstrumentedBucket + + services.Service +} + +func NewReporter(bucketClient objstore.InstrumentedBucket, logger log.Logger) *Reporter { + // The cluster seed file is stored in a prefix dedicated to Mimir internals. + bucketClient = bucket.NewPrefixedBucketClient(bucketClient, bucket.MimirInternalsPrefix) + + r := &Reporter{ + logger: logger, + bucket: bucketClient, + } + r.Service = services.NewBasicService(nil, r.running, nil) + return r +} + +func (r *Reporter) running(ctx context.Context) error { + // Init or get the cluster seed. + seed, err := initSeedFile(ctx, r.bucket, clusterSeedFileMinStability, r.logger) + if errors.Is(err, context.Canceled) { + return nil + } + if err != nil { + return err + } + + level.Info(r.logger).Log("msg", "usage stats reporter initialized", "cluster_id", seed.UID) + + // TODO Periodically send usage report. + + return nil +} diff --git a/pkg/usagestats/seed.go b/pkg/usagestats/seed.go new file mode 100644 index 00000000000..478a96359c0 --- /dev/null +++ b/pkg/usagestats/seed.go @@ -0,0 +1,183 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package usagestats + +import ( + "bytes" + "context" + "encoding/json" + "io" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/google/uuid" + "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/runutil" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/objstore" + + "github.com/grafana/mimir/pkg/util" +) + +const ( + // ClusterSeedFileName is the file name for the cluster seed file. + ClusterSeedFileName = "mimir_cluster_seed.json" + + // clusterSeedFileMinStability is how long to wait for a cluster seed file creation + // before using it (because multiple replicas could concurrently create it). + clusterSeedFileMinStability = 5 * time.Minute +) + +var ( + errClusterSeedFileCorrupted = errors.New("the cluster seed file is corrupted") +) + +// ClusterSeed is the seed for the usage stats. +type ClusterSeed struct { + // UID is the unique cluster ID. + UID string `json:"UID"` + + // CreatedAt is the timestamp when the seed file was created. + CreatedAt time.Time `json:"created_at"` +} + +func newClusterSeed() ClusterSeed { + return ClusterSeed{ + UID: uuid.NewString(), + CreatedAt: time.Now(), + } +} + +// initSeedFile returns the cluster seed, creating it if required. This function returns only after the seed +// file is created and stable, or context has been canceled. +func initSeedFile(ctx context.Context, bucket objstore.InstrumentedBucket, minStability time.Duration, logger log.Logger) (ClusterSeed, error) { + backoff := backoff.New(ctx, backoff.Config{ + MinBackoff: time.Second, + MaxBackoff: time.Minute, + MaxRetries: 0, + }) + + for backoff.Ongoing() { + // Check if the seed file is already created and wait for its stability. + seed, err := waitSeedFileStability(ctx, bucket, minStability, logger) + if err == nil { + return seed, nil + } + + // Either the seed file does not exist or it's corrupted. Before attempting to write it + // we want to wait some time to reduce the likelihood multiple Mimir replicas will attempt + // to create it at the same time. + select { + case <-time.After(util.DurationWithJitter(minStability/2, 1)): // Wait between 0s and minStability + case <-ctx.Done(): + return ClusterSeed{}, ctx.Err() + } + + // Ensure the seed file hasn't been created in the meanwhile. + // If so, we should get back to wait for its stability. + if seed, err := readSeedFile(ctx, bucket, logger); err == nil { + level.Debug(logger).Log("msg", "skipping creation of cluster seed file because found one", "cluster_id", seed.UID, "created_at", seed.CreatedAt.String()) + backoff.Wait() + continue + } + + // Create or re-create the seed file. + seed = newClusterSeed() + level.Info(logger).Log("msg", "creating cluster seed file", "cluster_id", seed.UID) + + if err := writeSeedFile(ctx, bucket, seed); err != nil { + level.Warn(logger).Log("msg", "failed to create cluster seed file", "err", err) + } + + // Either on success of failure we need to loop back to wait for stability. Why? + // If creation succeeded, other replicas may have concurrently tried to create it, so + // we need to wait for its stability. + // If creation failed, other replicas may have succeeded, so we get back to check it. + backoff.Wait() + } + + return ClusterSeed{}, backoff.Err() +} + +// waitSeedFileStability reads the seed file from the object storage and waits until it was created at least minStability +// time ago. Returns an error if the seed file doesn't exist, or it's corrupted, or context is canceled. +func waitSeedFileStability(ctx context.Context, bucket objstore.InstrumentedBucket, minStability time.Duration, logger log.Logger) (ClusterSeed, error) { + backoff := backoff.New(ctx, backoff.Config{ + MinBackoff: time.Second, + MaxBackoff: time.Minute, + MaxRetries: 0, + }) + + for backoff.Ongoing() { + seed, err := readSeedFile(ctx, bucket, logger) + if bucket.IsObjNotFoundErr(err) { + // The seed file doesn't exist. We'll attempt to create it. + return ClusterSeed{}, err + } + if errors.Is(err, errClusterSeedFileCorrupted) { + // The seed file doesn't exist. We'll attempt to re-create it. + return ClusterSeed{}, err + } + if err != nil { + // We assume any other error to either be a transient error (e.g. network error) + // or a permanent error which won't be fixed trying to re-create the seed file (e.g. auth error). + // Since there's no reliable way to detect whether an error is transient or permanent, + // we just keep retrying. + level.Warn(logger).Log("msg", "failed to read cluster seed file from object storage", "err", err) + backoff.Wait() + continue + } + + // Check if stability has been reached. We assume clock is synchronized between Mimir replicas + // (this is an assumption done in other Mimir logic too). + createdTimeAgo := time.Since(seed.CreatedAt) + if createdTimeAgo >= minStability { + return seed, nil + } + + // Wait until the min stability should have been reached. We add an extra jitter: in case there are many + // replicas they will not look up the seed file at the same time. + level.Info(logger).Log("msg", "found a cluster seed file created recently, waiting until stable", "cluster_id", seed.UID, "created_at", seed.CreatedAt.String()) + select { + case <-time.After(util.DurationWithPositiveJitter(minStability-createdTimeAgo, 0.2)): + case <-ctx.Done(): + return ClusterSeed{}, ctx.Err() + } + } + + return ClusterSeed{}, backoff.Err() +} + +// readSeedFile reads the cluster seed file from the object store. +func readSeedFile(ctx context.Context, bucket objstore.InstrumentedBucket, logger log.Logger) (ClusterSeed, error) { + reader, err := bucket.WithExpectedErrs(bucket.IsObjNotFoundErr).Get(ctx, ClusterSeedFileName) + if err != nil { + return ClusterSeed{}, err + } + + // Ensure the reader will be closed. + defer runutil.CloseWithLogOnErr(logger, reader, "failed to close cluster seed reader") + + data, err := io.ReadAll(reader) + if err != nil { + return ClusterSeed{}, err + } + + var seed ClusterSeed + if err := json.Unmarshal(data, &seed); err != nil { + return ClusterSeed{}, errors.Wrap(errClusterSeedFileCorrupted, err.Error()) + } + + return seed, nil +} + +// writeSeedFile writes the cluster seed to the object store. +func writeSeedFile(ctx context.Context, bucket objstore.InstrumentedBucket, seed ClusterSeed) error { + data, err := json.Marshal(seed) + if err != nil { + return err + } + + return bucket.Upload(ctx, ClusterSeedFileName, bytes.NewReader(data)) +} diff --git a/pkg/usagestats/seed_test.go b/pkg/usagestats/seed_test.go new file mode 100644 index 00000000000..5dd7d688ebb --- /dev/null +++ b/pkg/usagestats/seed_test.go @@ -0,0 +1,326 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package usagestats + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "sync" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/objstore" + "golang.org/x/sync/errgroup" + + "github.com/grafana/mimir/pkg/storage/bucket" + "github.com/grafana/mimir/pkg/storage/bucket/filesystem" +) + +func TestReadSeedFile(t *testing.T) { + tests := map[string]struct { + setup func(bucketClient *bucket.ClientMock) + expectedSeed ClusterSeed + expectedErr error + }{ + "the seed file does not exist": { + setup: func(bucketClient *bucket.ClientMock) { + bucketClient.MockGet(ClusterSeedFileName, "", bucket.ErrObjectDoesNotExist) + }, + expectedErr: bucket.ErrObjectDoesNotExist, + }, + "an error occurred while reading the seed file": { + setup: func(bucketClient *bucket.ClientMock) { + bucketClient.MockGet(ClusterSeedFileName, "{}", errors.New("read failure")) + }, + expectedErr: errors.New("read failure"), + }, + "the seed file is corrupted": { + setup: func(bucketClient *bucket.ClientMock) { + bucketClient.MockGet(ClusterSeedFileName, "xxx", nil) + }, + expectedErr: errClusterSeedFileCorrupted, + }, + "the seed file is read successfully": { + setup: func(bucketClient *bucket.ClientMock) { + bucketClient.MockGet(ClusterSeedFileName, `{"UID":"xxx","created_at":"2006-01-02T15:04:05.999999999Z"}`, nil) + }, + expectedSeed: ClusterSeed{ + UID: "xxx", + CreatedAt: func() time.Time { + ts, err := time.Parse(time.RFC3339Nano, "2006-01-02T15:04:05.999999999Z") + if err != nil { + panic(err) + } + return ts + }(), + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + bucketClient := &bucket.ClientMock{} + testData.setup(bucketClient) + + seed, err := readSeedFile(context.Background(), objstore.BucketWithMetrics("", bucketClient, nil), log.NewNopLogger()) + if testData.expectedErr != nil { + require.Error(t, err) + require.Contains(t, err.Error(), testData.expectedErr.Error()) + } else { + require.NoError(t, err) + require.Equal(t, testData.expectedSeed, seed) + } + }) + } +} + +func TestWriteSeedFile(t *testing.T) { + seed := ClusterSeed{ + UID: "xxx", + CreatedAt: time.Now(), + } + + tests := map[string]struct { + setup func(bucketClient *bucket.ClientMock) + expectedErr error + }{ + "an error occurred while writing the seed file": { + setup: func(bucketClient *bucket.ClientMock) { + bucketClient.MockUpload(ClusterSeedFileName, errors.New("write failure")) + }, + expectedErr: errors.New("write failure"), + }, + "the seed file is written successfully": { + setup: func(bucketClient *bucket.ClientMock) { + bucketClient.MockUpload(ClusterSeedFileName, nil) + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + bucketClient := &bucket.ClientMock{} + testData.setup(bucketClient) + + err := writeSeedFile(context.Background(), objstore.BucketWithMetrics("", bucketClient, nil), seed) + if testData.expectedErr != nil { + require.Error(t, err) + require.Contains(t, err.Error(), testData.expectedErr.Error()) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestWaitSeedFileStability(t *testing.T) { + const minStability = 3 * time.Second + + type testConfig struct { + setup func(bucketClient *bucket.ClientMock) + expectedSeed ClusterSeed + expectedErr error + expectedMinDuration time.Duration + } + + tests := map[string]testConfig{ + "should immediately return if seed file does not exist": { + setup: func(bucketClient *bucket.ClientMock) { + bucketClient.MockGet(ClusterSeedFileName, "", bucket.ErrObjectDoesNotExist) + }, + expectedErr: bucket.ErrObjectDoesNotExist, + }, + "should immediately return if seed file is corrupted": { + setup: func(bucketClient *bucket.ClientMock) { + bucketClient.MockGet(ClusterSeedFileName, "xxx", nil) + }, + expectedErr: errClusterSeedFileCorrupted, + }, + "should immediately return if seed file was created more than 'min stability' time ago": func() testConfig { + oldSeed := ClusterSeed{UID: "old", CreatedAt: time.Now().Add(-2 * minStability)} + + return testConfig{ + setup: func(bucketClient *bucket.ClientMock) { + data, err := json.Marshal(oldSeed) + require.NoError(t, err) + bucketClient.MockGet(ClusterSeedFileName, string(data), nil) + }, + expectedSeed: oldSeed, + expectedMinDuration: 0, + } + }(), + "should wait for 'min stability' and return the seed file if was created less than 'min stability' time ago": func() testConfig { + newSeed := ClusterSeed{UID: "new", CreatedAt: time.Now()} + + return testConfig{ + setup: func(bucketClient *bucket.ClientMock) { + data, err := json.Marshal(newSeed) + require.NoError(t, err) + bucketClient.MockGet(ClusterSeedFileName, string(data), nil) + }, + expectedSeed: newSeed, + expectedMinDuration: minStability, + } + }(), + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + t.Parallel() + + startTime := time.Now() + + bucketClient := &bucket.ClientMock{} + testData.setup(bucketClient) + + actualSeed, err := waitSeedFileStability(context.Background(), objstore.BucketWithMetrics("", bucketClient, nil), minStability, log.NewNopLogger()) + if testData.expectedErr != nil { + require.Error(t, err) + require.Contains(t, err.Error(), testData.expectedErr.Error()) + } else { + require.NoError(t, err) + require.Equal(t, testData.expectedSeed.UID, actualSeed.UID) + require.Equal(t, testData.expectedSeed.CreatedAt.Unix(), actualSeed.CreatedAt.Unix()) + } + + require.GreaterOrEqual(t, time.Since(startTime), testData.expectedMinDuration) + }) + } +} + +func TestInitSeedFile(t *testing.T) { + const minStability = 3 * time.Second + + type testConfig struct { + setup func(bucketClient objstore.Bucket) + expectedErr error + expectedMinDuration time.Duration + } + + tests := map[string]testConfig{ + "should immediately return if seed file exists and it was created more than 'min stability' time ago": func() testConfig { + oldSeed := ClusterSeed{UID: "old", CreatedAt: time.Now().Add(-2 * minStability)} + + return testConfig{ + setup: func(bucketClient objstore.Bucket) { + data, err := json.Marshal(oldSeed) + require.NoError(t, err) + require.NoError(t, bucketClient.Upload(context.Background(), ClusterSeedFileName, bytes.NewReader(data))) + }, + expectedMinDuration: 0, + } + }(), + "should wait for 'min stability' and return the seed file if it exists and was created less than 'min stability' time ago": func() testConfig { + newSeed := ClusterSeed{UID: "new", CreatedAt: time.Now()} + + return testConfig{ + setup: func(bucketClient objstore.Bucket) { + data, err := json.Marshal(newSeed) + require.NoError(t, err) + require.NoError(t, bucketClient.Upload(context.Background(), ClusterSeedFileName, bytes.NewReader(data))) + }, + expectedMinDuration: minStability, + } + }(), + "should create the seed file if doesn't exist and then wait for 'min stability'": { + setup: func(bucketClient objstore.Bucket) {}, + expectedMinDuration: minStability, + }, + "should re-create the seed file if exist but is corrupted, and then wait for 'min stability'": { + setup: func(bucketClient objstore.Bucket) { + require.NoError(t, bucketClient.Upload(context.Background(), ClusterSeedFileName, bytes.NewReader([]byte("xxx")))) + }, + expectedMinDuration: minStability, + }, + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + t.Parallel() + + bucketClient, err := filesystem.NewBucketClient(filesystem.Config{Directory: t.TempDir()}) + require.NoError(t, err) + + testData.setup(bucketClient) + + startTime := time.Now() + actualSeed, err := initSeedFile(context.Background(), objstore.BucketWithMetrics("", bucketClient, nil), minStability, log.NewNopLogger()) + if testData.expectedErr != nil { + require.Error(t, err) + require.Contains(t, err.Error(), testData.expectedErr.Error()) + } else { + require.NoError(t, err) + + // We expect the seed stored in the bucket. + expectedSeed, err := readSeedFile(context.Background(), objstore.BucketWithMetrics("", bucketClient, nil), log.NewNopLogger()) + require.NoError(t, err) + require.Equal(t, expectedSeed.UID, actualSeed.UID) + require.Equal(t, expectedSeed.CreatedAt.Unix(), actualSeed.CreatedAt.Unix()) + } + + require.GreaterOrEqual(t, time.Since(startTime), testData.expectedMinDuration) + }) + } +} + +func TestInitSeedFile_CreatingConcurrency(t *testing.T) { + t.Parallel() + + const ( + numReplicas = 100 + minStability = 5 * time.Second + ) + + var ( + start = make(chan struct{}) + seedsMx sync.Mutex + seeds = make([]ClusterSeed, 0, numReplicas) + ) + + bucketClient, err := filesystem.NewBucketClient(filesystem.Config{Directory: t.TempDir()}) + require.NoError(t, err) + + // Add a random delay to each API call to increase the likelihood of having multiple replicas creating the seed file. + bucketClient = bucket.NewDelayedBucketClient(bucketClient, 5*time.Millisecond, 10*time.Millisecond) + + // Run replicas. + group := errgroup.Group{} + + for i := 0; i < numReplicas; i++ { + group.Go(func() error { + // Wait for the start. + <-start + + seed, err := initSeedFile(context.Background(), objstore.BucketWithMetrics("", bucketClient, nil), minStability, log.NewNopLogger()) + if err != nil { + return err + } + + seedsMx.Lock() + seeds = append(seeds, seed) + seedsMx.Unlock() + + return nil + }) + } + + // Notify replicas to call initSeedFile(). + close(start) + + // Wait until all replicas have done. + require.NoError(t, group.Wait()) + + // We expect all replicas got the same seed. + require.Len(t, seeds, numReplicas) + for i := 1; i < numReplicas; i++ { + require.Equal(t, seeds[0], seeds[i]) + } +}