Skip to content

Commit

Permalink
Merge branch 'main' into fix-sampler-defaults
Browse files Browse the repository at this point in the history
  • Loading branch information
matej-g authored Nov 14, 2022
2 parents 68fd25e + a48584e commit 6e89d4b
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 18 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,20 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
## Unreleased

### Fixed

- [#5844](https://github.com/thanos-io/thanos/pull/5844) Query Frontend: Fixes @ modifier time range when splitting queries by interval.
- [#5854](https://github.com/thanos-io/thanos/pull/5854) Query Frontend: Handles `lookback_delta` param in query frontend.
- [#5887](https://github.com/thanos-io/thanos/pull/5887) Tracing: Make sure rate limiting sampler is the default, as was the case in version pre-0.29.0.
- [#5230](https://github.com/thanos-io/thanos/pull/5230) Rule: Stateless ruler support restoring `for` state from query API servers. The query API servers should be able to access the remote write storage.
- [#5880](https://github.com/thanos-io/thanos/pull/5880) Query Frontend: Fixes some edge cases of query sharding analysis.
- [#5893](https://github.com/thanos-io/thanos/pull/5893) Cache: fixed redis client not respecting `SetMultiBatchSize` config value.

### Added

- [#5814](https://github.com/thanos-io/thanos/pull/5814) Store: Add metric `thanos_bucket_store_postings_size_bytes` that shows the distribution of how many postings (in bytes) were needed for each Series() call in Thanos Store. Useful for determining limits.
- [#5801](https://github.com/thanos-io/thanos/pull/5801) Store: add a new limiter `--store.grpc.downloaded-bytes-limit` that limits the number of bytes downloaded in each Series/LabelNames/LabelValues call. Use `thanos_bucket_store_postings_size_bytes` for determining the limits.
- [#5839](https://github.com/thanos-io/thanos/pull/5839) Receive: Add parameter `--tsdb.out-of-order.time-window` to set time window for experimental out-of-order samples ingestion. Disabled by default (set to 0s). Please note if you enable this option and you use compactor, make sure you set the `--enable-vertical-compaction` flag, otherwise you might risk compactor halt.
- [#5836](https://github.com/thanos-io/thanos/pull/5836) Receive: Add hidden flag `tsdb.memory-snapshot-on-shutdown` to enable experimental TSDB feature to snapshot on shutdown. This is intended to speed up receiver restart.
- [#5865](https://github.com/thanos-io/thanos/pull/5865) Compact: Retry on sync metas error.

### Changed

Expand Down
9 changes: 9 additions & 0 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,15 @@ func runCompact(
return runutil.Repeat(conf.progressCalculateInterval, ctx.Done(), func() error {

if err := sy.SyncMetas(ctx); err != nil {
// The RetryError signals that we hit an retriable error (transient error, no connection).
// You should alert on this being triggered too frequently.
if compact.IsRetryError(err) {
level.Error(logger).Log("msg", "retriable error", "err", err)
compactMetrics.retried.Inc()

return nil
}

return errors.Wrapf(err, "could not sync metas")
}

Expand Down
6 changes: 3 additions & 3 deletions internal/cortex/querier/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ func matrixMerge(resps []*PrometheusResponse) []SampleStream {
stream.Samples = stream.Samples[1:]
} else if existingEndTs > stream.Samples[0].TimestampMs {
// Overlap might be big, use heavier algorithm to remove overlap.
stream.Samples = sliceSamples(stream.Samples, existingEndTs)
stream.Samples = SliceSamples(stream.Samples, existingEndTs)
} // else there is no overlap, yay!
}
existing.Samples = append(existing.Samples, stream.Samples...)
Expand All @@ -676,11 +676,11 @@ func matrixMerge(resps []*PrometheusResponse) []SampleStream {
return result
}

// sliceSamples assumes given samples are sorted by timestamp in ascending order and
// SliceSamples assumes given samples are sorted by timestamp in ascending order and
// return a sub slice whose first element's is the smallest timestamp that is strictly
// bigger than the given minTs. Empty slice is returned if minTs is bigger than all the
// timestamps in samples.
func sliceSamples(samples []cortexpb.Sample, minTs int64) []cortexpb.Sample {
func SliceSamples(samples []cortexpb.Sample, minTs int64) []cortexpb.Sample {
if len(samples) <= 0 || minTs < samples[0].TimestampMs {
return samples
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cacheutil/redis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,9 @@ func (c *RedisClient) SetMulti(ctx context.Context, data map[string][]byte, ttl
keys = append(keys, k)
}
err := doWithBatch(ctx, len(data), c.config.SetMultiBatchSize, c.setMultiGate, func(startIndex, endIndex int) error {
currentKeys := keys[startIndex:endIndex]
_, err := c.Pipelined(ctx, func(p redis.Pipeliner) error {
for _, key := range keys {
for _, key := range currentKeys {
p.SetEX(ctx, key, data[key], ttl)
}
return nil
Expand Down
93 changes: 82 additions & 11 deletions pkg/queryfrontend/queryinstant_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func NewThanosQueryInstantCodec(partialResponse bool) *queryInstantCodec {
}
}

// MergeResponse merges multiple responses into a single response.
// For instant query only vector responses will be merged because other types of queries
// MergeResponse merges multiple responses into a single response. For instant query
// only vector and matrix responses will be merged because other types of queries
// are not shardable like number literal, string literal, scalar, etc.
func (c queryInstantCodec) MergeResponse(responses ...queryrange.Response) (queryrange.Response, error) {
if len(responses) == 0 {
Expand All @@ -52,18 +52,36 @@ func (c queryInstantCodec) MergeResponse(responses ...queryrange.Response) (quer
for _, resp := range responses {
promResponses = append(promResponses, resp.(*queryrange.PrometheusInstantQueryResponse))
}
res := &queryrange.PrometheusInstantQueryResponse{
Status: queryrange.StatusSuccess,
Data: queryrange.PrometheusInstantQueryData{
ResultType: model.ValVector.String(),
Result: queryrange.PrometheusInstantQueryResult{
Result: &queryrange.PrometheusInstantQueryResult_Vector{
Vector: vectorMerge(promResponses),
var res queryrange.Response
switch promResponses[0].Data.ResultType {
case model.ValMatrix.String():
res = &queryrange.PrometheusInstantQueryResponse{
Status: queryrange.StatusSuccess,
Data: queryrange.PrometheusInstantQueryData{
ResultType: model.ValMatrix.String(),
Result: queryrange.PrometheusInstantQueryResult{
Result: &queryrange.PrometheusInstantQueryResult_Matrix{
Matrix: matrixMerge(promResponses),
},
},
Stats: queryrange.StatsMerge(responses),
},
Stats: queryrange.StatsMerge(responses),
},
}
default:
res = &queryrange.PrometheusInstantQueryResponse{
Status: queryrange.StatusSuccess,
Data: queryrange.PrometheusInstantQueryData{
ResultType: model.ValVector.String(),
Result: queryrange.PrometheusInstantQueryResult{
Result: &queryrange.PrometheusInstantQueryResult_Vector{
Vector: vectorMerge(promResponses),
},
},
Stats: queryrange.StatsMerge(responses),
},
}
}

return res, nil
}

Expand Down Expand Up @@ -282,3 +300,56 @@ func vectorMerge(resps []*queryrange.PrometheusInstantQueryResponse) *queryrange
}
return result
}

func matrixMerge(resps []*queryrange.PrometheusInstantQueryResponse) *queryrange.Matrix {
output := map[string]*queryrange.SampleStream{}
for _, resp := range resps {
if resp == nil {
continue
}
// Merge matrix result samples only. Skip other types such as
// string, scalar as those are not sharable.
if resp.Data.Result.GetMatrix() == nil {
continue
}
for _, stream := range resp.Data.Result.GetMatrix().SampleStreams {
metric := cortexpb.FromLabelAdaptersToLabels(stream.Labels).String()
existing, ok := output[metric]
if !ok {
existing = &queryrange.SampleStream{
Labels: stream.Labels,
}
}
// We need to make sure we don't repeat samples. This causes some visualizations to be broken in Grafana.
// The prometheus API is inclusive of start and end timestamps.
if len(existing.Samples) > 0 && len(stream.Samples) > 0 {
existingEndTs := existing.Samples[len(existing.Samples)-1].TimestampMs
if existingEndTs == stream.Samples[0].TimestampMs {
// Typically this the cases where only 1 sample point overlap,
// so optimize with simple code.
stream.Samples = stream.Samples[1:]
} else if existingEndTs > stream.Samples[0].TimestampMs {
// Overlap might be big, use heavier algorithm to remove overlap.
stream.Samples = queryrange.SliceSamples(stream.Samples, existingEndTs)
} // else there is no overlap, yay!
}
existing.Samples = append(existing.Samples, stream.Samples...)
output[metric] = existing
}
}

keys := make([]string, 0, len(output))
for key := range output {
keys = append(keys, key)
}
sort.Strings(keys)

result := &queryrange.Matrix{
SampleStreams: make([]*queryrange.SampleStream, 0, len(output)),
}
for _, key := range keys {
result.SampleStreams = append(result.SampleStreams, output[key])
}

return result
}
178 changes: 178 additions & 0 deletions pkg/queryfrontend/queryinstant_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,184 @@ func TestMergeResponse(t *testing.T) {
},
},
},
{
name: "merge two matrix responses with non-duplicate samples",
resps: []queryrange.Response{
&queryrange.PrometheusInstantQueryResponse{
Status: queryrange.StatusSuccess,
Data: queryrange.PrometheusInstantQueryData{
ResultType: model.ValMatrix.String(),
Result: queryrange.PrometheusInstantQueryResult{
Result: &queryrange.PrometheusInstantQueryResult_Matrix{
Matrix: &queryrange.Matrix{
SampleStreams: []*queryrange.SampleStream{
{
Samples: []cortexpb.Sample{{TimestampMs: 1, Value: 2}},
Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{
"__name__": "up",
"job": "bar",
})),
},
{
Samples: []cortexpb.Sample{{TimestampMs: 1, Value: 2}},
Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{
"__name__": "up",
"job": "foo",
})),
},
},
},
},
},
},
},
&queryrange.PrometheusInstantQueryResponse{
Status: queryrange.StatusSuccess,
Data: queryrange.PrometheusInstantQueryData{
ResultType: model.ValMatrix.String(),
Result: queryrange.PrometheusInstantQueryResult{
Result: &queryrange.PrometheusInstantQueryResult_Matrix{
Matrix: &queryrange.Matrix{
SampleStreams: []*queryrange.SampleStream{
{
Samples: []cortexpb.Sample{{TimestampMs: 2, Value: 3}},
Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{
"__name__": "up",
"job": "bar",
})),
},
{
Samples: []cortexpb.Sample{{TimestampMs: 2, Value: 3}},
Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{
"__name__": "up",
"job": "foo",
})),
},
},
},
},
},
},
},
},
expectedResp: &queryrange.PrometheusInstantQueryResponse{
Status: queryrange.StatusSuccess,
Data: queryrange.PrometheusInstantQueryData{
ResultType: model.ValMatrix.String(),
Result: queryrange.PrometheusInstantQueryResult{
Result: &queryrange.PrometheusInstantQueryResult_Matrix{
Matrix: &queryrange.Matrix{
SampleStreams: []*queryrange.SampleStream{
{
Samples: []cortexpb.Sample{{TimestampMs: 1, Value: 2}, {TimestampMs: 2, Value: 3}},
Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{
"__name__": "up",
"job": "bar",
})),
},
{
Samples: []cortexpb.Sample{{TimestampMs: 1, Value: 2}, {TimestampMs: 2, Value: 3}},
Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{
"__name__": "up",
"job": "foo",
})),
},
},
},
},
},
},
},
},
{
name: "merge two matrix responses with duplicate samples",
resps: []queryrange.Response{
&queryrange.PrometheusInstantQueryResponse{
Status: queryrange.StatusSuccess,
Data: queryrange.PrometheusInstantQueryData{
ResultType: model.ValMatrix.String(),
Result: queryrange.PrometheusInstantQueryResult{
Result: &queryrange.PrometheusInstantQueryResult_Matrix{
Matrix: &queryrange.Matrix{
SampleStreams: []*queryrange.SampleStream{
{
Samples: []cortexpb.Sample{{TimestampMs: 1, Value: 2}},
Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{
"__name__": "up",
"job": "bar",
})),
},
{
Samples: []cortexpb.Sample{{TimestampMs: 1, Value: 2}},
Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{
"__name__": "up",
"job": "foo",
})),
},
},
},
},
},
},
},
&queryrange.PrometheusInstantQueryResponse{
Status: queryrange.StatusSuccess,
Data: queryrange.PrometheusInstantQueryData{
ResultType: model.ValMatrix.String(),
Result: queryrange.PrometheusInstantQueryResult{
Result: &queryrange.PrometheusInstantQueryResult_Matrix{
Matrix: &queryrange.Matrix{
SampleStreams: []*queryrange.SampleStream{
{
Samples: []cortexpb.Sample{{TimestampMs: 1, Value: 2}},
Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{
"__name__": "up",
"job": "bar",
})),
},
{
Samples: []cortexpb.Sample{{TimestampMs: 1, Value: 2}},
Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{
"__name__": "up",
"job": "foo",
})),
},
},
},
},
},
},
},
},
expectedResp: &queryrange.PrometheusInstantQueryResponse{
Status: queryrange.StatusSuccess,
Data: queryrange.PrometheusInstantQueryData{
ResultType: model.ValMatrix.String(),
Result: queryrange.PrometheusInstantQueryResult{
Result: &queryrange.PrometheusInstantQueryResult_Matrix{
Matrix: &queryrange.Matrix{
SampleStreams: []*queryrange.SampleStream{
{
Samples: []cortexpb.Sample{{TimestampMs: 1, Value: 2}},
Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{
"__name__": "up",
"job": "bar",
})),
},
{
Samples: []cortexpb.Sample{{TimestampMs: 1, Value: 2}},
Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{
"__name__": "up",
"job": "foo",
})),
},
},
},
},
},
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
resp, err := codec.MergeResponse(tc.resps...)
Expand Down
4 changes: 2 additions & 2 deletions pkg/querysharding/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (a *CachedQueryAnalyzer) Analyze(query string) (QueryAnalysis, error) {
// Analyze analyzes a query and returns a QueryAnalysis.

// Analyze uses the following algorithm:
// - if a query has subqueries, such as label_join or label_replace,
// or has functions which cannot be sharded, then treat the query as non shardable.
// - if a query has functions which cannot be sharded such as
// label_join or label_replace, then treat the query as non shardable.
// - Walk the query and find the least common labelset
// used in grouping expressions. If non-empty, treat the query
// as shardable by those labels.
Expand Down

0 comments on commit 6e89d4b

Please sign in to comment.