Skip to content

Commit

Permalink
Implement query pushdown for a subset of aggregations (#4917)
Browse files Browse the repository at this point in the history
* Implement query pushdown for a subset of aggregations

Certain aggregations can be executed safely on leaf nodes without
worrying about data duplication or overlap. One such example is the max
function which can be computed on local data by the leaves before it is
computed globally by the querier.

This commit implements local aggregation in the Prometheus sidecar for
all functions which are safe to execute locally. The feature can be
enabled by passing the `--enable-feature evaluate-queries` flag to the sidecar.

Signed-off-by: fpetkovski <filip.petkovsky@gmail.com>

* CHANGELOG: fix entry

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

Co-authored-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
fpetkovski and GiedriusS authored Dec 16, 2021
1 parent d08da88 commit 79e70da
Show file tree
Hide file tree
Showing 16 changed files with 1,610 additions and 133 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4903](https://github.com/thanos-io/thanos/pull/4903) Compactor: Added tracing support for compaction.
- [#4909](https://github.com/thanos-io/thanos/pull/4909) Compactor: Add flag --max-time / --min-time to filter blocks that are ready to be compacted.
- [#4942](https://github.com/thanos-io/thanos/pull/4942) Tracing: add `traceid_128bit` support for jaeger.
- [#4917](https://github.com/thanos-io/thanos/pull/4917) Query: add initial query pushdown for a subset of aggregations. Can be enabled with `--enable-feature=query-pushdown` on Thanos Query.
- [#4888](https://github.com/thanos-io/thanos/pull/4888) Cache: support redis cache backend.

### Fixed
Expand Down
11 changes: 9 additions & 2 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
const (
promqlNegativeOffset = "promql-negative-offset"
promqlAtModifier = "promql-at-modifier"
queryPushdown = "query-pushdown"
)

// registerQuery registers a query command.
Expand Down Expand Up @@ -160,7 +161,7 @@ func registerQuery(app *extkingpin.App) {
enableMetricMetadataPartialResponse := cmd.Flag("metric-metadata.partial-response", "Enable partial response for metric metadata endpoint. --no-metric-metadata.partial-response for disabling.").
Hidden().Default("true").Bool()

featureList := cmd.Flag("enable-feature", "Comma separated experimental feature names to enable.The current list of features is "+promqlNegativeOffset+" and "+promqlAtModifier+".").Default("").Strings()
featureList := cmd.Flag("enable-feature", "Comma separated experimental feature names to enable.The current list of features is "+promqlNegativeOffset+", "+promqlAtModifier+" and "+queryPushdown+".").Default("").Strings()

enableExemplarPartialResponse := cmd.Flag("exemplar.partial-response", "Enable partial response for exemplar endpoint. --no-exemplar.partial-response for disabling.").
Hidden().Default("true").Bool()
Expand All @@ -181,14 +182,17 @@ func registerQuery(app *extkingpin.App) {
return errors.Wrap(err, "parse federation labels")
}

var enableNegativeOffset, enableAtModifier bool
var enableNegativeOffset, enableAtModifier, enableQueryPushdown bool
for _, feature := range *featureList {
if feature == promqlNegativeOffset {
enableNegativeOffset = true
}
if feature == promqlAtModifier {
enableAtModifier = true
}
if feature == queryPushdown {
enableQueryPushdown = true
}
}

httpLogOpts, err := logging.ParseHTTPOptions(*reqLogDecision, reqLogConfig)
Expand Down Expand Up @@ -278,6 +282,7 @@ func registerQuery(app *extkingpin.App) {
*webDisableCORS,
enableAtModifier,
enableNegativeOffset,
enableQueryPushdown,
*alertQueryURL,
component.Query,
)
Expand Down Expand Up @@ -346,6 +351,7 @@ func runQuery(
disableCORS bool,
enableAtModifier bool,
enableNegativeOffset bool,
enableQueryPushdown bool,
alertQueryURL string,
comp component.Component,
) error {
Expand Down Expand Up @@ -614,6 +620,7 @@ func runQuery(
enableTargetPartialResponse,
enableMetricMetadataPartialResponse,
enableExemplarPartialResponse,
enableQueryPushdown,
queryReplicaLabels,
flagsMap,
defaultRangeQueryStep,
Expand Down
3 changes: 2 additions & 1 deletion docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ Flags:
in all alerts 'Source' field.
--enable-feature= ... Comma separated experimental feature names to
enable.The current list of features is
promql-negative-offset and promql-at-modifier.
promql-negative-offset, promql-at-modifier and
query-pushdown.
--endpoint=<endpoint> ... Addresses of statically configured Thanos API
servers (repeatable). The scheme may be
prefixed with 'dns+' or 'dnssrv+' to detect
Expand Down
13 changes: 8 additions & 5 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type QueryAPI struct {
enableTargetPartialResponse bool
enableMetricMetadataPartialResponse bool
enableExemplarPartialResponse bool
enableQueryPushdown bool
disableCORS bool

replicaLabels []string
Expand Down Expand Up @@ -120,6 +121,7 @@ func NewQueryAPI(
enableTargetPartialResponse bool,
enableMetricMetadataPartialResponse bool,
enableExemplarPartialResponse bool,
enableQueryPushdown bool,
replicaLabels []string,
flagsMap map[string]string,
defaultRangeQueryStep time.Duration,
Expand All @@ -146,6 +148,7 @@ func NewQueryAPI(
enableTargetPartialResponse: enableTargetPartialResponse,
enableMetricMetadataPartialResponse: enableMetricMetadataPartialResponse,
enableExemplarPartialResponse: enableExemplarPartialResponse,
enableQueryPushdown: enableQueryPushdown,
replicaLabels: replicaLabels,
endpointStatus: endpointStatus,
defaultRangeQueryStep: defaultRangeQueryStep,
Expand Down Expand Up @@ -342,7 +345,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
span, ctx := tracing.StartSpan(ctx, "promql_instant_query")
defer span.Finish()

qry, err := qe.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts)
qry, err := qe.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false), r.FormValue("query"), ts)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
Expand Down Expand Up @@ -459,7 +462,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
defer span.Finish()

qry, err := qe.NewRangeQuery(
qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false),
qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false),
r.FormValue("query"),
start,
end,
Expand Down Expand Up @@ -532,7 +535,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
matcherSets = append(matcherSets, matchers)
}

q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, true).
q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true).
Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
Expand Down Expand Up @@ -619,7 +622,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, math.MaxInt64, enablePartialResponse, true).
q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, math.MaxInt64, enablePartialResponse, qapi.enableQueryPushdown, true).
Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
Expand Down Expand Up @@ -669,7 +672,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
matcherSets = append(matcherSets, matchers)
}

q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, true).
q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true).
Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
Expand Down
31 changes: 27 additions & 4 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ import (
// replicaLabels at query time.
// maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds).
// partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behavior of proxy.
type QueryableCreator func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable
type QueryableCreator func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, enableQueryPushdown, skipChunks bool) storage.Queryable

