Skip to content

Commit

Permalink
chore: cluster overview update to use single stream (#517)
Browse files Browse the repository at this point in the history
  • Loading branch information
decleaver authored Nov 6, 2024
1 parent f2e8f54 commit 5e65944
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 198 deletions.
221 changes: 142 additions & 79 deletions src/pkg/api/monitor/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

type ClusterData struct {
TotalPods int `json:"totalPods"`
TotalNodes int `json:"totalNodes"`
CPUCapacity float64 `json:"cpuCapacity"`
MemoryCapacity float64 `json:"memoryCapacity"`
CurrentUsage resources.Usage `json:"currentUsage"`
HistoricalUsage []resources.Usage `json:"historicalUsage"`
type ClusterOverviewData struct {
TotalPods int `json:"totalPods"`
TotalNodes int `json:"totalNodes"`
CPUCapacity float64 `json:"cpuCapacity"`
MemoryCapacity float64 `json:"memoryCapacity"`
CurrentUsage resources.Usage `json:"currentUsage"`
HistoricalUsage []resources.Usage `json:"historicalUsage"`
PackagesData []map[string]interface{} `json:"packagesData"`
PodsData []map[string]interface{} `json:"podsData"`
}

var udsPackagesFieldsList = []string{".metadata.name", ".status.phase", ".status.endpoints"}
var podsFieldsList = []string{".metadata.name", ".metadata.namespace", ".status.phase"}

func BindClusterOverviewHandler(cache *resources.Cache) func(w http.ResponseWriter, r *http.Request) {
// Return a function that sends the data to the client
return func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -36,76 +41,10 @@ func BindClusterOverviewHandler(cache *resources.Cache) func(w http.ResponseWrit
return
}

// Get the current data
getUsage := func(cache *resources.Cache) {
var clusterData ClusterData

// Timestamp the data
clusterData.CurrentUsage.Timestamp = time.Now()
// Get all available pod metrics
clusterData.TotalPods = len(cache.Pods.Resources)
// Get the current usage
clusterData.CurrentUsage.CPU, clusterData.CurrentUsage.Memory = cache.PodMetrics.GetUsage()
// Get the historical usage
clusterData.HistoricalUsage = cache.PodMetrics.GetHistoricalUsage()

// Load node data
nodes := cache.Nodes.GetSparseResources("", "")

// Calculate the total number of nodes
clusterData.TotalNodes = len(nodes)

// Calculate the total capacity of the cluster
clusterData.CPUCapacity = 0.0
clusterData.MemoryCapacity = 0.0

// Get the capacity of all nodes in the cluster
for _, node := range nodes {
// Get the CPU capacity for the node
cpu, found, err := unstructured.NestedString(node.Object, "status", "capacity", "cpu")
if !found || err != nil {
continue
}
parsed, err := strconv.ParseFloat(cpu, 64)
if err != nil {
continue
}

// Convert from cores to milli-cores
clusterData.CPUCapacity += (parsed * 1000)

// Get the memory capacity for the node
mem, found, err := unstructured.NestedString(node.Object, "status", "capacity", "memory")
if !found || err != nil {
continue
}
parsed, err = parseMemory(mem)
if err != nil {
continue
}

clusterData.MemoryCapacity += parsed
}

// Flush the headers at the end
defer flusher.Flush()

// Convert the data to JSON
data, err := json.Marshal(clusterData)
if err != nil {
fmt.Fprintf(w, "data: Error: %v\n\n", err)
return
}

// Write the data to the response
fmt.Fprintf(w, "data: %s\n\n", data)
}

ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

// Send the initial data
getUsage(cache)
getUsage(cache, w, flusher)
getPackagesData(cache, w, flusher)
getPodsData(cache, w, flusher)

for {
select {
Expand All @@ -115,16 +54,140 @@ func BindClusterOverviewHandler(cache *resources.Cache) func(w http.ResponseWrit

// If there is a pending update, send the data immediately
case <-cache.MetricsChanges:
getUsage(cache)
getUsage(cache, w, flusher)

// Respond to node changes
case <-cache.Nodes.Changes:
getUsage(cache)
getUsage(cache, w, flusher)

case <-cache.UDSPackages.Changes:
getPackagesData(cache, w, flusher)

case <-cache.Pods.Changes:
getPodsData(cache, w, flusher)
}
}
}
}

func initializeClusterData(cache *resources.Cache) ClusterOverviewData {
var clusterData ClusterOverviewData

// Timestamp the data
clusterData.CurrentUsage.Timestamp = time.Now()
// Get all available pod metrics
clusterData.TotalPods = len(cache.Pods.Resources)
// Get the current usage
clusterData.CurrentUsage.CPU, clusterData.CurrentUsage.Memory = cache.PodMetrics.GetUsage()
// Get the historical usage
clusterData.HistoricalUsage = cache.PodMetrics.GetHistoricalUsage()

return clusterData
}

func getUsage(cache *resources.Cache, w http.ResponseWriter, flusher http.Flusher) {
clusterData := initializeClusterData(cache)

// Load node data
nodes := cache.Nodes.GetSparseResources("", "")

// Calculate the total number of nodes
clusterData.TotalNodes = len(nodes)

// Calculate the total capacity of the cluster
clusterData.CPUCapacity = 0.0
clusterData.MemoryCapacity = 0.0

// Get the capacity of all nodes in the cluster
for _, node := range nodes {
// Get the CPU capacity for the node
cpu, found, err := unstructured.NestedString(node.Object, "status", "capacity", "cpu")
if !found || err != nil {
continue
}
parsed, err := strconv.ParseFloat(cpu, 64)
if err != nil {
continue
}

// Convert from cores to milli-cores
clusterData.CPUCapacity += (parsed * 1000)

// Get the memory capacity for the node
mem, found, err := unstructured.NestedString(node.Object, "status", "capacity", "memory")
if !found || err != nil {
continue
}
parsed, err = parseMemory(mem)
if err != nil {
continue
}

clusterData.MemoryCapacity += parsed
}

defer flusher.Flush()

// Convert the data to JSON
data, err := json.Marshal(clusterData)
if err != nil {
fmt.Fprintf(w, "event: usageDataUpdate\ndata: Error: %v\n\n", err)
return
}

// Write the data to the response
fmt.Fprintf(w, "event: usageDataUpdate\ndata: %s\n\n", data)
}

func getPackagesData(cache *resources.Cache, w http.ResponseWriter, flusher http.Flusher) {
var clusterData ClusterOverviewData
if cache.UDSPackages == nil {
return
}
var data []unstructured.Unstructured = cache.UDSPackages.GetResources("", "")
var fieldsList = udsPackagesFieldsList

var filteredData = rest.FilterItemsByFields(data, fieldsList)

clusterData.PackagesData = filteredData

defer flusher.Flush()

// Convert the data to JSON
payload, err := json.Marshal(clusterData)
if err != nil {
fmt.Fprintf(w, "event: packagesDataUpdate\ndata: Error: %v\n\n", err)
return
}

// Write the data to the response
fmt.Fprintf(w, "event: packagesDataUpdate\ndata: %s\n\n", payload)
}

func getPodsData(cache *resources.Cache, w http.ResponseWriter, flusher http.Flusher) {
var clusterData ClusterOverviewData
if cache.Pods == nil {
return
}
var data []unstructured.Unstructured = cache.Pods.GetResources("", "")
var fieldsList = podsFieldsList

var filteredData = rest.FilterItemsByFields(data, fieldsList)

clusterData.PodsData = filteredData

defer flusher.Flush()

// Convert the data to JSON
payload, err := json.Marshal(clusterData)
if err != nil {
fmt.Fprintf(w, "event: podsDataUpdate\ndata: Error: %v\n\n", err)
return
}

// Write the data to the response
fmt.Fprintf(w, "event: podsDataUpdate\ndata: %s\n\n", payload)
}

func parseMemory(memoryString string) (float64, error) {
// Remove the 'i' suffix if present
memoryString = strings.TrimSuffix(memoryString, "i")
Expand Down
35 changes: 35 additions & 0 deletions src/pkg/api/monitor/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,42 @@ func TestBindClusterOverviewHandler(t *testing.T) {
Resources: pods,
}

// create resource list for UDS Packages
udsPackages := map[string]*unstructured.Unstructured{
"package1": {
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"name": "package1",
},
"status": map[string]interface{}{
"phase": "Running",
"endpoints": "1",
},
},
},
"package2": {
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"name": "package2",
},
"status": map[string]interface{}{
"phase": "Running",
"endpoints": "1",
},
},
},
}

