From 1be6b7b252521e5f71848d192e6a6ffeeb48f764 Mon Sep 17 00:00:00 2001 From: Yuri Nikolic Date: Fri, 10 Mar 2023 00:51:36 +0100 Subject: [PATCH] Fixing failing tests --- CHANGELOG.md | 6 +++--- cmd/mimir/config-descriptor.json | 2 +- cmd/mimir/help-all.txt.tmpl | 2 +- .../references/configuration-parameters/index.md | 4 +++- integration/compactor_test.go | 13 ++++++------- integration/util.go | 9 ++++----- pkg/ingester/ingester.go | 16 ++++++++++------ pkg/ingester/shipper.go | 2 +- pkg/mimir/mimir_test.go | 2 +- pkg/storage/tsdb/config.go | 5 ++--- pkg/util/test/histogram.go | 14 +++++++------- 11 files changed, 39 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d26ae23f67..a1fe399be99 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ * `cortex_bucket_store_series_get_all_duration_seconds` * `cortex_bucket_store_series_merge_duration_seconds` * [CHANGE] Ingester: changed default value of `-blocks-storage.tsdb.retention-period` from `24h` to `13h`. If you're running Mimir with a custom configuration and you're overriding `-querier.query-store-after` to a value greater than the default `12h` then you should increase `-blocks-storage.tsdb.retention-period` accordingly. #4382 -* [CHANGE] Ingester: the configuration parameter `-blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup` has been deprecated and will be removed in Mimir 2.10. +* [CHANGE] Ingester: the configuration parameter `-blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup` has been deprecated and will be removed in Mimir 2.10. #4445 * [FEATURE] Cache: Introduce experimental support for using Redis for results, chunks, index, and metadata caches. #4371 * [ENHANCEMENT] Allow to define service name used for tracing via `JAEGER_SERVICE_NAME` environment variable. #4394 * [ENHANCEMENT] Querier and query-frontend: add experimental, more performant protobuf query result response format enabled with `-query-frontend.query-result-response-format=protobuf`. #4304 #4318 #4375 @@ -19,8 +19,8 @@ * [ENHANCEMENT] Ruler: increased tolerance for missed iterations on alerts, reducing the chances of flapping firing alerts during ruler restarts. #4432 * [ENHANCEMENT] Querier and store-gateway: optimized `.*` and `.+` regular expression label matchers. #4432 * [ENHANCEMENT] Query-frontend: results cache TTL is now configurable by using `-query-frontend.results-cache-ttl` and `-query-frontend.results-cache-ttl-for-out-of-order-time-window` options. These values can also be specified per tenant. Default values are unchanged (7 days and 10 minutes respectively). #4385 -* [BUGFIX] Querier: Streaming remote read will now continue to return multiple chunks per frame after the first frame. #4423 -* [ENHANCEMENT] Ingester: added advanced parameter `-blocks-storage.tsdb.wal-replay-concurrency` representing the maximum number of CPUs used during WAL replay. +* [ENHANCEMENT] Ingester: added advanced configuration parameter `-blocks-storage.tsdb.wal-replay-concurrency` representing the maximum number of CPUs used during WAL replay. #4445 +* [BUGFIX] Querier: Streaming remote read will now continue to return multiple chunks per frame after the first frame. #4423 ### Mixin diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index a872a6f5f1d..f225319ba10 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -7093,7 +7093,7 @@ "kind": "field", "name": "wal_replay_concurrency", "required": false, - "desc": "Maximum number of CPUs that can simultaneously processes WAL replay. 0 means disabled.", + "desc": "Maximum number of CPUs that can simultaneously processes WAL replay. 0 means disabled. If set to a positive value it overrides the deprecated blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup option", "fieldValue": null, "fieldDefaultValue": 0, "fieldFlag": "blocks-storage.tsdb.wal-replay-concurrency", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 221e807f8f0..09d7d43c6c3 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -680,7 +680,7 @@ Usage of ./cmd/mimir/mimir: -blocks-storage.tsdb.wal-compression-enabled True to enable TSDB WAL compression. -blocks-storage.tsdb.wal-replay-concurrency int - Maximum number of CPUs that can simultaneously processes WAL replay. 0 means disabled. + Maximum number of CPUs that can simultaneously processes WAL replay. 0 means disabled. If set to a positive value it overrides the deprecated blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup option -blocks-storage.tsdb.wal-segment-size-bytes int TSDB WAL segments files max size (bytes). (default 134217728) -common.storage.azure.account-key string diff --git a/docs/sources/mimir/references/configuration-parameters/index.md b/docs/sources/mimir/references/configuration-parameters/index.md index 69ff7cb6f97..46ef2ca2ec9 100644 --- a/docs/sources/mimir/references/configuration-parameters/index.md +++ b/docs/sources/mimir/references/configuration-parameters/index.md @@ -3268,7 +3268,9 @@ tsdb: [wal_segment_size_bytes: | default = 134217728] # (advanced) Maximum number of CPUs that can simultaneously processes WAL - # replay. 0 means disabled. + # replay. 0 means disabled. If set to a positive value it overrides the + # deprecated blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup + # option # CLI flag: -blocks-storage.tsdb.wal-replay-concurrency [wal_replay_concurrency: | default = 0] diff --git a/integration/compactor_test.go b/integration/compactor_test.go index 2decf210067..91042daa0f4 100644 --- a/integration/compactor_test.go +++ b/integration/compactor_test.go @@ -20,7 +20,6 @@ import ( "github.com/oklog/ulid" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" @@ -74,16 +73,16 @@ func TestCompactBlocksContainingNativeHistograms(t *testing.T) { Labels: labels.FromStrings("case", "native_histogram", "i", strconv.Itoa(i)), Chunks: []chunks.Meta{ tsdbutil.ChunkFromSamples([]tsdbutil.Sample{ - sample{10, 0, tsdb.GenerateTestHistogram(1), nil}, - sample{20, 0, tsdb.GenerateTestHistogram(2), nil}, + sample{10, 0, tsdbutil.GenerateTestHistogram(1), nil}, + sample{20, 0, tsdbutil.GenerateTestHistogram(2), nil}, }), tsdbutil.ChunkFromSamples([]tsdbutil.Sample{ - sample{30, 0, tsdb.GenerateTestHistogram(3), nil}, - sample{40, 0, tsdb.GenerateTestHistogram(4), nil}, + sample{30, 0, tsdbutil.GenerateTestHistogram(3), nil}, + sample{40, 0, tsdbutil.GenerateTestHistogram(4), nil}, }), tsdbutil.ChunkFromSamples([]tsdbutil.Sample{ - sample{50, 0, tsdb.GenerateTestHistogram(5), nil}, - sample{2*time.Hour.Milliseconds() - 1, 0, tsdb.GenerateTestHistogram(6), nil}, + sample{50, 0, tsdbutil.GenerateTestHistogram(5), nil}, + sample{2*time.Hour.Milliseconds() - 1, 0, tsdbutil.GenerateTestHistogram(6), nil}, }), }, } diff --git a/integration/util.go b/integration/util.go index 9a1f99b037b..85f2fbb9990 100644 --- a/integration/util.go +++ b/integration/util.go @@ -15,15 +15,14 @@ import ( "path/filepath" "time" + "github.com/grafana/e2e" "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage/remote" - "github.com/prometheus/prometheus/tsdb" - - "github.com/grafana/e2e" + "github.com/prometheus/prometheus/tsdb/tsdbutil" ) var ( @@ -117,11 +116,11 @@ func getTLSFlagsWithPrefix(prefix string, servername string, http bool) map[stri } func GenerateTestHistogram(i int) *histogram.Histogram { - return tsdb.GenerateTestHistograms(i + 1)[i] + return tsdbutil.GenerateTestHistograms(i + 1)[i] } func GenerateTestFloatHistogram(i int) *histogram.FloatHistogram { - return tsdb.GenerateTestFloatHistograms(i + 1)[i] + return tsdbutil.GenerateTestFloatHistograms(i + 1)[i] } // explicit decoded version of GenerateTestHistogram and GenerateTestFloatHistogram diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 7038b1db43d..190d36379a5 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -110,7 +110,11 @@ const ( minOutOfOrderTimeWindowSecondsStatName = "ingester_ooo_min_window" maxOutOfOrderTimeWindowSecondsStatName = "ingester_ooo_max_window" - // maximum number of TSDBs present on the file system which can be opened in a single process without walReplayConcurrency + // Maximum number of TSDB users present on the file system which can be opened in a single process + // without concurrency. More precisely, if actual number of TSDB users is lower than this number, + // each TSDB is opened in a single process, while WAL replay is done in WALReplayConcurrency concurrent + // processes. Otherwise, TSDBs are opened in WALReplayConcurrency concurrent processes, while WAL replay + // is done in a single process. maxTSDBOpenWithoutConcurrency = 10 ) @@ -1842,7 +1846,7 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error { queue := make(chan string) group, groupCtx := errgroup.WithContext(ctx) - userIDs, err := i.getAllUsersWithTSDB() + userIDs, err := i.getAllTSDBUserIDs() if err != nil { level.Error(i.logger).Log("msg", "error while finding existing TSDBs", "err", err) return err @@ -1896,7 +1900,7 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error { }) } - // Spawn a goroutine to place on the queue all users with a TSDB found on the filesystem. + // Spawn a goroutine to place all users with a TSDB found on the filesystem in the queue. group.Go(func() error { defer close(queue) @@ -1927,8 +1931,8 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error { return nil } -// getAllUsersWithTSDB finds all users with a TSDB on the filesystem. -func (i *Ingester) getAllUsersWithTSDB() (map[string]struct{}, error) { +// getAllTSDBUserIDs finds all users with a TSDB on the filesystem. +func (i *Ingester) getAllTSDBUserIDs() (map[string]struct{}, error) { userIDs := make(map[string]struct{}) walkErr := filepath.Walk(i.cfg.BlocksStorageConfig.TSDB.Dir, func(path string, info os.FileInfo, err error) error { if err != nil { @@ -2242,7 +2246,7 @@ func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbCloseCheckRes userDB.casState(closing, closed) // Only remove user from TSDBState when everything is cleaned up - // This will prevent walReplayConcurrency problems when cortex are trying to open new TSDB - Ie: New request for a given tenant + // This will prevent concurrency problems when cortex are trying to open new TSDB - Ie: New request for a given tenant // came in - while closing the tsdb for the same tenant. // If this happens now, the request will get reject as the push will not be able to acquire the lock as the tsdb will be // in closed state diff --git a/pkg/ingester/shipper.go b/pkg/ingester/shipper.go index 4f71b226bdf..46eef1948e2 100644 --- a/pkg/ingester/shipper.go +++ b/pkg/ingester/shipper.go @@ -112,7 +112,7 @@ func NewShipper( // Sync performs a single synchronization, which ensures all non-compacted local blocks have been uploaded // to the object bucket once. // -// It is not walReplayConcurrency-safe, however it is compactor-safe (running concurrently with compactor is ok). +// It is not concurrency-safe, however it is compactor-safe (running concurrently with compactor is ok). func (s *Shipper) Sync(ctx context.Context) (shipped int, err error) { shippedBlocks, err := readShippedBlocks(s.dir) if err != nil { diff --git a/pkg/mimir/mimir_test.go b/pkg/mimir/mimir_test.go index aac804b2bfc..a024c5ab2d7 100644 --- a/pkg/mimir/mimir_test.go +++ b/pkg/mimir/mimir_test.go @@ -417,7 +417,7 @@ func TestConfigValidation(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - err := tc.getTestConfig().Validate(nil) + err := tc.getTestConfig().Validate(util_log.Logger) if tc.expectAnyError { require.Error(t, err) } else if tc.expectedError != nil { diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index d6aff66f50a..2db835175ff 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -233,7 +233,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.StripeSize, "blocks-storage.tsdb.stripe-size", 16384, headStripeSizeHelp) f.BoolVar(&cfg.WALCompressionEnabled, "blocks-storage.tsdb.wal-compression-enabled", false, "True to enable TSDB WAL compression.") f.IntVar(&cfg.WALSegmentSizeBytes, "blocks-storage.tsdb.wal-segment-size-bytes", wlog.DefaultSegmentSize, "TSDB WAL segments files max size (bytes).") - f.IntVar(&cfg.WALReplayConcurrency, "blocks-storage.tsdb.wal-replay-concurrency", 0, "Maximum number of CPUs that can simultaneously processes WAL replay. 0 means disabled.") + f.IntVar(&cfg.WALReplayConcurrency, "blocks-storage.tsdb.wal-replay-concurrency", 0, "Maximum number of CPUs that can simultaneously processes WAL replay. 0 means disabled. If set to a positive value it overrides the deprecated "+maxTSDBOpeningConcurrencyOnStartupFlag+" option") f.BoolVar(&cfg.FlushBlocksOnShutdown, "blocks-storage.tsdb.flush-blocks-on-shutdown", false, "True to flush blocks to storage on shutdown. If false, incomplete blocks will be reused after restart.") f.DurationVar(&cfg.CloseIdleTSDBTimeout, "blocks-storage.tsdb.close-idle-tsdb-timeout", 13*time.Hour, "If TSDB has not received any data for this duration, and all blocks from TSDB have been shipped, TSDB is closed and deleted from local disk. If set to positive value, this value should be equal or higher than -querier.query-ingesters-within flag to make sure that TSDB is not closed prematurely, which could cause partial query results. 0 or negative value disables closing of idle TSDB.") f.BoolVar(&cfg.MemorySnapshotOnShutdown, "blocks-storage.tsdb.memory-snapshot-on-shutdown", false, "True to enable snapshotting of in-memory TSDB data on disk when shutting down.") @@ -252,9 +252,8 @@ func (cfg *TSDBConfig) Validate(logger log.Logger) error { if cfg.DeprecatedMaxTSDBOpeningConcurrencyOnStartup <= 0 { return errInvalidOpeningConcurrency - } else { - util.WarnDeprecatedConfig(maxTSDBOpeningConcurrencyOnStartupFlag, logger) } + util.WarnDeprecatedConfig(maxTSDBOpeningConcurrencyOnStartupFlag, logger) if cfg.HeadCompactionInterval <= 0 || cfg.HeadCompactionInterval > 15*time.Minute { return errInvalidCompactionInterval diff --git a/pkg/util/test/histogram.go b/pkg/util/test/histogram.go index f9abeb409e0..52106d4f790 100644 --- a/pkg/util/test/histogram.go +++ b/pkg/util/test/histogram.go @@ -5,32 +5,32 @@ package test import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/histogram" - "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/stretchr/testify/require" ) func GenerateTestHistograms(i int) []*histogram.Histogram { - return tsdb.GenerateTestHistograms(i) + return tsdbutil.GenerateTestHistograms(i) } func GenerateTestFloatHistograms(i int) []*histogram.FloatHistogram { - return tsdb.GenerateTestFloatHistograms(i) + return tsdbutil.GenerateTestFloatHistograms(i) } func GenerateTestHistogram(i int) *histogram.Histogram { - return tsdb.GenerateTestHistogram(i) + return tsdbutil.GenerateTestHistogram(i) } func GenerateTestFloatHistogram(i int) *histogram.FloatHistogram { - return tsdb.GenerateTestFloatHistogram(i) + return tsdbutil.GenerateTestFloatHistogram(i) } func GenerateTestGaugeHistogram(i int) *histogram.Histogram { - return tsdb.GenerateTestGaugeHistogram(i) + return tsdbutil.GenerateTestGaugeHistogram(i) } func GenerateTestGaugeFloatHistogram(i int) *histogram.FloatHistogram { - return tsdb.GenerateTestGaugeFloatHistogram(i) + return tsdbutil.GenerateTestGaugeFloatHistogram(i) } // explicit decoded version of GenerateTestHistogram and GenerateTestFloatHistogram