Skip to content

Commit

Permalink
Add custom thresholds mechanism in APIResponsivenessPrometheus measur…
Browse files Browse the repository at this point in the history
…ement
  • Loading branch information
tosi3k committed May 28, 2020
1 parent 6a07c2f commit f1d338d
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/prometheus/common/model"
"gopkg.in/yaml.v2"
"k8s.io/klog"

"k8s.io/perf-tests/clusterloader2/pkg/errors"
Expand Down Expand Up @@ -70,7 +71,7 @@ const (

latencyWindowSize = 5 * time.Minute

// Number of metrics with highest latency to print. If the latency exceeeds SLO threshold, a metric is printed regardless.
// Number of metrics with highest latency to print. If the latency exceeds SLO threshold, a metric is printed regardless.
topToPrint = 5
)

Expand All @@ -95,6 +96,20 @@ type apiCallMetrics struct {
metrics map[string]*apiCallMetric
}

type customThresholdEntry struct {
Resource string `json:"resource"`
Subresource string `json:"subresource"`
Verb string `json:"verb"`
Scope string `json:"scope"`
Threshold time.Duration `json:"threshold"`
}

type customThresholds map[string]time.Duration

func (cte *customThresholdEntry) getKey() string {
return buildKey(cte.Resource, cte.Subresource, cte.Verb, cte.Scope)
}

type apiResponsivenessGatherer struct{}

