Skip to content

Commit

Permalink
Copy cortex/pkg/querier/astmapper package dependency into Loki (#4982)
Browse files Browse the repository at this point in the history
* Fork cortex `pkg/querier/asmapper` into Loki.

* Move away from cortex `pkg/querier/astmapper`.

* Fix lint issues.

- Use _ in argument of method `pkg/querier/astmapper/subtree_folder.go`
  since it is not being used
- Remove unnecessary cast from `subtree_folder.go`
  • Loading branch information
DylanGuedes committed Jan 3, 2022
1 parent 2fdd240 commit d5783e8
Show file tree
Hide file tree
Showing 21 changed files with 805 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pkg/ingester/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"github.com/prometheus/prometheus/model/labels"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/astmapper"

"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/chunk"
)

Expand Down
3 changes: 2 additions & 1 deletion pkg/ingester/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"testing"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/querier/astmapper"
)

func Test_GetShards(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"syscall"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -28,6 +27,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/util"
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/querier/astmapper"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
"fmt"
"time"

"github.com/cortexproject/cortex/pkg/querier/astmapper"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/promql"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/util"
)

Expand Down
3 changes: 2 additions & 1 deletion pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package logql
import (
"fmt"

"github.com/cortexproject/cortex/pkg/querier/astmapper"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/loki/pkg/querier/astmapper"
)

// keys used in metrics
Expand Down
3 changes: 2 additions & 1 deletion pkg/logql/shardmapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/querier/astmapper"
)

func TestShardedStringer(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"time"

"github.com/cespare/xxhash/v2"
"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/prometheus/prometheus/model/labels"
promql_parser "github.com/prometheus/prometheus/promql/parser"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/querier/astmapper"
)

func NewMockQuerier(shards int, streams []logproto.Stream) MockQuerier {
Expand Down
187 changes: 187 additions & 0 deletions pkg/querier/astmapper/astmapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package astmapper

import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/promql/parser"
)

// ASTMapper is the exported interface for mapping between multiple AST representations
type ASTMapper interface {
Map(node parser.Node) (parser.Node, error)
}

// MapperFunc is a function adapter for ASTMapper
type MapperFunc func(node parser.Node) (parser.Node, error)

// Map applies a mapperfunc as an ASTMapper
func (fn MapperFunc) Map(node parser.Node) (parser.Node, error) {
return fn(node)
}

// MultiMapper can compose multiple ASTMappers
type MultiMapper struct {
mappers []ASTMapper
}

// Map implements ASTMapper
func (m *MultiMapper) Map(node parser.Node) (parser.Node, error) {
var result parser.Node = node
var err error

if len(m.mappers) == 0 {
return nil, errors.New("MultiMapper: No mappers registered")
}

for _, x := range m.mappers {
result, err = x.Map(result)
if err != nil {
return nil, err
}
}
return result, nil

}

// Register adds ASTMappers into a multimapper.
// Since registered functions are applied in the order they're registered, it's advised to register them
// in decreasing priority and only operate on nodes that each function cares about, defaulting to CloneNode.
func (m *MultiMapper) Register(xs ...ASTMapper) {
m.mappers = append(m.mappers, xs...)
}

// NewMultiMapper instaniates an ASTMapper from multiple ASTMappers
func NewMultiMapper(xs ...ASTMapper) *MultiMapper {
m := &MultiMapper{}
m.Register(xs...)
return m
}

// CloneNode is a helper function to clone a node.
func CloneNode(node parser.Node) (parser.Node, error) {
return parser.ParseExpr(node.String())
}

// NodeMapper either maps a single AST node or returns the unaltered node.
// It also returns a bool to signal that no further recursion is necessary.
// This is helpful because it allows mappers to only implement logic for node types they want to change.
// It makes some mappers trivially easy to implement
type NodeMapper interface {
MapNode(node parser.Node) (mapped parser.Node, finished bool, err error)
}

// NodeMapperFunc is an adapter for NodeMapper
type NodeMapperFunc func(node parser.Node) (parser.Node, bool, error)

// MapNode applies a NodeMapperFunc as a NodeMapper
func (f NodeMapperFunc) MapNode(node parser.Node) (parser.Node, bool, error) {
return f(node)
}

// NewASTNodeMapper creates an ASTMapper from a NodeMapper
func NewASTNodeMapper(mapper NodeMapper) ASTNodeMapper {
return ASTNodeMapper{mapper}
}

// ASTNodeMapper is an ASTMapper adapter which uses a NodeMapper internally.
type ASTNodeMapper struct {
NodeMapper
}