udsPackagesResourceList := resources.ResourceList{
Resources: udsPackages,
}

// Create a test cache
cache := &resources.Cache{}
cache.PodMetrics = podMetrics
cache.Pods = &podResourceList
cache.Nodes = &nodeResourceList
cache.UDSPackages = &udsPackagesResourceList

// Create a request to pass to our handler
req, err := http.NewRequest("GET", "/cluster-overview", nil)
Expand Down Expand Up @@ -92,4 +123,8 @@ func TestBindClusterOverviewHandler(t *testing.T) {
// Check if the response body contains expected data
expectedSubstring := `"totalPods":2`
require.Contains(t, rr.Body.String(), expectedSubstring)

// Check UDS Package data
expectedSubstring = "{\"name\":\"package1\"}"
require.Contains(t, rr.Body.String(), expectedSubstring)
}
8 changes: 4 additions & 4 deletions src/pkg/api/rest/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ func jsonMarshal(payload any, fieldsList []string) ([]byte, error) {
switch payload := payload.(type) {
// Handle single resource
case unstructured.Unstructured:
filteredItem := filterItemsByFields([]unstructured.Unstructured{payload}, fieldsList)
filteredItem := FilterItemsByFields([]unstructured.Unstructured{payload}, fieldsList)
data, err = json.Marshal(filteredItem[0])

// Handle multiple resources
case []unstructured.Unstructured:
filteredItems := filterItemsByFields(payload, fieldsList)
filteredItems := FilterItemsByFields(payload, fieldsList)
data, err = json.Marshal(filteredItems)

default:
Expand All @@ -45,10 +45,10 @@ func jsonMarshal(payload any, fieldsList []string) ([]byte, error) {
return data, nil
}

// filterItemsByFields filters the given items based on the specified field paths
// FilterItemsByFields filters the given items based on the specified field paths
// - It takes a slice of unstructured.Unstructured and a slice of field paths.
// - It returns a slice of maps, each representing a filtered item.
func filterItemsByFields(items []unstructured.Unstructured, fieldPaths []string) []map[string]interface{} {
func FilterItemsByFields(items []unstructured.Unstructured, fieldPaths []string) []map[string]interface{} {
var filtered []map[string]interface{}

// Iterate over each item and filter the fields
Expand Down
2 changes: 1 addition & 1 deletion src/pkg/api/rest/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func TestFilterItemsByFields(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := filterItemsByFields(tt.items, tt.fieldPaths)
got := FilterItemsByFields(tt.items, tt.fieldPaths)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("filterItemsByFields() = %v, want %v", got, tt.want)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@
$: {
transformedPackagesList = []
udsPackages.forEach((service) => {
let name = service.metadata.name
let endpoint = service.status.endpoints ? service.status.endpoints[0] : ''
if (udsPackages) {
udsPackages.forEach((service) => {
let name = service.metadata.name
let endpoint = service.status && service.status.endpoints ? service.status.endpoints[0] : ''
if (endpoint) {
transformedPackagesList = [
...transformedPackagesList,
{
name,
endpoint,
},
]
}
})
if (endpoint) {
transformedPackagesList = [
...transformedPackagesList,
{
name,
endpoint,
},
]
}
})
}
}
function openLink(endpoint: string) {
Expand Down
Loading

0 comments on commit 5e65944

Please sign in to comment.