Skip to content

Commit

Permalink
Fixing failing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
duricanikolic committed Mar 10, 2023
1 parent fd9c1e0 commit 1be6b7b
Show file tree
Hide file tree
Showing 11 changed files with 39 additions and 36 deletions.
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* `cortex_bucket_store_series_get_all_duration_seconds`
* `cortex_bucket_store_series_merge_duration_seconds`
* [CHANGE] Ingester: changed default value of `-blocks-storage.tsdb.retention-period` from `24h` to `13h`. If you're running Mimir with a custom configuration and you're overriding `-querier.query-store-after` to a value greater than the default `12h` then you should increase `-blocks-storage.tsdb.retention-period` accordingly. #4382
* [CHANGE] Ingester: the configuration parameter `-blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup` has been deprecated and will be removed in Mimir 2.10.
* [CHANGE] Ingester: the configuration parameter `-blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup` has been deprecated and will be removed in Mimir 2.10. #4445
* [FEATURE] Cache: Introduce experimental support for using Redis for results, chunks, index, and metadata caches. #4371
* [ENHANCEMENT] Allow to define service name used for tracing via `JAEGER_SERVICE_NAME` environment variable. #4394
* [ENHANCEMENT] Querier and query-frontend: add experimental, more performant protobuf query result response format enabled with `-query-frontend.query-result-response-format=protobuf`. #4304 #4318 #4375
Expand All @@ -19,8 +19,8 @@
* [ENHANCEMENT] Ruler: increased tolerance for missed iterations on alerts, reducing the chances of flapping firing alerts during ruler restarts. #4432
* [ENHANCEMENT] Querier and store-gateway: optimized `.*` and `.+` regular expression label matchers. #4432
* [ENHANCEMENT] Query-frontend: results cache TTL is now configurable by using `-query-frontend.results-cache-ttl` and `-query-frontend.results-cache-ttl-for-out-of-order-time-window` options. These values can also be specified per tenant. Default values are unchanged (7 days and 10 minutes respectively). #4385
* [BUGFIX] Querier: Streaming remote read will now continue to return multiple chunks per frame after the first frame. #4423
* [ENHANCEMENT] Ingester: added advanced parameter `-blocks-storage.tsdb.wal-replay-concurrency` representing the maximum number of CPUs used during WAL replay.
* [ENHANCEMENT] Ingester: added advanced configuration parameter `-blocks-storage.tsdb.wal-replay-concurrency` representing the maximum number of CPUs used during WAL replay. #4445
* [BUGFIX] Querier: Streaming remote read will now continue to return multiple chunks per frame after the first frame. #4423

### Mixin

Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -7093,7 +7093,7 @@
"kind": "field",
"name": "wal_replay_concurrency",
"required": false,
"desc": "Maximum number of CPUs that can simultaneously processes WAL replay. 0 means disabled.",
"desc": "Maximum number of CPUs that can simultaneously processes WAL replay. 0 means disabled. If set to a positive value it overrides the deprecated blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup option",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "blocks-storage.tsdb.wal-replay-concurrency",
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ Usage of ./cmd/mimir/mimir:
-blocks-storage.tsdb.wal-compression-enabled
True to enable TSDB WAL compression.
-blocks-storage.tsdb.wal-replay-concurrency int
Maximum number of CPUs that can simultaneously processes WAL replay. 0 means disabled.
Maximum number of CPUs that can simultaneously processes WAL replay. 0 means disabled. If set to a positive value it overrides the deprecated blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup option
-blocks-storage.tsdb.wal-segment-size-bytes int
TSDB WAL segments files max size (bytes). (default 134217728)
-common.storage.azure.account-key string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3268,7 +3268,9 @@ tsdb:
[wal_segment_size_bytes: <int> | default = 134217728]
# (advanced) Maximum number of CPUs that can simultaneously processes WAL
# replay. 0 means disabled.
# replay. 0 means disabled. If set to a positive value it overrides the
# deprecated blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup
# option
# CLI flag: -blocks-storage.tsdb.wal-replay-concurrency
[wal_replay_concurrency: <int> | default = 0]
Expand Down
13 changes: 6 additions & 7 deletions integration/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/oklog/ulid"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"
Expand Down Expand Up @@ -74,16 +73,16 @@ func TestCompactBlocksContainingNativeHistograms(t *testing.T) {
Labels: labels.FromStrings("case", "native_histogram", "i", strconv.Itoa(i)),
Chunks: []chunks.Meta{
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{
sample{10, 0, tsdb.GenerateTestHistogram(1), nil},
sample{20, 0, tsdb.GenerateTestHistogram(2), nil},
sample{10, 0, tsdbutil.GenerateTestHistogram(1), nil},
sample{20, 0, tsdbutil.GenerateTestHistogram(2), nil},
}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{
sample{30, 0, tsdb.GenerateTestHistogram(3), nil},
sample{40, 0, tsdb.GenerateTestHistogram(4), nil},
sample{30, 0, tsdbutil.GenerateTestHistogram(3), nil},
sample{40, 0, tsdbutil.GenerateTestHistogram(4), nil},
}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{
sample{50, 0, tsdb.GenerateTestHistogram(5), nil},
sample{2*time.Hour.Milliseconds() - 1, 0, tsdb.GenerateTestHistogram(6), nil},
sample{50, 0, tsdbutil.GenerateTestHistogram(5), nil},
sample{2*time.Hour.Milliseconds() - 1, 0, tsdbutil.GenerateTestHistogram(6), nil},
}),
},
}
Expand Down
9 changes: 4 additions & 5 deletions integration/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@ import (
"path/filepath"
"time"

"github.com/grafana/e2e"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb"

"github.com/grafana/e2e"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
)