// Map implements ASTMapper from a NodeMapper
func (nm ASTNodeMapper) Map(node parser.Node) (parser.Node, error) {
node, fin, err := nm.MapNode(node)

if err != nil {
return nil, err
}

if fin {
return node, nil
}

switch n := node.(type) {
case nil:
// nil handles cases where we check optional fields that are not set
return nil, nil

case parser.Expressions:
for i, e := range n {
mapped, err := nm.Map(e)
if err != nil {
return nil, err
}
n[i] = mapped.(parser.Expr)
}
return n, nil

case *parser.AggregateExpr:
expr, err := nm.Map(n.Expr)
if err != nil {
return nil, err
}
n.Expr = expr.(parser.Expr)
return n, nil

case *parser.BinaryExpr:
lhs, err := nm.Map(n.LHS)
if err != nil {
return nil, err
}
n.LHS = lhs.(parser.Expr)

rhs, err := nm.Map(n.RHS)
if err != nil {
return nil, err
}
n.RHS = rhs.(parser.Expr)
return n, nil

case *parser.Call:
for i, e := range n.Args {
mapped, err := nm.Map(e)
if err != nil {
return nil, err
}
n.Args[i] = mapped.(parser.Expr)
}
return n, nil

case *parser.SubqueryExpr:
mapped, err := nm.Map(n.Expr)
if err != nil {
return nil, err
}
n.Expr = mapped.(parser.Expr)
return n, nil

case *parser.ParenExpr:
mapped, err := nm.Map(n.Expr)
if err != nil {
return nil, err
}
n.Expr = mapped.(parser.Expr)
return n, nil

case *parser.UnaryExpr:
mapped, err := nm.Map(n.Expr)
if err != nil {
return nil, err
}
n.Expr = mapped.(parser.Expr)
return n, nil

case *parser.EvalStmt:
mapped, err := nm.Map(n.Expr)
if err != nil {
return nil, err
}
n.Expr = mapped.(parser.Expr)
return n, nil

case *parser.NumberLiteral, *parser.StringLiteral, *parser.VectorSelector, *parser.MatrixSelector:
return n, nil

default:
panic(errors.Errorf("nodeMapper: unhandled node type %T", node))
}
}
82 changes: 82 additions & 0 deletions pkg/querier/astmapper/embedded.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package astmapper

import (
"encoding/json"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
)

/*
Design:
The prometheus api package enforces a (*promql.Engine argument), making it infeasible to do lazy AST
evaluation and substitution from within this package.
This leaves the (storage.Queryable) interface as the remaining target for conducting application level sharding.
The main idea is to analyze the AST and determine which subtrees can be parallelized. With those in hand, the queries may
be remapped into vector or matrix selectors utilizing a reserved label containing the original query. These may then be parallelized in the storage implementation.
*/

const (
// QueryLabel is a reserved label containing an embedded query
QueryLabel = "__cortex_queries__"
// EmbeddedQueriesMetricName is a reserved label (metric name) denoting an embedded query
EmbeddedQueriesMetricName = "__embedded_queries__"
)

// EmbeddedQueries is a wrapper type for encoding queries
type EmbeddedQueries struct {
Concat []string `json:"Concat"`
}

// JSONCodec is a Codec that uses JSON representations of EmbeddedQueries structs
var JSONCodec jsonCodec

type jsonCodec struct{}

func (c jsonCodec) Encode(queries []string) (string, error) {
embedded := EmbeddedQueries{
Concat: queries,
}
b, err := json.Marshal(embedded)
return string(b), err
}

func (c jsonCodec) Decode(encoded string) (queries []string, err error) {
var embedded EmbeddedQueries
err = json.Unmarshal([]byte(encoded), &embedded)
if err != nil {
return nil, err
}

return embedded.Concat, nil
}

// VectorSquash reduces an AST into a single vector query which can be hijacked by a Queryable impl.
// It always uses a VectorSelector as the substitution node.
// This is important because logical/set binops can only be applied against vectors and not matrices.
func VectorSquasher(nodes ...parser.Node) (parser.Expr, error) {

// concat OR legs
strs := make([]string, 0, len(nodes))
for _, node := range nodes {
strs = append(strs, node.String())
}

encoded, err := JSONCodec.Encode(strs)
if err != nil {
return nil, err
}

embeddedQuery, err := labels.NewMatcher(labels.MatchEqual, QueryLabel, encoded)
if err != nil {
return nil, err
}

return &parser.VectorSelector{
Name: EmbeddedQueriesMetricName,
LabelMatchers: []*labels.Matcher{embeddedQuery},
}, nil

}
Loading

0 comments on commit d5783e8

Please sign in to comment.