Skip to content

Commit

Permalink
Add linear regression functions (#1063)
Browse files Browse the repository at this point in the history
  • Loading branch information
benraskin92 authored Oct 13, 2018
1 parent 4a10b8e commit 03b0242
Show file tree
Hide file tree
Showing 9 changed files with 404 additions and 7 deletions.
3 changes: 2 additions & 1 deletion src/query/functions/temporal/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package temporal
import (
"fmt"
"math"
"time"

"github.com/m3db/m3/src/query/executor/transform"
"github.com/m3db/m3/src/query/ts"
Expand Down Expand Up @@ -96,7 +97,7 @@ type aggNode struct {
aggFunc func([]float64) float64
}

func (a *aggNode) Process(datapoints ts.Datapoints) float64 {
func (a *aggNode) Process(datapoints ts.Datapoints, _ time.Time) float64 {
return a.aggFunc(datapoints.Values())
}

Expand Down
6 changes: 3 additions & 3 deletions src/query/functions/temporal/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type baseOp struct {
// skipping lint check for a single operator type since we will be adding more
// nolint : unparam
func newBaseOp(args []interface{}, operatorType string, processorFn MakeProcessor) (baseOp, error) {
if operatorType != HoltWintersType {
if operatorType != HoltWintersType && operatorType != PredictLinearType {
if len(args) != 1 {
return emptyOp, fmt.Errorf("invalid number of args for %s: %d", operatorType, len(args))
}
Expand Down Expand Up @@ -320,7 +320,7 @@ func (c *baseNode) processSingleRequest(request processRequest) error {
flattenedValues = append(flattenedValues, dps[idx:]...)
}

newVal = c.processor.Process(flattenedValues)
newVal = c.processor.Process(flattenedValues, alignedTime)
}

builder.AppendValue(i, newVal)
Expand Down Expand Up @@ -360,7 +360,7 @@ func (c *baseNode) sweep(processedKeys []bool, maxBlocks int) {

// Processor is implemented by the underlying transforms
type Processor interface {
Process(values ts.Datapoints) float64
Process(values ts.Datapoints, evaluationTime time.Time) float64
}

// MakeProcessor is a way to create a transform
Expand Down
2 changes: 1 addition & 1 deletion src/query/functions/temporal/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (p processor) Init(op baseOp, controller *transform.Controller, opts transf
return &p
}

func (p *processor) Process(dps ts.Datapoints) float64 {
func (p *processor) Process(dps ts.Datapoints, _ time.Time) float64 {
vals := dps.Values()
sum := 0.0
for _, n := range vals {
Expand Down
3 changes: 2 additions & 1 deletion src/query/functions/temporal/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package temporal
import (
"fmt"
"math"
"time"

"github.com/m3db/m3/src/query/executor/transform"
"github.com/m3db/m3/src/query/ts"
Expand Down Expand Up @@ -79,7 +80,7 @@ type functionNode struct {
comparisonFunc comparisonFunc
}

func (f *functionNode) Process(datapoints ts.Datapoints) float64 {
func (f *functionNode) Process(datapoints ts.Datapoints, _ time.Time) float64 {
if len(datapoints) == 0 {
return math.NaN()
}
Expand Down
159 changes: 159 additions & 0 deletions src/query/functions/temporal/linear_regression.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package temporal

import (
"fmt"
"math"
"time"

"github.com/m3db/m3/src/query/executor/transform"
"github.com/m3db/m3/src/query/ts"
)

const (
// PredictLinearType predicts the value of time series t seconds from now, based on the input series, using simple linear regression.
// PredictLinearType should only be used with gauges.
PredictLinearType = "predict_linear"

// DerivType calculates the per-second derivative of the time series, using simple linear regression.
// DerivType should only be used with gauges.
DerivType = "deriv"
)

type linearRegressionProcessor struct {
fn linearRegFn
isDeriv bool
}

func (l linearRegressionProcessor) Init(op baseOp, controller *transform.Controller, opts transform.Options) Processor {
return &linearRegressionNode{
op: op,
controller: controller,
timeSpec: opts.TimeSpec,
fn: l.fn,
isDeriv: l.isDeriv,
}
}

type linearRegFn func(float64, float64) float64

// NewLinearRegressionOp creates a new base temporal transform for linear regression functions
func NewLinearRegressionOp(args []interface{}, optype string) (transform.Params, error) {
var (
fn linearRegFn
isDeriv bool
)

switch optype {
case PredictLinearType:
if len(args) != 2 {
return emptyOp, fmt.Errorf("invalid number of args for %s: %d", PredictLinearType, len(args))
}

duration, ok := args[1].(float64)
if !ok {
return emptyOp, fmt.Errorf("unable to cast to scalar argument: %v for %s", args[1], PredictLinearType)
}

fn = func(slope, intercept float64) float64 {
return slope*duration + intercept
}

case DerivType:
fn = func(slope, _ float64) float64 {
return slope
}

isDeriv = true

default:
return nil, fmt.Errorf("unknown linear regression type: %s", optype)
}

l := linearRegressionProcessor{
fn: fn,
isDeriv: isDeriv,
}

return newBaseOp(args, optype, l)
}

type linearRegressionNode struct {
op baseOp
controller *transform.Controller
timeSpec transform.TimeSpec
fn linearRegFn
isDeriv bool
}

func (l linearRegressionNode) Process(dps ts.Datapoints, evaluationTime time.Time) float64 {
if dps.Len() < 2 {
return math.NaN()
}

slope, intercept := linearRegression(dps, evaluationTime, l.isDeriv)
return l.fn(slope, intercept)
}

// linearRegression performs a least-square linear regression analysis on the
// provided datapoints. It returns the slope, and the intercept value at the
// provided time. The algorithm we use comes from https://en.wikipedia.org/wiki/Simple_linear_regression.
func linearRegression(dps ts.Datapoints, interceptTime time.Time, isDeriv bool) (float64, float64) {
var (
n float64
sumTimeDiff, sumVals float64
sumTimeDiffVals, sumTimeDiffSquared float64
valueCount int
)

for _, dp := range dps {
if math.IsNaN(dp.Value) {
continue
}

if valueCount == 0 && isDeriv {
// set interceptTime as timestamp of first non-NaN dp
interceptTime = dp.Timestamp
}

valueCount++
timeDiff := float64(dp.Timestamp.Sub(interceptTime).Seconds())
n += 1.0
sumVals += dp.Value
sumTimeDiff += timeDiff
sumTimeDiffVals += timeDiff * dp.Value
sumTimeDiffSquared += timeDiff * timeDiff
}

// need at least 2 non-NaN values to calculate slope and intercept
if valueCount == 1 {
return math.NaN(), math.NaN()
}

covXY := sumTimeDiffVals - sumTimeDiff*sumVals/n
varX := sumTimeDiffSquared - sumTimeDiff*sumTimeDiff/n

slope := covXY / varX
intercept := sumVals/n - slope*sumTimeDiff/n

return slope, intercept
}
Loading

0 comments on commit 03b0242

Please sign in to comment.