diff --git a/go.mod b/go.mod index 79b9740b97..e2cfd6f4e8 100644 --- a/go.mod +++ b/go.mod @@ -152,6 +152,7 @@ replace ( k8s.io/component-helpers => k8s.io/component-helpers v0.31.1 k8s.io/controller-manager => k8s.io/controller-manager v0.31.1 k8s.io/cri-api => k8s.io/cri-api v0.31.1 + k8s.io/cri-client => k8s.io/cri-client v0.31.1 k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.31.1 k8s.io/dynamic-resource-allocation => k8s.io/dynamic-resource-allocation v0.31.1 k8s.io/endpointslice => k8s.io/endpointslice v0.31.1 diff --git a/installer/helm/chart/volcano/templates/controllers.yaml b/installer/helm/chart/volcano/templates/controllers.yaml index 3fc6cf35f5..72666ae8be 100644 --- a/installer/helm/chart/volcano/templates/controllers.yaml +++ b/installer/helm/chart/volcano/templates/controllers.yaml @@ -58,8 +58,11 @@ rules: resources: ["secrets"] verbs: ["get", "create", "delete", "update"] - apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"] - resources: ["podgroups", "queues", "queues/status"] + resources: ["podgroups", "queues"] verbs: ["get", "list", "watch", "create", "delete", "update"] + - apiGroups: [ "scheduling.incubator.k8s.io", "scheduling.volcano.sh" ] + resources: [ "queues/status" ] + verbs: ["get", "watch", "patch"] - apiGroups: ["flow.volcano.sh"] resources: ["jobflows", "jobtemplates"] verbs: ["get", "list", "watch", "create", "delete", "update"] diff --git a/installer/volcano-development.yaml b/installer/volcano-development.yaml index 87136fdba2..8a3159e396 100644 --- a/installer/volcano-development.yaml +++ b/installer/volcano-development.yaml @@ -4318,8 +4318,11 @@ rules: resources: ["secrets"] verbs: ["get", "create", "delete", "update"] - apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"] - resources: ["podgroups", "queues", "queues/status"] + resources: ["podgroups", "queues"] verbs: ["get", "list", "watch", "create", "delete", "update"] + - apiGroups: [ "scheduling.incubator.k8s.io", "scheduling.volcano.sh" ] + resources: [ "queues/status" ] + verbs: ["get", "watch", "patch"] - apiGroups: ["flow.volcano.sh"] resources: ["jobflows", "jobtemplates"] verbs: ["get", "list", "watch", "create", "delete", "update"] diff --git a/pkg/cli/queue/get.go b/pkg/cli/queue/get.go index 057bffda1e..cd495a10e2 100644 --- a/pkg/cli/queue/get.go +++ b/pkg/cli/queue/get.go @@ -63,21 +63,37 @@ func GetQueue(ctx context.Context) error { return err } - PrintQueue(queue, os.Stdout) + // Although the featuregate called CustomResourceFieldSelectors is enabled by default after v1.31, there are still + // users using k8s versions lower than v1.31. Therefore we can only get all the podgroups from kube-apiserver + // and then filtering them. + pgList, err := queueClient.SchedulingV1beta1().PodGroups("").List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed to list podgroup for queue %s with err: %v", getQueueFlags.Name, err) + } + + pgStats := &PodGroupStatistics{} + for _, pg := range pgList.Items { + if pg.Spec.Queue == getQueueFlags.Name { + pgStats.StatPodGroupCountsForQueue(&pg) + } + } + + PrintQueue(queue, pgStats, os.Stdout) return nil } // PrintQueue prints queue information. -func PrintQueue(queue *v1beta1.Queue, writer io.Writer) { - _, err := fmt.Fprintf(writer, "%-25s%-8s%-8s%-8s%-8s%-8s%-8s\n", - Name, Weight, State, Inqueue, Pending, Running, Unknown) +func PrintQueue(queue *v1beta1.Queue, pgStats *PodGroupStatistics, writer io.Writer) { + _, err := fmt.Fprintf(writer, "%-25s%-8s%-8s%-8s%-8s%-8s%-8s%-8s\n", + Name, Weight, State, Inqueue, Pending, Running, Unknown, Completed) if err != nil { fmt.Printf("Failed to print queue command result: %s.\n", err) } - _, err = fmt.Fprintf(writer, "%-25s%-8d%-8s%-8d%-8d%-8d%-8d\n", - queue.Name, queue.Spec.Weight, queue.Status.State, queue.Status.Inqueue, - queue.Status.Pending, queue.Status.Running, queue.Status.Unknown) + + _, err = fmt.Fprintf(writer, "%-25s%-8d%-8s%-8d%-8d%-8d%-8d%-8d\n", + queue.Name, queue.Spec.Weight, queue.Status.State, pgStats.Inqueue, + pgStats.Pending, pgStats.Running, pgStats.Unknown, pgStats.Completed) if err != nil { fmt.Printf("Failed to print queue command result: %s.\n", err) } diff --git a/pkg/cli/queue/list.go b/pkg/cli/queue/list.go index b6ceba2f0c..656abce088 100644 --- a/pkg/cli/queue/list.go +++ b/pkg/cli/queue/list.go @@ -53,6 +53,9 @@ const ( // Inqueue status of queue Inqueue string = "Inqueue" + // Completed status of the queue + Completed string = "Completed" + // State is state of queue State string = "State" ) @@ -81,22 +84,41 @@ func ListQueue(ctx context.Context) error { fmt.Printf("No resources found\n") return nil } - PrintQueues(queues, os.Stdout) + + // Although the featuregate called CustomResourceFieldSelectors is enabled by default after v1.31, there are still + // users using k8s versions lower than v1.31. Therefore we can only get all the podgroups from kube-apiserver + // and then filtering them. + pgList, err := jobClient.SchedulingV1beta1().PodGroups("").List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed to list podgroups with err: %v", err) + } + + queueStats := make(map[string]*PodGroupStatistics, len(queues.Items)) + for _, queue := range queues.Items { + queueStats[queue.Name] = &PodGroupStatistics{} + } + + for _, pg := range pgList.Items { + queueStats[pg.Spec.Queue].StatPodGroupCountsForQueue(&pg) + } + + PrintQueues(queues, queueStats, os.Stdout) return nil } // PrintQueues prints queue information. -func PrintQueues(queues *v1beta1.QueueList, writer io.Writer) { - _, err := fmt.Fprintf(writer, "%-25s%-8s%-8s%-8s%-8s%-8s%-8s\n", - Name, Weight, State, Inqueue, Pending, Running, Unknown) +func PrintQueues(queues *v1beta1.QueueList, queueStats map[string]*PodGroupStatistics, writer io.Writer) { + _, err := fmt.Fprintf(writer, "%-25s%-8s%-8s%-8s%-8s%-8s%-8s%-8s\n", + Name, Weight, State, Inqueue, Pending, Running, Unknown, Completed) if err != nil { fmt.Printf("Failed to print queue command result: %s.\n", err) } + for _, queue := range queues.Items { - _, err = fmt.Fprintf(writer, "%-25s%-8d%-8s%-8d%-8d%-8d%-8d\n", - queue.Name, queue.Spec.Weight, queue.Status.State, queue.Status.Inqueue, - queue.Status.Pending, queue.Status.Running, queue.Status.Unknown) + _, err = fmt.Fprintf(writer, "%-25s%-8d%-8s%-8d%-8d%-8d%-8d%-8d\n", + queue.Name, queue.Spec.Weight, queue.Status.State, queueStats[queue.Name].Inqueue, queueStats[queue.Name].Pending, + queueStats[queue.Name].Running, queueStats[queue.Name].Unknown, queueStats[queue.Name].Completed) if err != nil { fmt.Printf("Failed to print queue command result: %s.\n", err) } diff --git a/pkg/cli/queue/util.go b/pkg/cli/queue/util.go index 50c6c4aab1..e13c08c610 100644 --- a/pkg/cli/queue/util.go +++ b/pkg/cli/queue/util.go @@ -29,6 +29,7 @@ import ( busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1" "volcano.sh/apis/pkg/apis/helpers" + "volcano.sh/apis/pkg/apis/scheduling/v1beta1" "volcano.sh/apis/pkg/client/clientset/versioned" ) @@ -62,3 +63,26 @@ func createQueueCommand(ctx context.Context, config *rest.Config, action busv1al return nil } + +type PodGroupStatistics struct { + Inqueue int + Pending int + Running int + Unknown int + Completed int +} + +func (pgStats *PodGroupStatistics) StatPodGroupCountsForQueue(pg *v1beta1.PodGroup) { + switch pg.Status.Phase { + case v1beta1.PodGroupInqueue: + pgStats.Inqueue++ + case v1beta1.PodGroupPending: + pgStats.Pending++ + case v1beta1.PodGroupRunning: + pgStats.Running++ + case v1beta1.PodGroupUnknown: + pgStats.Unknown++ + case v1beta1.PodGroupCompleted: + pgStats.Completed++ + } +} diff --git a/pkg/controllers/metrics/queue.go b/pkg/controllers/metrics/queue.go new file mode 100644 index 0000000000..7f4839538a --- /dev/null +++ b/pkg/controllers/metrics/queue.go @@ -0,0 +1,84 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "volcano.sh/volcano/pkg/scheduler/metrics" +) + +var ( + queuePodGroupInqueue = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: metrics.VolcanoNamespace, + Name: "queue_pod_group_inqueue_count", + Help: "The number of Inqueue PodGroup in this queue", + }, []string{"queue_name"}, + ) + + queuePodGroupPending = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: metrics.VolcanoNamespace, + Name: "queue_pod_group_pending_count", + Help: "The number of Pending PodGroup in this queue", + }, []string{"queue_name"}, + ) + + queuePodGroupRunning = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: metrics.VolcanoNamespace, + Name: "queue_pod_group_running_count", + Help: "The number of Running PodGroup in this queue", + }, []string{"queue_name"}, + ) + + queuePodGroupUnknown = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: metrics.VolcanoNamespace, + Name: "queue_pod_group_unknown_count", + Help: "The number of Unknown PodGroup in this queue", + }, []string{"queue_name"}, + ) + + queuePodGroupCompleted = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: metrics.VolcanoNamespace, + Name: "queue_pod_group_completed_count", + Help: "The number of Completed PodGroup in this queue", + }, []string{"queue_name"}, + ) +) + +// UpdateQueuePodGroupInqueueCount records the number of Inqueue PodGroup in this queue +func UpdateQueuePodGroupInqueueCount(queueName string, count int32) { + queuePodGroupInqueue.WithLabelValues(queueName).Set(float64(count)) +} + +// UpdateQueuePodGroupPendingCount records the number of Pending PodGroup in this queue +func UpdateQueuePodGroupPendingCount(queueName string, count int32) { + queuePodGroupPending.WithLabelValues(queueName).Set(float64(count)) +} + +// UpdateQueuePodGroupRunningCount records the number of Running PodGroup in this queue +func UpdateQueuePodGroupRunningCount(queueName string, count int32) { + queuePodGroupRunning.WithLabelValues(queueName).Set(float64(count)) +} + +// UpdateQueuePodGroupUnknownCount records the number of Unknown PodGroup in this queue +func UpdateQueuePodGroupUnknownCount(queueName string, count int32) { + queuePodGroupUnknown.WithLabelValues(queueName).Set(float64(count)) +} + +// UpdateQueuePodGroupCompletedCount records the number of Completed PodGroup in this queue +func UpdateQueuePodGroupCompletedCount(queueName string, count int32) { + queuePodGroupCompleted.WithLabelValues(queueName).Set(float64(count)) +} + +// DeleteQueueMetrics delete all metrics related to the queue +func DeleteQueueMetrics(queueName string) { + queuePodGroupInqueue.DeleteLabelValues(queueName) + queuePodGroupPending.DeleteLabelValues(queueName) + queuePodGroupRunning.DeleteLabelValues(queueName) + queuePodGroupUnknown.DeleteLabelValues(queueName) + queuePodGroupCompleted.DeleteLabelValues(queueName) +} diff --git a/pkg/controllers/queue/queue_controller.go b/pkg/controllers/queue/queue_controller.go index 678424c804..dee9cd99ca 100644 --- a/pkg/controllers/queue/queue_controller.go +++ b/pkg/controllers/queue/queue_controller.go @@ -160,8 +160,6 @@ func (c *queuecontroller) Initialize(opt *framework.ControllerOption) error { } queuestate.SyncQueue = c.syncQueue - queuestate.OpenQueue = c.openQueue - queuestate.CloseQueue = c.closeQueue c.syncHandler = c.handleQueue c.syncCommandHandler = c.handleCommand diff --git a/pkg/controllers/queue/queue_controller_action.go b/pkg/controllers/queue/queue_controller_action.go index 67ff9707cd..5e96cf6c73 100644 --- a/pkg/controllers/queue/queue_controller_action.go +++ b/pkg/controllers/queue/queue_controller_action.go @@ -27,17 +27,19 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" - "volcano.sh/apis/pkg/apis/bus/v1alpha1" schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + v1beta1apply "volcano.sh/apis/pkg/client/applyconfiguration/scheduling/v1beta1" + "volcano.sh/volcano/pkg/controllers/metrics" "volcano.sh/volcano/pkg/controllers/queue/state" ) +// syncQueue will record the number of podgroups of each state in the queues as metrics and update the state of the queue if updateStateFn is not nil. func (c *queuecontroller) syncQueue(queue *schedulingv1beta1.Queue, updateStateFn state.UpdateQueueStatusFn) error { klog.V(4).Infof("Begin to sync queue %s.", queue.Name) defer klog.V(4).Infof("End sync queue %s.", queue.Name) podGroups := c.getPodGroups(queue.Name) - queueStatus := schedulingv1beta1.QueueStatus{} + newQueueStatus := schedulingv1beta1.QueueStatus{} for _, pgKey := range podGroups { // Ignore error here, tt can not occur. @@ -58,126 +60,52 @@ func (c *queuecontroller) syncQueue(queue *schedulingv1beta1.Queue, updateStateF switch pg.Status.Phase { case schedulingv1beta1.PodGroupPending: - queueStatus.Pending++ + newQueueStatus.Pending++ case schedulingv1beta1.PodGroupRunning: - queueStatus.Running++ + newQueueStatus.Running++ case schedulingv1beta1.PodGroupUnknown: - queueStatus.Unknown++ + newQueueStatus.Unknown++ case schedulingv1beta1.PodGroupInqueue: - queueStatus.Inqueue++ + newQueueStatus.Inqueue++ + case schedulingv1beta1.PodGroupCompleted: + newQueueStatus.Completed++ } } + // Update the metrics + metrics.UpdateQueuePodGroupPendingCount(queue.Name, newQueueStatus.Pending) + metrics.UpdateQueuePodGroupRunningCount(queue.Name, newQueueStatus.Running) + metrics.UpdateQueuePodGroupUnknownCount(queue.Name, newQueueStatus.Unknown) + metrics.UpdateQueuePodGroupInqueueCount(queue.Name, newQueueStatus.Inqueue) + metrics.UpdateQueuePodGroupCompletedCount(queue.Name, newQueueStatus.Completed) + if updateStateFn != nil { - updateStateFn(&queueStatus, podGroups) + updateStateFn(&newQueueStatus, podGroups) } else { - queueStatus.State = queue.Status.State + newQueueStatus.State = queue.Status.State } - queueStatus.Allocated = queue.Status.Allocated.DeepCopy() + newQueueStatus.Allocated = queue.Status.Allocated.DeepCopy() // queue.status.allocated will be updated after every session close in volcano scheduler, we should not depend on it because session may be time-consuming, // and queue.status.allocated can't be updated timely. We initialize queue.status.allocated and update it here explicitly // to avoid update queue err because update will fail when queue.status.allocated is nil. - if queueStatus.Allocated == nil { - queueStatus.Allocated = v1.ResourceList{} + if newQueueStatus.Allocated == nil { + newQueueStatus.Allocated = v1.ResourceList{} } // ignore update when status does not change - if equality.Semantic.DeepEqual(queueStatus, queue.Status) { - return nil - } - - newQueue := queue.DeepCopy() - newQueue.Status = queueStatus - if _, err := c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil { - klog.Errorf("Failed to update status of Queue %s: %v.", newQueue.Name, err) - return err - } - - return nil -} - -func (c *queuecontroller) openQueue(queue *schedulingv1beta1.Queue, updateStateFn state.UpdateQueueStatusFn) error { - klog.V(4).Infof("Begin to open queue %s.", queue.Name) - - newQueue := queue.DeepCopy() - newQueue.Status.State = schedulingv1beta1.QueueStateOpen - - if queue.Status.State != newQueue.Status.State { - if _, err := c.vcClient.SchedulingV1beta1().Queues().Update(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil { - c.recorder.Event(newQueue, v1.EventTypeWarning, string(v1alpha1.OpenQueueAction), - fmt.Sprintf("Open queue failed for %v", err)) - return err - } - - c.recorder.Event(newQueue, v1.EventTypeNormal, string(v1alpha1.OpenQueueAction), "Open queue succeed") - } else { + if equality.Semantic.DeepEqual(newQueueStatus, queue.Status) { return nil } - q, err := c.vcClient.SchedulingV1beta1().Queues().Get(context.TODO(), newQueue.Name, metav1.GetOptions{}) - if err != nil { + queueStatusApply := v1beta1apply.QueueStatus().WithState(newQueueStatus.State).WithAllocated(newQueueStatus.Allocated) + queueApply := v1beta1apply.Queue(queue.Name).WithStatus(queueStatusApply) + if _, err := c.vcClient.SchedulingV1beta1().Queues().ApplyStatus(context.TODO(), queueApply, metav1.ApplyOptions{FieldManager: controllerName}); err != nil { + errMsg := fmt.Sprintf("Update queue state from %s to %s failed for %v", queue.Status.State, newQueueStatus.State, err) + c.recorder.Event(queue, v1.EventTypeWarning, state.EventReason, errMsg) + klog.Errorf(errMsg) return err } - newQueue = q.DeepCopy() - if updateStateFn != nil { - updateStateFn(&newQueue.Status, nil) - } else { - return fmt.Errorf("internal error, update state function should be provided") - } - - if queue.Status.State != newQueue.Status.State { - if _, err := c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil { - c.recorder.Event(newQueue, v1.EventTypeWarning, string(v1alpha1.OpenQueueAction), - fmt.Sprintf("Update queue status from %s to %s failed for %v", - queue.Status.State, newQueue.Status.State, err)) - return err - } - } - - return nil -} - -func (c *queuecontroller) closeQueue(queue *schedulingv1beta1.Queue, updateStateFn state.UpdateQueueStatusFn) error { - klog.V(4).Infof("Begin to close queue %s.", queue.Name) - - newQueue := queue.DeepCopy() - newQueue.Status.State = schedulingv1beta1.QueueStateClosed - - if queue.Status.State != newQueue.Status.State { - if _, err := c.vcClient.SchedulingV1beta1().Queues().Update(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil { - c.recorder.Event(newQueue, v1.EventTypeWarning, string(v1alpha1.CloseQueueAction), - fmt.Sprintf("Close queue failed for %v", err)) - return err - } - - c.recorder.Event(newQueue, v1.EventTypeNormal, string(v1alpha1.CloseQueueAction), "Close queue succeed") - } else { - return nil - } - - q, err := c.vcClient.SchedulingV1beta1().Queues().Get(context.TODO(), newQueue.Name, metav1.GetOptions{}) - if err != nil { - return err - } - - newQueue = q.DeepCopy() - podGroups := c.getPodGroups(newQueue.Name) - if updateStateFn != nil { - updateStateFn(&newQueue.Status, podGroups) - } else { - return fmt.Errorf("internal error, update state function should be provided") - } - - if queue.Status.State != newQueue.Status.State { - if _, err := c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil { - c.recorder.Event(newQueue, v1.EventTypeWarning, string(v1alpha1.CloseQueueAction), - fmt.Sprintf("Update queue status from %s to %s failed for %v", - queue.Status.State, newQueue.Status.State, err)) - return err - } - } - return nil } diff --git a/pkg/controllers/queue/queue_controller_handler.go b/pkg/controllers/queue/queue_controller_handler.go index 85c83225c9..f3560556ee 100644 --- a/pkg/controllers/queue/queue_controller_handler.go +++ b/pkg/controllers/queue/queue_controller_handler.go @@ -19,6 +19,7 @@ package queue import ( "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + "volcano.sh/volcano/pkg/controllers/metrics" busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1" schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" @@ -57,6 +58,7 @@ func (c *queuecontroller) deleteQueue(obj interface{}) { } } + metrics.DeleteQueueMetrics(queue.Name) c.pgMutex.Lock() defer c.pgMutex.Unlock() delete(c.podGroups, queue.Name) diff --git a/pkg/controllers/queue/queue_controller_test.go b/pkg/controllers/queue/queue_controller_test.go index b10d8168a7..c0856f6a4b 100644 --- a/pkg/controllers/queue/queue_controller_test.go +++ b/pkg/controllers/queue/queue_controller_test.go @@ -22,7 +22,9 @@ import ( "reflect" "testing" + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubeclient "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" @@ -31,6 +33,7 @@ import ( vcclient "volcano.sh/apis/pkg/client/clientset/versioned/fake" informerfactory "volcano.sh/apis/pkg/client/informers/externalversions" "volcano.sh/volcano/pkg/controllers/framework" + "volcano.sh/volcano/pkg/controllers/queue/state" ) func newFakeController() *queuecontroller { @@ -241,113 +244,75 @@ func TestUpdatePodGroup(t *testing.T) { } func TestSyncQueue(t *testing.T) { - namespace := "c1" - testCases := []struct { - Name string - pgsInCache []*schedulingv1beta1.PodGroup - pgsInInformer []*schedulingv1beta1.PodGroup - queue *schedulingv1beta1.Queue - ExpectStatus schedulingv1beta1.QueueStatus + Name string + queue *schedulingv1beta1.Queue + updateStatusFnFactory func(queue *schedulingv1beta1.Queue) state.UpdateQueueStatusFn + ExpectStatus schedulingv1beta1.QueueStatus }{ { - Name: "syncQueue", - pgsInCache: []*schedulingv1beta1.PodGroup{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "pg1", - Namespace: namespace, - }, - Spec: schedulingv1beta1.PodGroupSpec{ - Queue: "c1", - }, - Status: schedulingv1beta1.PodGroupStatus{ - Phase: schedulingv1beta1.PodGroupPending, - }, - }, - }, - pgsInInformer: []*schedulingv1beta1.PodGroup{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "pg1", - Namespace: namespace, - }, - Spec: schedulingv1beta1.PodGroupSpec{ - Queue: "c1", - }, - Status: schedulingv1beta1.PodGroupStatus{ - Phase: schedulingv1beta1.PodGroupPending, - }, - }, - }, + Name: "From empty state to open", queue: &schedulingv1beta1.Queue{ ObjectMeta: metav1.ObjectMeta{ Name: "c1", }, - Spec: schedulingv1beta1.QueueSpec{ - Weight: 1, + Status: schedulingv1beta1.QueueStatus{ + State: "", }, }, ExpectStatus: schedulingv1beta1.QueueStatus{ - Pending: 1, - Reservation: schedulingv1beta1.Reservation{}, - Allocated: v1.ResourceList{}, + State: schedulingv1beta1.QueueStateOpen, + }, + updateStatusFnFactory: func(queue *schedulingv1beta1.Queue) state.UpdateQueueStatusFn { + return func(status *schedulingv1beta1.QueueStatus, podGroupList []string) { + if len(queue.Status.State) == 0 { + status.State = schedulingv1beta1.QueueStateOpen + } + } }, }, { - Name: "syncQueueHandlingNotFoundPg", - pgsInCache: []*schedulingv1beta1.PodGroup{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "pg1", - Namespace: namespace, - }, - Spec: schedulingv1beta1.PodGroupSpec{ - Queue: "c2", - }, - Status: schedulingv1beta1.PodGroupStatus{ - Phase: schedulingv1beta1.PodGroupPending, - }, + Name: "From open to close", + queue: &schedulingv1beta1.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: "c2", }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "pg2", - Namespace: namespace, - }, - Spec: schedulingv1beta1.PodGroupSpec{ - Queue: "c2", - }, - Status: schedulingv1beta1.PodGroupStatus{ - Phase: schedulingv1beta1.PodGroupPending, - }, + Status: schedulingv1beta1.QueueStatus{ + State: schedulingv1beta1.QueueStateOpen, }, }, - pgsInInformer: []*schedulingv1beta1.PodGroup{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "pg2", - Namespace: namespace, - }, - Spec: schedulingv1beta1.PodGroupSpec{ - Queue: "c2", - }, - Status: schedulingv1beta1.PodGroupStatus{ - Phase: schedulingv1beta1.PodGroupPending, - }, - }, + ExpectStatus: schedulingv1beta1.QueueStatus{ + State: schedulingv1beta1.QueueStateClosed, + }, + updateStatusFnFactory: func(queue *schedulingv1beta1.Queue) state.UpdateQueueStatusFn { + return func(status *schedulingv1beta1.QueueStatus, podGroupList []string) { + status.State = schedulingv1beta1.QueueStateClosed + } }, + }, + { + Name: "Updated state succeeded but keep allocated unchanged", queue: &schedulingv1beta1.Queue{ ObjectMeta: metav1.ObjectMeta{ - Name: "c2", + Name: "c3", }, - Spec: schedulingv1beta1.QueueSpec{ - Weight: 1, + Status: schedulingv1beta1.QueueStatus{ + State: schedulingv1beta1.QueueStateUnknown, + Allocated: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("1"), + }, }, }, ExpectStatus: schedulingv1beta1.QueueStatus{ - Pending: 1, - Reservation: schedulingv1beta1.Reservation{}, - Allocated: v1.ResourceList{}, + State: schedulingv1beta1.QueueStateOpen, + Allocated: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("1"), + }, + }, + updateStatusFnFactory: func(queue *schedulingv1beta1.Queue) state.UpdateQueueStatusFn { + return func(status *schedulingv1beta1.QueueStatus, podGroupList []string) { + status.State = schedulingv1beta1.QueueStateOpen + } }, }, } @@ -355,24 +320,14 @@ func TestSyncQueue(t *testing.T) { for i, testcase := range testCases { c := newFakeController() - for j := range testcase.pgsInCache { - key, _ := cache.MetaNamespaceKeyFunc(testcase.pgsInCache[j]) - if _, ok := c.podGroups[testcase.pgsInCache[j].Spec.Queue]; !ok { - c.podGroups[testcase.pgsInCache[j].Spec.Queue] = make(map[string]struct{}) - } - c.podGroups[testcase.pgsInCache[j].Spec.Queue][key] = struct{}{} - } - - for j := range testcase.pgsInInformer { - c.pgInformer.Informer().GetIndexer().Add(testcase.pgsInInformer[j]) - } - - c.queueInformer.Informer().GetIndexer().Add(testcase.queue) - c.vcClient.SchedulingV1beta1().Queues().Create(context.TODO(), testcase.queue, metav1.CreateOptions{}) + _, err := c.vcClient.SchedulingV1beta1().Queues().Create(context.TODO(), testcase.queue, metav1.CreateOptions{}) + assert.NoError(t, err) - err := c.syncQueue(testcase.queue, nil) + updateStatusFn := testcase.updateStatusFnFactory(testcase.queue) + err = c.syncQueue(testcase.queue, updateStatusFn) + assert.NoError(t, err) - item, _ := c.vcClient.SchedulingV1beta1().Queues().Get(context.TODO(), testcase.queue.Name, metav1.GetOptions{}) + item, err := c.vcClient.SchedulingV1beta1().Queues().Get(context.TODO(), testcase.queue.Name, metav1.GetOptions{}) if err != nil && !reflect.DeepEqual(testcase.ExpectStatus, item.Status) { t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectStatus, item.Status) } diff --git a/pkg/controllers/queue/queue_controller_util.go b/pkg/controllers/queue/queue_controller_util.go index c14e27162f..71444730db 100644 --- a/pkg/controllers/queue/queue_controller_util.go +++ b/pkg/controllers/queue/queue_controller_util.go @@ -22,6 +22,10 @@ import ( schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" ) +const ( + controllerName = "queue-controller" +) + // IsQueueReference return if ownerReference is Queue Kind. func IsQueueReference(ref *metav1.OwnerReference) bool { if ref == nil { diff --git a/pkg/controllers/queue/state/closed.go b/pkg/controllers/queue/state/closed.go index f2da618346..2ccbe2fb50 100644 --- a/pkg/controllers/queue/state/closed.go +++ b/pkg/controllers/queue/state/closed.go @@ -28,7 +28,7 @@ type closedState struct { func (cs *closedState) Execute(action v1alpha1.Action) error { switch action { case v1alpha1.OpenQueueAction: - return OpenQueue(cs.queue, func(status *v1beta1.QueueStatus, podGroupList []string) { + return SyncQueue(cs.queue, func(status *v1beta1.QueueStatus, podGroupList []string) { status.State = v1beta1.QueueStateOpen }) case v1alpha1.CloseQueueAction: diff --git a/pkg/controllers/queue/state/closing.go b/pkg/controllers/queue/state/closing.go index 5d36a2a55f..c107c9b348 100644 --- a/pkg/controllers/queue/state/closing.go +++ b/pkg/controllers/queue/state/closing.go @@ -28,7 +28,7 @@ type closingState struct { func (cs *closingState) Execute(action v1alpha1.Action) error { switch action { case v1alpha1.OpenQueueAction: - return OpenQueue(cs.queue, func(status *v1beta1.QueueStatus, podGroupList []string) { + return SyncQueue(cs.queue, func(status *v1beta1.QueueStatus, podGroupList []string) { status.State = v1beta1.QueueStateOpen }) case v1alpha1.CloseQueueAction: diff --git a/pkg/controllers/queue/state/factory.go b/pkg/controllers/queue/state/factory.go index 030f14dfe3..07be38547d 100644 --- a/pkg/controllers/queue/state/factory.go +++ b/pkg/controllers/queue/state/factory.go @@ -36,10 +36,10 @@ type QueueActionFn func(queue *v1beta1.Queue, fn UpdateQueueStatusFn) error var ( // SyncQueue will sync queue status. SyncQueue QueueActionFn - // OpenQueue will set state of queue to open - OpenQueue QueueActionFn - // CloseQueue will set state of queue to close - CloseQueue QueueActionFn +) + +const ( + EventReason = "StateSwitch" ) // NewState gets the state from queue status. diff --git a/pkg/controllers/queue/state/open.go b/pkg/controllers/queue/state/open.go index 5b8c0f88aa..b69e848d42 100644 --- a/pkg/controllers/queue/state/open.go +++ b/pkg/controllers/queue/state/open.go @@ -32,7 +32,7 @@ func (os *openState) Execute(action v1alpha1.Action) error { status.State = v1beta1.QueueStateOpen }) case v1alpha1.CloseQueueAction: - return CloseQueue(os.queue, func(status *v1beta1.QueueStatus, podGroupList []string) { + return SyncQueue(os.queue, func(status *v1beta1.QueueStatus, podGroupList []string) { if len(podGroupList) == 0 { status.State = v1beta1.QueueStateClosed return diff --git a/pkg/controllers/queue/state/unknown.go b/pkg/controllers/queue/state/unknown.go index 176cdd8ff6..0e79186527 100644 --- a/pkg/controllers/queue/state/unknown.go +++ b/pkg/controllers/queue/state/unknown.go @@ -28,11 +28,11 @@ type unknownState struct { func (us *unknownState) Execute(action v1alpha1.Action) error { switch action { case v1alpha1.OpenQueueAction: - return OpenQueue(us.queue, func(status *v1beta1.QueueStatus, podGroupList []string) { + return SyncQueue(us.queue, func(status *v1beta1.QueueStatus, podGroupList []string) { status.State = v1beta1.QueueStateOpen }) case v1alpha1.CloseQueueAction: - return CloseQueue(us.queue, func(status *v1beta1.QueueStatus, podGroupList []string) { + return SyncQueue(us.queue, func(status *v1beta1.QueueStatus, podGroupList []string) { if len(podGroupList) == 0 { status.State = v1beta1.QueueStateClosed return diff --git a/pkg/scheduler/metrics/queue.go b/pkg/scheduler/metrics/queue.go index ce2504d0f5..8d2d4675ff 100644 --- a/pkg/scheduler/metrics/queue.go +++ b/pkg/scheduler/metrics/queue.go @@ -93,38 +93,6 @@ var ( Help: "If one queue is overused", }, []string{"queue_name"}, ) - - queuePodGroupInqueue = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Subsystem: VolcanoNamespace, - Name: "queue_pod_group_inqueue_count", - Help: "The number of Inqueue PodGroup in this queue", - }, []string{"queue_name"}, - ) - - queuePodGroupPending = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Subsystem: VolcanoNamespace, - Name: "queue_pod_group_pending_count", - Help: "The number of Pending PodGroup in this queue", - }, []string{"queue_name"}, - ) - - queuePodGroupRunning = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Subsystem: VolcanoNamespace, - Name: "queue_pod_group_running_count", - Help: "The number of Running PodGroup in this queue", - }, []string{"queue_name"}, - ) - - queuePodGroupUnknown = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Subsystem: VolcanoNamespace, - Name: "queue_pod_group_unknown_count", - Help: "The number of Unknown PodGroup in this queue", - }, []string{"queue_name"}, - ) ) // UpdateQueueAllocated records allocated resources for one queue @@ -166,26 +134,6 @@ func UpdateQueueOverused(queueName string, overused bool) { queueOverused.WithLabelValues(queueName).Set(value) } -// UpdateQueuePodGroupInqueueCount records the number of Inqueue PodGroup in this queue -func UpdateQueuePodGroupInqueueCount(queueName string, count int32) { - queuePodGroupInqueue.WithLabelValues(queueName).Set(float64(count)) -} - -// UpdateQueuePodGroupPendingCount records the number of Pending PodGroup in this queue -func UpdateQueuePodGroupPendingCount(queueName string, count int32) { - queuePodGroupPending.WithLabelValues(queueName).Set(float64(count)) -} - -// UpdateQueuePodGroupRunningCount records the number of Running PodGroup in this queue -func UpdateQueuePodGroupRunningCount(queueName string, count int32) { - queuePodGroupRunning.WithLabelValues(queueName).Set(float64(count)) -} - -// UpdateQueuePodGroupUnknownCount records the number of Unknown PodGroup in this queue -func UpdateQueuePodGroupUnknownCount(queueName string, count int32) { - queuePodGroupUnknown.WithLabelValues(queueName).Set(float64(count)) -} - // DeleteQueueMetrics delete all metrics related to the queue func DeleteQueueMetrics(queueName string) { queueAllocatedMilliCPU.DeleteLabelValues(queueName) @@ -197,8 +145,4 @@ func DeleteQueueMetrics(queueName string) { queueShare.DeleteLabelValues(queueName) queueWeight.DeleteLabelValues(queueName) queueOverused.DeleteLabelValues(queueName) - queuePodGroupInqueue.DeleteLabelValues(queueName) - queuePodGroupPending.DeleteLabelValues(queueName) - queuePodGroupRunning.DeleteLabelValues(queueName) - queuePodGroupUnknown.DeleteLabelValues(queueName) } diff --git a/pkg/scheduler/plugins/capacity/capacity.go b/pkg/scheduler/plugins/capacity/capacity.go index 3906b2ab5f..6774f099fb 100644 --- a/pkg/scheduler/plugins/capacity/capacity.go +++ b/pkg/scheduler/plugins/capacity/capacity.go @@ -181,10 +181,6 @@ func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) { metrics.UpdateQueueDeserved(attr.name, attr.deserved.MilliCPU, attr.deserved.Memory) metrics.UpdateQueueAllocated(attr.name, attr.allocated.MilliCPU, attr.allocated.Memory) metrics.UpdateQueueRequest(attr.name, attr.request.MilliCPU, attr.request.Memory) - metrics.UpdateQueuePodGroupInqueueCount(attr.name, queue.Queue.Status.Inqueue) - metrics.UpdateQueuePodGroupPendingCount(attr.name, queue.Queue.Status.Pending) - metrics.UpdateQueuePodGroupRunningCount(attr.name, queue.Queue.Status.Running) - metrics.UpdateQueuePodGroupUnknownCount(attr.name, queue.Queue.Status.Unknown) continue } deservedCPU, deservedMem := 0.0, 0.0 @@ -195,10 +191,6 @@ func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) { metrics.UpdateQueueDeserved(queueInfo.Name, deservedCPU, deservedMem) metrics.UpdateQueueAllocated(queueInfo.Name, 0, 0) metrics.UpdateQueueRequest(queueInfo.Name, 0, 0) - metrics.UpdateQueuePodGroupInqueueCount(queueInfo.Name, 0) - metrics.UpdateQueuePodGroupPendingCount(queueInfo.Name, 0) - metrics.UpdateQueuePodGroupRunningCount(queueInfo.Name, 0) - metrics.UpdateQueuePodGroupUnknownCount(queueInfo.Name, 0) } ssn.AddQueueOrderFn(cp.Name(), func(l, r interface{}) int { diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index 07203c6298..28f97beae3 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -167,19 +167,10 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { metrics.UpdateQueueAllocated(attr.name, attr.allocated.MilliCPU, attr.allocated.Memory) metrics.UpdateQueueRequest(attr.name, attr.request.MilliCPU, attr.request.Memory) metrics.UpdateQueueWeight(attr.name, attr.weight) - queue := ssn.Queues[attr.queueID] - metrics.UpdateQueuePodGroupInqueueCount(attr.name, queue.Queue.Status.Inqueue) - metrics.UpdateQueuePodGroupPendingCount(attr.name, queue.Queue.Status.Pending) - metrics.UpdateQueuePodGroupRunningCount(attr.name, queue.Queue.Status.Running) - metrics.UpdateQueuePodGroupUnknownCount(attr.name, queue.Queue.Status.Unknown) continue } metrics.UpdateQueueAllocated(queueInfo.Name, 0, 0) metrics.UpdateQueueRequest(queueInfo.Name, 0, 0) - metrics.UpdateQueuePodGroupInqueueCount(queueInfo.Name, 0) - metrics.UpdateQueuePodGroupPendingCount(queueInfo.Name, 0) - metrics.UpdateQueuePodGroupRunningCount(queueInfo.Name, 0) - metrics.UpdateQueuePodGroupUnknownCount(queueInfo.Name, 0) } remaining := pp.totalResource.Clone() diff --git a/test/e2e/jobseq/queue_job_status.go b/test/e2e/jobseq/queue_job_status.go index 53bdac30c9..534b7608e1 100644 --- a/test/e2e/jobseq/queue_job_status.go +++ b/test/e2e/jobseq/queue_job_status.go @@ -26,7 +26,6 @@ import ( . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" watchtools "k8s.io/client-go/tools/watch" @@ -77,17 +76,15 @@ var _ = Describe("Queue Job Status Transition", func() { By("Verify queue have pod groups inqueue") err := e2eutil.WaitQueueStatus(func() (bool, error) { - queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q1) - return queue.Status.Inqueue > 0, nil + pgStats := e2eutil.GetPodGroupStatistics(ctx, ctx.Namespace, q1) + return pgStats.Inqueue > 0, nil }) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue inqueue") By("Verify queue have pod groups running") err = e2eutil.WaitQueueStatus(func() (bool, error) { - queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q1) - return queue.Status.Running > 0, nil + pgStats := e2eutil.GetPodGroupStatistics(ctx, ctx.Namespace, q1) + return pgStats.Running > 0, nil }) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") }) @@ -134,9 +131,8 @@ var _ = Describe("Queue Job Status Transition", func() { By("Verify queue have pod groups running") err := e2eutil.WaitQueueStatus(func() (bool, error) { - queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q1) - return queue.Status.Running > 0, nil + pgStats := e2eutil.GetPodGroupStatistics(ctx, ctx.Namespace, q1) + return pgStats.Running > 0, nil }) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") @@ -150,9 +146,8 @@ var _ = Describe("Queue Job Status Transition", func() { By("Verify queue have pod groups Pending") err = e2eutil.WaitQueueStatus(func() (bool, error) { - queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q1) - return queue.Status.Pending > 0, nil + pgStats := e2eutil.GetPodGroupStatistics(ctx, ctx.Namespace, q1) + return pgStats.Pending > 0, nil }) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue Pending") }) @@ -195,9 +190,8 @@ var _ = Describe("Queue Job Status Transition", func() { By("Verify queue have pod groups running") err := e2eutil.WaitQueueStatus(func() (bool, error) { - queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q1) - return queue.Status.Running > 0, nil + pgStats := e2eutil.GetPodGroupStatistics(ctx, ctx.Namespace, q1) + return pgStats.Running > 0, nil }) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") @@ -220,11 +214,9 @@ var _ = Describe("Queue Job Status Transition", func() { } By("Verify queue have pod groups unknown") - fieldSelector := fields.OneTermEqualSelector("metadata.name", q1).String() w := &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) { - options.FieldSelector = fieldSelector - return ctx.Vcclient.SchedulingV1beta1().Queues().Watch(context.TODO(), options) + return ctx.Vcclient.SchedulingV1beta1().PodGroups(podNamespace).Watch(context.TODO(), options) }, } wctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), 5*time.Minute) @@ -232,8 +224,8 @@ var _ = Describe("Queue Job Status Transition", func() { _, err = watchtools.Until(wctx, clusterPods.ResourceVersion, w, func(event watch.Event) (bool, error) { switch t := event.Object.(type) { - case *v1beta1.Queue: - if t.Status.Unknown > 0 { + case *v1beta1.PodGroup: + if t.Status.Phase == v1beta1.PodGroupUnknown { return true, nil } } diff --git a/test/e2e/schedulingaction/reclaim.go b/test/e2e/schedulingaction/reclaim.go index 0c5dd3009c..43112c6757 100644 --- a/test/e2e/schedulingaction/reclaim.go +++ b/test/e2e/schedulingaction/reclaim.go @@ -70,14 +70,25 @@ var _ = Describe("Reclaim E2E Test", func() { queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), queue, metav1.GetOptions{}) Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", queue) switch status { - case "Running": - return queue.Status.Running == num, nil case "Open": return queue.Status.State == schedulingv1beta1.QueueStateOpen, nil + default: + return false, nil + } + }) + return err + } + + CheckPodGroupStatistics := func(ctx *e2eutil.TestContext, status string, num int, queue string) error { + err := e2eutil.WaitQueueStatus(func() (bool, error) { + pgStats := e2eutil.GetPodGroupStatistics(ctx, ctx.Namespace, queue) + switch status { + case "Running": + return pgStats.Running == num, nil case "Pending": - return queue.Status.Pending == num, nil + return pgStats.Pending == num, nil case "Inqueue": - return queue.Status.Inqueue == num, nil + return pgStats.Inqueue == num, nil default: return false, nil } @@ -117,13 +128,13 @@ var _ = Describe("Reclaim E2E Test", func() { By("Make sure all job running") - err = WaitQueueStatus(ctx, "Running", 1, q1) + err = CheckPodGroupStatistics(ctx, "Running", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q2) + err = CheckPodGroupStatistics(ctx, "Running", 1, q2) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q3) + err = CheckPodGroupStatistics(ctx, "Running", 1, q3) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") }) @@ -176,10 +187,10 @@ var _ = Describe("Reclaim E2E Test", func() { Expect(err).NotTo(HaveOccurred(), "Get %s pod failed", j3) By("Make sure q1 q2 with job running in it.") - err = WaitQueueStatus(ctx, "Running", 1, q1) + err = CheckPodGroupStatistics(ctx, "Running", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q2) + err = CheckPodGroupStatistics(ctx, "Running", 1, q2) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") for _, pod := range job3pods.Items { @@ -188,7 +199,7 @@ var _ = Describe("Reclaim E2E Test", func() { } By("Q3 pending when we delete it.") - err = WaitQueueStatus(ctx, "Pending", 1, q3) + err = CheckPodGroupStatistics(ctx, "Pending", 1, q3) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue pending") }) @@ -223,10 +234,10 @@ var _ = Describe("Reclaim E2E Test", func() { By("Make sure all job running") - err = WaitQueueStatus(ctx, "Running", 1, q1) + err = CheckPodGroupStatistics(ctx, "Running", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q2) + err = CheckPodGroupStatistics(ctx, "Running", 1, q2) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") }) @@ -265,10 +276,10 @@ var _ = Describe("Reclaim E2E Test", func() { By("Make sure all job running") - err = WaitQueueStatus(ctx, "Running", 1, q1) + err = CheckPodGroupStatistics(ctx, "Running", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q2) + err = CheckPodGroupStatistics(ctx, "Running", 1, q2) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") }) @@ -306,16 +317,16 @@ var _ = Describe("Reclaim E2E Test", func() { time.Sleep(10 * time.Second) By("Make sure all job running") - err = WaitQueueStatus(ctx, "Running", 1, q1) + err = CheckPodGroupStatistics(ctx, "Running", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q2) + err = CheckPodGroupStatistics(ctx, "Running", 1, q2) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q3) + err = CheckPodGroupStatistics(ctx, "Running", 1, q3) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Inqueue", 1, q3) + err = CheckPodGroupStatistics(ctx, "Inqueue", 1, q3) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue Inqueue") }) @@ -352,14 +363,14 @@ var _ = Describe("Reclaim E2E Test", func() { time.Sleep(10 * time.Second) By("Make sure all job running") - err = WaitQueueStatus(ctx, "Running", 1, q1) + err = CheckPodGroupStatistics(ctx, "Running", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q2) + err = CheckPodGroupStatistics(ctx, "Running", 1, q2) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") // TODO: it is a bug : the job status is pending but podgroup status is running - err = WaitQueueStatus(ctx, "Running", 1, q3) + err = CheckPodGroupStatistics(ctx, "Running", 1, q3) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue Running") }) @@ -412,13 +423,13 @@ var _ = Describe("Reclaim E2E Test", func() { time.Sleep(10 * time.Second) By("Make sure all job running") - err = WaitQueueStatus(ctx, "Running", 1, q1) + err = CheckPodGroupStatistics(ctx, "Running", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q2) + err = CheckPodGroupStatistics(ctx, "Running", 1, q2) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Inqueue", 1, q3) + err = CheckPodGroupStatistics(ctx, "Inqueue", 1, q3) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue Inqueue") }) @@ -457,13 +468,13 @@ var _ = Describe("Reclaim E2E Test", func() { By("Make sure all job running") - err = WaitQueueStatus(ctx, "Running", 1, q1) + err = CheckPodGroupStatistics(ctx, "Running", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q2) + err = CheckPodGroupStatistics(ctx, "Running", 1, q2) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = WaitQueueStatus(ctx, "Running", 1, q3) + err = CheckPodGroupStatistics(ctx, "Running", 1, q3) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") }) @@ -514,10 +525,10 @@ var _ = Describe("Reclaim E2E Test", func() { err = e2eutil.WaitJobReady(ctx, job2) Expect(err).NotTo(HaveOccurred()) - err = WaitQueueStatus(ctx, "Running", 1, q1) + err = CheckPodGroupStatistics(ctx, "Running", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue1 running") - err = WaitQueueStatus(ctx, "Running", 1, q2) + err = CheckPodGroupStatistics(ctx, "Running", 1, q2) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue2 running") By("Create coming jobs") @@ -530,10 +541,10 @@ var _ = Describe("Reclaim E2E Test", func() { By("Make sure all job running") - err = WaitQueueStatus(ctx, "Running", 1, q3) + err = CheckPodGroupStatistics(ctx, "Running", 1, q3) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue3 running") - err = WaitQueueStatus(ctx, "Running", 1, q4) + err = CheckPodGroupStatistics(ctx, "Running", 1, q4) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue4 running") }) @@ -619,10 +630,10 @@ var _ = Describe("Reclaim E2E Test", func() { err = e2eutil.WaitJobReady(ctx, job2) Expect(err).NotTo(HaveOccurred()) - err = WaitQueueStatus(ctx, "Running", 1, q1) + err = CheckPodGroupStatistics(ctx, "Running", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue1 running") - err = WaitQueueStatus(ctx, "Running", 1, q2) + err = CheckPodGroupStatistics(ctx, "Running", 1, q2) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue2 running") By("Create coming jobs") @@ -641,10 +652,10 @@ var _ = Describe("Reclaim E2E Test", func() { By("Make sure all job running") - err = WaitQueueStatus(ctx, "Running", 1, q3) + err = CheckPodGroupStatistics(ctx, "Running", 1, q3) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue3 running") - err = WaitQueueStatus(ctx, "Running", 3, q4) + err = CheckPodGroupStatistics(ctx, "Running", 3, q4) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue4 running") }) @@ -684,9 +695,8 @@ var _ = Describe("Reclaim E2E Test", func() { Expect(err).NotTo(HaveOccurred()) err = e2eutil.WaitQueueStatus(func() (bool, error) { - queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred()) - return queue.Status.Running == 1, nil + pgStats := e2eutil.GetPodGroupStatistics(ctx, ctx.Namespace, q1) + return pgStats.Running == 1, nil }) Expect(err).NotTo(HaveOccurred()) @@ -726,9 +736,8 @@ var _ = Describe("Reclaim E2E Test", func() { err = e2eutil.WaitJobStatePending(ctx, job3) Expect(err).NotTo(HaveOccurred()) err = e2eutil.WaitQueueStatus(func() (bool, error) { - queue, err := ctx.Vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred()) - return queue.Status.Pending == 1, nil + pgStats := e2eutil.GetPodGroupStatistics(ctx, ctx.Namespace, q1) + return pgStats.Pending == 1, nil }) Expect(err).NotTo(HaveOccurred()) }) diff --git a/test/e2e/util/podgroup.go b/test/e2e/util/podgroup.go index 1055307b55..b8219f04b8 100644 --- a/test/e2e/util/podgroup.go +++ b/test/e2e/util/podgroup.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + queucli "volcano.sh/volcano/pkg/cli/queue" ) // CreatePodGroup creates a PodGroup with the specified name in the namespace @@ -90,3 +91,15 @@ func PodGroupIsReady(ctx *TestContext, namespace string) (bool, error) { return false, fmt.Errorf("pod group phase is Pending") } + +func GetPodGroupStatistics(ctx *TestContext, namespace, queue string) *queucli.PodGroupStatistics { + pgList, err := ctx.Vcclient.SchedulingV1beta1().PodGroups(namespace).List(context.TODO(), metav1.ListOptions{}) + Expect(err).NotTo(HaveOccurred(), "List podgroups failed") + pgStats := &queucli.PodGroupStatistics{} + for _, pg := range pgList.Items { + if pg.Spec.Queue == queue { + pgStats.StatPodGroupCountsForQueue(&pg) + } + } + return pgStats +}