diff --git a/metrics/api/v1/api.go b/metrics/api/v1/api.go index ba31d02afe..efce466bc0 100644 --- a/metrics/api/v1/api.go +++ b/metrics/api/v1/api.go @@ -15,11 +15,13 @@ package v1 import ( + "time" + restful "github.com/emicklei/go-restful" "k8s.io/heapster/metrics/api/v1/types" "k8s.io/heapster/metrics/core" - "k8s.io/heapster/metrics/sinks/metric" + metricsink "k8s.io/heapster/metrics/sinks/metric" ) type Api struct { @@ -131,7 +133,7 @@ func convertMetricDescriptor(md core.MetricDescriptor) types.MetricDescriptor { return result } -func (a *Api) exportMetricsSchema(request *restful.Request, response *restful.Response) { +func (a *Api) exportMetricsSchema(_ *restful.Request, response *restful.Response) { result := types.TimeseriesSchema{ Metrics: make([]types.MetricDescriptor, 0), CommonLabels: make([]types.LabelDescriptor, 0), @@ -160,78 +162,96 @@ func (a *Api) exportMetricsSchema(request *restful.Request, response *restful.Re response.WriteEntity(result) } -func (a *Api) exportMetrics(request *restful.Request, response *restful.Response) { - shortStorage := a.metricSink.GetShortStore() - tsmap := make(map[string]*types.Timeseries) +func (a *Api) exportMetrics(_ *restful.Request, response *restful.Response) { + response.WriteEntity(a.processMetricsRequest(a.metricSink.GetShortStore())) +} - var newestBatch *core.DataBatch = nil +func (a *Api) processMetricsRequest(shortStorage []*core.DataBatch) []*types.Timeseries { + tsmap := make(map[string]*types.Timeseries) + var newestBatch *core.DataBatch for _, batch := range shortStorage { if newestBatch == nil || newestBatch.Timestamp.Before(batch.Timestamp) { newestBatch = batch } } - if newestBatch != nil { - for key, ms := range newestBatch.MetricSets { - ts := tsmap[key] + var timeseries []*types.Timeseries + if newestBatch == nil { + return timeseries + } + for key, ms := range newestBatch.MetricSets { + ts := tsmap[key] - msType := ms.Labels[core.LabelMetricSetType.Key] + msType := ms.Labels[core.LabelMetricSetType.Key] - if msType != core.MetricSetTypeNode && - msType != core.MetricSetTypePodContainer && - msType != core.MetricSetTypeSystemContainer { - continue - } + switch msType { + case core.MetricSetTypeNode, core.MetricSetTypePodContainer, core.MetricSetTypeSystemContainer: + default: + continue + } - if ts == nil { - ts = &types.Timeseries{ - Metrics: make(map[string][]types.Point), - Labels: make(map[string]string), - } - for labelName, labelValue := range ms.Labels { - if _, ok := a.gkeLabels[labelName]; ok { - ts.Labels[labelName] = labelValue - } - } - if msType == core.MetricSetTypeNode { - ts.Labels[core.LabelContainerName.Key] = "machine" - } - tsmap[key] = ts + if ts == nil { + ts = &types.Timeseries{ + Metrics: make(map[string][]types.Point), + Labels: make(map[string]string), } - for metricName, metricVal := range ms.MetricValues { - if _, ok := a.gkeMetrics[metricName]; ok { - points := ts.Metrics[metricName] - if points == nil { - points = make([]types.Point, 0, 1) - } - point := types.Point{ - Start: newestBatch.Timestamp, - End: newestBatch.Timestamp, - } - // For cumulative metric use the provided start time. - if metricVal.MetricType == core.MetricCumulative { - point.Start = ms.CreateTime - } - var value interface{} - if metricVal.ValueType == core.ValueInt64 { - value = metricVal.IntValue - } else if metricVal.ValueType == core.ValueFloat { - value = metricVal.FloatValue - } else { - continue - } - point.Value = value - points = append(points, point) - ts.Metrics[metricName] = points + for labelName, labelValue := range ms.Labels { + if _, ok := a.gkeLabels[labelName]; ok { + ts.Labels[labelName] = labelValue } } + if msType == core.MetricSetTypeNode { + ts.Labels[core.LabelContainerName.Key] = "machine" + } + tsmap[key] = ts + } + for metricName, metricVal := range ms.MetricValues { + if _, ok := a.gkeMetrics[metricName]; ok { + processPoint(ts, newestBatch, metricName, &metricVal, nil, ms.CreateTime) + } + } + for _, metric := range ms.LabeledMetrics { + if _, ok := a.gkeMetrics[metric.Name]; ok { + processPoint(ts, newestBatch, metric.Name, &metric.MetricValue, metric.Labels, ms.CreateTime) + } } } - timeseries := make([]*types.Timeseries, 0, len(tsmap)) + timeseries = make([]*types.Timeseries, 0, len(tsmap)) for _, ts := range tsmap { timeseries = append(timeseries, ts) } + return timeseries +} - response.WriteEntity(timeseries) +func processPoint(ts *types.Timeseries, db *core.DataBatch, metricName string, metricVal *core.MetricValue, labels map[string]string, creationTime time.Time) { + points := ts.Metrics[metricName] + if points == nil { + points = make([]types.Point, 0, 1) + } + point := types.Point{ + Start: db.Timestamp, + End: db.Timestamp, + } + // For cumulative metric use the provided start time. + if metricVal.MetricType == core.MetricCumulative { + point.Start = creationTime + } + var value interface{} + if metricVal.ValueType == core.ValueInt64 { + value = metricVal.IntValue + } else if metricVal.ValueType == core.ValueFloat { + value = metricVal.FloatValue + } else { + return + } + point.Value = value + if labels != nil { + point.Labels = make(map[string]string) + for key, value := range labels { + point.Labels[key] = value + } + } + points = append(points, point) + ts.Metrics[metricName] = points } diff --git a/metrics/api/v1/api_test.go b/metrics/api/v1/api_test.go new file mode 100644 index 0000000000..1933aed4d2 --- /dev/null +++ b/metrics/api/v1/api_test.go @@ -0,0 +1,177 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v1 + +import ( + "testing" + "time" + + fuzz "github.com/google/gofuzz" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "k8s.io/heapster/metrics/core" + metricsink "k8s.io/heapster/metrics/sinks/metric" +) + +func TestApiFactory(t *testing.T) { + metricSink := metricsink.MetricSink{} + api := NewApi(false, &metricSink) + as := assert.New(t) + for _, metric := range core.StandardMetrics { + val, exists := api.gkeMetrics[metric.Name] + as.True(exists) + as.Equal(val, metric.MetricDescriptor) + } + for _, metric := range core.LabeledMetrics { + val, exists := api.gkeMetrics[metric.Name] + as.True(exists) + as.Equal(val, metric.MetricDescriptor) + } + + for _, metric := range core.LabeledMetrics { + val, exists := api.gkeMetrics[metric.Name] + as.True(exists) + as.Equal(val, metric.MetricDescriptor) + } + labels := append(core.CommonLabels(), core.ContainerLabels()...) + labels = append(labels, core.PodLabels()...) + for _, label := range labels { + val, exists := api.gkeLabels[label.Key] + as.True(exists) + as.Equal(val, label) + } +} + +func TestFuzzInput(t *testing.T) { + api := NewApi(false, nil) + data := []*core.DataBatch{} + fuzz.New().NilChance(0).Fuzz(&data) + _ = api.processMetricsRequest(data) +} + +func generateMetricSet(objectType string, labels []core.LabelDescriptor) *core.MetricSet { + ms := &core.MetricSet{ + CreateTime: time.Now().Add(-time.Hour), + ScrapeTime: time.Now(), + Labels: make(map[string]string), + MetricValues: make(map[string]core.MetricValue), + LabeledMetrics: make([]core.LabeledMetric, len(labels)), + } + // Add all necessary labels + for _, label := range labels { + ms.Labels[label.Key] = "test-value" + } + ms.Labels[core.LabelMetricSetType.Key] = objectType + // Add all standard metrics + for _, metric := range core.StandardMetrics { + ms.MetricValues[metric.Name] = core.MetricValue{ + MetricType: core.MetricCumulative, + ValueType: core.ValueInt64, + IntValue: -1, + } + } + // Add all labeled metrics + for _, metric := range core.LabeledMetrics { + lm := core.LabeledMetric{ + Name: metric.Name, + MetricValue: core.MetricValue{ + MetricType: core.MetricCumulative, + ValueType: core.ValueInt64, + IntValue: -1, + }, + Labels: make(map[string]string), + } + for _, label := range core.MetricLabels() { + lm.Labels[label.Key] = "test-value" + } + ms.LabeledMetrics = append(ms.LabeledMetrics, lm) + } + return ms +} + +func TestRealInput(t *testing.T) { + api := NewApi(false, nil) + dataBatch := []*core.DataBatch{ + { + Timestamp: time.Now(), + MetricSets: map[string]*core.MetricSet{}, + }, + { + Timestamp: time.Now().Add(-time.Minute), + MetricSets: map[string]*core.MetricSet{}, + }, + } + labels := append(core.CommonLabels(), core.ContainerLabels()...) + labels = append(labels, core.PodLabels()...) + for _, entry := range dataBatch { + // Add a pod, container, node, systemcontainer + entry.MetricSets[core.MetricSetTypePod] = generateMetricSet(core.MetricSetTypePod, labels) + entry.MetricSets[core.MetricSetTypeNode] = generateMetricSet(core.MetricSetTypeNode, labels) + entry.MetricSets[core.MetricSetTypePodContainer] = generateMetricSet(core.MetricSetTypePodContainer, labels) + entry.MetricSets[core.MetricSetTypeSystemContainer] = generateMetricSet(core.MetricSetTypeSystemContainer, labels) + } + ts := api.processMetricsRequest(dataBatch) + type expectation struct { + count int + extraLabels bool + } + expectedMetrics := make(map[string]*expectation) + for _, metric := range core.StandardMetrics { + expectedMetrics[metric.Name] = &expectation{ + count: 4, + extraLabels: false, + } + } + for _, metric := range core.LabeledMetrics { + expectedMetrics[metric.Name] = &expectation{ + count: 4, + extraLabels: true, + } + } + as := assert.New(t) + for _, elem := range ts { + // validate labels + for _, label := range labels { + val, exists := elem.Labels[label.Key] + as.True(exists, "%q label does not exist", label.Key) + if label.Key == core.LabelMetricSetType.Key { + continue + } + if label.Key == core.LabelContainerName.Key && val != "machine" { + as.Equal(val, "test-value", "%q label's value is %q, expected 'test-value'", label.Key, val) + } + } + for mname, points := range elem.Metrics { + ex := expectedMetrics[mname] + require.NotNil(t, ex) + as.NotEqual(ex, 0) + ex.count-- + for _, point := range points { + as.Equal(point.Value, -1) + if !ex.extraLabels { + continue + } + as.Equal(len(core.MetricLabels()), len(point.Labels)) + for _, label := range core.MetricLabels() { + val, exists := point.Labels[label.Key] + as.True(exists, "expected label %q to be found - %+v", label.Key, point.Labels) + as.Equal(val, "test-value") + } + } + } + + } +} diff --git a/metrics/core/metrics.go b/metrics/core/metrics.go index f828197986..0ecf4216e7 100644 --- a/metrics/core/metrics.go +++ b/metrics/core/metrics.go @@ -444,9 +444,11 @@ var MetricFilesystemUsage = Metric{ Labels: map[string]string{ LabelResourceID.Key: fs.Device, }, - ValueType: ValueInt64, - MetricType: MetricCumulative, - IntValue: int64(fs.Usage), + MetricValue: MetricValue{ + ValueType: ValueInt64, + MetricType: MetricCumulative, + IntValue: int64(fs.Usage), + }, }) } return result @@ -473,9 +475,11 @@ var MetricFilesystemLimit = Metric{ Labels: map[string]string{ LabelResourceID.Key: fs.Device, }, - ValueType: ValueInt64, - MetricType: MetricCumulative, - IntValue: int64(fs.Limit), + MetricValue: MetricValue{ + ValueType: ValueInt64, + MetricType: MetricCumulative, + IntValue: int64(fs.Limit), + }, }) } return result diff --git a/metrics/core/types.go b/metrics/core/types.go index 4f88169375..727f6d537b 100644 --- a/metrics/core/types.go +++ b/metrics/core/types.go @@ -102,12 +102,9 @@ func (this *MetricValue) GetValue() interface{} { } type LabeledMetric struct { - Name string - Labels map[string]string - IntValue int64 - FloatValue float32 - MetricType MetricType - ValueType ValueType + Name string + Labels map[string]string + MetricValue } func (this *LabeledMetric) GetValue() interface{} { diff --git a/metrics/sinks/log/log_sink_test.go b/metrics/sinks/log/log_sink_test.go index d911a922b0..0eef2bca6f 100644 --- a/metrics/sinks/log/log_sink_test.go +++ b/metrics/sinks/log/log_sink_test.go @@ -42,10 +42,12 @@ func TestSimpleWrite(t *testing.T) { }, LabeledMetrics: []core.LabeledMetric{ { - Name: "lm", - ValueType: core.ValueInt64, - MetricType: core.MetricGauge, - IntValue: 279, + Name: "lm", + MetricValue: core.MetricValue{ + MetricType: core.MetricGauge, + ValueType: core.ValueInt64, + IntValue: 279, + }, Labels: map[string]string{ "disk": "hard", }, diff --git a/metrics/sources/summary/summary.go b/metrics/sources/summary/summary.go index 75c286f234..71c078b1be 100644 --- a/metrics/sources/summary/summary.go +++ b/metrics/sources/summary/summary.go @@ -350,11 +350,13 @@ func (this *summaryMetricsSource) addLabeledIntMetric(metrics *MetricSet, metric } val := LabeledMetric{ - Name: metric.Name, - Labels: labels, - ValueType: ValueInt64, - MetricType: metric.Type, - IntValue: int64(*value), + Name: metric.Name, + Labels: labels, + MetricValue: MetricValue{ + ValueType: ValueInt64, + MetricType: metric.Type, + IntValue: int64(*value), + }, } metrics.LabeledMetrics = append(metrics.LabeledMetrics, val) }