Skip to content

Commit

Permalink
chore(api): updates for handling unavailable metrics server (#421)
Browse files Browse the repository at this point in the history
Co-authored-by: UncleGedd <42304551+UncleGedd@users.noreply.github.com>
  • Loading branch information
decleaver and UncleGedd authored Oct 10, 2024
1 parent 782d93b commit 6bb9728
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 64 deletions.
2 changes: 1 addition & 1 deletion pkg/api/resources/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func NewCache(ctx context.Context, clients *client.Clients) (*Cache, error) {
}

// Start metrics collection
go c.StartMetricsCollection(ctx, clients.MetricsClient)
go c.StartMetricsCollection(ctx, clients.MetricsClient.MetricsV1beta1())

// Stop the informer when the context is done
go func() {
Expand Down
85 changes: 53 additions & 32 deletions pkg/api/resources/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ package resources

import (
"context"
"fmt"
"log"
"sync"
"time"

metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
"k8s.io/metrics/pkg/client/clientset/versioned"
metricsv1beta1 "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"
)

const MAX_HISTORY_LENGTH = 200
Expand Down Expand Up @@ -97,7 +97,7 @@ func (pm *PodMetrics) Delete(podUID string) {
}

// StartMetricsCollection starts a goroutine to collect metrics for all pods in the cache
func (c *Cache) StartMetricsCollection(ctx context.Context, metricsClient *versioned.Clientset) {
func (c *Cache) StartMetricsCollection(ctx context.Context, metricsClient metricsv1beta1.MetricsV1beta1Interface) {
// Collect metrics immediately
c.collectMetrics(ctx, metricsClient)

Expand Down Expand Up @@ -127,46 +127,67 @@ func (c *Cache) CalculateUsage(metrics *v1beta1.PodMetrics) (float64, float64) {
return totalCPU, totalMemory
}

func (c *Cache) collectMetrics(ctx context.Context, metricsClient *versioned.Clientset) {
// Fetch all pods
pods := c.Pods.GetSparseResources("", "")

func (c *Cache) collectMetrics(ctx context.Context, metricsClient metricsv1beta1.MetricsV1beta1Interface) {
var totalCPU, totalMemory float64

// Fetch metrics for each pod
for _, pod := range pods {
// Only collect metrics for running pods
phase, _, _ := unstructured.NestedString(pod.Object, "status", "phase")
if phase != "Running" {
continue
}
// Check for metrics server availability
metricsServerAvailable := true
_, err := metricsClient.NodeMetricses().List(ctx, metaV1.ListOptions{})
if err != nil {
metricsServerAvailable = false
log.Printf("Metrics server is not available: %v", err)
}

// Fetch metrics for the pod
metrics, err := metricsClient.MetricsV1beta1().PodMetricses(pod.GetNamespace()).Get(ctx, pod.GetName(), metaV1.GetOptions{})
if err != nil {
fmt.Printf("Error fetching metrics for pod %s/%s: %v\n", pod.GetNamespace(), pod.GetName(), err)
continue
}
if metricsServerAvailable {
// Fetch all pods
pods := c.Pods.GetSparseResources("", "")

// Calculate the total CPU and memory usage
cpu, mem := c.CalculateUsage(metrics)
totalCPU += cpu
totalMemory += mem
// Fetch metrics for each pod
for _, pod := range pods {
// Only collect metrics for running pods
phase, _, _ := unstructured.NestedString(pod.Object, "status", "phase")
if phase != "Running" {
continue
}

// Convert the metrics to unstructured
converted, err := ToUnstructured(metrics)
if err != nil {
fmt.Printf("Error converting metrics for pod %s/%s: %v\n", pod.GetNamespace(), pod.GetName(), err)
continue
}
// Fetch metrics for the pod
metrics, err := metricsClient.PodMetricses(pod.GetNamespace()).Get(ctx, pod.GetName(), metaV1.GetOptions{})
if err != nil {
log.Printf("Error fetching metrics for pod %s/%s: %v\n", pod.GetNamespace(), pod.GetName(), err)
continue
}

// Update the cache with the new metrics
c.PodMetrics.Update(string(pod.GetUID()), converted)
// Calculate the total CPU and memory usage
cpu, mem := c.CalculateUsage(metrics)
totalCPU += cpu
totalMemory += mem

// Convert the metrics to unstructured
converted, err := ToUnstructured(metrics)
if err != nil {
log.Printf("Error converting metrics for pod %s/%s: %v\n", pod.GetNamespace(), pod.GetName(), err)
continue
}

// Update the cache with the new metrics
c.PodMetrics.Update(string(pod.GetUID()), converted)
}
} else {
// Set totalCPU and totalMemory to -1 to indicate metrics server is not available
totalCPU = -1
totalMemory = -1
}

// Add the metrics to the cache and historical usage
c.PodMetrics.current.CPU = totalCPU
c.PodMetrics.current.Memory = totalMemory

// Make sure CPU and Memory data for historical usage is not negative for historical data
if !metricsServerAvailable {
totalCPU = 0
totalMemory = 0
}

c.PodMetrics.historical = append(c.PodMetrics.historical, Usage{
Timestamp: time.Now(),
CPU: totalCPU,
Expand Down
155 changes: 125 additions & 30 deletions pkg/api/resources/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,146 @@
package resources

import (
"context"
"fmt"
"log"
"strings"
"testing"

"github.com/stretchr/testify/require"
coreV1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
metricsv1beta1 "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"
)

func TestPodMetrics(t *testing.T) {
pm := NewPodMetrics()

// Test GetCount
require.Equal(t, 0, pm.GetCount())
t.Run("Initial State", func(t *testing.T) {
require.Equal(t, 0, pm.GetCount())
cpu, mem := pm.GetUsage()
require.Equal(t, 0.0, cpu)
require.Equal(t, 0.0, mem)
require.Empty(t, pm.GetHistoricalUsage())
})

// Test GetUsage
cpu, mem := pm.GetUsage()
require.Equal(t, 0.0, cpu)
require.Equal(t, 0.0, mem)
t.Run("Add Metrics", func(t *testing.T) {
metric1 := &unstructured.Unstructured{}
metric1.SetNamespace("default")
metric1.SetName("metric1")
pm.metrics["metric1"] = metric1

// Test GetHistoricalUsage
historical := pm.GetHistoricalUsage()
require.Empty(t, historical)
metric2 := &unstructured.Unstructured{}
metric2.SetNamespace("kube-system")
metric2.SetName("metric2")
pm.metrics["metric2"] = metric2

// Add some metrics
metric1 := &unstructured.Unstructured{}
metric1.SetNamespace("default")
metric1.SetName("metric1")
pm.metrics["metric1"] = metric1
require.Equal(t, 2, pm.GetCount())
})

metric2 := &unstructured.Unstructured{}
metric2.SetNamespace("kube-system")
metric2.SetName("metric2")
pm.metrics["metric2"] = metric2
t.Run("GetAll Metrics", func(t *testing.T) {
allMetrics := pm.GetAll("", "")
require.Len(t, allMetrics, 2)

// Test GetCount after adding metrics
require.Equal(t, 2, pm.GetCount())
defaultMetrics := pm.GetAll("default", "")
require.Len(t, defaultMetrics, 1)
require.Equal(t, "metric1", defaultMetrics[0].GetName())

// Test GetAll without namespace filter
allMetrics := pm.GetAll("", "")
require.Len(t, allMetrics, 2)
kubeSystemMetrics := pm.GetAll("kube-system", "")
require.Len(t, kubeSystemMetrics, 1)
require.Equal(t, "metric2", kubeSystemMetrics[0].GetName())
})
}

// CustomFakeNodeMetricsInterface implements a fake NodeMetricsInterface
type CustomFakeNodeMetricsInterface struct {
Err error
}

func (f *CustomFakeNodeMetricsInterface) List(ctx context.Context, opts metav1.ListOptions) (*v1beta1.NodeMetricsList, error) {
return nil, f.Err
}

// We don't need these methods for this test, so we'll leave them unimplemented
func (f *CustomFakeNodeMetricsInterface) Get(ctx context.Context, name string, options metav1.GetOptions) (*v1beta1.NodeMetrics, error) {
return nil, nil
}

func (f *CustomFakeNodeMetricsInterface) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
return nil, nil
}

// CustomFakeMetricsV1beta1Client implements a fake MetricsV1beta1Interface
type CustomFakeMetricsV1beta1Client struct {
FakeNodeMetrics *CustomFakeNodeMetricsInterface
}

func (f *CustomFakeMetricsV1beta1Client) NodeMetricses() metricsv1beta1.NodeMetricsInterface {
return f.FakeNodeMetrics
}

// We don't need these methods for this test, so we'll leave them unimplemented
func (f *CustomFakeMetricsV1beta1Client) PodMetricses(namespace string) metricsv1beta1.PodMetricsInterface {
return nil
}

// Test GetAll with namespace filter
defaultMetrics := pm.GetAll("default", "")
require.Len(t, defaultMetrics, 1)
require.Equal(t, "metric1", defaultMetrics[0].GetName())
func (f *CustomFakeMetricsV1beta1Client) RESTClient() rest.Interface {
return nil
}

func TestCollectMetrics(t *testing.T) {

expectedError := fmt.Errorf("custom error: unable to list node metrics")

fakeNodeMetrics := &CustomFakeNodeMetricsInterface{Err: expectedError}
fakeMetricsClient := &CustomFakeMetricsV1beta1Client{FakeNodeMetrics: fakeNodeMetrics}

// Create a test Pods
podGVK := coreV1.SchemeGroupVersion.WithKind("Pod")
pods := &ResourceList{
Resources: make(map[string]*unstructured.Unstructured),
SparseResources: make(map[string]*unstructured.Unstructured),
Changes: make(chan struct{}, 1),
HasSynced: nil,
gvk: podGVK,
CRDExists: true,
}
podMetrics := NewPodMetrics()

// Create a Cache instance with the mock Pods
cache := &Cache{
Pods: pods,
PodMetrics: podMetrics,
}

ctx := context.TODO()

logOutput := &logCapture{}
log.SetOutput(logOutput)

cache.collectMetrics(ctx, fakeMetricsClient)

require.Equal(t, cache.PodMetrics.current.CPU, float64(-1))
require.Equal(t, cache.PodMetrics.current.Memory, float64(-1))
require.Equal(t, cache.PodMetrics.historical[0].CPU, float64(0))
require.Equal(t, cache.PodMetrics.historical[0].Memory, float64(0))

require.Contains(t, logOutput.String(), expectedError.Error())
}

type logCapture struct {
logs []string
}

func (lc *logCapture) Write(p []byte) (n int, err error) {
lc.logs = append(lc.logs, string(p))
return len(p), nil
}

kubeSystemMetrics := pm.GetAll("kube-system", "")
require.Len(t, kubeSystemMetrics, 1)
require.Equal(t, "metric2", kubeSystemMetrics[0].GetName())
func (lc *logCapture) String() string {
return strings.Join(lc.logs, "")
}
10 changes: 9 additions & 1 deletion ui/src/lib/features/k8s/cluster-overview/component.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,15 @@
if (clusterData && Object.keys(clusterData).length > 0) {
const { cpuCapacity, currentUsage, historicalUsage, memoryCapacity } = clusterData
const { CPU, Memory } = currentUsage
let { CPU, Memory } = currentUsage
// Handle case where CPU or Memory is -1 indicating metrics server is not available. Don't want to display negative values
if (CPU == -1) {
CPU = 0
}
if (Memory == -1) {
Memory = 0
}
cpuPercentage = calculatePercentage(CPU, cpuCapacity)
memoryPercentage = calculatePercentage(Memory, memoryCapacity)
Expand Down

0 comments on commit 6bb9728

Please sign in to comment.