Skip to content

Commit

Permalink
Support series Matchers in LabelValues call.
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
  • Loading branch information
pstibrany committed Apr 26, 2021
1 parent 8f6c8e3 commit a8d7380
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 118 deletions.
79 changes: 63 additions & 16 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ type BucketStore struct {

// chunksLimiterFactory creates a new limiter used to limit the number of chunks fetched by each Series() call.
chunksLimiterFactory ChunksLimiterFactory
// seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call.
// seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call,
// or LabelName and LabelValues calls when used with matchers.
seriesLimiterFactory SeriesLimiterFactory
partitioner Partitioner

Expand Down Expand Up @@ -1280,16 +1281,16 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq

// LabelValues implements the storepb.StoreServer interface.
func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
reqSeriesMatchers, err := storepb.MatchersToPromMatchers(req.Matchers...)
if err != nil {
return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request labels matchers").Error())
}

resHints := &hintspb.LabelValuesResponseHints{}

g, gctx := errgroup.WithContext(ctx)

s.mtx.RLock()

var mtx sync.Mutex
var sets [][]string
var reqBlockMatchers []*labels.Matcher

if req.Hints != nil {
reqHints := &hintspb.LabelValuesRequestHints{}
err := types.UnmarshalAny(req.Hints, reqHints)
Expand All @@ -1303,7 +1304,25 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
}
}

// If we have series matchers, add <labelName> != "" matcher, to only select series that have given label name.
if len(reqSeriesMatchers) > 0 {
m, err := labels.NewMatcher(labels.MatchNotEqual, req.Label, "")
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

reqSeriesMatchers = append(reqSeriesMatchers, m)
}

s.mtx.RLock()

var mtx sync.Mutex
var sets [][]string
var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series"))

for _, b := range s.blocks {
b := b

if !b.overlapsClosedInterval(req.Start, req.End) {
continue
}
Expand All @@ -1314,24 +1333,52 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
resHints.AddQueriedBlock(b.meta.ULID)

indexr := b.indexReader(gctx)
extLabels := b.meta.Thanos.Labels

g.Go(func() error {
defer runutil.CloseWithLogOnErr(s.logger, indexr, "label values")

// Do it via index reader to have pending reader registered correctly.
res, err := indexr.block.indexHeaderReader.LabelValues(req.Label)
if err != nil {
return errors.Wrap(err, "index header label values")
}
var result []string
if len(reqSeriesMatchers) == 0 {
// Do it via index reader to have pending reader registered correctly.
res, err := indexr.block.indexHeaderReader.LabelValues(req.Label)
if err != nil {
return errors.Wrapf(err, "index header label values for block %s", b.meta.ULID)
}

// Add the external label value as well.
if extLabelValue := b.extLset.Get(req.Label); extLabelValue != "" {
res = strutil.MergeSlices(res, []string{extLabelValue})
}
result = res
} else {
seriesSet, _, err := blockSeries(b.extLset, indexr, nil, reqSeriesMatchers, nil, seriesLimiter, true, req.Start, req.End, nil)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
}

// Add the external label value as well.
if extLabelValue, ok := extLabels[req.Label]; ok {
res = strutil.MergeSlices(res, []string{extLabelValue})
// Extract given label's value from all series and deduplicate them.
// We don't need to deal with external labels, since they are already added by blockSeries.
values := map[string]struct{}{}
for seriesSet.Next() {
ls, _ := seriesSet.At()
val := ls.Get(req.Label)
if val != "" { // Should never be empty since we added labelName!="" matcher to the list of matchers.
values[val] = struct{}{}
}
}
if seriesSet.Err() != nil {
return errors.Wrapf(seriesSet.Err(), "iterate series for block %s", b.meta.ULID)
}

result = make([]string, 0, len(values))
for n := range values {
result = append(result, n)
}
sort.Strings(result)
}

mtx.Lock()
sets = append(sets, res)
sets = append(sets, result)
mtx.Unlock()

return nil
Expand Down
Loading

0 comments on commit a8d7380

Please sign in to comment.