From dde2cae91808adcaa31391c58f5db5a2b9f0cc01 Mon Sep 17 00:00:00 2001 From: aymericDD Date: Fri, 8 Oct 2021 19:32:08 +0200 Subject: [PATCH 1/3] store: validate --block-sync-concurrency parameter (#4753) * store: valide block sync concurrency parameter Must be equal or greater than 1 to avoid blocked program. Signed-off-by: Aymeric * docs: update docs (#4753) Signed-off-by: Aymeric Co-authored-by: Aymeric --- CHANGELOG.md | 1 + cmd/thanos/store.go | 2 +- docs/components/store.md | 1 + pkg/store/bucket.go | 17 +++++++++++++++++ pkg/store/bucket_test.go | 26 ++++++++++++++++++++++++++ 5 files changed, 46 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d8d67abc40..a2695830e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Fixed - [#4663](https://github.com/thanos-io/thanos/pull/4663) Fetcher: Fix discovered data races +- [#4753](https://github.com/thanos-io/thanos/pull/4753) Store: valide block sync concurrency parameter ### Added diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 19b6dbf837..25310f9ef3 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -113,7 +113,7 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view."). Default("3m").DurationVar(&sc.syncInterval) - cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage."). + cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage. Must be equal or greater than 1."). Default("20").IntVar(&sc.blockSyncConcurrency) cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage."). diff --git a/docs/components/store.md b/docs/components/store.md index 3ae037beb9..cdc8605052 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -34,6 +34,7 @@ Flags: --block-sync-concurrency=20 Number of goroutines to use when constructing index-cache.json blocks from object storage. + Must be equal or greater than 1. --chunk-pool-size=2GB Maximum size of concurrently allocatable bytes reserved strictly to reuse for chunks in memory. diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 39d78b257f..3af0c179ec 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -92,6 +92,12 @@ const ( // Labels for metrics. labelEncode = "encode" labelDecode = "decode" + + minBlockSyncConcurrency = 1 +) + +var ( + errBlockSyncConcurrencyNotValid = errors.New("the block sync concurrency must be equal or greater than 1.") ) type bucketStoreMetrics struct { @@ -298,6 +304,13 @@ type BucketStore struct { enableSeriesResponseHints bool } +func (b *BucketStore) validate() error { + if b.blockSyncConcurrency < minBlockSyncConcurrency { + return errBlockSyncConcurrencyNotValid + } + return nil +} + type noopCache struct{} func (noopCache) StorePostings(context.Context, ulid.ULID, labels.Label, []byte) {} @@ -407,6 +420,10 @@ func NewBucketStore( s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, indexReaderPoolMetrics) s.metrics = newBucketStoreMetrics(s.reg) // TODO(metalmatze): Might be possible via Option too + if err := s.validate(); err != nil { + return nil, errors.Wrap(err, "validate config") + } + if err := os.MkdirAll(dir, 0750); err != nil { return nil, errors.Wrap(err, "create dir") } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index e635ab22c1..adc5701740 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -556,6 +556,32 @@ func TestGapBasedPartitioner_Partition(t *testing.T) { } } +func TestBucketStoreConfig_validate(t *testing.T) { + tests := map[string]struct { + config *BucketStore + expected error + }{ + "should pass on valid config": { + config: &BucketStore{ + blockSyncConcurrency: 1, + }, + expected: nil, + }, + "should fail on blockSyncConcurrency < 1": { + config: &BucketStore{ + blockSyncConcurrency: 0, + }, + expected: errBlockSyncConcurrencyNotValid, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + testutil.Equals(t, testData.expected, testData.config.validate()) + }) + } +} + func TestBucketStore_Info(t *testing.T) { defer testutil.TolerantVerifyLeak(t) From d5156d8e10f8f6bfc880ccf98b0cbf37dd0b1304 Mon Sep 17 00:00:00 2001 From: ian woolf Date: Sun, 10 Oct 2021 17:27:32 +0800 Subject: [PATCH 2/3] pkg/block: childSources in addNodeBySources do not need to be assigned every time (#4758) Signed-off-by: ian woolf --- pkg/block/fetcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index f9f45202d9..77091ecbff 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -660,9 +660,9 @@ func (f *DeduplicateFilter) DuplicateIDs() []ulid.ULID { func addNodeBySources(root, add *Node) bool { var rootNode *Node + childSources := add.Compaction.Sources for _, node := range root.Children { parentSources := node.Compaction.Sources - childSources := add.Compaction.Sources // Block exists with same sources, add as child. if contains(parentSources, childSources) && contains(childSources, parentSources) { From 7dee6fa1568256e282b1f07a9c90ef092d70ffcb Mon Sep 17 00:00:00 2001 From: Matej Gera <38492574+matej-g@users.noreply.github.com> Date: Tue, 12 Oct 2021 11:28:57 +0200 Subject: [PATCH 3/3] Tests: Attempt to fix flaky test in reloader on directories changes (#4765) * Fix by catching up on missed steps Signed-off-by: Matej Gera * Review feedback - simplify mutex unlock Signed-off-by: Matej Gera --- pkg/reloader/reloader_test.go | 165 ++++++++++++++++++---------------- 1 file changed, 87 insertions(+), 78 deletions(-) diff --git a/pkg/reloader/reloader_test.go b/pkg/reloader/reloader_test.go index 6659d20cc7..25a0af5ae9 100644 --- a/pkg/reloader/reloader_test.go +++ b/pkg/reloader/reloader_test.go @@ -247,6 +247,84 @@ func TestReloader_DirectoriesApply(t *testing.T) { testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-source.yaml"), path.Join(dir2, "rule3-001.yaml"))) testutil.Ok(t, ioutil.WriteFile(path.Join(dir2, "rule-dir", "rule4.yaml"), []byte("rule4"), os.ModePerm)) + stepFunc := func(rel int) { + t.Log("Performing step number", rel) + switch rel { + case 0: + // Create rule2.yaml. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // └─ rule2.yaml (*) + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml + testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "rule2.yaml"), []byte("rule2"), os.ModePerm)) + case 1: + // Update rule1.yaml. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml (*) + // └─ rule2.yaml + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml + testutil.Ok(t, os.Rename(tempRule1File, path.Join(dir, "rule1.yaml"))) + case 2: + // Create dir/rule3.yaml (symlink to rule3-001.yaml). + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml -> dir2/rule3-001.yaml (*) + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-001.yaml"), path.Join(dir2, "rule3.yaml"))) + testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml"))) + case 3: + // Update the symlinked file and replace the symlink file to trigger fsnotify. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml -> dir2/rule3-002.yaml (*) + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-002.yaml -> rule3-source.yaml (*) + // └─ rule3-source.yaml (*) + testutil.Ok(t, os.Rename(tempRule3File, path.Join(dir2, "rule3-source.yaml"))) + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-source.yaml"), path.Join(dir2, "rule3-002.yaml"))) + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-002.yaml"), path.Join(dir2, "rule3.yaml"))) + testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml"))) + testutil.Ok(t, os.Remove(path.Join(dir2, "rule3-001.yaml"))) + case 4: + // Update rule4.yaml in the symlinked directory. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml -> rule3-source.yaml + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml (*) + // └─ rule3-source.yaml + testutil.Ok(t, os.Rename(tempRule4File, path.Join(dir2, "rule-dir", "rule4.yaml"))) + } + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) g := sync.WaitGroup{} g.Add(1) @@ -267,90 +345,21 @@ func TestReloader_DirectoriesApply(t *testing.T) { reloadsMtx.Lock() rel := reloads + reloadsMtx.Unlock() if init && rel <= reloadsSeen { - reloadsMtx.Unlock() continue } - reloadsMtx.Unlock() - init = true - reloadsSeen = rel - t.Log("Performing step number", rel) - switch rel { - case 0: - // Create rule2.yaml. - // - // dir - // ├─ rule-dir -> dir2/rule-dir - // ├─ rule1.yaml - // └─ rule2.yaml (*) - // dir2 - // ├─ rule-dir - // │ └─ rule4.yaml - // ├─ rule3-001.yaml -> rule3-source.yaml - // └─ rule3-source.yaml - testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "rule2.yaml"), []byte("rule2"), os.ModePerm)) - case 1: - // Update rule1.yaml. - // - // dir - // ├─ rule-dir -> dir2/rule-dir - // ├─ rule1.yaml (*) - // └─ rule2.yaml - // dir2 - // ├─ rule-dir - // │ └─ rule4.yaml - // ├─ rule3-001.yaml -> rule3-source.yaml - // └─ rule3-source.yaml - testutil.Ok(t, os.Rename(tempRule1File, path.Join(dir, "rule1.yaml"))) - case 2: - // Create dir/rule3.yaml (symlink to rule3-001.yaml). - // - // dir - // ├─ rule-dir -> dir2/rule-dir - // ├─ rule1.yaml - // ├─ rule2.yaml - // └─ rule3.yaml -> dir2/rule3-001.yaml (*) - // dir2 - // ├─ rule-dir - // │ └─ rule4.yaml - // ├─ rule3-001.yaml -> rule3-source.yaml - // └─ rule3-source.yaml - testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-001.yaml"), path.Join(dir2, "rule3.yaml"))) - testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml"))) - case 3: - // Update the symlinked file and replace the symlink file to trigger fsnotify. - // - // dir - // ├─ rule-dir -> dir2/rule-dir - // ├─ rule1.yaml - // ├─ rule2.yaml - // └─ rule3.yaml -> dir2/rule3-002.yaml (*) - // dir2 - // ├─ rule-dir - // │ └─ rule4.yaml - // ├─ rule3-002.yaml -> rule3-source.yaml (*) - // └─ rule3-source.yaml (*) - testutil.Ok(t, os.Rename(tempRule3File, path.Join(dir2, "rule3-source.yaml"))) - testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-source.yaml"), path.Join(dir2, "rule3-002.yaml"))) - testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-002.yaml"), path.Join(dir2, "rule3.yaml"))) - testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml"))) - testutil.Ok(t, os.Remove(path.Join(dir2, "rule3-001.yaml"))) - case 4: - // Update rule4.yaml in the symlinked directory. - // - // dir - // ├─ rule-dir -> dir2/rule-dir - // ├─ rule1.yaml - // ├─ rule2.yaml - // └─ rule3.yaml -> rule3-source.yaml - // dir2 - // ├─ rule-dir - // │ └─ rule4.yaml (*) - // └─ rule3-source.yaml - testutil.Ok(t, os.Rename(tempRule4File, path.Join(dir2, "rule-dir", "rule4.yaml"))) + // Catch up if reloader is step(s) ahead. + for skipped := rel - reloadsSeen - 1; skipped > 0; skipped-- { + stepFunc(rel - skipped) } + stepFunc(rel) + + init = true + reloadsSeen = rel + if rel > 4 { // All good. return