Skip to content

Commit

Permalink
Implement query pushdown for a subset of aggregations
Browse files Browse the repository at this point in the history
Certain aggregations can be executed safely on leaf nodes without
worrying about data duplication or overlap. One such example is the max
function which can be computed on local data by the leaves before it is
computed globally by the querier.

This commit implements local aggregation in the Prometheus sidecar for
all functions which are safe to execute locally. The feature can be
enabled by passing the -evaluate-queries flag to the sidecar.

Signed-off-by: fpetkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Dec 6, 2021
1 parent afd23cf commit bf11fbf
Show file tree
Hide file tree
Showing 12 changed files with 1,550 additions and 121 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan
We use *breaking :warning:* to mark changes that are not backward compatible (relates only to v0.y.z releases.)

## Unreleased
- [#4917](https://github.com/thanos-io/thanos/pull/4917) Sidecar: Add flag `--evaluate-queries` to Thanos sidecar to enable local execution of certain queries.

### Added

Expand Down
26 changes: 15 additions & 11 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func registerSidecar(app *extkingpin.App) {
RetryInterval: conf.reloader.retryInterval,
})

return runSidecar(g, logger, reg, tracer, rl, component.Sidecar, *conf, grpcLogOpts, tagOpts)
return runSidecar(g, logger, reg, tracer, rl, component.Sidecar, *conf, grpcLogOpts, tagOpts, conf.evaluateQueries)
})
}

Expand All @@ -84,6 +84,7 @@ func runSidecar(
conf sidecarConfig,
grpcLogOpts []grpc_logging.Option,
tagOpts []tags.Option,
evaluateQueries bool,
) error {
httpConfContentYaml, err := conf.prometheus.httpClient.Content()
if err != nil {
Expand Down Expand Up @@ -244,7 +245,7 @@ func runSidecar(
{
c := promclient.NewWithTracingClient(logger, httpClient, httpconfig.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version)
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version, evaluateQueries)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down Expand Up @@ -467,15 +468,16 @@ func (s *promMetadata) Version() string {
}

type sidecarConfig struct {
http httpConfig
grpc grpcConfig
prometheus prometheusConfig
tsdb tsdbConfig
reloader reloaderConfig
reqLogConfig *extflag.PathOrContent
objStore extflag.PathOrContent
shipper shipperConfig
limitMinTime thanosmodel.TimeOrDurationValue
http httpConfig
grpc grpcConfig
prometheus prometheusConfig
tsdb tsdbConfig
reloader reloaderConfig
reqLogConfig *extflag.PathOrContent
objStore extflag.PathOrContent
shipper shipperConfig
limitMinTime thanosmodel.TimeOrDurationValue
evaluateQueries bool
}

func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand All @@ -489,4 +491,6 @@ func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) {
sc.shipper.registerFlag(cmd)
cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z").SetValue(&sc.limitMinTime)
cmd.Flag("evaluate-queries", "When set, Thanos sidecar will evaluate expressions that are safe to execute locally before returning series to the Querier.").
BoolVar(&sc.evaluateQueries)
}
15 changes: 15 additions & 0 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,20 @@ func aggrsFromFunc(f string) []storepb.Aggr {
return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}
}

func storeHintsFromPromHints(hints *storage.SelectHints) storepb.QueryHints {
return storepb.QueryHints{
StepMillis: hints.Step,
Func: &storepb.Func{
Name: hints.Func,
},
Grouping: &storepb.Grouping{
By: hints.By,
Labels: hints.Grouping,
},
Range: &storepb.Range{Millis: hints.Range},
}
}

func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
if hints == nil {
hints = &storage.SelectHints{
Expand Down Expand Up @@ -274,6 +288,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
Matchers: sms,
MaxResolutionWindow: q.maxResolutionMillis,
Aggregates: aggrs,
QueryHints: storeHintsFromPromHints(hints),
PartialResponseDisabled: !q.partialResponse,
SkipChunks: q.skipChunks,
Step: hints.Step,
Expand Down
66 changes: 62 additions & 4 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"net/http"
"net/url"
"path"
"sort"
"strings"
"sync"
"time"

"github.com/blang/semver/v4"
"github.com/go-kit/log"
Expand Down Expand Up @@ -54,6 +56,8 @@ type PrometheusStore struct {
remoteReadAcceptableResponses []prompb.ReadRequest_ResponseType

framesRead prometheus.Histogram

evaluateQueries bool
}

// Label{Values,Names} call with matchers is supported for Prometheus versions >= 2.24.0.
Expand All @@ -74,6 +78,7 @@ func NewPrometheusStore(
externalLabelsFn func() labels.Labels,
timestamps func() (mint int64, maxt int64),
promVersion func() string,
evaluateQueries bool,
) (*PrometheusStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -98,6 +103,7 @@ func NewPrometheusStore(
Buckets: prometheus.ExponentialBuckets(10, 10, 5),
},
),
evaluateQueries: evaluateQueries,
}
return p, nil
}
Expand Down Expand Up @@ -179,6 +185,10 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
return nil
}