func (a *apiResponsivenessGatherer) Gather(executor QueryExecutor, startTime time.Time, config *measurement.Config) ([]measurement.Summary, error) {
Expand All @@ -120,7 +135,12 @@ func (a *apiResponsivenessGatherer) Gather(executor QueryExecutor, startTime tim
return nil, err
}

badMetrics := a.validateAPICalls(config.Identifier, allowedSlowCalls, apiCalls)
customThresholds, err := getCustomThresholds(config, apiCalls)
if err != nil {
return nil, err
}

badMetrics := a.validateAPICalls(config.Identifier, allowedSlowCalls, apiCalls, customThresholds)
if len(badMetrics) > 0 {
err = errors.NewMetricViolationError("top latency metric", fmt.Sprintf("there should be no high-latency requests, but: %v", badMetrics))
}
Expand Down Expand Up @@ -204,13 +224,44 @@ func (a *apiResponsivenessGatherer) gatherAPICalls(executor QueryExecutor, start
return newFromSamples(latencySamples, countSamples, countSlowSamples)
}

func (a *apiResponsivenessGatherer) validateAPICalls(identifier string, allowedSlowCalls int, metrics *apiCallMetrics) []error {
func getCustomThresholds(config *measurement.Config, metrics *apiCallMetrics) (customThresholds, error) {
thresholdsString, err := util.GetStringOrDefault(config.Params, "customThresholds", "")
if err != nil {
return nil, err
}
var thresholds []customThresholdEntry
if err := yaml.Unmarshal([]byte(thresholdsString), &thresholds); err != nil {
return nil, err
}

customThresholds := customThresholds{}
for _, entry := range thresholds {
if entry.Threshold == 0 {
return nil, fmt.Errorf("custom threshold must be set to a positive time duration")
}
key := entry.getKey()
if _, ok := metrics.metrics[key]; !ok {
klog.Infof("WARNING: unrecognized custom threshold API call key: %v", key)
} else {
customThresholds[key] = entry.Threshold
}
}
return customThresholds, nil
}

func (a *apiResponsivenessGatherer) validateAPICalls(identifier string, allowedSlowCalls int, metrics *apiCallMetrics, customThresholds customThresholds) []error {
badMetrics := make([]error, 0)
top := topToPrint

for _, apiCall := range metrics.sorted() {
var threshold time.Duration
if customThreshold, ok := customThresholds[apiCall.getKey()]; ok {
threshold = customThreshold
} else {
threshold = apiCall.getSLOThreshold()
}
var err error
if err = apiCall.Validate(allowedSlowCalls); err != nil {
if err = apiCall.Validate(allowedSlowCalls, threshold); err != nil {
badMetrics = append(badMetrics, err)
}
if top > 0 || err != nil {
Expand All @@ -219,7 +270,7 @@ func (a *apiResponsivenessGatherer) validateAPICalls(identifier string, allowedS
if err != nil {
prefix = "WARNING "
}
klog.Infof("%s: %vTop latency metric: %v", identifier, prefix, apiCall)
klog.Infof("%s: %vTop latency metric: %+v; threshold: %v", identifier, prefix, *apiCall, threshold)
}
}
return badMetrics
Expand Down Expand Up @@ -259,7 +310,7 @@ func newFromSamples(latencySamples, countSamples, countSlowSamples []*model.Samp
}

func (m *apiCallMetrics) getAPICall(resource, subresource, verb, scope string) *apiCallMetric {
key := m.buildKey(resource, subresource, verb, scope)
key := buildKey(resource, subresource, verb, scope)
call, exists := m.metrics[key]
if !exists {
call = &apiCallMetric{
Expand Down Expand Up @@ -329,12 +380,15 @@ func (m *apiCallMetrics) sorted() []*apiCallMetric {
return all
}

func (m *apiCallMetrics) buildKey(resource, subresource, verb, scope string) string {
func buildKey(resource, subresource, verb, scope string) string {
return fmt.Sprintf("%s|%s|%s|%s", resource, subresource, verb, scope)
}

func (ap *apiCallMetric) Validate(allowedSlowCalls int) error {
threshold := ap.getSLOThreshold()
func (ap *apiCallMetric) getKey() string {
return buildKey(ap.Resource, ap.Subresource, ap.Verb, ap.Scope)
}

func (ap *apiCallMetric) Validate(allowedSlowCalls int, threshold time.Duration) error {
if err := ap.Latency.VerifyThreshold(threshold); err != nil {
// TODO(oxddr): remove allowedSlowCalls guard once it's stable
if allowedSlowCalls > 0 && ap.SlowCount <= allowedSlowCalls {
Expand All @@ -354,7 +408,3 @@ func (ap *apiCallMetric) getSLOThreshold() time.Duration {
}
return namespaceThreshold
}

func (ap *apiCallMetric) String() string {
return fmt.Sprintf("%+v; threshold: %v", *ap, ap.getSLOThreshold())
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,25 @@ import (
measurementutil "k8s.io/perf-tests/clusterloader2/pkg/measurement/util"
)

var (
// klogv1 allows users to turn on/off logging to stderr only through
// the use of flag. This prevents us from having control over which
// of the test functions have that mechanism turned off when we run
// go test command.
// TODO(#1286): refactor api_responsiveness_prometheus.go to make
// testing of logging easier and remove this hack in the end.
klogLogToStderr = true
)

func turnOffLoggingToStderrInKlog() {
if klogLogToStderr {
klog.InitFlags(nil)
flag.Set("logtostderr", "false")
flag.Parse()
klogLogToStderr = false
}
}

type sample struct {
resource string
subresource string
Expand Down Expand Up @@ -64,7 +83,7 @@ func (ex *fakeQueryExecutor) Query(query string, queryTime time.Time) ([]*model.
sample := &model.Sample{
Metric: model.Metric{
"resource": model.LabelValue(s.resource),
"subresoruce": model.LabelValue(s.subresource),
"subresource": model.LabelValue(s.subresource),
"verb": model.LabelValue(s.verb),
"scope": model.LabelValue(s.scope),
},
Expand Down Expand Up @@ -532,9 +551,7 @@ func TestLogging(t *testing.T) {
},
}

klog.InitFlags(nil)
flag.Set("logtostderr", "false")
flag.Parse()
turnOffLoggingToStderrInKlog()

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
Expand All @@ -557,3 +574,167 @@ func TestLogging(t *testing.T) {
})
}
}

func TestAPIResponsivenessCustomThresholds(t *testing.T) {
splitter := func(yamlLines []string) string {
return strings.Join(yamlLines, "\n")
}

cases := []struct {
name string
config *measurement.Config
samples []*sample
hasError bool
expectedMessages []string
}{
{
name: "simple_slo_threshold_override_success",
config: &measurement.Config{
Params: map[string]interface{}{
"customThresholds": splitter([]string{
"- verb: PUT",
" resource: leases",
" scope: namespace",
" threshold: 600ms",
}),
},
},
samples: []*sample{
{
resource: "leases",
verb: "PUT",
scope: "namespace",
latency: 0.5,
},
},
hasError: false,
},
{
name: "simple_slo_threshold_override_failure",
config: &measurement.Config{
Params: map[string]interface{}{
"customThresholds": splitter([]string{
"- verb: PUT",
" resource: leases",
" scope: namespace",
" threshold: 400ms",
}),
},
},
samples: []*sample{
{
resource: "leases",
verb: "PUT",
scope: "namespace",
latency: 0.5,
},
},
hasError: true,
expectedMessages: []string{
"WARNING Top latency metric",
},
},
{
name: "empty_custom_thresholds_field",
config: &measurement.Config{
Params: map[string]interface{}{
"customThresholds": "",
},
},
samples: []*sample{
{
resource: "leases",
verb: "PUT",
scope: "namespace",
latency: 0.5,
},
},
hasError: false,
},
{
name: "no_custom_thresholds_field",
config: &measurement.Config{
Params: map[string]interface{}{},
},
samples: []*sample{
{
resource: "leases",
verb: "PUT",
scope: "namespace",
latency: 0.5,
},
},
hasError: false,
},
{
name: "unrecognized_metric",
config: &measurement.Config{
Params: map[string]interface{}{
"customThresholds": splitter([]string{
"- verb: POST",
" resource: pod",
" scope: namespace",
" threshold: 500ms",
}),
},
},
samples: []*sample{
{
resource: "leases",
verb: "PUT",
scope: "namespace",
latency: 0.2,
},
},
hasError: false,
expectedMessages: []string{
"unrecognized custom threshold API call key",
},
},
{
name: "non_unmarshallable_custom_thresholds",
config: &measurement.Config{
Params: map[string]interface{}{
"customThresholds": splitter([]string{
"im: not",
"a: good",
"yaml: array",
}),
},
},
samples: []*sample{
{
resource: "pod",
verb: "POST",
scope: "namespace",
latency: 0.2,
},
},
hasError: true,
},
}

turnOffLoggingToStderrInKlog()

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
buf := bytes.NewBuffer(nil)
klog.SetOutput(buf)

executor := &fakeQueryExecutor{samples: tc.samples}
gatherer := &apiResponsivenessGatherer{}

_, err := gatherer.Gather(executor, time.Now(), tc.config)
klog.Flush()
if tc.hasError {
assert.NotNil(t, err, "expected an error, but got none")
} else {
assert.Nil(t, err, "expected no error, but got %v", err)
}

for _, msg := range tc.expectedMessages {
assert.Contains(t, buf.String(), msg)
}
})
}
}
2 changes: 2 additions & 0 deletions clusterloader2/testing/load/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
{{$ENABLE_RESTART_COUNT_CHECK := DefaultParam .ENABLE_RESTART_COUNT_CHECK false}}
{{$RESTART_COUNT_THRESHOLD_OVERRIDES:= DefaultParam .RESTART_COUNT_THRESHOLD_OVERRIDES ""}}
{{$ALLOWED_SLOW_API_CALLS := DefaultParam .CL2_ALLOWED_SLOW_API_CALLS 0}}
{{$CUSTOM_API_CALL_THRESHOLDS := DefaultParam .CUSTOM_API_CALL_THRESHOLDS ""}}
#Variables
{{$namespaces := DivideInt .Nodes $NODES_PER_NAMESPACE}}
{{$totalPods := MultiplyInt $namespaces $NODES_PER_NAMESPACE $PODS_PER_NODE}}
Expand Down Expand Up @@ -671,6 +672,7 @@ steps:
Params:
action: gather
allowedSlowCalls: {{$ALLOWED_SLOW_API_CALLS}}
customThresholds: {{YamlQuote $CUSTOM_API_CALL_THRESHOLDS 4}}
{{end}}
- Identifier: PodStartupLatency
Method: PodStartupLatency
Expand Down
11 changes: 11 additions & 0 deletions clusterloader2/testing/load/golang/custom_api_call_thresholds.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CUSTOM_API_CALL_THRESHOLDS: |
- verb: PUT
resource: leases
subresource: ''
scope: namespace
threshold: 500ms
- verb: DELETE
resource: pods
subresource: ''
scope: namespace
threshold: 700ms

0 comments on commit f1d338d

Please sign in to comment.