Skip to content

Commit

Permalink
querier: Dedup series is now replica label agnostic and simpler. Fixe…
Browse files Browse the repository at this point in the history
…d panic seen when using larger number of replicas and small series.

Fixes #2645


Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Jun 5, 2020
1 parent 6f2c3b1 commit 9d048e4
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 255 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2637](https://github.com/thanos-io/thanos/pull/2637) Compact: detect retryable errors that are inside of a wrapped `tsdb.MultiError`
- [#2648](https://github.com/thanos-io/thanos/pull/2648) Store: allow index cache and caching bucket to be configured at the same time.
- [#2705](https://github.com/thanos-io/thanos/pull/2705) minio-go: Added support for `af-south-1` and `eu-south-1` regions.
- []() Query: Fixed panics when using larger number of replica labels with short series label sets.

### Changed

Expand Down
35 changes: 7 additions & 28 deletions pkg/query/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,18 +342,17 @@ func (it *chunkSeriesIterator) Err() error {
}

type dedupSeriesSet struct {
set storage.SeriesSet
replicaLabels map[string]struct{}
isCounter bool
set storage.SeriesSet
isCounter bool

replicas []storage.Series
lset labels.Labels
peek storage.Series
ok bool
}

func newDedupSeriesSet(set storage.SeriesSet, replicaLabels map[string]struct{}, isCounter bool) storage.SeriesSet {
s := &dedupSeriesSet{set: set, replicaLabels: replicaLabels, isCounter: isCounter}
func newDedupSeriesSet(set storage.SeriesSet, isCounter bool) storage.SeriesSet {
s := &dedupSeriesSet{set: set, isCounter: isCounter}
s.ok = s.set.Next()
if s.ok {
s.peek = s.set.At()
Expand All @@ -365,31 +364,11 @@ func (s *dedupSeriesSet) Next() bool {
if !s.ok {
return false
}
// Set the label set we are currently gathering to the peek element
// without the replica label if it exists.
s.lset = s.peekLset()
s.lset = s.peek.Labels()
s.replicas = append(s.replicas[:0], s.peek)
return s.next()
}

// peekLset returns the label set of the current peek element stripped from the
// replica label if it exists.
func (s *dedupSeriesSet) peekLset() labels.Labels {
lset := s.peek.Labels()
if len(s.replicaLabels) == 0 {
return lset
}
// Check how many replica labels are present so that these are removed.
var totalToRemove int
for index := 0; index < len(s.replicaLabels); index++ {
if _, ok := s.replicaLabels[lset[len(lset)-index-1].Name]; ok {
totalToRemove++
}
}
// Strip all present replica labels.
return lset[:len(lset)-totalToRemove]
}

func (s *dedupSeriesSet) next() bool {
// Peek the next series to see whether it's a replica for the current series.
s.ok = s.set.Next()
Expand All @@ -398,9 +377,9 @@ func (s *dedupSeriesSet) next() bool {
return len(s.replicas) > 0
}
s.peek = s.set.At()
nextLset := s.peekLset()
nextLset := s.peek.Labels()

// If the label set modulo the replica label is equal to the current label set
// If the label set is equal to the current label set
// look for more replicas, otherwise a series is complete.
if !labels.Equal(s.lset, nextLset) {
return true
Expand Down
41 changes: 27 additions & 14 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match

// TODO(fabxc): this could potentially pushed further down into the store API
// to make true streaming possible.
sortDedupLabels(resp.seriesSet, q.replicaLabels)
trimReplicaLabels(resp.seriesSet, q.replicaLabels)

set := &promSeriesSet{
mint: q.mint,
Expand All @@ -226,28 +226,41 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match
}

// The merged series set assembles all potentially-overlapping time ranges
// of the same series into a single one. The series are ordered so that equal series
// from different replicas are sequential. We can now deduplicate those.
return newDedupSeriesSet(set, q.replicaLabels, len(aggrs) == 1 && aggrs[0] == storepb.Aggr_COUNTER), warns, nil
// of the same series into a single one.
return newDedupSeriesSet(set, len(aggrs) == 1 && aggrs[0] == storepb.Aggr_COUNTER), warns, nil
}

// sortDedupLabels re-sorts the set so that the same series with different replica
// labels are coming right after each other.
func sortDedupLabels(set []storepb.Series, replicaLabels map[string]struct{}) {
for _, s := range set {
// trimReplicaLabels removed replica labels from all series and re-sorts the set so that the same series are coming right after each other.
func trimReplicaLabels(set []storepb.Series, replicaLabels map[string]struct{}) {
for si := range set {
lset := set[si].Labels
// Move the replica labels to the very end.
sort.Slice(s.Labels, func(i, j int) bool {
if _, ok := replicaLabels[s.Labels[i].Name]; ok {
sort.Slice(lset, func(i, j int) bool {
if _, ok := replicaLabels[lset[i].Name]; ok {
return false
}
if _, ok := replicaLabels[s.Labels[j].Name]; ok {
if _, ok := replicaLabels[lset[j].Name]; ok {
return true
}
return s.Labels[i].Name < s.Labels[j].Name
return lset[i].Name < lset[j].Name
})

// Check how many replica labels are present so that these are removed.
var totalToRemove int
for i := 0; i < len(replicaLabels); i++ {
if len(lset)-i == 0 {
break
}

if _, ok := replicaLabels[lset[len(lset)-i-1].Name]; ok {
totalToRemove++
}
}
// Strip all present replica labels.
set[si].Labels = lset[:len(lset)-totalToRemove]

}
// With the re-ordered label sets, re-sorting all series aligns the same series
// from different replicas sequentially.
// With the removed label sets, re-sorting all series aligns the same series sequentially.
sort.Slice(set, func(i, j int) bool {
return storepb.CompareLabels(set[i].Labels, set[j].Labels) < 0
})
Expand Down
Loading

0 comments on commit 9d048e4

Please sign in to comment.