if p.evaluateQueries && r.QueryHints.IsSafeToExecute() {
return p.queryPrometheus(s, r)
}

q := &prompb.Query{StartTimestampMs: r.MinTime, EndTimestampMs: r.MaxTime}
for _, m := range matchers {
pm := &prompb.LabelMatcher{Name: m.Name, Value: m.Value}
Expand Down Expand Up @@ -220,18 +230,66 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
return p.handleStreamedPrometheusResponse(s, httpResp, queryPrometheusSpan, extLset)
}

func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan tracing.Span, extLset labels.Labels) error {
ctx := s.Context()
func (p *PrometheusStore) queryPrometheus(s storepb.Store_SeriesServer, r *storepb.SeriesRequest) error {
step := math.Max(float64(r.QueryHints.StepMillis/1000), 30)
minTime := math.Max(float64(r.MinTime), float64(r.MaxTime-5*time.Minute.Milliseconds()))
opts := promclient.QueryOptions{}
matrix, _, err := p.client.QueryRange(s.Context(), p.base, r.ToPromQL(), int64(minTime), r.MaxTime, int64(step), opts)
if err != nil {
return err
}

for _, vector := range matrix {
var lbls []labels.Label
for k, v := range vector.Metric {
lbls = append(lbls, labels.Label{
Name: string(k),
Value: string(v),
})
}
// Attach external labels for compatibility with remote read
for _, lbl := range p.externalLabelsFn() {
lbls = append(lbls, lbl)
}

series := &prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(lbls),
Samples: make([]prompb.Sample, len(vector.Values)),
}

for i, sample := range vector.Values {
series.Samples[i] = prompb.Sample{
Value: float64(sample.Value),
Timestamp: int64(sample.Timestamp),
}
}

chks, err := p.chunkSamples(series, MaxSamplesPerChunk)
if err != nil {
return err
}

if err := s.Send(storepb.NewSeriesResponse(&storepb.Series{
Labels: series.Labels,
Chunks: chks,
})); err != nil {
return err
}
}

return nil
}

func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan tracing.Span, extLset labels.Labels) error {
level.Debug(p.logger).Log("msg", "started handling ReadRequest_SAMPLED response type.")

resp, err := p.fetchSampledResponse(ctx, httpResp)
resp, err := p.fetchSampledResponse(s.Context(), httpResp)
querySpan.Finish()
if err != nil {
return err
}

span, _ := tracing.StartSpan(ctx, "transform_and_respond")
span, _ := tracing.StartSpan(s.Context(), "transform_and_respond")
defer span.Finish()
span.SetTag("series_count", len(resp.Results[0].Timeseries))

Expand Down
1 change: 1 addition & 0 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
Aggregates: r.Aggregates,
MaxResolutionWindow: r.MaxResolutionWindow,
SkipChunks: r.SkipChunks,
QueryHints: r.QueryHints,
PartialResponseDisabled: r.PartialResponseDisabled,
}
wg = &sync.WaitGroup{}
Expand Down
24 changes: 24 additions & 0 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,3 +517,27 @@ func (c *SeriesStatsCounter) Count(series *Series) {
}
}
}

func (m *SeriesRequest) ToPromQL() string {
return m.QueryHints.toPromQL(m.Matchers)
}

// IsSafeToExecute returns true if the function or aggregation from the query hint
// can be safely executed by the underlying Prometheus instance without affecting the
// result of the query.
func (m *QueryHints) IsSafeToExecute() bool {
distributiveOperations := []string{
"max",
"max_over_time",
"min",
"min_over_time",
"group",
}
for _, op := range distributiveOperations {
if m.Func.Name == op {
return true
}
}

return false
}
Loading

0 comments on commit bf11fbf

Please sign in to comment.