diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b258d6673..ca5e72064d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,20 +10,20 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ## Unreleased -### Fixed - -- [#4663](https://github.com/thanos-io/thanos/pull/4663) Fetcher: Fix discovered data races - ### Added - [#4680](https://github.com/thanos-io/thanos/pull/4680) Query: add `exemplar.partial-response` flag to control partial response. - [#4679](https://github.com/thanos-io/thanos/pull/4679) Added `enable-feature` flag to enable negative offsets and @ modifier, similar to Prometheus. - [#4696](https://github.com/thanos-io/thanos/pull/4696) Query: add cache name to tracing spans. - [#4736](https://github.com/thanos-io/thanos/pull/4736) S3: Add capability to use custom AWS STS Endpoint +- [#4764](https://github.com/thanos-io/thanos/pull/4764) Compactor: add `block-viewer.global.sync-block-timeout` flag to set the timeout of synchronization block metas. ### Fixed - [#4508](https://github.com/thanos-io/thanos/pull/4508) Adjust and rename `ThanosSidecarUnhealthy` to `ThanosSidecarNoConnectionToStartedPrometheus`; Remove `ThanosSidecarPrometheusDown` alert; Remove unused `thanos_sidecar_last_heartbeat_success_time_seconds` metrics. +- [#4663](https://github.com/thanos-io/thanos/pull/4663) Fetcher: Fix discovered data races. +- [#4754](https://github.com/thanos-io/thanos/pull/4754) Query: Fix possible panic on stores endpoint. +- [#4753](https://github.com/thanos-io/thanos/pull/4753) Store: validate block sync concurrency parameter ## [v0.23.1](https://github.com/thanos-io/thanos/tree/release-0.23) - 2021.10.1 diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 2ae488de6d..b4ff8f5690 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -537,14 +537,14 @@ func runCompact( } g.Add(func() error { - iterCtx, iterCancel := context.WithTimeout(ctx, conf.waitInterval) + iterCtx, iterCancel := context.WithTimeout(ctx, conf.blockViewerSyncBlockTimeout) _, _, _ = f.Fetch(iterCtx) iterCancel() // For /global state make sure to fetch periodically. return runutil.Repeat(conf.blockViewerSyncBlockInterval, ctx.Done(), func() error { return runutil.RetryWithLog(logger, time.Minute, ctx.Done(), func() error { - iterCtx, iterCancel := context.WithTimeout(ctx, conf.waitInterval) + iterCtx, iterCancel := context.WithTimeout(ctx, conf.blockViewerSyncBlockTimeout) defer iterCancel() _, _, err := f.Fetch(iterCtx) @@ -576,6 +576,7 @@ type compactConfig struct { blockSyncConcurrency int blockMetaFetchConcurrency int blockViewerSyncBlockInterval time.Duration + blockViewerSyncBlockTimeout time.Duration cleanupBlocksInterval time.Duration compactionConcurrency int downsampleConcurrency int @@ -634,6 +635,8 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { Default("32").IntVar(&cc.blockMetaFetchConcurrency) cmd.Flag("block-viewer.global.sync-block-interval", "Repeat interval for syncing the blocks between local and remote view for /global Block Viewer UI."). Default("1m").DurationVar(&cc.blockViewerSyncBlockInterval) + cmd.Flag("block-viewer.global.sync-block-timeout", "Maximum time for syncing the blocks between local and remote view for /global Block Viewer UI."). + Default("5m").DurationVar(&cc.blockViewerSyncBlockTimeout) cmd.Flag("compact.cleanup-interval", "How often we should clean up partially uploaded blocks and blocks with deletion mark in the background when --wait has been enabled. Setting it to \"0s\" disables it - the cleaning will only happen at the end of an iteration."). Default("5m").DurationVar(&cc.cleanupBlocksInterval) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 8b280229b2..373cd57913 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -569,7 +569,7 @@ func runQuery( api := v1.NewQueryAPI( logger, - endpoints, + endpoints.GetEndpointStatus, engineFactory(promql.NewEngine, engineOpts, dynamicLookbackDelta), queryableCreator, // NOTE: Will share the same replica label as the query for now. diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 19b6dbf837..25310f9ef3 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -113,7 +113,7 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view."). Default("3m").DurationVar(&sc.syncInterval) - cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage."). + cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage. Must be equal or greater than 1."). Default("20").IntVar(&sc.blockSyncConcurrency) cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage."). diff --git a/docs/components/compact.md b/docs/components/compact.md index ce1a796b36..8b662a88e7 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -284,6 +284,10 @@ Flags: Repeat interval for syncing the blocks between local and remote view for /global Block Viewer UI. + --block-viewer.global.sync-block-timeout=5m + Maximum time for syncing the blocks between + local and remote view for /global Block Viewer + UI. --bucket-web-label=BUCKET-WEB-LABEL Prometheus label to use as timeline title in the bucket web UI diff --git a/docs/components/store.md b/docs/components/store.md index 3ae037beb9..cdc8605052 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -34,6 +34,7 @@ Flags: --block-sync-concurrency=20 Number of goroutines to use when constructing index-cache.json blocks from object storage. + Must be equal or greater than 1. --chunk-pool-size=2GB Maximum size of concurrently allocatable bytes reserved strictly to reuse for chunks in memory. diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index 4f3866b62b..fd9ee99b68 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -93,8 +93,8 @@ type QueryAPI struct { enableExemplarPartialResponse bool disableCORS bool - replicaLabels []string - endpointSet *query.EndpointSet + replicaLabels []string + endpointStatus func() []query.EndpointStatus defaultRangeQueryStep time.Duration defaultInstantQueryMaxSourceResolution time.Duration @@ -106,7 +106,7 @@ type QueryAPI struct { // NewQueryAPI returns an initialized QueryAPI type. func NewQueryAPI( logger log.Logger, - endpointSet *query.EndpointSet, + endpointStatus func() []query.EndpointStatus, qe func(int64) *promql.Engine, c query.QueryableCreator, ruleGroups rules.UnaryClient, @@ -146,7 +146,7 @@ func NewQueryAPI( enableMetricMetadataPartialResponse: enableMetricMetadataPartialResponse, enableExemplarPartialResponse: enableExemplarPartialResponse, replicaLabels: replicaLabels, - endpointSet: endpointSet, + endpointStatus: endpointStatus, defaultRangeQueryStep: defaultRangeQueryStep, defaultInstantQueryMaxSourceResolution: defaultInstantQueryMaxSourceResolution, defaultMetadataTimeRange: defaultMetadataTimeRange, @@ -715,7 +715,11 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap func (qapi *QueryAPI) stores(_ *http.Request) (interface{}, []error, *api.ApiError) { statuses := make(map[string][]query.EndpointStatus) - for _, status := range qapi.endpointSet.GetEndpointStatus() { + for _, status := range qapi.endpointStatus() { + // Don't consider an endpoint if we cannot retrieve component type. + if status.ComponentType == nil { + continue + } statuses[status.ComponentType.String()] = append(statuses[status.ComponentType.String()], status) } return statuses, nil, nil diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index a9f0648f42..218498fe81 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -1201,6 +1201,93 @@ func TestMetadataEndpoints(t *testing.T) { } } +func TestStoresEndpoint(t *testing.T) { + apiWithNotEndpoints := &QueryAPI{ + endpointStatus: func() []query.EndpointStatus { + return []query.EndpointStatus{} + }, + } + apiWithValidEndpoints := &QueryAPI{ + endpointStatus: func() []query.EndpointStatus { + return []query.EndpointStatus{ + { + Name: "endpoint-1", + ComponentType: component.Store, + }, + { + Name: "endpoint-2", + ComponentType: component.Store, + }, + { + Name: "endpoint-3", + ComponentType: component.Sidecar, + }, + } + }, + } + apiWithInvalidEndpoint := &QueryAPI{ + endpointStatus: func() []query.EndpointStatus { + return []query.EndpointStatus{ + { + Name: "endpoint-1", + ComponentType: component.Store, + }, + { + Name: "endpoint-2", + }, + } + }, + } + + testCases := []endpointTestCase{ + { + endpoint: apiWithNotEndpoints.stores, + method: http.MethodGet, + response: map[string][]query.EndpointStatus{}, + }, + { + endpoint: apiWithValidEndpoints.stores, + method: http.MethodGet, + response: map[string][]query.EndpointStatus{ + "store": { + { + Name: "endpoint-1", + ComponentType: component.Store, + }, + { + Name: "endpoint-2", + ComponentType: component.Store, + }, + }, + "sidecar": { + { + Name: "endpoint-3", + ComponentType: component.Sidecar, + }, + }, + }, + }, + { + endpoint: apiWithInvalidEndpoint.stores, + method: http.MethodGet, + response: map[string][]query.EndpointStatus{ + "store": { + { + Name: "endpoint-1", + ComponentType: component.Store, + }, + }, + }, + }, + } + + for i, test := range testCases { + if ok := testEndpoint(t, test, strings.TrimSpace(fmt.Sprintf("#%d %s", i, test.query.Encode())), reflect.DeepEqual); !ok { + return + } + } +} + func TestParseTime(t *testing.T) { ts, err := time.Parse(time.RFC3339Nano, "2015-06-03T13:21:58.555Z") if err != nil { diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index f9f45202d9..77091ecbff 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -660,9 +660,9 @@ func (f *DeduplicateFilter) DuplicateIDs() []ulid.ULID { func addNodeBySources(root, add *Node) bool { var rootNode *Node + childSources := add.Compaction.Sources for _, node := range root.Children { parentSources := node.Compaction.Sources - childSources := add.Compaction.Sources // Block exists with same sources, add as child. if contains(parentSources, childSources) && contains(childSources, parentSources) { diff --git a/pkg/reloader/reloader_test.go b/pkg/reloader/reloader_test.go index 6659d20cc7..25a0af5ae9 100644 --- a/pkg/reloader/reloader_test.go +++ b/pkg/reloader/reloader_test.go @@ -247,6 +247,84 @@ func TestReloader_DirectoriesApply(t *testing.T) { testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-source.yaml"), path.Join(dir2, "rule3-001.yaml"))) testutil.Ok(t, ioutil.WriteFile(path.Join(dir2, "rule-dir", "rule4.yaml"), []byte("rule4"), os.ModePerm)) + stepFunc := func(rel int) { + t.Log("Performing step number", rel) + switch rel { + case 0: + // Create rule2.yaml. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // └─ rule2.yaml (*) + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml + testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "rule2.yaml"), []byte("rule2"), os.ModePerm)) + case 1: + // Update rule1.yaml. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml (*) + // └─ rule2.yaml + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml + testutil.Ok(t, os.Rename(tempRule1File, path.Join(dir, "rule1.yaml"))) + case 2: + // Create dir/rule3.yaml (symlink to rule3-001.yaml). + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml -> dir2/rule3-001.yaml (*) + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-001.yaml"), path.Join(dir2, "rule3.yaml"))) + testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml"))) + case 3: + // Update the symlinked file and replace the symlink file to trigger fsnotify. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml -> dir2/rule3-002.yaml (*) + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-002.yaml -> rule3-source.yaml (*) + // └─ rule3-source.yaml (*) + testutil.Ok(t, os.Rename(tempRule3File, path.Join(dir2, "rule3-source.yaml"))) + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-source.yaml"), path.Join(dir2, "rule3-002.yaml"))) + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-002.yaml"), path.Join(dir2, "rule3.yaml"))) + testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml"))) + testutil.Ok(t, os.Remove(path.Join(dir2, "rule3-001.yaml"))) + case 4: + // Update rule4.yaml in the symlinked directory. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml -> rule3-source.yaml + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml (*) + // └─ rule3-source.yaml + testutil.Ok(t, os.Rename(tempRule4File, path.Join(dir2, "rule-dir", "rule4.yaml"))) + } + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) g := sync.WaitGroup{} g.Add(1) @@ -267,90 +345,21 @@ func TestReloader_DirectoriesApply(t *testing.T) { reloadsMtx.Lock() rel := reloads + reloadsMtx.Unlock() if init && rel <= reloadsSeen { - reloadsMtx.Unlock() continue } - reloadsMtx.Unlock() - init = true - reloadsSeen = rel - t.Log("Performing step number", rel) - switch rel { - case 0: - // Create rule2.yaml. - // - // dir - // ├─ rule-dir -> dir2/rule-dir - // ├─ rule1.yaml - // └─ rule2.yaml (*) - // dir2 - // ├─ rule-dir - // │ └─ rule4.yaml - // ├─ rule3-001.yaml -> rule3-source.yaml - // └─ rule3-source.yaml - testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "rule2.yaml"), []byte("rule2"), os.ModePerm)) - case 1: - // Update rule1.yaml. - // - // dir - // ├─ rule-dir -> dir2/rule-dir - // ├─ rule1.yaml (*) - // └─ rule2.yaml - // dir2 - // ├─ rule-dir - // │ └─ rule4.yaml - // ├─ rule3-001.yaml -> rule3-source.yaml - // └─ rule3-source.yaml - testutil.Ok(t, os.Rename(tempRule1File, path.Join(dir, "rule1.yaml"))) - case 2: - // Create dir/rule3.yaml (symlink to rule3-001.yaml). - // - // dir - // ├─ rule-dir -> dir2/rule-dir - // ├─ rule1.yaml - // ├─ rule2.yaml - // └─ rule3.yaml -> dir2/rule3-001.yaml (*) - // dir2 - // ├─ rule-dir - // │ └─ rule4.yaml - // ├─ rule3-001.yaml -> rule3-source.yaml - // └─ rule3-source.yaml - testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-001.yaml"), path.Join(dir2, "rule3.yaml"))) - testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml"))) - case 3: - // Update the symlinked file and replace the symlink file to trigger fsnotify. - // - // dir - // ├─ rule-dir -> dir2/rule-dir - // ├─ rule1.yaml - // ├─ rule2.yaml - // └─ rule3.yaml -> dir2/rule3-002.yaml (*) - // dir2 - // ├─ rule-dir - // │ └─ rule4.yaml - // ├─ rule3-002.yaml -> rule3-source.yaml (*) - // └─ rule3-source.yaml (*) - testutil.Ok(t, os.Rename(tempRule3File, path.Join(dir2, "rule3-source.yaml"))) - testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-source.yaml"), path.Join(dir2, "rule3-002.yaml"))) - testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-002.yaml"), path.Join(dir2, "rule3.yaml"))) - testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml"))) - testutil.Ok(t, os.Remove(path.Join(dir2, "rule3-001.yaml"))) - case 4: - // Update rule4.yaml in the symlinked directory. - // - // dir - // ├─ rule-dir -> dir2/rule-dir - // ├─ rule1.yaml - // ├─ rule2.yaml - // └─ rule3.yaml -> rule3-source.yaml - // dir2 - // ├─ rule-dir - // │ └─ rule4.yaml (*) - // └─ rule3-source.yaml - testutil.Ok(t, os.Rename(tempRule4File, path.Join(dir2, "rule-dir", "rule4.yaml"))) + // Catch up if reloader is step(s) ahead. + for skipped := rel - reloadsSeen - 1; skipped > 0; skipped-- { + stepFunc(rel - skipped) } + stepFunc(rel) + + init = true + reloadsSeen = rel + if rel > 4 { // All good. return diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index d181f04731..be0e4bec97 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -92,6 +92,12 @@ const ( // Labels for metrics. labelEncode = "encode" labelDecode = "decode" + + minBlockSyncConcurrency = 1 +) + +var ( + errBlockSyncConcurrencyNotValid = errors.New("the block sync concurrency must be equal or greater than 1.") ) type bucketStoreMetrics struct { @@ -298,6 +304,13 @@ type BucketStore struct { enableSeriesResponseHints bool } +func (b *BucketStore) validate() error { + if b.blockSyncConcurrency < minBlockSyncConcurrency { + return errBlockSyncConcurrencyNotValid + } + return nil +} + type noopCache struct{} func (noopCache) StorePostings(context.Context, ulid.ULID, labels.Label, []byte) {} @@ -407,6 +420,10 @@ func NewBucketStore( s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, indexReaderPoolMetrics) s.metrics = newBucketStoreMetrics(s.reg) // TODO(metalmatze): Might be possible via Option too + if err := s.validate(); err != nil { + return nil, errors.Wrap(err, "validate config") + } + if err := os.MkdirAll(dir, 0750); err != nil { return nil, errors.Wrap(err, "create dir") } @@ -2491,24 +2508,32 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a r.stats.chunksFetchedSizeSum += int(part.End - part.Start) var ( - buf = make([]byte, EstimatedMaxChunkSize) + buf []byte readOffset = int(pIdxs[0].offset) // Save a few allocations. - written int64 + written int diff uint32 chunkLen int n int ) + bufPooled, err := r.block.chunkPool.Get(EstimatedMaxChunkSize) + if err == nil { + buf = *bufPooled + } else { + buf = make([]byte, EstimatedMaxChunkSize) + } + defer r.block.chunkPool.Put(&buf) + for i, pIdx := range pIdxs { // Fast forward range reader to the next chunk start in case of sparse (for our purposes) byte range. for readOffset < int(pIdx.offset) { - written, err = io.CopyN(ioutil.Discard, bufReader, int64(pIdx.offset)-int64(readOffset)) + written, err = bufReader.Discard(int(pIdx.offset) - int(readOffset)) if err != nil { return errors.Wrap(err, "fast forward range reader") } - readOffset += int(written) + readOffset += written } // Presume chunk length to be reasonably large for common use cases. // However, declaration for EstimatedMaxChunkSize warns us some chunks could be larger in some rare cases. diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index e635ab22c1..adc5701740 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -556,6 +556,32 @@ func TestGapBasedPartitioner_Partition(t *testing.T) { } } +func TestBucketStoreConfig_validate(t *testing.T) { + tests := map[string]struct { + config *BucketStore + expected error + }{ + "should pass on valid config": { + config: &BucketStore{ + blockSyncConcurrency: 1, + }, + expected: nil, + }, + "should fail on blockSyncConcurrency < 1": { + config: &BucketStore{ + blockSyncConcurrency: 0, + }, + expected: errBlockSyncConcurrencyNotValid, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + testutil.Equals(t, testData.expected, testData.config.validate()) + }) + } +} + func TestBucketStore_Info(t *testing.T) { defer testutil.TolerantVerifyLeak(t) diff --git a/website/data/adopters.yml b/website/data/adopters.yml index 4274e443df..86762b0843 100644 --- a/website/data/adopters.yml +++ b/website/data/adopters.yml @@ -158,4 +158,7 @@ adopters: logo: itau-unibanco.png - name: LabyrinthLabs url: https://lablabs.io - logo: lablabs.png \ No newline at end of file + logo: lablabs.png +- name: Darwinbox Digital Solutions + url: https://darwinbox.com + logo: darwinbox.png \ No newline at end of file diff --git a/website/static/logos/darwinbox.png b/website/static/logos/darwinbox.png new file mode 100644 index 0000000000..02be75ac9e Binary files /dev/null and b/website/static/logos/darwinbox.png differ