Skip to content

Commit

Permalink
Make populateSeries' evalRange support parallel execution
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksontj committed Sep 21, 2021
1 parent 6dc5069 commit ce9799d
Showing 1 changed file with 31 additions and 7 deletions.
38 changes: 31 additions & 7 deletions promql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,8 +705,8 @@ func (ng *Engine) findMinMaxTime(s *parser.EvalStmt) (int64, int64) {
var evalRange time.Duration
var l sync.Mutex
parser.Inspect(context.TODO(), s, func(node parser.Node, path []parser.Node) error {
l.Lock()
defer l.Unlock()
l.Lock()
defer l.Unlock()
switch n := node.(type) {
case *parser.VectorSelector:
start, end := ng.getTimeRangesForSelector(s, n, path, evalRange)
Expand Down Expand Up @@ -772,17 +772,41 @@ func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s
// Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range.
// The evaluation of the VectorSelector inside then evaluates the given range and unsets
// the variable.
var evalRange time.Duration
l := sync.Mutex{}

n, err := parser.Inspect(ctx, s, func(node parser.Node, path []parser.Node) error {
// This has been moved to be a map here to allow for parallel execution. We want the evalRange to apply
// to the subtree under the found MatrixSelector; not arbitrarily across anything.
evalRanges := make(map[parser.Node]time.Duration)
l := sync.RWMutex{}
findEvalRange := func(node parser.Node, path []parser.Node) (parser.Node, time.Duration) {
// If we are the top of the tree; then we don't have to look at the path
if len(path) == 0 {
return node, 0
}
l.RLock()
defer l.RUnlock()
for i := len(path) - 1; i >= 0; i-- {
pNode := path[i]
r, ok := evalRanges[pNode]
if ok {
return pNode, r
}
}
// If there isn't one set for the path; then we set one for ourselves
return node, 0
}
setEvalRange := func(n parser.Node, d time.Duration) {
l.Lock()
defer l.Unlock()
evalRanges[n] = d
}

n, err := parser.Inspect(ctx, s, func(node parser.Node, path []parser.Node) error {
switch n := node.(type) {
case *parser.VectorSelector:
if n.UnexpandedSeriesSet != nil {
return nil
}
pNode, evalRange := findEvalRange(node, path)
start, end := ng.getTimeRangesForSelector(s, n, path, evalRange)
hints := &storage.SelectHints{
Start: start,
Expand All @@ -791,12 +815,12 @@ func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s
Range: durationMilliseconds(evalRange),
Func: extractFuncFromPath(path),
}
evalRange = 0
setEvalRange(pNode, 0)
hints.By, hints.Grouping = extractGroupsFromPath(path)
n.UnexpandedSeriesSet = querier.Select(false, hints, n.LabelMatchers...)

case *parser.MatrixSelector:
evalRange = n.Range
setEvalRange(node, n.Range)
}
return nil
}, ng.NodeReplacer)
Expand Down

0 comments on commit ce9799d

Please sign in to comment.