Skip to content

Commit

Permalink
expose NewPromSeriesSet (thanos-io#7214)
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 authored and jnyi committed Apr 4, 2024
1 parent 36af2ba commit 5362de9
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 14 deletions.
11 changes: 11 additions & 0 deletions pkg/query/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ type promSeriesSet struct {
warns annotations.Annotations
}

// NewPromSeriesSet constructs a promSeriesSet.
func NewPromSeriesSet(seriesSet storepb.SeriesSet, mint, maxt int64, aggrs []storepb.Aggr, warns annotations.Annotations) storage.SeriesSet {
return &promSeriesSet{
set: seriesSet,
mint: mint,
maxt: maxt,
aggrs: aggrs,
warns: warns,
}
}

func (s *promSeriesSet) Next() bool {
return s.set.Next()
}
Expand Down
28 changes: 14 additions & 14 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,25 +364,25 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
warns := annotations.New().Merge(resp.warnings)

if !q.isDedupEnabled() {
return &promSeriesSet{
mint: q.mint,
maxt: q.maxt,
set: newStoreSeriesSet(resp.seriesSet),
aggrs: aggrs,
warns: warns,
}, resp.seriesSetStats, nil
return NewPromSeriesSet(
newStoreSeriesSet(resp.seriesSet),
q.mint,
q.maxt,
aggrs,
warns,
), resp.seriesSetStats, nil
}

// TODO(bwplotka): Move to deduplication on chunk level inside promSeriesSet, similar to what we have in dedup.NewDedupChunkMerger().
// This however require big refactor, caring about correct AggrChunk to iterator conversion and counter reset apply.
// For now we apply simple logic that splits potential overlapping chunks into separate replica series, so we can split the work.
set := &promSeriesSet{
mint: q.mint,
maxt: q.maxt,
set: dedup.NewOverlapSplit(newStoreSeriesSet(resp.seriesSet)),
aggrs: aggrs,
warns: warns,
}
set := NewPromSeriesSet(
dedup.NewOverlapSplit(newStoreSeriesSet(resp.seriesSet)),
q.mint,
q.maxt,
aggrs,
warns,
)

return dedup.NewSeriesSet(set, hints.Func), resp.seriesSetStats, nil
}
Expand Down

0 comments on commit 5362de9

Please sign in to comment.