Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into fix-panic-query-store…
Browse files Browse the repository at this point in the history
…s-invalid-endpoint

Signed-off-by: Matej Gera <matejgera@gmail.com>
  • Loading branch information
matej-g committed Oct 12, 2021
2 parents efa4cd5 + 7dee6fa commit 6ab9b1e
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 80 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4508](https://github.com/thanos-io/thanos/pull/4508) Adjust and rename `ThanosSidecarUnhealthy` to `ThanosSidecarNoConnectionToStartedPrometheus`; Remove `ThanosSidecarPrometheusDown` alert; Remove unused `thanos_sidecar_last_heartbeat_success_time_seconds` metrics.
- [#4663](https://github.com/thanos-io/thanos/pull/4663) Fetcher: Fix discovered data races.
- [#4754](https://github.com/thanos-io/thanos/pull/4754) Query: Fix possible panic on stores endpoint.
- [#4753](https://github.com/thanos-io/thanos/pull/4753) Store: validate block sync concurrency parameter

## [v0.23.1](https://github.com/thanos-io/thanos/tree/release-0.23) - 2021.10.1

Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view.").
Default("3m").DurationVar(&sc.syncInterval)

cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage.").
cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage. Must be equal or greater than 1.").
Default("20").IntVar(&sc.blockSyncConcurrency)

cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage.").
Expand Down
1 change: 1 addition & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Flags:
--block-sync-concurrency=20
Number of goroutines to use when constructing
index-cache.json blocks from object storage.
Must be equal or greater than 1.
--chunk-pool-size=2GB Maximum size of concurrently allocatable bytes
reserved strictly to reuse for chunks in
memory.
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,9 +660,9 @@ func (f *DeduplicateFilter) DuplicateIDs() []ulid.ULID {

func addNodeBySources(root, add *Node) bool {
var rootNode *Node
childSources := add.Compaction.Sources
for _, node := range root.Children {
parentSources := node.Compaction.Sources
childSources := add.Compaction.Sources

// Block exists with same sources, add as child.
if contains(parentSources, childSources) && contains(childSources, parentSources) {
Expand Down
165 changes: 87 additions & 78 deletions pkg/reloader/reloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,84 @@ func TestReloader_DirectoriesApply(t *testing.T) {
testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-source.yaml"), path.Join(dir2, "rule3-001.yaml")))
testutil.Ok(t, ioutil.WriteFile(path.Join(dir2, "rule-dir", "rule4.yaml"), []byte("rule4"), os.ModePerm))

stepFunc := func(rel int) {
t.Log("Performing step number", rel)
switch rel {
case 0:
// Create rule2.yaml.
//
// dir
// ├─ rule-dir -> dir2/rule-dir
// ├─ rule1.yaml
// └─ rule2.yaml (*)
// dir2
// ├─ rule-dir
// │ └─ rule4.yaml
// ├─ rule3-001.yaml -> rule3-source.yaml
// └─ rule3-source.yaml
testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "rule2.yaml"), []byte("rule2"), os.ModePerm))
case 1:
// Update rule1.yaml.
//
// dir
// ├─ rule-dir -> dir2/rule-dir
// ├─ rule1.yaml (*)
// └─ rule2.yaml
// dir2
// ├─ rule-dir
// │ └─ rule4.yaml
// ├─ rule3-001.yaml -> rule3-source.yaml
// └─ rule3-source.yaml
testutil.Ok(t, os.Rename(tempRule1File, path.Join(dir, "rule1.yaml")))
case 2:
// Create dir/rule3.yaml (symlink to rule3-001.yaml).
//
// dir
// ├─ rule-dir -> dir2/rule-dir
// ├─ rule1.yaml
// ├─ rule2.yaml
// └─ rule3.yaml -> dir2/rule3-001.yaml (*)
// dir2
// ├─ rule-dir
// │ └─ rule4.yaml
// ├─ rule3-001.yaml -> rule3-source.yaml
// └─ rule3-source.yaml
testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-001.yaml"), path.Join(dir2, "rule3.yaml")))
testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml")))
case 3:
// Update the symlinked file and replace the symlink file to trigger fsnotify.
//
// dir
// ├─ rule-dir -> dir2/rule-dir
// ├─ rule1.yaml
// ├─ rule2.yaml
// └─ rule3.yaml -> dir2/rule3-002.yaml (*)
// dir2
// ├─ rule-dir
// │ └─ rule4.yaml
// ├─ rule3-002.yaml -> rule3-source.yaml (*)
// └─ rule3-source.yaml (*)
testutil.Ok(t, os.Rename(tempRule3File, path.Join(dir2, "rule3-source.yaml")))
testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-source.yaml"), path.Join(dir2, "rule3-002.yaml")))
testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-002.yaml"), path.Join(dir2, "rule3.yaml")))
testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml")))
testutil.Ok(t, os.Remove(path.Join(dir2, "rule3-001.yaml")))
case 4:
// Update rule4.yaml in the symlinked directory.
//
// dir
// ├─ rule-dir -> dir2/rule-dir
// ├─ rule1.yaml
// ├─ rule2.yaml
// └─ rule3.yaml -> rule3-source.yaml
// dir2
// ├─ rule-dir
// │ └─ rule4.yaml (*)
// └─ rule3-source.yaml
testutil.Ok(t, os.Rename(tempRule4File, path.Join(dir2, "rule-dir", "rule4.yaml")))
}
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
g := sync.WaitGroup{}
g.Add(1)
Expand All @@ -267,90 +345,21 @@ func TestReloader_DirectoriesApply(t *testing.T) {

reloadsMtx.Lock()
rel := reloads
reloadsMtx.Unlock()
if init && rel <= reloadsSeen {
reloadsMtx.Unlock()
continue
}
reloadsMtx.Unlock()
init = true
reloadsSeen = rel

t.Log("Performing step number", rel)
switch rel {
case 0:
// Create rule2.yaml.
//
// dir
// ├─ rule-dir -> dir2/rule-dir
// ├─ rule1.yaml
// └─ rule2.yaml (*)
// dir2
// ├─ rule-dir
// │ └─ rule4.yaml
// ├─ rule3-001.yaml -> rule3-source.yaml
// └─ rule3-source.yaml
testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "rule2.yaml"), []byte("rule2"), os.ModePerm))
case 1:
// Update rule1.yaml.
//
// dir
// ├─ rule-dir -> dir2/rule-dir
// ├─ rule1.yaml (*)
// └─ rule2.yaml
// dir2
// ├─ rule-dir
// │ └─ rule4.yaml
// ├─ rule3-001.yaml -> rule3-source.yaml
// └─ rule3-source.yaml
testutil.Ok(t, os.Rename(tempRule1File, path.Join(dir, "rule1.yaml")))
case 2:
// Create dir/rule3.yaml (symlink to rule3-001.yaml).
//
// dir
// ├─ rule-dir -> dir2/rule-dir
// ├─ rule1.yaml
// ├─ rule2.yaml
// └─ rule3.yaml -> dir2/rule3-001.yaml (*)
// dir2
// ├─ rule-dir
// │ └─ rule4.yaml
// ├─ rule3-001.yaml -> rule3-source.yaml
// └─ rule3-source.yaml
testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-001.yaml"), path.Join(dir2, "rule3.yaml")))
testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml")))
case 3:
// Update the symlinked file and replace the symlink file to trigger fsnotify.
//
// dir
// ├─ rule-dir -> dir2/rule-dir
// ├─ rule1.yaml
// ├─ rule2.yaml
// └─ rule3.yaml -> dir2/rule3-002.yaml (*)
// dir2
// ├─ rule-dir
// │ └─ rule4.yaml
// ├─ rule3-002.yaml -> rule3-source.yaml (*)
// └─ rule3-source.yaml (*)
testutil.Ok(t, os.Rename(tempRule3File, path.Join(dir2, "rule3-source.yaml")))
testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-source.yaml"), path.Join(dir2, "rule3-002.yaml")))
testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-002.yaml"), path.Join(dir2, "rule3.yaml")))
testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml")))
testutil.Ok(t, os.Remove(path.Join(dir2, "rule3-001.yaml")))
case 4:
// Update rule4.yaml in the symlinked directory.
//
// dir
// ├─ rule-dir -> dir2/rule-dir
// ├─ rule1.yaml
// ├─ rule2.yaml
// └─ rule3.yaml -> rule3-source.yaml
// dir2
// ├─ rule-dir
// │ └─ rule4.yaml (*)
// └─ rule3-source.yaml
testutil.Ok(t, os.Rename(tempRule4File, path.Join(dir2, "rule-dir", "rule4.yaml")))
// Catch up if reloader is step(s) ahead.
for skipped := rel - reloadsSeen - 1; skipped > 0; skipped-- {
stepFunc(rel - skipped)
}

stepFunc(rel)

init = true
reloadsSeen = rel

if rel > 4 {
// All good.
return
Expand Down
17 changes: 17 additions & 0 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ const (
// Labels for metrics.
labelEncode = "encode"
labelDecode = "decode"

minBlockSyncConcurrency = 1
)

var (
errBlockSyncConcurrencyNotValid = errors.New("the block sync concurrency must be equal or greater than 1.")
)

type bucketStoreMetrics struct {
Expand Down Expand Up @@ -298,6 +304,13 @@ type BucketStore struct {
enableSeriesResponseHints bool
}

func (b *BucketStore) validate() error {
if b.blockSyncConcurrency < minBlockSyncConcurrency {
return errBlockSyncConcurrencyNotValid
}
return nil
}

type noopCache struct{}

func (noopCache) StorePostings(context.Context, ulid.ULID, labels.Label, []byte) {}
Expand Down Expand Up @@ -407,6 +420,10 @@ func NewBucketStore(
s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, indexReaderPoolMetrics)
s.metrics = newBucketStoreMetrics(s.reg) // TODO(metalmatze): Might be possible via Option too

if err := s.validate(); err != nil {
return nil, errors.Wrap(err, "validate config")
}

if err := os.MkdirAll(dir, 0750); err != nil {
return nil, errors.Wrap(err, "create dir")
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,32 @@ func TestGapBasedPartitioner_Partition(t *testing.T) {
}
}

func TestBucketStoreConfig_validate(t *testing.T) {
tests := map[string]struct {
config *BucketStore
expected error
}{
"should pass on valid config": {
config: &BucketStore{
blockSyncConcurrency: 1,
},
expected: nil,
},
"should fail on blockSyncConcurrency < 1": {
config: &BucketStore{
blockSyncConcurrency: 0,
},
expected: errBlockSyncConcurrencyNotValid,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
testutil.Equals(t, testData.expected, testData.config.validate())
})
}
}

func TestBucketStore_Info(t *testing.T) {
defer testutil.TolerantVerifyLeak(t)

Expand Down

0 comments on commit 6ab9b1e

Please sign in to comment.