diff --git a/installer/helm/chart/volcano/templates/controllers.yaml b/installer/helm/chart/volcano/templates/controllers.yaml index 3fc6cf35f5..ecc5b11c6a 100644 --- a/installer/helm/chart/volcano/templates/controllers.yaml +++ b/installer/helm/chart/volcano/templates/controllers.yaml @@ -59,7 +59,7 @@ rules: verbs: ["get", "create", "delete", "update"] - apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"] resources: ["podgroups", "queues", "queues/status"] - verbs: ["get", "list", "watch", "create", "delete", "update"] + verbs: ["get", "list", "watch", "create", "delete", "update", "patch"] - apiGroups: ["flow.volcano.sh"] resources: ["jobflows", "jobtemplates"] verbs: ["get", "list", "watch", "create", "delete", "update"] diff --git a/pkg/cli/queue/get.go b/pkg/cli/queue/get.go index 057bffda1e..7de390723e 100644 --- a/pkg/cli/queue/get.go +++ b/pkg/cli/queue/get.go @@ -63,21 +63,37 @@ func GetQueue(ctx context.Context) error { return err } - PrintQueue(queue, os.Stdout) + // Although the featuregate called CustomResourceFieldSelectors is enabled by default after v1.31, there are still + // users using k8s versions lower than v1.31. Therefore we can only get all the podgroups from kube-apiserver + // and then filtering them. + pgList, err := queueClient.SchedulingV1beta1().PodGroups("").List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed to list podgroup for queue %s with err: %v", getQueueFlags.Name, err) + } + + pgStats := &podGroupStatistics{} + for _, pg := range pgList.Items { + if pg.Spec.Queue == getQueueFlags.Name { + pgStats.statPodGroupCountsForQueue(&pg) + } + } + + PrintQueue(queue, pgStats, os.Stdout) return nil } // PrintQueue prints queue information. -func PrintQueue(queue *v1beta1.Queue, writer io.Writer) { - _, err := fmt.Fprintf(writer, "%-25s%-8s%-8s%-8s%-8s%-8s%-8s\n", - Name, Weight, State, Inqueue, Pending, Running, Unknown) +func PrintQueue(queue *v1beta1.Queue, pgStats *podGroupStatistics, writer io.Writer) { + _, err := fmt.Fprintf(writer, "%-25s%-8s%-8s%-8s%-8s%-8s%-8s%-8s\n", + Name, Weight, State, Inqueue, Pending, Running, Unknown, Completed) if err != nil { fmt.Printf("Failed to print queue command result: %s.\n", err) } - _, err = fmt.Fprintf(writer, "%-25s%-8d%-8s%-8d%-8d%-8d%-8d\n", - queue.Name, queue.Spec.Weight, queue.Status.State, queue.Status.Inqueue, - queue.Status.Pending, queue.Status.Running, queue.Status.Unknown) + + _, err = fmt.Fprintf(writer, "%-25s%-8d%-8s%-8d%-8d%-8d%-8d%-8d\n", + queue.Name, queue.Spec.Weight, queue.Status.State, pgStats.inqueue, + pgStats.pending, pgStats.running, pgStats.unknown, pgStats.completed) if err != nil { fmt.Printf("Failed to print queue command result: %s.\n", err) } diff --git a/pkg/cli/queue/list.go b/pkg/cli/queue/list.go index b6ceba2f0c..10031f4df1 100644 --- a/pkg/cli/queue/list.go +++ b/pkg/cli/queue/list.go @@ -53,6 +53,9 @@ const ( // Inqueue status of queue Inqueue string = "Inqueue" + // Completed status of the queue + Completed string = "Completed" + // State is state of queue State string = "State" ) @@ -81,22 +84,41 @@ func ListQueue(ctx context.Context) error { fmt.Printf("No resources found\n") return nil } - PrintQueues(queues, os.Stdout) + + // Although the featuregate called CustomResourceFieldSelectors is enabled by default after v1.31, there are still + // users using k8s versions lower than v1.31. Therefore we can only get all the podgroups from kube-apiserver + // and then filtering them. + pgList, err := jobClient.SchedulingV1beta1().PodGroups("").List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed to list podgroups with err: %v", err) + } + + queueStats := make(map[string]*podGroupStatistics, len(queues.Items)) + for _, queue := range queues.Items { + queueStats[queue.Name] = &podGroupStatistics{} + } + + for _, pg := range pgList.Items { + queueStats[pg.Spec.Queue].statPodGroupCountsForQueue(&pg) + } + + PrintQueues(queues, queueStats, os.Stdout) return nil } // PrintQueues prints queue information. -func PrintQueues(queues *v1beta1.QueueList, writer io.Writer) { - _, err := fmt.Fprintf(writer, "%-25s%-8s%-8s%-8s%-8s%-8s%-8s\n", - Name, Weight, State, Inqueue, Pending, Running, Unknown) +func PrintQueues(queues *v1beta1.QueueList, queueStats map[string]*podGroupStatistics, writer io.Writer) { + _, err := fmt.Fprintf(writer, "%-25s%-8s%-8s%-8s%-8s%-8s%-8s%-8s\n", + Name, Weight, State, Inqueue, Pending, Running, Unknown, Completed) if err != nil { fmt.Printf("Failed to print queue command result: %s.\n", err) } + for _, queue := range queues.Items { - _, err = fmt.Fprintf(writer, "%-25s%-8d%-8s%-8d%-8d%-8d%-8d\n", - queue.Name, queue.Spec.Weight, queue.Status.State, queue.Status.Inqueue, - queue.Status.Pending, queue.Status.Running, queue.Status.Unknown) + _, err = fmt.Fprintf(writer, "%-25s%-8d%-8s%-8d%-8d%-8d%-8d%-8d\n", + queue.Name, queue.Spec.Weight, queue.Status.State, queueStats[queue.Name].inqueue, queueStats[queue.Name].pending, + queueStats[queue.Name].running, queueStats[queue.Name].unknown, queueStats[queue.Name].completed) if err != nil { fmt.Printf("Failed to print queue command result: %s.\n", err) } diff --git a/pkg/cli/queue/util.go b/pkg/cli/queue/util.go index 50c6c4aab1..d5921f6798 100644 --- a/pkg/cli/queue/util.go +++ b/pkg/cli/queue/util.go @@ -29,6 +29,7 @@ import ( busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1" "volcano.sh/apis/pkg/apis/helpers" + "volcano.sh/apis/pkg/apis/scheduling/v1beta1" "volcano.sh/apis/pkg/client/clientset/versioned" ) @@ -62,3 +63,26 @@ func createQueueCommand(ctx context.Context, config *rest.Config, action busv1al return nil } + +type podGroupStatistics struct { + inqueue int + pending int + running int + unknown int + completed int +} + +func (pgStats *podGroupStatistics) statPodGroupCountsForQueue(pg *v1beta1.PodGroup) { + switch pg.Status.Phase { + case v1beta1.PodGroupInqueue: + pgStats.inqueue++ + case v1beta1.PodGroupPending: + pgStats.pending++ + case v1beta1.PodGroupRunning: + pgStats.running++ + case v1beta1.PodGroupUnknown: + pgStats.unknown++ + case v1beta1.PodGroupCompleted: + pgStats.completed++ + } +} diff --git a/pkg/controllers/metrics/podgroup.go b/pkg/controllers/metrics/podgroup.go new file mode 100644 index 0000000000..d9766a9e0f --- /dev/null +++ b/pkg/controllers/metrics/podgroup.go @@ -0,0 +1,75 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "volcano.sh/volcano/pkg/scheduler/metrics" +) + +var ( + pgPendingPhaseNum = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: metrics.VolcanoNamespace, + Name: "podgroup_pending_phase_num", + Help: "Number of podgroup at pending phase", + }, []string{"queue_name"}, + ) + + pgRunningPhaseNum = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: metrics.VolcanoNamespace, + Name: "podgroup_running_phase_num", + Help: "Number of podgroup at running phase", + }, []string{"queue_name"}, + ) + + pgUnknownPhaseNum = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: metrics.VolcanoNamespace, + Name: "podgroup_unknown_phase_num", + Help: "Number of podgroup at unknown phase", + }, []string{"queue_name"}, + ) + + pgInqueuePhaseNum = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: metrics.VolcanoNamespace, + Name: "podgroup_inqueue_phase_num", + Help: "Number of podgroup at inqueue phase", + }, []string{"queue_name"}, + ) + + pgCompletedPhaseNum = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: metrics.VolcanoNamespace, + Name: "podgroup_completed_phase_num", + Help: "Number of podgroup at completed phase", + }, []string{"queue_name"}, + ) +) + +// UpdatePgPendingPhaseNum recored the num of podgroups at pending state in queue +func UpdatePgPendingPhaseNum(queueName string, num float64) { + pgPendingPhaseNum.WithLabelValues(queueName).Set(num) +} + +// UpdatePgRunningPhaseNum recored the num of podgroups at running state in queue +func UpdatePgRunningPhaseNum(queueName string, num float64) { + pgRunningPhaseNum.WithLabelValues(queueName).Set(num) +} + +// UpdatePgUnknownPhaseNum recored the num of podgroups at unknown state in queue +func UpdatePgUnknownPhaseNum(queueName string, num float64) { + pgUnknownPhaseNum.WithLabelValues(queueName).Set(num) +} + +// UpdatePgInqueuePhaseNum recored the num of podgroups at inqueue state in queue +func UpdatePgInqueuePhaseNum(queueName string, num float64) { + pgInqueuePhaseNum.WithLabelValues(queueName).Set(num) +} + +// UpdatePgCompletedPhaseNum recored the num of podgroups at completed state in queue +func UpdatePgCompletedPhaseNum(queueName string, num float64) { + pgCompletedPhaseNum.WithLabelValues(queueName).Set(num) +} diff --git a/pkg/controllers/queue/queue_controller_action.go b/pkg/controllers/queue/queue_controller_action.go index 67ff9707cd..1a016519eb 100644 --- a/pkg/controllers/queue/queue_controller_action.go +++ b/pkg/controllers/queue/queue_controller_action.go @@ -29,6 +29,8 @@ import ( "volcano.sh/apis/pkg/apis/bus/v1alpha1" schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + v1beta1apply "volcano.sh/apis/pkg/client/applyconfiguration/scheduling/v1beta1" + "volcano.sh/volcano/pkg/controllers/metrics" "volcano.sh/volcano/pkg/controllers/queue/state" ) @@ -65,9 +67,18 @@ func (c *queuecontroller) syncQueue(queue *schedulingv1beta1.Queue, updateStateF queueStatus.Unknown++ case schedulingv1beta1.PodGroupInqueue: queueStatus.Inqueue++ + case schedulingv1beta1.PodGroupCompleted: + queueStatus.Completed++ } } + // Update the metrics + metrics.UpdatePgPendingPhaseNum(queue.Name, float64(queueStatus.Pending)) + metrics.UpdatePgRunningPhaseNum(queue.Name, float64(queueStatus.Running)) + metrics.UpdatePgUnknownPhaseNum(queue.Name, float64(queueStatus.Unknown)) + metrics.UpdatePgInqueuePhaseNum(queue.Name, float64(queueStatus.Inqueue)) + metrics.UpdatePgCompletedPhaseNum(queue.Name, float64(queueStatus.Completed)) + if updateStateFn != nil { updateStateFn(&queueStatus, podGroups) } else { @@ -87,10 +98,10 @@ func (c *queuecontroller) syncQueue(queue *schedulingv1beta1.Queue, updateStateF return nil } - newQueue := queue.DeepCopy() - newQueue.Status = queueStatus - if _, err := c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil { - klog.Errorf("Failed to update status of Queue %s: %v.", newQueue.Name, err) + queueStatusApply := v1beta1apply.QueueStatus().WithState(queueStatus.State).WithAllocated(queueStatus.Allocated) + queueApply := v1beta1apply.Queue(queue.Name).WithStatus(queueStatusApply) + if _, err := c.vcClient.SchedulingV1beta1().Queues().ApplyStatus(context.TODO(), queueApply, metav1.ApplyOptions{FieldManager: controllerName}); err != nil { + klog.Errorf("Failed to apply status of Queue %s: %v.", queue.Name, err) return err } diff --git a/pkg/controllers/queue/queue_controller_util.go b/pkg/controllers/queue/queue_controller_util.go index c14e27162f..71444730db 100644 --- a/pkg/controllers/queue/queue_controller_util.go +++ b/pkg/controllers/queue/queue_controller_util.go @@ -22,6 +22,10 @@ import ( schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" ) +const ( + controllerName = "queue-controller" +) + // IsQueueReference return if ownerReference is Queue Kind. func IsQueueReference(ref *metav1.OwnerReference) bool { if ref == nil { diff --git a/pkg/scheduler/metrics/queue.go b/pkg/scheduler/metrics/queue.go index ce2504d0f5..8d2d4675ff 100644 --- a/pkg/scheduler/metrics/queue.go +++ b/pkg/scheduler/metrics/queue.go @@ -93,38 +93,6 @@ var ( Help: "If one queue is overused", }, []string{"queue_name"}, ) - - queuePodGroupInqueue = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Subsystem: VolcanoNamespace, - Name: "queue_pod_group_inqueue_count", - Help: "The number of Inqueue PodGroup in this queue", - }, []string{"queue_name"}, - ) - - queuePodGroupPending = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Subsystem: VolcanoNamespace, - Name: "queue_pod_group_pending_count", - Help: "The number of Pending PodGroup in this queue", - }, []string{"queue_name"}, - ) - - queuePodGroupRunning = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Subsystem: VolcanoNamespace, - Name: "queue_pod_group_running_count", - Help: "The number of Running PodGroup in this queue", - }, []string{"queue_name"}, - ) - - queuePodGroupUnknown = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Subsystem: VolcanoNamespace, - Name: "queue_pod_group_unknown_count", - Help: "The number of Unknown PodGroup in this queue", - }, []string{"queue_name"}, - ) ) // UpdateQueueAllocated records allocated resources for one queue @@ -166,26 +134,6 @@ func UpdateQueueOverused(queueName string, overused bool) { queueOverused.WithLabelValues(queueName).Set(value) } -// UpdateQueuePodGroupInqueueCount records the number of Inqueue PodGroup in this queue -func UpdateQueuePodGroupInqueueCount(queueName string, count int32) { - queuePodGroupInqueue.WithLabelValues(queueName).Set(float64(count)) -} - -// UpdateQueuePodGroupPendingCount records the number of Pending PodGroup in this queue -func UpdateQueuePodGroupPendingCount(queueName string, count int32) { - queuePodGroupPending.WithLabelValues(queueName).Set(float64(count)) -} - -// UpdateQueuePodGroupRunningCount records the number of Running PodGroup in this queue -func UpdateQueuePodGroupRunningCount(queueName string, count int32) { - queuePodGroupRunning.WithLabelValues(queueName).Set(float64(count)) -} - -// UpdateQueuePodGroupUnknownCount records the number of Unknown PodGroup in this queue -func UpdateQueuePodGroupUnknownCount(queueName string, count int32) { - queuePodGroupUnknown.WithLabelValues(queueName).Set(float64(count)) -} - // DeleteQueueMetrics delete all metrics related to the queue func DeleteQueueMetrics(queueName string) { queueAllocatedMilliCPU.DeleteLabelValues(queueName) @@ -197,8 +145,4 @@ func DeleteQueueMetrics(queueName string) { queueShare.DeleteLabelValues(queueName) queueWeight.DeleteLabelValues(queueName) queueOverused.DeleteLabelValues(queueName) - queuePodGroupInqueue.DeleteLabelValues(queueName) - queuePodGroupPending.DeleteLabelValues(queueName) - queuePodGroupRunning.DeleteLabelValues(queueName) - queuePodGroupUnknown.DeleteLabelValues(queueName) } diff --git a/pkg/scheduler/plugins/capacity/capacity.go b/pkg/scheduler/plugins/capacity/capacity.go index 3906b2ab5f..6774f099fb 100644 --- a/pkg/scheduler/plugins/capacity/capacity.go +++ b/pkg/scheduler/plugins/capacity/capacity.go @@ -181,10 +181,6 @@ func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) { metrics.UpdateQueueDeserved(attr.name, attr.deserved.MilliCPU, attr.deserved.Memory) metrics.UpdateQueueAllocated(attr.name, attr.allocated.MilliCPU, attr.allocated.Memory) metrics.UpdateQueueRequest(attr.name, attr.request.MilliCPU, attr.request.Memory) - metrics.UpdateQueuePodGroupInqueueCount(attr.name, queue.Queue.Status.Inqueue) - metrics.UpdateQueuePodGroupPendingCount(attr.name, queue.Queue.Status.Pending) - metrics.UpdateQueuePodGroupRunningCount(attr.name, queue.Queue.Status.Running) - metrics.UpdateQueuePodGroupUnknownCount(attr.name, queue.Queue.Status.Unknown) continue } deservedCPU, deservedMem := 0.0, 0.0 @@ -195,10 +191,6 @@ func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) { metrics.UpdateQueueDeserved(queueInfo.Name, deservedCPU, deservedMem) metrics.UpdateQueueAllocated(queueInfo.Name, 0, 0) metrics.UpdateQueueRequest(queueInfo.Name, 0, 0) - metrics.UpdateQueuePodGroupInqueueCount(queueInfo.Name, 0) - metrics.UpdateQueuePodGroupPendingCount(queueInfo.Name, 0) - metrics.UpdateQueuePodGroupRunningCount(queueInfo.Name, 0) - metrics.UpdateQueuePodGroupUnknownCount(queueInfo.Name, 0) } ssn.AddQueueOrderFn(cp.Name(), func(l, r interface{}) int { diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index 07203c6298..28f97beae3 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -167,19 +167,10 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { metrics.UpdateQueueAllocated(attr.name, attr.allocated.MilliCPU, attr.allocated.Memory) metrics.UpdateQueueRequest(attr.name, attr.request.MilliCPU, attr.request.Memory) metrics.UpdateQueueWeight(attr.name, attr.weight) - queue := ssn.Queues[attr.queueID] - metrics.UpdateQueuePodGroupInqueueCount(attr.name, queue.Queue.Status.Inqueue) - metrics.UpdateQueuePodGroupPendingCount(attr.name, queue.Queue.Status.Pending) - metrics.UpdateQueuePodGroupRunningCount(attr.name, queue.Queue.Status.Running) - metrics.UpdateQueuePodGroupUnknownCount(attr.name, queue.Queue.Status.Unknown) continue } metrics.UpdateQueueAllocated(queueInfo.Name, 0, 0) metrics.UpdateQueueRequest(queueInfo.Name, 0, 0) - metrics.UpdateQueuePodGroupInqueueCount(queueInfo.Name, 0) - metrics.UpdateQueuePodGroupPendingCount(queueInfo.Name, 0) - metrics.UpdateQueuePodGroupRunningCount(queueInfo.Name, 0) - metrics.UpdateQueuePodGroupUnknownCount(queueInfo.Name, 0) } remaining := pp.totalResource.Clone()