diff --git a/src/pkg/api/monitor/cluster.go b/src/pkg/api/monitor/cluster.go index fe116976d..52124cced 100644 --- a/src/pkg/api/monitor/cluster.go +++ b/src/pkg/api/monitor/cluster.go @@ -16,15 +16,20 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) -type ClusterData struct { - TotalPods int `json:"totalPods"` - TotalNodes int `json:"totalNodes"` - CPUCapacity float64 `json:"cpuCapacity"` - MemoryCapacity float64 `json:"memoryCapacity"` - CurrentUsage resources.Usage `json:"currentUsage"` - HistoricalUsage []resources.Usage `json:"historicalUsage"` +type ClusterOverviewData struct { + TotalPods int `json:"totalPods"` + TotalNodes int `json:"totalNodes"` + CPUCapacity float64 `json:"cpuCapacity"` + MemoryCapacity float64 `json:"memoryCapacity"` + CurrentUsage resources.Usage `json:"currentUsage"` + HistoricalUsage []resources.Usage `json:"historicalUsage"` + PackagesData []map[string]interface{} `json:"packagesData"` + PodsData []map[string]interface{} `json:"podsData"` } +var udsPackagesFieldsList = []string{".metadata.name", ".status.phase", ".status.endpoints"} +var podsFieldsList = []string{".metadata.name", ".metadata.namespace", ".status.phase"} + func BindClusterOverviewHandler(cache *resources.Cache) func(w http.ResponseWriter, r *http.Request) { // Return a function that sends the data to the client return func(w http.ResponseWriter, r *http.Request) { @@ -36,76 +41,10 @@ func BindClusterOverviewHandler(cache *resources.Cache) func(w http.ResponseWrit return } - // Get the current data - getUsage := func(cache *resources.Cache) { - var clusterData ClusterData - - // Timestamp the data - clusterData.CurrentUsage.Timestamp = time.Now() - // Get all available pod metrics - clusterData.TotalPods = len(cache.Pods.Resources) - // Get the current usage - clusterData.CurrentUsage.CPU, clusterData.CurrentUsage.Memory = cache.PodMetrics.GetUsage() - // Get the historical usage - clusterData.HistoricalUsage = cache.PodMetrics.GetHistoricalUsage() - - // Load node data - nodes := cache.Nodes.GetSparseResources("", "") - - // Calculate the total number of nodes - clusterData.TotalNodes = len(nodes) - - // Calculate the total capacity of the cluster - clusterData.CPUCapacity = 0.0 - clusterData.MemoryCapacity = 0.0 - - // Get the capacity of all nodes in the cluster - for _, node := range nodes { - // Get the CPU capacity for the node - cpu, found, err := unstructured.NestedString(node.Object, "status", "capacity", "cpu") - if !found || err != nil { - continue - } - parsed, err := strconv.ParseFloat(cpu, 64) - if err != nil { - continue - } - - // Convert from cores to milli-cores - clusterData.CPUCapacity += (parsed * 1000) - - // Get the memory capacity for the node - mem, found, err := unstructured.NestedString(node.Object, "status", "capacity", "memory") - if !found || err != nil { - continue - } - parsed, err = parseMemory(mem) - if err != nil { - continue - } - - clusterData.MemoryCapacity += parsed - } - - // Flush the headers at the end - defer flusher.Flush() - - // Convert the data to JSON - data, err := json.Marshal(clusterData) - if err != nil { - fmt.Fprintf(w, "data: Error: %v\n\n", err) - return - } - - // Write the data to the response - fmt.Fprintf(w, "data: %s\n\n", data) - } - - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - // Send the initial data - getUsage(cache) + getUsage(cache, w, flusher) + getPackagesData(cache, w, flusher) + getPodsData(cache, w, flusher) for { select { @@ -115,16 +54,140 @@ func BindClusterOverviewHandler(cache *resources.Cache) func(w http.ResponseWrit // If there is a pending update, send the data immediately case <-cache.MetricsChanges: - getUsage(cache) + getUsage(cache, w, flusher) - // Respond to node changes case <-cache.Nodes.Changes: - getUsage(cache) + getUsage(cache, w, flusher) + + case <-cache.UDSPackages.Changes: + getPackagesData(cache, w, flusher) + + case <-cache.Pods.Changes: + getPodsData(cache, w, flusher) } } } } +func initializeClusterData(cache *resources.Cache) ClusterOverviewData { + var clusterData ClusterOverviewData + + // Timestamp the data + clusterData.CurrentUsage.Timestamp = time.Now() + // Get all available pod metrics + clusterData.TotalPods = len(cache.Pods.Resources) + // Get the current usage + clusterData.CurrentUsage.CPU, clusterData.CurrentUsage.Memory = cache.PodMetrics.GetUsage() + // Get the historical usage + clusterData.HistoricalUsage = cache.PodMetrics.GetHistoricalUsage() + + return clusterData +} + +func getUsage(cache *resources.Cache, w http.ResponseWriter, flusher http.Flusher) { + clusterData := initializeClusterData(cache) + + // Load node data + nodes := cache.Nodes.GetSparseResources("", "") + + // Calculate the total number of nodes + clusterData.TotalNodes = len(nodes) + + // Calculate the total capacity of the cluster + clusterData.CPUCapacity = 0.0 + clusterData.MemoryCapacity = 0.0 + + // Get the capacity of all nodes in the cluster + for _, node := range nodes { + // Get the CPU capacity for the node + cpu, found, err := unstructured.NestedString(node.Object, "status", "capacity", "cpu") + if !found || err != nil { + continue + } + parsed, err := strconv.ParseFloat(cpu, 64) + if err != nil { + continue + } + + // Convert from cores to milli-cores + clusterData.CPUCapacity += (parsed * 1000) + + // Get the memory capacity for the node + mem, found, err := unstructured.NestedString(node.Object, "status", "capacity", "memory") + if !found || err != nil { + continue + } + parsed, err = parseMemory(mem) + if err != nil { + continue + } + + clusterData.MemoryCapacity += parsed + } + + defer flusher.Flush() + + // Convert the data to JSON + data, err := json.Marshal(clusterData) + if err != nil { + fmt.Fprintf(w, "event: usageDataUpdate\ndata: Error: %v\n\n", err) + return + } + + // Write the data to the response + fmt.Fprintf(w, "event: usageDataUpdate\ndata: %s\n\n", data) +} + +func getPackagesData(cache *resources.Cache, w http.ResponseWriter, flusher http.Flusher) { + var clusterData ClusterOverviewData + if cache.UDSPackages == nil { + return + } + var data []unstructured.Unstructured = cache.UDSPackages.GetResources("", "") + var fieldsList = udsPackagesFieldsList + + var filteredData = rest.FilterItemsByFields(data, fieldsList) + + clusterData.PackagesData = filteredData + + defer flusher.Flush() + + // Convert the data to JSON + payload, err := json.Marshal(clusterData) + if err != nil { + fmt.Fprintf(w, "event: packagesDataUpdate\ndata: Error: %v\n\n", err) + return + } + + // Write the data to the response + fmt.Fprintf(w, "event: packagesDataUpdate\ndata: %s\n\n", payload) +} + +func getPodsData(cache *resources.Cache, w http.ResponseWriter, flusher http.Flusher) { + var clusterData ClusterOverviewData + if cache.Pods == nil { + return + } + var data []unstructured.Unstructured = cache.Pods.GetResources("", "") + var fieldsList = podsFieldsList + + var filteredData = rest.FilterItemsByFields(data, fieldsList) + + clusterData.PodsData = filteredData + + defer flusher.Flush() + + // Convert the data to JSON + payload, err := json.Marshal(clusterData) + if err != nil { + fmt.Fprintf(w, "event: podsDataUpdate\ndata: Error: %v\n\n", err) + return + } + + // Write the data to the response + fmt.Fprintf(w, "event: podsDataUpdate\ndata: %s\n\n", payload) +} + func parseMemory(memoryString string) (float64, error) { // Remove the 'i' suffix if present memoryString = strings.TrimSuffix(memoryString, "i") diff --git a/src/pkg/api/monitor/cluster_test.go b/src/pkg/api/monitor/cluster_test.go index 11a24a217..8ae2f60a8 100644 --- a/src/pkg/api/monitor/cluster_test.go +++ b/src/pkg/api/monitor/cluster_test.go @@ -58,11 +58,42 @@ func TestBindClusterOverviewHandler(t *testing.T) { Resources: pods, } + // create resource list for UDS Packages + udsPackages := map[string]*unstructured.Unstructured{ + "package1": { + Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "package1", + }, + "status": map[string]interface{}{ + "phase": "Running", + "endpoints": "1", + }, + }, + }, + "package2": { + Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "package2", + }, + "status": map[string]interface{}{ + "phase": "Running", + "endpoints": "1", + }, + }, + }, + } + + udsPackagesResourceList := resources.ResourceList{ + Resources: udsPackages, + } + // Create a test cache cache := &resources.Cache{} cache.PodMetrics = podMetrics cache.Pods = &podResourceList cache.Nodes = &nodeResourceList + cache.UDSPackages = &udsPackagesResourceList // Create a request to pass to our handler req, err := http.NewRequest("GET", "/cluster-overview", nil) @@ -92,4 +123,8 @@ func TestBindClusterOverviewHandler(t *testing.T) { // Check if the response body contains expected data expectedSubstring := `"totalPods":2` require.Contains(t, rr.Body.String(), expectedSubstring) + + // Check UDS Package data + expectedSubstring = "{\"name\":\"package1\"}" + require.Contains(t, rr.Body.String(), expectedSubstring) } diff --git a/src/pkg/api/rest/filter.go b/src/pkg/api/rest/filter.go index d12c2f6ce..07144325c 100644 --- a/src/pkg/api/rest/filter.go +++ b/src/pkg/api/rest/filter.go @@ -22,12 +22,12 @@ func jsonMarshal(payload any, fieldsList []string) ([]byte, error) { switch payload := payload.(type) { // Handle single resource case unstructured.Unstructured: - filteredItem := filterItemsByFields([]unstructured.Unstructured{payload}, fieldsList) + filteredItem := FilterItemsByFields([]unstructured.Unstructured{payload}, fieldsList) data, err = json.Marshal(filteredItem[0]) // Handle multiple resources case []unstructured.Unstructured: - filteredItems := filterItemsByFields(payload, fieldsList) + filteredItems := FilterItemsByFields(payload, fieldsList) data, err = json.Marshal(filteredItems) default: @@ -45,10 +45,10 @@ func jsonMarshal(payload any, fieldsList []string) ([]byte, error) { return data, nil } -// filterItemsByFields filters the given items based on the specified field paths +// FilterItemsByFields filters the given items based on the specified field paths // - It takes a slice of unstructured.Unstructured and a slice of field paths. // - It returns a slice of maps, each representing a filtered item. -func filterItemsByFields(items []unstructured.Unstructured, fieldPaths []string) []map[string]interface{} { +func FilterItemsByFields(items []unstructured.Unstructured, fieldPaths []string) []map[string]interface{} { var filtered []map[string]interface{} // Iterate over each item and filter the fields diff --git a/src/pkg/api/rest/filter_test.go b/src/pkg/api/rest/filter_test.go index 43dbdbab8..feaa81a7a 100644 --- a/src/pkg/api/rest/filter_test.go +++ b/src/pkg/api/rest/filter_test.go @@ -251,7 +251,7 @@ func TestFilterItemsByFields(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := filterItemsByFields(tt.items, tt.fieldPaths) + got := FilterItemsByFields(tt.items, tt.fieldPaths) if !reflect.DeepEqual(got, tt.want) { t.Errorf("filterItemsByFields() = %v, want %v", got, tt.want) } diff --git a/ui/src/lib/components/ClusterOverviewWidgets/Applications/component.svelte b/ui/src/lib/components/ClusterOverviewWidgets/Applications/component.svelte index f19a9811a..97eb6b49e 100644 --- a/ui/src/lib/components/ClusterOverviewWidgets/Applications/component.svelte +++ b/ui/src/lib/components/ClusterOverviewWidgets/Applications/component.svelte @@ -12,20 +12,22 @@ $: { transformedPackagesList = [] - udsPackages.forEach((service) => { - let name = service.metadata.name - let endpoint = service.status.endpoints ? service.status.endpoints[0] : '' + if (udsPackages) { + udsPackages.forEach((service) => { + let name = service.metadata.name + let endpoint = service.status && service.status.endpoints ? service.status.endpoints[0] : '' - if (endpoint) { - transformedPackagesList = [ - ...transformedPackagesList, - { - name, - endpoint, - }, - ] - } - }) + if (endpoint) { + transformedPackagesList = [ + ...transformedPackagesList, + { + name, + endpoint, + }, + ] + } + }) + } } function openLink(endpoint: string) { diff --git a/ui/src/lib/components/ClusterOverviewWidgets/CoreServices/component.svelte b/ui/src/lib/components/ClusterOverviewWidgets/CoreServices/component.svelte index 934965f9d..0f6360878 100644 --- a/ui/src/lib/components/ClusterOverviewWidgets/CoreServices/component.svelte +++ b/ui/src/lib/components/ClusterOverviewWidgets/CoreServices/component.svelte @@ -38,20 +38,22 @@ .filter((pod: V1Pod) => pod?.metadata?.name?.match(/^istiod/)) .filter((pod) => pod.status && pod.status?.phase === 'Running').length === 1 - coreServices.forEach((service) => { - let name = coreServicesMapping[service.metadata.name] - let status = service.status ? service.status.phase : 'Pending' + if (coreServices) { + coreServices.forEach((service) => { + let name = coreServicesMapping[service.metadata.name] + let status = service.status ? service.status.phase : 'Pending' - if (Object.keys(coreServicesMapping).includes(service.metadata.name)) { - transformedCoreServiceList = [ - ...transformedCoreServiceList, - { - name, - status, - }, - ] - } - }) + if (Object.keys(coreServicesMapping).includes(service.metadata.name)) { + transformedCoreServiceList = [ + ...transformedCoreServiceList, + { + name, + status, + }, + ] + } + }) + } // If we have pepr uds core pods then we have a Policy Engine & Operator in the cluster if (hasPolicyEngineOperator) { diff --git a/ui/src/lib/features/k8s/cluster-overview/component.svelte b/ui/src/lib/features/k8s/cluster-overview/component.svelte index 431a5140c..60a6a563e 100644 --- a/ui/src/lib/features/k8s/cluster-overview/component.svelte +++ b/ui/src/lib/features/k8s/cluster-overview/component.svelte @@ -36,115 +36,109 @@ Timestamp: new Date().toISOString(), }, historicalUsage: [], + packagesData: [], + podsData: [], } - let cpuPercentage = 0 - let memoryPercentage = 0 - let gbUsed = 0 - let gbCapacity = 0 - let cpuUsed = 0 - let formattedCpuCapacity = 0 - let onMessageCount = 0 - let myChart: Chart - const description = resourceDescriptions['Events'] let udsPackages: ClusterOverviewUDSPackageType[] = [] let pods: V1Pod[] = [] + const description = resourceDescriptions['Events'] + + let myChart: Chart | null = null + let canvasElement: HTMLCanvasElement + let metricsServerAvailable = true let metricsServerNewlyAvailable = false + let previousMetricsServerState = metricsServerAvailable - onMount(() => { - let ctx = document.getElementById('chartjs-el') as HTMLCanvasElement - const overviewPath: string = '/api/v1/monitor/cluster-overview' - const udsPackagesPath: string = - '/api/v1/resources/configs/uds-packages?fields=.metadata.name,.status.phase,.status.endpoints' - const podsPath: string = '/api/v1/resources/workloads/pods?fields=.metadata.name,.metadata.namespace,.status.phase' + $: metricsServerAvailable = !(clusterData.currentUsage.CPU === -1 && clusterData.currentUsage.Memory === -1) + $: { + metricsServerNewlyAvailable = !previousMetricsServerState && metricsServerAvailable + previousMetricsServerState = metricsServerAvailable + } - const overview = new EventSource(overviewPath) - const udsPackagesEvent = new EventSource(udsPackagesPath) - const podsEvent = new EventSource(podsPath) + $: cpuPercentage = calculatePercentage(clusterData.currentUsage.CPU, clusterData.cpuCapacity) + $: memoryPercentage = calculatePercentage(clusterData.currentUsage.Memory, clusterData.memoryCapacity) + $: gbUsed = mebibytesToGigabytes(clusterData.currentUsage.Memory) + $: gbCapacity = mebibytesToGigabytes(clusterData.memoryCapacity) + $: cpuUsed = millicoresToCores(clusterData.currentUsage.CPU) + $: formattedCpuCapacity = millicoresToCores(clusterData.cpuCapacity) + + $: if (canvasElement && !myChart) { + myChart = new Chart(canvasElement, { + type: 'line', + data: getChartData(metricsServerAvailable), + options: getChartOptions(metricsServerAvailable), + }) + } - udsPackagesEvent.onmessage = (event) => { - let response = JSON.parse(event.data) as ClusterOverviewUDSPackageType[] + $: if (myChart && clusterData.historicalUsage) { + if (metricsServerNewlyAvailable) { + myChart.data = getChartData(metricsServerAvailable) + myChart.options = getChartOptions(metricsServerAvailable) + } - // If we have an error i.e. response.error === 'crd not found', set to an empty array - if (Array.isArray(response)) { - udsPackages = response - } else { - udsPackages = [] - } + myChart.data.labels = clusterData.historicalUsage.map((point) => [formatTime(point.Timestamp)]) + + if (metricsServerAvailable) { + myChart.data.datasets[0].data = clusterData.historicalUsage.map((point) => point.Memory / (1024 * 1024 * 1024)) + myChart.data.datasets[1].data = clusterData.historicalUsage.map((point) => point.CPU / 1000) + } else { + myChart.data = getChartData(false) + myChart.options = getChartOptions(false) } - podsEvent.onmessage = (event) => { - pods = JSON.parse(event.data) as V1Pod[] + myChart.update() + } + + function updateClusterData(newData: Partial) { + clusterData = { + ...clusterData, + ...newData, } + } - overview.onmessage = (event) => { - clusterData = JSON.parse(event.data) as ClusterData - - if (clusterData && Object.keys(clusterData).length > 0) { - const { cpuCapacity, currentUsage, historicalUsage, memoryCapacity } = clusterData - let { CPU, Memory } = currentUsage - - if (CPU === -1 && Memory === -1) { - metricsServerAvailable = false - } else { - if (!metricsServerAvailable) { - metricsServerNewlyAvailable = true - } - metricsServerAvailable = true - } - - cpuPercentage = calculatePercentage(CPU, cpuCapacity) - memoryPercentage = calculatePercentage(Memory, memoryCapacity) - gbUsed = mebibytesToGigabytes(Memory) - gbCapacity = mebibytesToGigabytes(memoryCapacity) - cpuUsed = millicoresToCores(CPU) - formattedCpuCapacity = millicoresToCores(cpuCapacity) - - if (onMessageCount === 0) { - myChart = new Chart(ctx, { - type: 'line', - data: getChartData(metricsServerAvailable), - options: getChartOptions(metricsServerAvailable), - }) - } - - // handle case when metrics server becomes available while page is up - if (metricsServerNewlyAvailable) { - metricsServerNewlyAvailable = false - myChart.data = getChartData(metricsServerAvailable) - myChart.options = getChartOptions(metricsServerAvailable) - } - - // on each message manually update the graph - myChart.data.labels = historicalUsage.map((point) => [formatTime(point.Timestamp)]) - - // If metrics server is not available, dont update the graph - if (metricsServerAvailable) { - myChart.data.datasets[0].data = historicalUsage.map((point) => point.Memory / (1024 * 1024 * 1024)) - myChart.data.datasets[1].data = historicalUsage.map((point) => point.CPU / 1000) - } else { - // handle case where metrics server was available and now is not - myChart.data = getChartData(false) - myChart.options = getChartOptions(false) - } - myChart.update() - onMessageCount++ + onMount(() => { + const overviewPath: string = '/api/v1/monitor/cluster-overview' + const overview = new EventSource(overviewPath) + + overview.addEventListener('usageDataUpdate', function (event) { + const newData = JSON.parse(event.data) as ClusterData + if (newData && Object.keys(newData).length > 0) { + updateClusterData({ + totalNodes: newData.totalNodes, + totalPods: newData.totalPods, + cpuCapacity: newData.cpuCapacity, + memoryCapacity: newData.memoryCapacity, + currentUsage: newData.currentUsage, + historicalUsage: newData.historicalUsage, + }) } - } + }) + + overview.addEventListener('packagesDataUpdate', function (event) { + const newData = JSON.parse(event.data) as ClusterData + udsPackages = [...newData.packagesData] + updateClusterData({ packagesData: newData.packagesData }) + }) + + overview.addEventListener('podsDataUpdate', function (event) { + const newData = JSON.parse(event.data) as ClusterData + pods = [...newData.podsData] + updateClusterData({ podsData: newData.podsData }) + }) Chart.register({}) return () => { - onMessageCount = 0 overview.close() - udsPackagesEvent.close() - podsEvent.close() - myChart.destroy() + if (myChart) { + myChart.destroy() + myChart = null + } } }) - // Chart.js settings Chart.defaults.datasets.line.tension = 0.4 @@ -212,7 +206,7 @@
- +
diff --git a/ui/src/lib/features/k8s/types.ts b/ui/src/lib/features/k8s/types.ts index 4a7892d46..c6494a449 100644 --- a/ui/src/lib/features/k8s/types.ts +++ b/ui/src/lib/features/k8s/types.ts @@ -102,4 +102,22 @@ export type ClusterData = { Memory: number Timestamp: string }[] + packagesData: { + metadata: { + name: string + } + status: { + phase: UDSPackageStatus + endpoints: string[] + } + }[] + podsData: { + metadata: { + name: string + namespace: string + } + status: { + phase: string + } + }[] } diff --git a/ui/src/lib/types.ts b/ui/src/lib/types.ts index 9e2487f1c..3dcaf3f0e 100644 --- a/ui/src/lib/types.ts +++ b/ui/src/lib/types.ts @@ -33,7 +33,6 @@ export type PeprEvent = { export type ClusterOverviewUDSPackageType = { metadata: { name: string - namespace: string } status: { phase: UDSPackageStatus