From 6bb9728d2e01de73b6462b420c384c64580524e7 Mon Sep 17 00:00:00 2001 From: decleaver <85503726+decleaver@users.noreply.github.com> Date: Wed, 9 Oct 2024 19:35:11 -0600 Subject: [PATCH] chore(api): updates for handling unavailable metrics server (#421) Co-authored-by: UncleGedd <42304551+UncleGedd@users.noreply.github.com> --- pkg/api/resources/cache.go | 2 +- pkg/api/resources/metrics.go | 85 ++++++---- pkg/api/resources/metrics_test.go | 155 ++++++++++++++---- .../k8s/cluster-overview/component.svelte | 10 +- 4 files changed, 188 insertions(+), 64 deletions(-) diff --git a/pkg/api/resources/cache.go b/pkg/api/resources/cache.go index 0ba3f503..7a5cab80 100644 --- a/pkg/api/resources/cache.go +++ b/pkg/api/resources/cache.go @@ -120,7 +120,7 @@ func NewCache(ctx context.Context, clients *client.Clients) (*Cache, error) { } // Start metrics collection - go c.StartMetricsCollection(ctx, clients.MetricsClient) + go c.StartMetricsCollection(ctx, clients.MetricsClient.MetricsV1beta1()) // Stop the informer when the context is done go func() { diff --git a/pkg/api/resources/metrics.go b/pkg/api/resources/metrics.go index 15dd62ad..88efbb97 100644 --- a/pkg/api/resources/metrics.go +++ b/pkg/api/resources/metrics.go @@ -5,14 +5,14 @@ package resources import ( "context" - "fmt" + "log" "sync" "time" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/metrics/pkg/apis/metrics/v1beta1" - "k8s.io/metrics/pkg/client/clientset/versioned" + metricsv1beta1 "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1" ) const MAX_HISTORY_LENGTH = 200 @@ -97,7 +97,7 @@ func (pm *PodMetrics) Delete(podUID string) { } // StartMetricsCollection starts a goroutine to collect metrics for all pods in the cache -func (c *Cache) StartMetricsCollection(ctx context.Context, metricsClient *versioned.Clientset) { +func (c *Cache) StartMetricsCollection(ctx context.Context, metricsClient metricsv1beta1.MetricsV1beta1Interface) { // Collect metrics immediately c.collectMetrics(ctx, metricsClient) @@ -127,46 +127,67 @@ func (c *Cache) CalculateUsage(metrics *v1beta1.PodMetrics) (float64, float64) { return totalCPU, totalMemory } -func (c *Cache) collectMetrics(ctx context.Context, metricsClient *versioned.Clientset) { - // Fetch all pods - pods := c.Pods.GetSparseResources("", "") - +func (c *Cache) collectMetrics(ctx context.Context, metricsClient metricsv1beta1.MetricsV1beta1Interface) { var totalCPU, totalMemory float64 - // Fetch metrics for each pod - for _, pod := range pods { - // Only collect metrics for running pods - phase, _, _ := unstructured.NestedString(pod.Object, "status", "phase") - if phase != "Running" { - continue - } + // Check for metrics server availability + metricsServerAvailable := true + _, err := metricsClient.NodeMetricses().List(ctx, metaV1.ListOptions{}) + if err != nil { + metricsServerAvailable = false + log.Printf("Metrics server is not available: %v", err) + } - // Fetch metrics for the pod - metrics, err := metricsClient.MetricsV1beta1().PodMetricses(pod.GetNamespace()).Get(ctx, pod.GetName(), metaV1.GetOptions{}) - if err != nil { - fmt.Printf("Error fetching metrics for pod %s/%s: %v\n", pod.GetNamespace(), pod.GetName(), err) - continue - } + if metricsServerAvailable { + // Fetch all pods + pods := c.Pods.GetSparseResources("", "") - // Calculate the total CPU and memory usage - cpu, mem := c.CalculateUsage(metrics) - totalCPU += cpu - totalMemory += mem + // Fetch metrics for each pod + for _, pod := range pods { + // Only collect metrics for running pods + phase, _, _ := unstructured.NestedString(pod.Object, "status", "phase") + if phase != "Running" { + continue + } - // Convert the metrics to unstructured - converted, err := ToUnstructured(metrics) - if err != nil { - fmt.Printf("Error converting metrics for pod %s/%s: %v\n", pod.GetNamespace(), pod.GetName(), err) - continue - } + // Fetch metrics for the pod + metrics, err := metricsClient.PodMetricses(pod.GetNamespace()).Get(ctx, pod.GetName(), metaV1.GetOptions{}) + if err != nil { + log.Printf("Error fetching metrics for pod %s/%s: %v\n", pod.GetNamespace(), pod.GetName(), err) + continue + } - // Update the cache with the new metrics - c.PodMetrics.Update(string(pod.GetUID()), converted) + // Calculate the total CPU and memory usage + cpu, mem := c.CalculateUsage(metrics) + totalCPU += cpu + totalMemory += mem + + // Convert the metrics to unstructured + converted, err := ToUnstructured(metrics) + if err != nil { + log.Printf("Error converting metrics for pod %s/%s: %v\n", pod.GetNamespace(), pod.GetName(), err) + continue + } + + // Update the cache with the new metrics + c.PodMetrics.Update(string(pod.GetUID()), converted) + } + } else { + // Set totalCPU and totalMemory to -1 to indicate metrics server is not available + totalCPU = -1 + totalMemory = -1 } // Add the metrics to the cache and historical usage c.PodMetrics.current.CPU = totalCPU c.PodMetrics.current.Memory = totalMemory + + // Make sure CPU and Memory data for historical usage is not negative for historical data + if !metricsServerAvailable { + totalCPU = 0 + totalMemory = 0 + } + c.PodMetrics.historical = append(c.PodMetrics.historical, Usage{ Timestamp: time.Now(), CPU: totalCPU, diff --git a/pkg/api/resources/metrics_test.go b/pkg/api/resources/metrics_test.go index c52b107f..8838615f 100644 --- a/pkg/api/resources/metrics_test.go +++ b/pkg/api/resources/metrics_test.go @@ -6,51 +6,146 @@ package resources import ( + "context" + "fmt" + "log" + "strings" "testing" "github.com/stretchr/testify/require" + coreV1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/rest" + "k8s.io/metrics/pkg/apis/metrics/v1beta1" + metricsv1beta1 "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1" ) func TestPodMetrics(t *testing.T) { pm := NewPodMetrics() - // Test GetCount - require.Equal(t, 0, pm.GetCount()) + t.Run("Initial State", func(t *testing.T) { + require.Equal(t, 0, pm.GetCount()) + cpu, mem := pm.GetUsage() + require.Equal(t, 0.0, cpu) + require.Equal(t, 0.0, mem) + require.Empty(t, pm.GetHistoricalUsage()) + }) - // Test GetUsage - cpu, mem := pm.GetUsage() - require.Equal(t, 0.0, cpu) - require.Equal(t, 0.0, mem) + t.Run("Add Metrics", func(t *testing.T) { + metric1 := &unstructured.Unstructured{} + metric1.SetNamespace("default") + metric1.SetName("metric1") + pm.metrics["metric1"] = metric1 - // Test GetHistoricalUsage - historical := pm.GetHistoricalUsage() - require.Empty(t, historical) + metric2 := &unstructured.Unstructured{} + metric2.SetNamespace("kube-system") + metric2.SetName("metric2") + pm.metrics["metric2"] = metric2 - // Add some metrics - metric1 := &unstructured.Unstructured{} - metric1.SetNamespace("default") - metric1.SetName("metric1") - pm.metrics["metric1"] = metric1 + require.Equal(t, 2, pm.GetCount()) + }) - metric2 := &unstructured.Unstructured{} - metric2.SetNamespace("kube-system") - metric2.SetName("metric2") - pm.metrics["metric2"] = metric2 + t.Run("GetAll Metrics", func(t *testing.T) { + allMetrics := pm.GetAll("", "") + require.Len(t, allMetrics, 2) - // Test GetCount after adding metrics - require.Equal(t, 2, pm.GetCount()) + defaultMetrics := pm.GetAll("default", "") + require.Len(t, defaultMetrics, 1) + require.Equal(t, "metric1", defaultMetrics[0].GetName()) - // Test GetAll without namespace filter - allMetrics := pm.GetAll("", "") - require.Len(t, allMetrics, 2) + kubeSystemMetrics := pm.GetAll("kube-system", "") + require.Len(t, kubeSystemMetrics, 1) + require.Equal(t, "metric2", kubeSystemMetrics[0].GetName()) + }) +} + +// CustomFakeNodeMetricsInterface implements a fake NodeMetricsInterface +type CustomFakeNodeMetricsInterface struct { + Err error +} + +func (f *CustomFakeNodeMetricsInterface) List(ctx context.Context, opts metav1.ListOptions) (*v1beta1.NodeMetricsList, error) { + return nil, f.Err +} + +// We don't need these methods for this test, so we'll leave them unimplemented +func (f *CustomFakeNodeMetricsInterface) Get(ctx context.Context, name string, options metav1.GetOptions) (*v1beta1.NodeMetrics, error) { + return nil, nil +} + +func (f *CustomFakeNodeMetricsInterface) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + return nil, nil +} + +// CustomFakeMetricsV1beta1Client implements a fake MetricsV1beta1Interface +type CustomFakeMetricsV1beta1Client struct { + FakeNodeMetrics *CustomFakeNodeMetricsInterface +} + +func (f *CustomFakeMetricsV1beta1Client) NodeMetricses() metricsv1beta1.NodeMetricsInterface { + return f.FakeNodeMetrics +} + +// We don't need these methods for this test, so we'll leave them unimplemented +func (f *CustomFakeMetricsV1beta1Client) PodMetricses(namespace string) metricsv1beta1.PodMetricsInterface { + return nil +} - // Test GetAll with namespace filter - defaultMetrics := pm.GetAll("default", "") - require.Len(t, defaultMetrics, 1) - require.Equal(t, "metric1", defaultMetrics[0].GetName()) +func (f *CustomFakeMetricsV1beta1Client) RESTClient() rest.Interface { + return nil +} + +func TestCollectMetrics(t *testing.T) { + + expectedError := fmt.Errorf("custom error: unable to list node metrics") + + fakeNodeMetrics := &CustomFakeNodeMetricsInterface{Err: expectedError} + fakeMetricsClient := &CustomFakeMetricsV1beta1Client{FakeNodeMetrics: fakeNodeMetrics} + + // Create a test Pods + podGVK := coreV1.SchemeGroupVersion.WithKind("Pod") + pods := &ResourceList{ + Resources: make(map[string]*unstructured.Unstructured), + SparseResources: make(map[string]*unstructured.Unstructured), + Changes: make(chan struct{}, 1), + HasSynced: nil, + gvk: podGVK, + CRDExists: true, + } + podMetrics := NewPodMetrics() + + // Create a Cache instance with the mock Pods + cache := &Cache{ + Pods: pods, + PodMetrics: podMetrics, + } + + ctx := context.TODO() + + logOutput := &logCapture{} + log.SetOutput(logOutput) + + cache.collectMetrics(ctx, fakeMetricsClient) + + require.Equal(t, cache.PodMetrics.current.CPU, float64(-1)) + require.Equal(t, cache.PodMetrics.current.Memory, float64(-1)) + require.Equal(t, cache.PodMetrics.historical[0].CPU, float64(0)) + require.Equal(t, cache.PodMetrics.historical[0].Memory, float64(0)) + + require.Contains(t, logOutput.String(), expectedError.Error()) +} + +type logCapture struct { + logs []string +} + +func (lc *logCapture) Write(p []byte) (n int, err error) { + lc.logs = append(lc.logs, string(p)) + return len(p), nil +} - kubeSystemMetrics := pm.GetAll("kube-system", "") - require.Len(t, kubeSystemMetrics, 1) - require.Equal(t, "metric2", kubeSystemMetrics[0].GetName()) +func (lc *logCapture) String() string { + return strings.Join(lc.logs, "") } diff --git a/ui/src/lib/features/k8s/cluster-overview/component.svelte b/ui/src/lib/features/k8s/cluster-overview/component.svelte index a56ead0c..38a933d0 100644 --- a/ui/src/lib/features/k8s/cluster-overview/component.svelte +++ b/ui/src/lib/features/k8s/cluster-overview/component.svelte @@ -50,7 +50,15 @@ if (clusterData && Object.keys(clusterData).length > 0) { const { cpuCapacity, currentUsage, historicalUsage, memoryCapacity } = clusterData - const { CPU, Memory } = currentUsage + let { CPU, Memory } = currentUsage + + // Handle case where CPU or Memory is -1 indicating metrics server is not available. Don't want to display negative values + if (CPU == -1) { + CPU = 0 + } + if (Memory == -1) { + Memory = 0 + } cpuPercentage = calculatePercentage(CPU, cpuCapacity) memoryPercentage = calculatePercentage(Memory, memoryCapacity)