var (
Expand Down Expand Up @@ -117,11 +116,11 @@ func getTLSFlagsWithPrefix(prefix string, servername string, http bool) map[stri
}

func GenerateTestHistogram(i int) *histogram.Histogram {
return tsdb.GenerateTestHistograms(i + 1)[i]
return tsdbutil.GenerateTestHistograms(i + 1)[i]
}

func GenerateTestFloatHistogram(i int) *histogram.FloatHistogram {
return tsdb.GenerateTestFloatHistograms(i + 1)[i]
return tsdbutil.GenerateTestFloatHistograms(i + 1)[i]
}

// explicit decoded version of GenerateTestHistogram and GenerateTestFloatHistogram
Expand Down
16 changes: 10 additions & 6 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ const (
minOutOfOrderTimeWindowSecondsStatName = "ingester_ooo_min_window"
maxOutOfOrderTimeWindowSecondsStatName = "ingester_ooo_max_window"

// maximum number of TSDBs present on the file system which can be opened in a single process without walReplayConcurrency
// Maximum number of TSDB users present on the file system which can be opened in a single process
// without concurrency. More precisely, if actual number of TSDB users is lower than this number,
// each TSDB is opened in a single process, while WAL replay is done in WALReplayConcurrency concurrent
// processes. Otherwise, TSDBs are opened in WALReplayConcurrency concurrent processes, while WAL replay
// is done in a single process.
maxTSDBOpenWithoutConcurrency = 10
)

Expand Down Expand Up @@ -1842,7 +1846,7 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {
queue := make(chan string)
group, groupCtx := errgroup.WithContext(ctx)

userIDs, err := i.getAllUsersWithTSDB()
userIDs, err := i.getAllTSDBUserIDs()
if err != nil {
level.Error(i.logger).Log("msg", "error while finding existing TSDBs", "err", err)
return err
Expand Down Expand Up @@ -1896,7 +1900,7 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {
})
}

// Spawn a goroutine to place on the queue all users with a TSDB found on the filesystem.
// Spawn a goroutine to place all users with a TSDB found on the filesystem in the queue.
group.Go(func() error {
defer close(queue)

Expand Down Expand Up @@ -1927,8 +1931,8 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {
return nil
}

// getAllUsersWithTSDB finds all users with a TSDB on the filesystem.
func (i *Ingester) getAllUsersWithTSDB() (map[string]struct{}, error) {
// getAllTSDBUserIDs finds all users with a TSDB on the filesystem.
func (i *Ingester) getAllTSDBUserIDs() (map[string]struct{}, error) {
userIDs := make(map[string]struct{})
walkErr := filepath.Walk(i.cfg.BlocksStorageConfig.TSDB.Dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
Expand Down Expand Up @@ -2242,7 +2246,7 @@ func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbCloseCheckRes
userDB.casState(closing, closed)

// Only remove user from TSDBState when everything is cleaned up
// This will prevent walReplayConcurrency problems when cortex are trying to open new TSDB - Ie: New request for a given tenant
// This will prevent concurrency problems when cortex are trying to open new TSDB - Ie: New request for a given tenant
// came in - while closing the tsdb for the same tenant.
// If this happens now, the request will get reject as the push will not be able to acquire the lock as the tsdb will be
// in closed state
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func NewShipper(
// Sync performs a single synchronization, which ensures all non-compacted local blocks have been uploaded
// to the object bucket once.
//
// It is not walReplayConcurrency-safe, however it is compactor-safe (running concurrently with compactor is ok).
// It is not concurrency-safe, however it is compactor-safe (running concurrently with compactor is ok).
func (s *Shipper) Sync(ctx context.Context) (shipped int, err error) {
shippedBlocks, err := readShippedBlocks(s.dir)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/mimir/mimir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func TestConfigValidation(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
err := tc.getTestConfig().Validate(nil)
err := tc.getTestConfig().Validate(util_log.Logger)
if tc.expectAnyError {
require.Error(t, err)
} else if tc.expectedError != nil {
Expand Down
5 changes: 2 additions & 3 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.StripeSize, "blocks-storage.tsdb.stripe-size", 16384, headStripeSizeHelp)
f.BoolVar(&cfg.WALCompressionEnabled, "blocks-storage.tsdb.wal-compression-enabled", false, "True to enable TSDB WAL compression.")
f.IntVar(&cfg.WALSegmentSizeBytes, "blocks-storage.tsdb.wal-segment-size-bytes", wlog.DefaultSegmentSize, "TSDB WAL segments files max size (bytes).")
f.IntVar(&cfg.WALReplayConcurrency, "blocks-storage.tsdb.wal-replay-concurrency", 0, "Maximum number of CPUs that can simultaneously processes WAL replay. 0 means disabled.")
f.IntVar(&cfg.WALReplayConcurrency, "blocks-storage.tsdb.wal-replay-concurrency", 0, "Maximum number of CPUs that can simultaneously processes WAL replay. 0 means disabled. If set to a positive value it overrides the deprecated "+maxTSDBOpeningConcurrencyOnStartupFlag+" option")
f.BoolVar(&cfg.FlushBlocksOnShutdown, "blocks-storage.tsdb.flush-blocks-on-shutdown", false, "True to flush blocks to storage on shutdown. If false, incomplete blocks will be reused after restart.")
f.DurationVar(&cfg.CloseIdleTSDBTimeout, "blocks-storage.tsdb.close-idle-tsdb-timeout", 13*time.Hour, "If TSDB has not received any data for this duration, and all blocks from TSDB have been shipped, TSDB is closed and deleted from local disk. If set to positive value, this value should be equal or higher than -querier.query-ingesters-within flag to make sure that TSDB is not closed prematurely, which could cause partial query results. 0 or negative value disables closing of idle TSDB.")
f.BoolVar(&cfg.MemorySnapshotOnShutdown, "blocks-storage.tsdb.memory-snapshot-on-shutdown", false, "True to enable snapshotting of in-memory TSDB data on disk when shutting down.")
Expand All @@ -252,9 +252,8 @@ func (cfg *TSDBConfig) Validate(logger log.Logger) error {

if cfg.DeprecatedMaxTSDBOpeningConcurrencyOnStartup <= 0 {
return errInvalidOpeningConcurrency
} else {
util.WarnDeprecatedConfig(maxTSDBOpeningConcurrencyOnStartupFlag, logger)
}
util.WarnDeprecatedConfig(maxTSDBOpeningConcurrencyOnStartupFlag, logger)

if cfg.HeadCompactionInterval <= 0 || cfg.HeadCompactionInterval > 15*time.Minute {
return errInvalidCompactionInterval
Expand Down
14 changes: 7 additions & 7 deletions pkg/util/test/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,32 @@ package test
import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/stretchr/testify/require"
)

func GenerateTestHistograms(i int) []*histogram.Histogram {
return tsdb.GenerateTestHistograms(i)
return tsdbutil.GenerateTestHistograms(i)
}

func GenerateTestFloatHistograms(i int) []*histogram.FloatHistogram {
return tsdb.GenerateTestFloatHistograms(i)
return tsdbutil.GenerateTestFloatHistograms(i)
}

func GenerateTestHistogram(i int) *histogram.Histogram {
return tsdb.GenerateTestHistogram(i)
return tsdbutil.GenerateTestHistogram(i)
}

func GenerateTestFloatHistogram(i int) *histogram.FloatHistogram {
return tsdb.GenerateTestFloatHistogram(i)
return tsdbutil.GenerateTestFloatHistogram(i)
}

func GenerateTestGaugeHistogram(i int) *histogram.Histogram {
return tsdb.GenerateTestGaugeHistogram(i)
return tsdbutil.GenerateTestGaugeHistogram(i)
}

func GenerateTestGaugeFloatHistogram(i int) *histogram.FloatHistogram {
return tsdb.GenerateTestGaugeFloatHistogram(i)
return tsdbutil.GenerateTestGaugeFloatHistogram(i)
}

// explicit decoded version of GenerateTestHistogram and GenerateTestFloatHistogram
Expand Down

0 comments on commit 1be6b7b

Please sign in to comment.