Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support vertical sharding for label_join and label_replace functions #5889

Merged
merged 3 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5839](https://github.com/thanos-io/thanos/pull/5839) Receive: Add parameter `--tsdb.out-of-order.time-window` to set time window for experimental out-of-order samples ingestion. Disabled by default (set to 0s). Please note if you enable this option and you use compactor, make sure you set the `--enable-vertical-compaction` flag, otherwise you might risk compactor halt.
- [#5836](https://github.com/thanos-io/thanos/pull/5836) Receive: Add hidden flag `tsdb.memory-snapshot-on-shutdown` to enable experimental TSDB feature to snapshot on shutdown. This is intended to speed up receiver restart.
- [#5865](https://github.com/thanos-io/thanos/pull/5865) Compact: Retry on sync metas error.
- [#5889](https://github.com/thanos-io/thanos/pull/5889) Query Frontend: Support sharding vertical sharding `label_replace` and `label_join` functions.

### Changed

Expand Down
76 changes: 54 additions & 22 deletions pkg/querysharding/analyzer.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,40 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package querysharding

import (
"fmt"

lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql/parser"
)

// QueryAnalyzer is an analyzer which determines
// whether a PromQL Query is shardable and using which labels.

type Analyzer interface {
Analyze(string) (QueryAnalysis, error)
}

// QueryAnalyzer is an analyzer which determines
// whether a PromQL Query is shardable and using which labels.
type QueryAnalyzer struct{}

type CachedQueryAnalyzer struct {
analyzer *QueryAnalyzer
cache *lru.Cache
}

var nonShardableFuncs = []string{
"label_join",
"label_replace",
}

// NewQueryAnalyzer creates a new QueryAnalyzer.
func NewQueryAnalyzer() *CachedQueryAnalyzer {
// Ignore the error check since it throws error
Expand Down Expand Up @@ -80,14 +85,18 @@ func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) {
return nonShardableQuery(), err
}

isShardable := true
var analysis QueryAnalysis
var (
analysis QueryAnalysis
dynamicLabels []string
)
parser.Inspect(expr, func(node parser.Node, nodes []parser.Node) error {
switch n := node.(type) {
case *parser.Call:
if n.Func != nil && contains(n.Func.Name, nonShardableFuncs) {
isShardable = false
return fmt.Errorf("expressions with %s are not shardable", n.Func.Name)
if n.Func != nil {
if n.Func.Name == "label_join" || n.Func.Name == "label_replace" {
dstLabel := stringFromArg(n.Args[1])
dynamicLabels = append(dynamicLabels, dstLabel)
}
}
case *parser.BinaryExpr:
if n.VectorMatching != nil {
Expand All @@ -108,19 +117,42 @@ func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) {
return nil
})

if !isShardable {
return nonShardableQuery(), nil
// If currently it is shard by, it is still shardable if there is
// any label left after removing the dynamic labels.
// If currently it is shard without, it is still shardable if we
// shard without the union of the labels.
// TODO(yeya24): we can still make dynamic labels shardable if we push
// down the label_replace and label_join computation to the store level.
if len(dynamicLabels) > 0 {
analysis = analysis.scopeToLabels(dynamicLabels, false)
}

return analysis, nil
}

func contains(needle string, haystack []string) bool {
for _, item := range haystack {
if needle == item {
return true
// Copied from https://github.com/prometheus/prometheus/blob/v2.40.1/promql/functions.go#L1416.
func stringFromArg(e parser.Expr) string {
tmp := unwrapStepInvariantExpr(e) // Unwrap StepInvariant
unwrapParenExpr(&tmp) // Optionally unwrap ParenExpr
return tmp.(*parser.StringLiteral).Val
}

// Copied from https://github.com/prometheus/prometheus/blob/v2.40.1/promql/engine.go#L2642.
// unwrapParenExpr does the AST equivalent of removing parentheses around a expression.
func unwrapParenExpr(e *parser.Expr) {
for {
if p, ok := (*e).(*parser.ParenExpr); ok {
*e = p.Expr
} else {
break
}
}
}

return false
// Copied from https://github.com/prometheus/prometheus/blob/v2.40.1/promql/engine.go#L2652.
func unwrapStepInvariantExpr(e parser.Expr) parser.Expr {
if p, ok := e.(*parser.StepInvariantExpr); ok {
return p.Expr
}
return e
}
65 changes: 53 additions & 12 deletions pkg/querysharding/analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ func TestAnalyzeQuery(t *testing.T) {
name: "outer aggregation with without grouping",
expression: "count(sum without (pod) (http_requests_total))",
},
{
name: "aggregate expression with label_replace",
expression: `sum by (pod) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`,
},
{
name: "aggregate without expression with label_replace",
expression: `sum without (pod) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`,
},
{
name: "binary expression",
expression: `http_requests_total{code="400"} / http_requests_total`,
Expand All @@ -56,10 +48,6 @@ func TestAnalyzeQuery(t *testing.T) {
name: "binary aggregation with different grouping labels",
expression: `sum by (pod) (http_requests_total{code="400"}) / sum by (cluster) (http_requests_total)`,
},
{
name: "binary expression with vector matching and label_replace",
expression: `http_requests_total{code="400"} / on (pod) label_replace(metric, "dst_label", "$1", "src_label", "re")`,
},
{
name: "multiple binary expressions",
expression: `(http_requests_total{code="400"} + http_requests_total{code="500"}) / http_requests_total`,
Expand All @@ -71,6 +59,14 @@ func TestAnalyzeQuery(t *testing.T) {
/ on ()
http_requests_total`,
},
{
name: "aggregate by expression with label_replace, sharding label is dynamic",
expression: `sum by (dst_label) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`,
},
{
name: "aggregate by expression with label_join, sharding label is dynamic",
expression: `sum by (dst_label) (label_join(metric, "dst_label", ",", "src_label"))`,
},
}

shardableByLabels := []testCase{
Expand Down Expand Up @@ -142,6 +138,36 @@ sum by (container) (
expression: `sum(rate(node_cpu_seconds_total[3h])) by (cluster_id, mode) / ignoring(mode) group_left sum(rate(node_cpu_seconds_total[3h])) by (cluster_id)`,
shardingLabels: []string{"cluster_id"},
},
{
name: "aggregate by expression with label_replace, sharding label is not dynamic",
expression: `sum by (pod) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`,
shardingLabels: []string{"pod"},
},
{
name: "aggregate by expression with label_join, sharding label is not dynamic",
expression: `sum by (pod) (label_join(metric, "dst_label", ",", "src_label"))`,
shardingLabels: []string{"pod"},
},
{
name: "label_join and aggregation on multiple labels. Can be sharded by the static one",
expression: `sum by (pod, dst_label) (label_join(metric, "dst_label", ",", "src_label"))`,
shardingLabels: []string{"pod"},
},
{
name: "binary expression with vector matching and label_replace",
expression: `http_requests_total{code="400"} / on (pod) label_replace(metric, "dst_label", "$1", "src_label", "re")`,
shardingLabels: []string{"pod"},
},
{
name: "nested label joins",
expression: `label_join(sum by (pod) (label_join(metric, "dst_label", ",", "src_label")), "dst_label1", ",", "dst_label")`,
shardingLabels: []string{"pod"},
},
{
name: "complex query with label_replace, binary expr and aggregations on dynamic label",
expression: `sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[1d:5m])) by (instance, cluster) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[1d:5m])) by (node, cluster), "instance", "$1", "node", "(.*)")) by (instance, cluster)`,
shardingLabels: []string{"cluster"},
},
}

shardableWithoutLabels := []testCase{
Expand Down Expand Up @@ -178,6 +204,21 @@ http_requests_total`,
expression: "histogram_quantile(0.95, sum(rate(metric[1m])) without (le, cluster))",
shardingLabels: []string{"cluster"},
},
{
name: "aggregate without expression with label_replace, sharding label is not dynamic",
expression: `sum without (dst_label) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`,
shardingLabels: []string{"dst_label"},
},
{
name: "aggregate without expression with label_join, sharding label is not dynamic",
expression: `sum without (dst_label) (label_join(metric, "dst_label", ",", "src_label"))`,
shardingLabels: []string{"dst_label"},
},
{
name: "aggregate without expression with label_replace",
expression: `sum without (pod) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`,
shardingLabels: []string{"pod", "dst_label"},
},
}

for _, test := range nonShardable {
Expand Down