Skip to content

Commit

Permalink
Export labeled metrics for real from /metric-export API.
Browse files Browse the repository at this point in the history
Signed-off-by: Vishnu kannan <vishnuk@google.com>
  • Loading branch information
vishh committed Mar 6, 2016
1 parent 119181c commit 735c745
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 77 deletions.
132 changes: 76 additions & 56 deletions metrics/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
package v1

import (
"time"

restful "github.com/emicklei/go-restful"

"k8s.io/heapster/metrics/api/v1/types"
"k8s.io/heapster/metrics/core"
"k8s.io/heapster/metrics/sinks/metric"
metricsink "k8s.io/heapster/metrics/sinks/metric"
)

type Api struct {
Expand Down Expand Up @@ -131,7 +133,7 @@ func convertMetricDescriptor(md core.MetricDescriptor) types.MetricDescriptor {
return result
}

func (a *Api) exportMetricsSchema(request *restful.Request, response *restful.Response) {
func (a *Api) exportMetricsSchema(_ *restful.Request, response *restful.Response) {
result := types.TimeseriesSchema{
Metrics: make([]types.MetricDescriptor, 0),
CommonLabels: make([]types.LabelDescriptor, 0),
Expand Down Expand Up @@ -160,78 +162,96 @@ func (a *Api) exportMetricsSchema(request *restful.Request, response *restful.Re
response.WriteEntity(result)
}

func (a *Api) exportMetrics(request *restful.Request, response *restful.Response) {
shortStorage := a.metricSink.GetShortStore()
tsmap := make(map[string]*types.Timeseries)
func (a *Api) exportMetrics(_ *restful.Request, response *restful.Response) {
response.WriteEntity(a.processMetricsRequest(a.metricSink.GetShortStore()))
}

var newestBatch *core.DataBatch = nil
func (a *Api) processMetricsRequest(shortStorage []*core.DataBatch) []*types.Timeseries {
tsmap := make(map[string]*types.Timeseries)

var newestBatch *core.DataBatch
for _, batch := range shortStorage {
if newestBatch == nil || newestBatch.Timestamp.Before(batch.Timestamp) {
newestBatch = batch
}
}

if newestBatch != nil {
for key, ms := range newestBatch.MetricSets {
ts := tsmap[key]
var timeseries []*types.Timeseries
if newestBatch == nil {
return timeseries
}
for key, ms := range newestBatch.MetricSets {
ts := tsmap[key]

msType := ms.Labels[core.LabelMetricSetType.Key]
msType := ms.Labels[core.LabelMetricSetType.Key]

if msType != core.MetricSetTypeNode &&
msType != core.MetricSetTypePodContainer &&
msType != core.MetricSetTypeSystemContainer {
continue
}
switch msType {
case core.MetricSetTypeNode, core.MetricSetTypePodContainer, core.MetricSetTypeSystemContainer:
default:
continue
}

if ts == nil {
ts = &types.Timeseries{
Metrics: make(map[string][]types.Point),
Labels: make(map[string]string),
}
for labelName, labelValue := range ms.Labels {
if _, ok := a.gkeLabels[labelName]; ok {
ts.Labels[labelName] = labelValue
}
}
if msType == core.MetricSetTypeNode {
ts.Labels[core.LabelContainerName.Key] = "machine"
}
tsmap[key] = ts
if ts == nil {
ts = &types.Timeseries{
Metrics: make(map[string][]types.Point),
Labels: make(map[string]string),
}
for metricName, metricVal := range ms.MetricValues {
if _, ok := a.gkeMetrics[metricName]; ok {
points := ts.Metrics[metricName]
if points == nil {
points = make([]types.Point, 0, 1)
}
point := types.Point{
Start: newestBatch.Timestamp,
End: newestBatch.Timestamp,
}
// For cumulative metric use the provided start time.
if metricVal.MetricType == core.MetricCumulative {
point.Start = ms.CreateTime
}
var value interface{}
if metricVal.ValueType == core.ValueInt64 {
value = metricVal.IntValue
} else if metricVal.ValueType == core.ValueFloat {
value = metricVal.FloatValue
} else {
continue
}
point.Value = value
points = append(points, point)
ts.Metrics[metricName] = points
for labelName, labelValue := range ms.Labels {
if _, ok := a.gkeLabels[labelName]; ok {
ts.Labels[labelName] = labelValue
}
}
if msType == core.MetricSetTypeNode {
ts.Labels[core.LabelContainerName.Key] = "machine"
}
tsmap[key] = ts
}
for metricName, metricVal := range ms.MetricValues {
if _, ok := a.gkeMetrics[metricName]; ok {
processPoint(ts, newestBatch, metricName, &metricVal, nil, ms.CreateTime)
}
}
for _, metric := range ms.LabeledMetrics {
if _, ok := a.gkeMetrics[metric.Name]; ok {
processPoint(ts, newestBatch, metric.Name, &metric.MetricValue, metric.Labels, ms.CreateTime)
}
}
}
timeseries := make([]*types.Timeseries, 0, len(tsmap))
timeseries = make([]*types.Timeseries, 0, len(tsmap))
for _, ts := range tsmap {
timeseries = append(timeseries, ts)
}
return timeseries
}

response.WriteEntity(timeseries)
func processPoint(ts *types.Timeseries, db *core.DataBatch, metricName string, metricVal *core.MetricValue, labels map[string]string, creationTime time.Time) {
points := ts.Metrics[metricName]
if points == nil {
points = make([]types.Point, 0, 1)
}
point := types.Point{
Start: db.Timestamp,
End: db.Timestamp,
}
// For cumulative metric use the provided start time.
if metricVal.MetricType == core.MetricCumulative {
point.Start = creationTime
}
var value interface{}
if metricVal.ValueType == core.ValueInt64 {
value = metricVal.IntValue
} else if metricVal.ValueType == core.ValueFloat {
value = metricVal.FloatValue
} else {
return
}
point.Value = value
if labels != nil {
point.Labels = make(map[string]string)
for key, value := range labels {
point.Labels[key] = value
}
}
points = append(points, point)
ts.Metrics[metricName] = points
}
177 changes: 177 additions & 0 deletions metrics/api/v1/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v1

import (
"testing"
"time"

fuzz "github.com/google/gofuzz"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"k8s.io/heapster/metrics/core"
metricsink "k8s.io/heapster/metrics/sinks/metric"
)

func TestApiFactory(t *testing.T) {
metricSink := metricsink.MetricSink{}
api := NewApi(false, &metricSink)
as := assert.New(t)
for _, metric := range core.StandardMetrics {
val, exists := api.gkeMetrics[metric.Name]
as.True(exists)
as.Equal(val, metric.MetricDescriptor)
}
for _, metric := range core.LabeledMetrics {
val, exists := api.gkeMetrics[metric.Name]
as.True(exists)
as.Equal(val, metric.MetricDescriptor)
}

for _, metric := range core.LabeledMetrics {
val, exists := api.gkeMetrics[metric.Name]
as.True(exists)
as.Equal(val, metric.MetricDescriptor)
}
labels := append(core.CommonLabels(), core.ContainerLabels()...)
labels = append(labels, core.PodLabels()...)
for _, label := range labels {
val, exists := api.gkeLabels[label.Key]
as.True(exists)
as.Equal(val, label)
}
}

func TestFuzzInput(t *testing.T) {
api := NewApi(false, nil)
data := []*core.DataBatch{}
fuzz.New().NilChance(0).Fuzz(&data)
_ = api.processMetricsRequest(data)
}

func generateMetricSet(objectType string, labels []core.LabelDescriptor) *core.MetricSet {
ms := &core.MetricSet{
CreateTime: time.Now().Add(-time.Hour),
ScrapeTime: time.Now(),
Labels: make(map[string]string),
MetricValues: make(map[string]core.MetricValue),
LabeledMetrics: make([]core.LabeledMetric, len(labels)),
}
// Add all necessary labels
for _, label := range labels {
ms.Labels[label.Key] = "test-value"
}
ms.Labels[core.LabelMetricSetType.Key] = objectType
// Add all standard metrics
for _, metric := range core.StandardMetrics {
ms.MetricValues[metric.Name] = core.MetricValue{
MetricType: core.MetricCumulative,
ValueType: core.ValueInt64,
IntValue: -1,
}
}
// Add all labeled metrics
for _, metric := range core.LabeledMetrics {
lm := core.LabeledMetric{
Name: metric.Name,
MetricValue: core.MetricValue{
MetricType: core.MetricCumulative,
ValueType: core.ValueInt64,
IntValue: -1,
},
Labels: make(map[string]string),
}
for _, label := range core.MetricLabels() {
lm.Labels[label.Key] = "test-value"
}
ms.LabeledMetrics = append(ms.LabeledMetrics, lm)
}
return ms
}

func TestRealInput(t *testing.T) {
api := NewApi(false, nil)
dataBatch := []*core.DataBatch{
{
Timestamp: time.Now(),
MetricSets: map[string]*core.MetricSet{},
},
{
Timestamp: time.Now().Add(-time.Minute),
MetricSets: map[string]*core.MetricSet{},
},
}
labels := append(core.CommonLabels(), core.ContainerLabels()...)
labels = append(labels, core.PodLabels()...)
for _, entry := range dataBatch {
// Add a pod, container, node, systemcontainer
entry.MetricSets[core.MetricSetTypePod] = generateMetricSet(core.MetricSetTypePod, labels)
entry.MetricSets[core.MetricSetTypeNode] = generateMetricSet(core.MetricSetTypeNode, labels)
entry.MetricSets[core.MetricSetTypePodContainer] = generateMetricSet(core.MetricSetTypePodContainer, labels)
entry.MetricSets[core.MetricSetTypeSystemContainer] = generateMetricSet(core.MetricSetTypeSystemContainer, labels)
}
ts := api.processMetricsRequest(dataBatch)
type expectation struct {
count int
extraLabels bool
}
expectedMetrics := make(map[string]*expectation)
for _, metric := range core.StandardMetrics {
expectedMetrics[metric.Name] = &expectation{
count: 4,
extraLabels: false,
}
}
for _, metric := range core.LabeledMetrics {
expectedMetrics[metric.Name] = &expectation{
count: 4,
extraLabels: true,
}
}
as := assert.New(t)
for _, elem := range ts {
// validate labels
for _, label := range labels {
val, exists := elem.Labels[label.Key]
as.True(exists, "%q label does not exist", label.Key)
if label.Key == core.LabelMetricSetType.Key {
continue
}
if label.Key == core.LabelContainerName.Key && val != "machine" {
as.Equal(val, "test-value", "%q label's value is %q, expected 'test-value'", label.Key, val)
}
}
for mname, points := range elem.Metrics {
ex := expectedMetrics[mname]
require.NotNil(t, ex)
as.NotEqual(ex, 0)
ex.count--
for _, point := range points {
as.Equal(point.Value, -1)
if !ex.extraLabels {
continue
}
as.Equal(len(core.MetricLabels()), len(point.Labels))
for _, label := range core.MetricLabels() {
val, exists := point.Labels[label.Key]
as.True(exists, "expected label %q to be found - %+v", label.Key, point.Labels)
as.Equal(val, "test-value")
}
}
}

}
}
16 changes: 10 additions & 6 deletions metrics/core/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,11 @@ var MetricFilesystemUsage = Metric{
Labels: map[string]string{
LabelResourceID.Key: fs.Device,
},
ValueType: ValueInt64,
MetricType: MetricCumulative,
IntValue: int64(fs.Usage),
MetricValue: MetricValue{
ValueType: ValueInt64,
MetricType: MetricCumulative,
IntValue: int64(fs.Usage),
},
})
}
return result
Expand All @@ -473,9 +475,11 @@ var MetricFilesystemLimit = Metric{
Labels: map[string]string{
LabelResourceID.Key: fs.Device,
},
ValueType: ValueInt64,
MetricType: MetricCumulative,
IntValue: int64(fs.Limit),
MetricValue: MetricValue{
ValueType: ValueInt64,
MetricType: MetricCumulative,
IntValue: int64(fs.Limit),
},
})
}
return result
Expand Down
9 changes: 3 additions & 6 deletions metrics/core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,9 @@ func (this *MetricValue) GetValue() interface{} {
}

type LabeledMetric struct {
Name string
Labels map[string]string
IntValue int64
FloatValue float32
MetricType MetricType
ValueType ValueType
Name string
Labels map[string]string
MetricValue
}

func (this *LabeledMetric) GetValue() interface{} {
Expand Down
Loading

0 comments on commit 735c745

Please sign in to comment.