Skip to content

Commit

Permalink
Add option to use simple latency query for API latency.
Browse files Browse the repository at this point in the history
  • Loading branch information
oxddr committed Sep 12, 2019
1 parent e6e303d commit 6d4554d
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,19 @@ import (
const (
apiResponsivenessPrometheusMeasurementName = "APIResponsivenessPrometheus"

// TODO(krzysied): figure out why we're getting non-capitalized proxy and fix this
filters = `resource!="events", verb!~"WATCH|WATCHLIST|PROXY|proxy|CONNECT"`

// latencyQuery: %v should be replaced with (1) filters and (2) query window size.
// TODO(krzysied): figure out why we're getting non-capitalized proxy and fix this.
latencyQuery = "quantile_over_time(0.99, apiserver:apiserver_request_latency:histogram_quantile{%v}[%v])"
// latencyQuery matches description of the API call latency SLI and measure 99th percentaile over 5m windows
//
// latencyQuery: %v should be replaced with (1) filters and (2) query window size..
latencyQuery = "quantile_over_time(0.99, apiserver:apiserver_request_latency:histogram_quantile{ %v}[%v])"

// simpleLatencyQuery measures 99th percentile of API call latency over given period of time
// it doesn't match SLI, but is useful in shorter tests, where we don't have enough number of windows to use latencyQuery meaningfully.
//
// simpleLatencyQuery: placeholders should be replaced with (1) quantile (2) filters and (3) query window size.
simpleLatencyQuery = "histogram_quantile(%.2f, sum(rate(apiserver_request_duration_seconds_bucket{%v}[%v])) by (resource, subresource, verb, scope, le))"

// countQuery %v should be replaced with (1) filters and (2) query window size.
countQuery = "sum(increase(apiserver_request_duration_seconds_count{%v}[%v])) by (resource, subresource, scope, verb)"
Expand All @@ -63,8 +71,8 @@ func init() {

type apiResponsivenessGatherer struct{}

func (a *apiResponsivenessGatherer) Gather(executor QueryExecutor, startTime time.Time) (measurement.Summary, error) {
apiCalls, err := a.gatherApiCalls(executor, startTime)
func (a *apiResponsivenessGatherer) Gather(executor QueryExecutor, startTime time.Time, config *measurement.MeasurementConfig) (measurement.Summary, error) {
apiCalls, err := a.gatherAPICalls(executor, startTime, config)
if err != nil {
klog.Errorf("%s: samples gathering error: %v", apiResponsivenessMeasurementName, err)
return nil, err
Expand All @@ -77,9 +85,9 @@ func (a *apiResponsivenessGatherer) Gather(executor QueryExecutor, startTime tim
for _, apiCall := range metrics.ApiCalls {
isBad := false
sloThreshold := getSLOThreshold(apiCall.Verb, apiCall.Scope)
if apiCall.Latency.Perc99 > sloThreshold {
if err := apiCall.Latency.VerifyThreshold(sloThreshold); err != nil {
isBad = true
badMetrics = append(badMetrics, fmt.Sprintf("got: %+v; expected perc99 <= %v", apiCall, sloThreshold))
badMetrics = append(badMetrics, err.Error())
}
if top > 0 || isBad {
top--
Expand Down Expand Up @@ -110,29 +118,56 @@ func (a *apiResponsivenessGatherer) IsEnabled(config *measurement.MeasurementCon
return true
}

func (a *apiResponsivenessGatherer) gatherApiCalls(executor QueryExecutor, startTime time.Time) ([]apiCall, error) {
func (a *apiResponsivenessGatherer) gatherAPICalls(executor QueryExecutor, startTime time.Time, config *measurement.MeasurementConfig) ([]apiCall, error) {
measurementEnd := time.Now()
measurementDuration := measurementEnd.Sub(startTime)
// Latency measurement is based on 5m window aggregation,
// therefore first 5 minutes of the test should be skipped.
latencyMeasurementDuration := measurementDuration - latencyWindowSize
if latencyMeasurementDuration < time.Minute {
latencyMeasurementDuration = time.Minute
}
timeBoundedLatencyQuery := fmt.Sprintf(latencyQuery, filters, measurementutil.ToPrometheusTime(latencyMeasurementDuration))
latencySamples, err := executor.Query(timeBoundedLatencyQuery, measurementEnd)

useSimple, err := util.GetBoolOrDefault(config.Params, "useSimpleLatencyQuery", false)
if err != nil {
return nil, err
}

var latencySamples []*model.Sample
if useSimple {
promDuration := measurementutil.ToPrometheusTime(measurementDuration)
quantiles := []float64{0.5, 0.9, 0.99}
for _, q := range quantiles {
query := fmt.Sprintf(simpleLatencyQuery, q, filters, promDuration)
samples, err := executor.Query(query, measurementEnd)
if err != nil {
return nil, err
}
// Underlying code assumes presence of 'quantile' label, so adding it manually.
for _, sample := range samples {
sample.Metric["quantile"] = model.LabelValue(fmt.Sprintf("%.2f", q))
}
latencySamples = append(latencySamples, samples...)
}
} else {
// Latency measurement is based on 5m window aggregation,
// therefore first 5 minutes of the test should be skipped.
latencyMeasurementDuration := measurementDuration - latencyWindowSize
if latencyMeasurementDuration < time.Minute {
latencyMeasurementDuration = time.Minute
}
promDuration := measurementutil.ToPrometheusTime(latencyMeasurementDuration)

query := fmt.Sprintf(latencyQuery, filters, promDuration)
latencySamples, err = executor.Query(query, measurementEnd)
if err != nil {
return nil, err
}
}

timeBoundedCountQuery := fmt.Sprintf(countQuery, filters, measurementutil.ToPrometheusTime(measurementDuration))
countSamples, err := executor.Query(timeBoundedCountQuery, measurementEnd)
if err != nil {
return nil, err
}
return a.convertToApiCalls(latencySamples, countSamples)
return a.convertToAPICalls(latencySamples, countSamples)
}

func (a *apiResponsivenessGatherer) convertToApiCalls(latencySamples, countSamples []*model.Sample) ([]apiCall, error) {
func (a *apiResponsivenessGatherer) convertToAPICalls(latencySamples, countSamples []*model.Sample) ([]apiCall, error) {
apiCalls := make(map[string]*apiCall)

for _, sample := range latencySamples {
Expand Down Expand Up @@ -166,7 +201,7 @@ func (a *apiResponsivenessGatherer) convertToApiCalls(latencySamples, countSampl
return result, nil
}

func getApiCall(apiCalls map[string]*apiCall, resource, subresource, verb, scope string) *apiCall {
func getAPICall(apiCalls map[string]*apiCall, resource, subresource, verb, scope string) *apiCall {
key := getMetricKey(resource, subresource, verb, scope)
call, exists := apiCalls[key]
if !exists {
Expand All @@ -182,15 +217,15 @@ func getApiCall(apiCalls map[string]*apiCall, resource, subresource, verb, scope
}

func addLatency(apiCalls map[string]*apiCall, resource, subresource, verb, scope string, quantile float64, latency time.Duration) {
call := getApiCall(apiCalls, resource, subresource, verb, scope)
call := getAPICall(apiCalls, resource, subresource, verb, scope)
call.Latency.SetQuantile(quantile, latency)
}

func addCount(apiCalls map[string]*apiCall, resource, subresource, verb, scope string, count int) {
if count == 0 {
return
}
call := getApiCall(apiCalls, resource, subresource, verb, scope)
call := getAPICall(apiCalls, resource, subresource, verb, scope)
call.Count = count
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (n *netProgGatherer) IsEnabled(config *measurement.MeasurementConfig) bool
return config.CloudProvider != "kubemark"
}

func (n *netProgGatherer) Gather(executor QueryExecutor, startTime time.Time) (measurement.Summary, error) {
func (n *netProgGatherer) Gather(executor QueryExecutor, startTime time.Time, config *measurement.MeasurementConfig) (measurement.Summary, error) {
latency, err := n.query(executor, startTime)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestGather(t *testing.T) {

func testGatherer(t *testing.T, executor QueryExecutor, wantData *measurementutil.PerfData, wantError error) {
g := &netProgGatherer{}
summary, err := g.Gather(executor, time.Now())
summary, err := g.Gather(executor, time.Now(), nil)
if err != nil {
if wantError != nil {
assert.Equal(t, wantError, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,12 @@ type QueryExecutor interface {
// It's assumed Prometheus is up, running and instructed to scrape required metrics in the test cluster
// (please see clusterloader2/pkg/prometheus/manifests).
type Gatherer interface {
Gather(executor QueryExecutor, startTime time.Time) (measurement.Summary, error)
Gather(executor QueryExecutor, startTime time.Time, config *measurement.MeasurementConfig) (measurement.Summary, error)
IsEnabled(config *measurement.MeasurementConfig) bool
String() string
}

type prometheusMeasurement struct {
name string
gatherer Gatherer

startTime time.Time
Expand Down Expand Up @@ -86,7 +85,7 @@ func (m *prometheusMeasurement) Execute(config *measurement.MeasurementConfig) (
c := config.PrometheusFramework.GetClientSets().GetClient()
executor := measurementutil.NewQueryExecutor(c)

summary, err := m.gatherer.Gather(executor, m.startTime)
summary, err := m.gatherer.Gather(executor, m.startTime, config)
if err != nil {
if !errors.IsMetricViolationError(err) {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions clusterloader2/pkg/measurement/util/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (e *PrometheusQueryExecutor) Query(query string, queryTime time.Time) ([]*m
resultSamples = append(resultSamples, sample)
}
}
klog.V(4).Infof("Got %d samples", len(resultSamples))
return resultSamples, nil
}

Expand Down
5 changes: 4 additions & 1 deletion clusterloader2/testing/density/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
{{$MIN_SATURATION_PODS_TIMEOUT := 180}}
{{$ENABLE_CHAOSMONKEY := DefaultParam .ENABLE_CHAOSMONKEY false}}
{{$ENABLE_PROMETHEUS_API_RESPONSIVENESS := DefaultParam .ENABLE_PROMETHEUS_API_RESPONSIVENESS false}}
{{$ENABLE_PROBES := DefaultParam .ENABLE_PROBES false}}
{{$USE_SIMPLE_LATENCY_QUERY := DefaultParam .USE_SIMPLE_LATENCY_QUERY false}}
#Variables
{{$namespaces := DivideInt .Nodes $NODES_PER_NAMESPACE}}
{{$podsPerNamespace := MultiplyInt $PODS_PER_NODE $NODES_PER_NAMESPACE}}
Expand Down Expand Up @@ -223,6 +223,9 @@ steps:
{{if $ENABLE_PROMETHEUS_API_RESPONSIVENESS}}
enableViolations: true
{{end}}
{{if $USE_SIMPLE_LATENCY_QUERY}}
useSimpleLatencyQuery: true
{{end}}
- Identifier: InClusterNetworkLatency
Method: InClusterNetworkLatency
Params:
Expand Down
4 changes: 4 additions & 0 deletions clusterloader2/testing/load/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
{{$ENABLE_CONFIGMAPS := DefaultParam .ENABLE_CONFIGMAPS false}}
{{$ENABLE_SECRETS := DefaultParam .ENABLE_SECRETS false}}
{{$ENABLE_STATEFULSETS := DefaultParam .ENABLE_STATEFULSETS false}}
{{$USE_SIMPLE_LATENCY_QUERY := DefaultParam .USE_SIMPLE_LATENCY_QUERY false}}
#Variables
{{$namespaces := DivideInt .Nodes $NODES_PER_NAMESPACE}}
{{$totalPods := MultiplyInt $namespaces $NODES_PER_NAMESPACE $PODS_PER_NODE}}
Expand Down Expand Up @@ -449,6 +450,9 @@ steps:
{{if $ENABLE_PROMETHEUS_API_RESPONSIVENESS}}
enableViolations: true
{{end}}
{{if $USE_SIMPLE_LATENCY_QUERY}}
useSimpleLatencyQuery: true
{{end}}
- Identifier: PodStartupLatency
Method: PodStartupLatency
Params:
Expand Down

0 comments on commit 6d4554d

Please sign in to comment.