// NewQueryableCreator creates QueryableCreator.
func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy storepb.StoreServer, maxConcurrentSelects int, selectTimeout time.Duration) QueryableCreator {
duration := promauto.With(
extprom.WrapRegistererWithPrefix("concurrent_selects_", reg),
).NewHistogram(gate.DurationHistogramOpts)

return func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable {
return func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, enableQueryPushdown, skipChunks bool) storage.Queryable {
return &queryable{
logger: logger,
replicaLabels: replicaLabels,
Expand All @@ -56,6 +56,7 @@ func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy sto
},
maxConcurrentSelects: maxConcurrentSelects,
selectTimeout: selectTimeout,
enableQueryPushdown: enableQueryPushdown,
}
}
}
Expand All @@ -72,11 +73,12 @@ type queryable struct {
gateProviderFn func() gate.Gate
maxConcurrentSelects int
selectTimeout time.Duration
enableQueryPushdown bool
}

// Querier returns a new storage querier against the underlying proxy store API.
func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateProviderFn(), q.selectTimeout), nil
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.enableQueryPushdown, q.skipChunks, q.gateProviderFn(), q.selectTimeout), nil
}

type querier struct {
Expand All @@ -90,6 +92,7 @@ type querier struct {
deduplicate bool
maxResolutionMillis int64
partialResponse bool
enableQueryPushdown bool
skipChunks bool
selectGate gate.Gate
selectTimeout time.Duration
Expand All @@ -106,7 +109,7 @@ func newQuerier(
proxy storepb.StoreServer,
deduplicate bool,
maxResolutionMillis int64,
partialResponse, skipChunks bool,
partialResponse, enableQueryPushdown bool, skipChunks bool,
selectGate gate.Gate,
selectTimeout time.Duration,
) *querier {
Expand Down Expand Up @@ -135,6 +138,7 @@ func newQuerier(
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
skipChunks: skipChunks,
enableQueryPushdown: enableQueryPushdown,
}
}

Expand Down Expand Up @@ -193,6 +197,20 @@ func aggrsFromFunc(f string) []storepb.Aggr {
return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}
}

func storeHintsFromPromHints(hints *storage.SelectHints) *storepb.QueryHints {
return &storepb.QueryHints{
StepMillis: hints.Step,
Func: &storepb.Func{
Name: hints.Func,
},
Grouping: &storepb.Grouping{
By: hints.By,
Labels: hints.Grouping,
},
Range: &storepb.Range{Millis: hints.Range},
}
}

func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
if hints == nil {
hints = &storage.SelectHints{
Expand Down Expand Up @@ -268,12 +286,17 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .

// TODO(bwplotka): Use inprocess gRPC.
resp := &seriesServer{ctx: ctx}
var queryHints *storepb.QueryHints
if q.enableQueryPushdown {
queryHints = storeHintsFromPromHints(hints)
}
if err := q.proxy.Series(&storepb.SeriesRequest{
MinTime: hints.Start,
MaxTime: hints.End,
Matchers: sms,
MaxResolutionWindow: q.maxResolutionMillis,
Aggregates: aggrs,
QueryHints: queryHints,
PartialResponseDisabled: !q.partialResponse,
SkipChunks: q.skipChunks,
Step: hints.Step,
Expand Down
12 changes: 6 additions & 6 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) {
queryableCreator := NewQueryableCreator(nil, nil, testProxy, 2, 5*time.Second)

oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond)
queryable := queryableCreator(false, nil, nil, oneHourMillis, false, false)
queryable := queryableCreator(false, nil, nil, oneHourMillis, false, false, false)

q, err := queryable.Querier(context.Background(), 0, 42)
testutil.Ok(t, err)
Expand All @@ -71,7 +71,7 @@ func TestQuerier_DownsampledData(t *testing.T) {
}

timeout := 10 * time.Second
q := NewQueryableCreator(nil, nil, testProxy, 2, timeout)(false, nil, nil, 9999999, false, false)
q := NewQueryableCreator(nil, nil, testProxy, 2, timeout)(false, nil, nil, 9999999, false, false, false)
engine := promql.NewEngine(
promql.EngineOpts{
MaxSamples: math.MaxInt32,
Expand Down Expand Up @@ -362,7 +362,7 @@ func TestQuerier_Select_AfterPromQL(t *testing.T) {
g := gate.New(2)
mq := &mockedQueryable{
Creator: func(mint, maxt int64) storage.Querier {
return newQuerier(context.Background(), nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout)
return newQuerier(context.Background(), nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout)
},
}
t.Cleanup(func() {
Expand Down Expand Up @@ -606,7 +606,7 @@ func TestQuerier_Select(t *testing.T) {
{dedup: true, expected: []series{tcase.expectedAfterDedup}},
} {
g := gate.New(2)
q := newQuerier(context.Background(), nil, tcase.mint, tcase.maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout)
q := newQuerier(context.Background(), nil, tcase.mint, tcase.maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout)
t.Cleanup(func() { testutil.Ok(t, q.Close()) })

t.Run(fmt.Sprintf("dedup=%v", sc.dedup), func(t *testing.T) {
Expand Down Expand Up @@ -835,7 +835,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) {

timeout := 100 * time.Second
g := gate.New(2)
q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, g, timeout)
q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, false, g, timeout)
t.Cleanup(func() {
testutil.Ok(t, q.Close())
})
Expand Down Expand Up @@ -905,7 +905,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) {

timeout := 5 * time.Second
g := gate.New(2)
q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, true, 0, true, false, g, timeout)
q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, true, 0, true, false, false, g, timeout)
t.Cleanup(func() {
testutil.Ok(t, q.Close())
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestQuerier_Proxy(t *testing.T) {
name: fmt.Sprintf("store number %v", i),
})
}
return q(true, nil, nil, 0, false, false)
return q(true, nil, nil, 0, false, false, false)
}

for _, fn := range files {
Expand Down
Loading

0 comments on commit 79e70da

Please sign in to comment.