Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(api): updates for handling unavailable metrics server #421

Merged
merged 6 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/api/resources/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func NewCache(ctx context.Context, clients *client.Clients) (*Cache, error) {
}

// Start metrics collection
go c.StartMetricsCollection(ctx, clients.MetricsClient)
go c.StartMetricsCollection(ctx, clients.MetricsClient.MetricsV1beta1())

// Stop the informer when the context is done
go func() {
Expand Down
85 changes: 53 additions & 32 deletions pkg/api/resources/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ package resources

import (
"context"
"fmt"
"log"
"sync"
"time"

metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
"k8s.io/metrics/pkg/client/clientset/versioned"
metricsv1beta1 "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"
)

const MAX_HISTORY_LENGTH = 200
Expand Down Expand Up @@ -97,7 +97,7 @@ func (pm *PodMetrics) Delete(podUID string) {
}

// StartMetricsCollection starts a goroutine to collect metrics for all pods in the cache
func (c *Cache) StartMetricsCollection(ctx context.Context, metricsClient *versioned.Clientset) {
func (c *Cache) StartMetricsCollection(ctx context.Context, metricsClient metricsv1beta1.MetricsV1beta1Interface) {
// Collect metrics immediately
c.collectMetrics(ctx, metricsClient)

Expand Down Expand Up @@ -127,46 +127,67 @@ func (c *Cache) CalculateUsage(metrics *v1beta1.PodMetrics) (float64, float64) {
return totalCPU, totalMemory
}

func (c *Cache) collectMetrics(ctx context.Context, metricsClient *versioned.Clientset) {
// Fetch all pods
pods := c.Pods.GetSparseResources("", "")

func (c *Cache) collectMetrics(ctx context.Context, metricsClient metricsv1beta1.MetricsV1beta1Interface) {
var totalCPU, totalMemory float64

// Fetch metrics for each pod
for _, pod := range pods {
// Only collect metrics for running pods
phase, _, _ := unstructured.NestedString(pod.Object, "status", "phase")
if phase != "Running" {
continue
}
// Check for metrics server availability
metricsServerAvailable := true
_, err := metricsClient.NodeMetricses().List(ctx, metaV1.ListOptions{})
if err != nil {
metricsServerAvailable = false
log.Printf("Metrics server is not available: %v", err)
}

// Fetch metrics for the pod
metrics, err := metricsClient.MetricsV1beta1().PodMetricses(pod.GetNamespace()).Get(ctx, pod.GetName(), metaV1.GetOptions{})
if err != nil {
fmt.Printf("Error fetching metrics for pod %s/%s: %v\n", pod.GetNamespace(), pod.GetName(), err)
continue
}
if metricsServerAvailable {
// Fetch all pods
pods := c.Pods.GetSparseResources("", "")

// Calculate the total CPU and memory usage
cpu, mem := c.CalculateUsage(metrics)
totalCPU += cpu
totalMemory += mem
// Fetch metrics for each pod
for _, pod := range pods {
// Only collect metrics for running pods
phase, _, _ := unstructured.NestedString(pod.Object, "status", "phase")
if phase != "Running" {
continue
}

// Convert the metrics to unstructured
converted, err := ToUnstructured(metrics)
if err != nil {
fmt.Printf("Error converting metrics for pod %s/%s: %v\n", pod.GetNamespace(), pod.GetName(), err)
continue
}
// Fetch metrics for the pod
metrics, err := metricsClient.PodMetricses(pod.GetNamespace()).Get(ctx, pod.GetName(), metaV1.GetOptions{})
if err != nil {
log.Printf("Error fetching metrics for pod %s/%s: %v\n", pod.GetNamespace(), pod.GetName(), err)
continue
}

// Update the cache with the new metrics
c.PodMetrics.Update(string(pod.GetUID()), converted)
// Calculate the total CPU and memory usage
cpu, mem := c.CalculateUsage(metrics)
totalCPU += cpu
totalMemory += mem

// Convert the metrics to unstructured
converted, err := ToUnstructured(metrics)
if err != nil {
log.Printf("Error converting metrics for pod %s/%s: %v\n", pod.GetNamespace(), pod.GetName(), err)
continue
}

// Update the cache with the new metrics
c.PodMetrics.Update(string(pod.GetUID()), converted)
}
} else {
// Set totalCPU and totalMemory to -1 to indicate metrics server is not available
totalCPU = -1
totalMemory = -1
}

// Add the metrics to the cache and historical usage
c.PodMetrics.current.CPU = totalCPU
c.PodMetrics.current.Memory = totalMemory

// Make sure CPU and Memory data for historical usage is not negative for historical data
if !metricsServerAvailable {
UncleGedd marked this conversation as resolved.
Show resolved Hide resolved
totalCPU = 0
totalMemory = 0
}

c.PodMetrics.historical = append(c.PodMetrics.historical, Usage{
Timestamp: time.Now(),
CPU: totalCPU,
Expand Down
157 changes: 125 additions & 32 deletions pkg/api/resources/metrics_test.go
Original file line number Diff line number Diff line change
@@ -1,56 +1,149 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: 2024-Present The UDS Authors

//go:build unit

package resources

import (
"context"
"fmt"
"log"
"strings"
"testing"

"github.com/stretchr/testify/require"
coreV1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
metricsv1beta1 "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"
)

func TestPodMetrics(t *testing.T) {
pm := NewPodMetrics()

// Test GetCount
require.Equal(t, 0, pm.GetCount())
t.Run("Initial State", func(t *testing.T) {
require.Equal(t, 0, pm.GetCount())
cpu, mem := pm.GetUsage()
require.Equal(t, 0.0, cpu)
require.Equal(t, 0.0, mem)
require.Empty(t, pm.GetHistoricalUsage())
})

t.Run("Add Metrics", func(t *testing.T) {
metric1 := &unstructured.Unstructured{}
metric1.SetNamespace("default")
metric1.SetName("metric1")
pm.metrics["metric1"] = metric1

metric2 := &unstructured.Unstructured{}
metric2.SetNamespace("kube-system")
metric2.SetName("metric2")
pm.metrics["metric2"] = metric2

require.Equal(t, 2, pm.GetCount())
})

t.Run("GetAll Metrics", func(t *testing.T) {
allMetrics := pm.GetAll("", "")
require.Len(t, allMetrics, 2)

defaultMetrics := pm.GetAll("default", "")
require.Len(t, defaultMetrics, 1)
require.Equal(t, "metric1", defaultMetrics[0].GetName())

kubeSystemMetrics := pm.GetAll("kube-system", "")
require.Len(t, kubeSystemMetrics, 1)
require.Equal(t, "metric2", kubeSystemMetrics[0].GetName())
})
}

// CustomFakeNodeMetricsInterface implements a fake NodeMetricsInterface
type CustomFakeNodeMetricsInterface struct {
Err error
}

func (f *CustomFakeNodeMetricsInterface) List(ctx context.Context, opts metav1.ListOptions) (*v1beta1.NodeMetricsList, error) {
return nil, f.Err
}

// We don't need these methods for this test, so we'll leave them unimplemented
func (f *CustomFakeNodeMetricsInterface) Get(ctx context.Context, name string, options metav1.GetOptions) (*v1beta1.NodeMetrics, error) {
return nil, nil
}

func (f *CustomFakeNodeMetricsInterface) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
return nil, nil
}

// CustomFakeMetricsV1beta1Client implements a fake MetricsV1beta1Interface
type CustomFakeMetricsV1beta1Client struct {
FakeNodeMetrics *CustomFakeNodeMetricsInterface
}

func (f *CustomFakeMetricsV1beta1Client) NodeMetricses() metricsv1beta1.NodeMetricsInterface {
return f.FakeNodeMetrics
}

// We don't need these methods for this test, so we'll leave them unimplemented
func (f *CustomFakeMetricsV1beta1Client) PodMetricses(namespace string) metricsv1beta1.PodMetricsInterface {
return nil
}

func (f *CustomFakeMetricsV1beta1Client) RESTClient() rest.Interface {
return nil
}

func TestCollectMetrics(t *testing.T) {

expectedError := fmt.Errorf("custom error: unable to list node metrics")

fakeNodeMetrics := &CustomFakeNodeMetricsInterface{Err: expectedError}
fakeMetricsClient := &CustomFakeMetricsV1beta1Client{FakeNodeMetrics: fakeNodeMetrics}

// Create a test Pods
podGVK := coreV1.SchemeGroupVersion.WithKind("Pod")
pods := &ResourceList{
Resources: make(map[string]*unstructured.Unstructured),
SparseResources: make(map[string]*unstructured.Unstructured),
Changes: make(chan struct{}, 1),
HasSynced: nil,
gvk: podGVK,
CRDExists: true,
}
podMetrics := NewPodMetrics()

// Test GetUsage
cpu, mem := pm.GetUsage()
require.Equal(t, 0.0, cpu)
require.Equal(t, 0.0, mem)
// Create a Cache instance with the mock Pods
cache := &Cache{
Pods: pods,
PodMetrics: podMetrics,
}

// Test GetHistoricalUsage
historical := pm.GetHistoricalUsage()
require.Empty(t, historical)
ctx := context.TODO()

// Add some metrics
metric1 := &unstructured.Unstructured{}
metric1.SetNamespace("default")
metric1.SetName("metric1")
pm.metrics["metric1"] = metric1
logOutput := &logCapture{}
log.SetOutput(logOutput)

metric2 := &unstructured.Unstructured{}
metric2.SetNamespace("kube-system")
metric2.SetName("metric2")
pm.metrics["metric2"] = metric2
cache.collectMetrics(ctx, fakeMetricsClient)

// Test GetCount after adding metrics
require.Equal(t, 2, pm.GetCount())
require.Equal(t, cache.PodMetrics.current.CPU, float64(-1))
require.Equal(t, cache.PodMetrics.current.Memory, float64(-1))
require.Equal(t, cache.PodMetrics.historical[0].CPU, float64(0))
require.Equal(t, cache.PodMetrics.historical[0].Memory, float64(0))

// Test GetAll without namespace filter
allMetrics := pm.GetAll("", "")
require.Len(t, allMetrics, 2)
require.Contains(t, logOutput.String(), expectedError.Error())
}

type logCapture struct {
logs []string
}

// Test GetAll with namespace filter
defaultMetrics := pm.GetAll("default", "")
require.Len(t, defaultMetrics, 1)
require.Equal(t, "metric1", defaultMetrics[0].GetName())
func (lc *logCapture) Write(p []byte) (n int, err error) {
lc.logs = append(lc.logs, string(p))
return len(p), nil
}

kubeSystemMetrics := pm.GetAll("kube-system", "")
require.Len(t, kubeSystemMetrics, 1)
require.Equal(t, "metric2", kubeSystemMetrics[0].GetName())
func (lc *logCapture) String() string {
return strings.Join(lc.logs, "")
}
Loading