Skip to content

Commit

Permalink
feature: Add podgroups statistics
Browse files Browse the repository at this point in the history
Signed-off-by: jessestutler <jesseincomparable@hotmail.com>
  • Loading branch information
JesseStutler committed Sep 26, 2024
1 parent 0843c0d commit a91a112
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 20 deletions.
5 changes: 5 additions & 0 deletions cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
defaultMaxRequeueNum = 15
defaultSchedulerName = "volcano"
defaultHealthzAddress = ":11251"
defaultListenAddress = ":8080"
defaultLockObjectNamespace = "volcano-system"
defaultPodGroupWorkers = 5
defaultGCWorkers = 1
Expand Down Expand Up @@ -70,6 +71,8 @@ type ServerOption struct {
// defaulting to 0.0.0.0:11251
HealthzBindAddress string
EnableHealthz bool
EnableMetrics bool
ListenAddress string
// To determine whether inherit owner's annotations for pods when create podgroup
InheritOwnerAnnotations bool
// WorkerThreadsForPG is the number of threads syncing podgroup operations
Expand Down Expand Up @@ -114,6 +117,8 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet, knownControllers []string) {
fs.IntVar(&s.MaxRequeueNum, "max-requeue-num", defaultMaxRequeueNum, "The number of times a job, queue or command will be requeued before it is dropped out of the queue")
fs.StringVar(&s.HealthzBindAddress, "healthz-address", defaultHealthzAddress, "The address to listen on for the health check server.")
fs.BoolVar(&s.EnableHealthz, "enable-healthz", false, "Enable the health check; it is false by default")
fs.BoolVar(&s.EnableMetrics, "enable-metrics", false, "Enable the metrics function; it is false by default")
fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.")
fs.BoolVar(&s.InheritOwnerAnnotations, "inherit-owner-annotations", true, "Enable inherit owner annotations for pods when create podgroup; it is enabled by default")
fs.Uint32Var(&s.WorkerThreadsForPG, "worker-threads-for-podgroup", defaultPodGroupWorkers, "The number of threads syncing podgroup operations. The larger the number, the faster the podgroup processing, but requires more CPU load.")
fs.Uint32Var(&s.WorkerThreadsForGC, "worker-threads-for-gc", defaultGCWorkers, "The number of threads for recycling jobs. The larger the number, the faster the job recycling, but requires more CPU load.")
Expand Down
20 changes: 19 additions & 1 deletion cmd/controller-manager/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ package app
import (
"context"
"fmt"
"net/http"
"os"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/informers"
Expand All @@ -31,6 +35,7 @@ import (
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"

"volcano.sh/apis/pkg/apis/helpers"
Expand All @@ -48,13 +53,19 @@ func Run(opt *options.ServerOption) error {
if err != nil {
return err
}

if opt.EnableHealthz {
if err := helpers.StartHealthz(opt.HealthzBindAddress, "volcano-controller", opt.CaCertData, opt.CertData, opt.KeyData); err != nil {
return err
}
}

if opt.EnableMetrics {
go func() {
http.Handle("/metrics", promHandler())
klog.Fatalf("Prometheus Http Server failed %s", http.ListenAndServe(opt.ListenAddress, nil))
}()
}

run := startControllers(config, opt)

ctx := signals.SetupSignalContext()
Expand Down Expand Up @@ -177,3 +188,10 @@ func isControllerEnabled(name string, controllers []string) bool {
// if we get here, there was no explicit inclusion or exclusion
return hasStar
}

func promHandler() http.Handler {
// Unregister go and process related collector because it's duplicated and `legacyregistry.DefaultGatherer` also has registered them.
prometheus.DefaultRegisterer.Unregister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
prometheus.DefaultRegisterer.Unregister(collectors.NewGoCollector())
return promhttp.InstrumentMetricHandler(prometheus.DefaultRegisterer, promhttp.HandlerFor(prometheus.Gatherers{prometheus.DefaultGatherer, legacyregistry.DefaultGatherer}, promhttp.HandlerOpts{}))
}
28 changes: 27 additions & 1 deletion installer/helm/chart/volcano/templates/controllers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ rules:
verbs: ["get", "create", "delete", "update"]
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
resources: ["podgroups", "queues", "queues/status"]
verbs: ["get", "list", "watch", "create", "delete", "update"]
verbs: ["get", "list", "watch", "create", "delete", "update", "patch"]
- apiGroups: ["flow.volcano.sh"]
resources: ["jobflows", "jobtemplates"]
verbs: ["get", "list", "watch", "create", "delete", "update"]
Expand Down Expand Up @@ -155,6 +155,7 @@ spec:
args:
- --logtostderr
- --enable-healthz=true
- --enable-metrics=true
- --leader-elect={{ .Values.custom.leader_elect_enable }}
{{- if .Values.custom.leader_elect_enable }}
- --leader-elect-resource-namespace={{ .Release.Namespace }}
Expand All @@ -167,3 +168,28 @@ spec:
{{- toYaml .Values.custom.controller_default_csc | nindent 14 }}
{{- end }}
{{- end }}
---
apiVersion: v1
kind: Service
metadata:
annotations:
prometheus.io/path: /metrics
prometheus.io/port: "8080"
prometheus.io/scrape: "true"
name: {{ .Release.Name }}-controllers-service
namespace: {{ .Release.Namespace }}
labels:
app: volcano-controller
{{- if .Values.custom.common_labels }}
{{- toYaml .Values.custom.common_labels | nindent 4 }}
{{- end }}
spec:
ports:
- port: 8080
protocol: TCP
targetPort: 8080
name: "metrics"
selector:
app: volcano-controller
type: ClusterIP
{{- end }}
31 changes: 24 additions & 7 deletions pkg/cli/queue/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,38 @@ func GetQueue(ctx context.Context) error {
return err
}

PrintQueue(queue, os.Stdout)
// Currently CRD does not support filtering directly from kube-apiserver using fieldSelector,
// The featuregate called CustomResourceFieldSelectors is turned off by default in kube-apiserver
// and is still in alpha in v1.30. Therefore we can only get all the podgroups from kube-apiserver
// and then filtering them.
pgList, err := queueClient.SchedulingV1beta1().PodGroups("").List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list podgroup for queue %s with err: %v", getQueueFlags.Name, err)
}

pgStats := &podGroupStatistics{}
for _, pg := range pgList.Items {
if pg.Spec.Queue == getQueueFlags.Name {
pgStats.statPodGroupCountsForQueue(&pg)
}
}

PrintQueue(queue, pgStats, os.Stdout)

return nil
}

// PrintQueue prints queue information.
func PrintQueue(queue *v1beta1.Queue, writer io.Writer) {
_, err := fmt.Fprintf(writer, "%-25s%-8s%-8s%-8s%-8s%-8s%-8s\n",
Name, Weight, State, Inqueue, Pending, Running, Unknown)
func PrintQueue(queue *v1beta1.Queue, pgStats *podGroupStatistics, writer io.Writer) {
_, err := fmt.Fprintf(writer, "%-25s%-8s%-8s%-8s%-8s%-8s%-8s%-8s\n",
Name, Weight, State, Inqueue, Pending, Running, Unknown, Completed)
if err != nil {
fmt.Printf("Failed to print queue command result: %s.\n", err)
}
_, err = fmt.Fprintf(writer, "%-25s%-8d%-8s%-8d%-8d%-8d%-8d\n",
queue.Name, queue.Spec.Weight, queue.Status.State, queue.Status.Inqueue,
queue.Status.Pending, queue.Status.Running, queue.Status.Unknown)

_, err = fmt.Fprintf(writer, "%-25s%-8d%-8s%-8d%-8d%-8d%-8d%-8d\n",
queue.Name, queue.Spec.Weight, queue.Status.State, pgStats.inqueue,
pgStats.pending, pgStats.running, pgStats.unknown, pgStats.completed)
if err != nil {
fmt.Printf("Failed to print queue command result: %s.\n", err)
}
Expand Down
37 changes: 30 additions & 7 deletions pkg/cli/queue/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ const (
// Inqueue status of queue
Inqueue string = "Inqueue"

// Completed status of the queue
Completed string = "Completed"

// State is state of queue
State string = "State"
)
Expand Down Expand Up @@ -81,22 +84,42 @@ func ListQueue(ctx context.Context) error {
fmt.Printf("No resources found\n")
return nil
}
PrintQueues(queues, os.Stdout)

// Currently CRD does not support filtering directly from kube-apiserver using fieldSelector,
// The featuregate called CustomResourceFieldSelectors is turned off by default in kube-apiserver
// and is still in alpha in v1.30. Therefore we can only get all the podgroups from kube-apiserver
// and then filtering them.
pgList, err := jobClient.SchedulingV1beta1().PodGroups("").List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list podgroups with err: %v", err)
}

queueStats := make(map[string]*podGroupStatistics, len(queues.Items))
for _, queue := range queues.Items {
queueStats[queue.Name] = &podGroupStatistics{}
}

for _, pg := range pgList.Items {
queueStats[pg.Spec.Queue].statPodGroupCountsForQueue(&pg)
}

PrintQueues(queues, queueStats, os.Stdout)

return nil
}

// PrintQueues prints queue information.
func PrintQueues(queues *v1beta1.QueueList, writer io.Writer) {
_, err := fmt.Fprintf(writer, "%-25s%-8s%-8s%-8s%-8s%-8s%-8s\n",
Name, Weight, State, Inqueue, Pending, Running, Unknown)
func PrintQueues(queues *v1beta1.QueueList, queueStats map[string]*podGroupStatistics, writer io.Writer) {
_, err := fmt.Fprintf(writer, "%-25s%-8s%-8s%-8s%-8s%-8s%-8s%-8s\n",
Name, Weight, State, Inqueue, Pending, Running, Unknown, Completed)
if err != nil {
fmt.Printf("Failed to print queue command result: %s.\n", err)
}

for _, queue := range queues.Items {
_, err = fmt.Fprintf(writer, "%-25s%-8d%-8s%-8d%-8d%-8d%-8d\n",
queue.Name, queue.Spec.Weight, queue.Status.State, queue.Status.Inqueue,
queue.Status.Pending, queue.Status.Running, queue.Status.Unknown)
_, err = fmt.Fprintf(writer, "%-25s%-8d%-8s%-8d%-8d%-8d%-8d%-8d\n",
queue.Name, queue.Spec.Weight, queue.Status.State, queueStats[queue.Name].inqueue, queueStats[queue.Name].pending,
queueStats[queue.Name].running, queueStats[queue.Name].unknown, queueStats[queue.Name].completed)
if err != nil {
fmt.Printf("Failed to print queue command result: %s.\n", err)
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/cli/queue/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1"
"volcano.sh/apis/pkg/apis/helpers"
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/apis/pkg/client/clientset/versioned"
)

Expand Down Expand Up @@ -62,3 +63,26 @@ func createQueueCommand(ctx context.Context, config *rest.Config, action busv1al

return nil
}

type podGroupStatistics struct {
inqueue int
pending int
running int
unknown int
completed int
}

func (pgStats *podGroupStatistics) statPodGroupCountsForQueue(pg *v1beta1.PodGroup) {
switch pg.Status.Phase {
case v1beta1.PodGroupInqueue:
pgStats.inqueue++
case v1beta1.PodGroupPending:
pgStats.pending++
case v1beta1.PodGroupRunning:
pgStats.running++
case v1beta1.PodGroupUnknown:
pgStats.unknown++
case v1beta1.PodGroupCompleted:
pgStats.completed++
}
}
73 changes: 73 additions & 0 deletions pkg/controllers/metrics/podgroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package metrics

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
pgPendingPhaseNum = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: VolcanoNamespace,
Name: "podgroup_pending_phase_num",
Help: "Number of podgroup at pending phase",
}, []string{"queue_name"},
)

pgRunningPhaseNum = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: VolcanoNamespace,
Name: "podgroup_running_phase_num",
Help: "Number of podgroup at running phase",
}, []string{"queue_name"},
)

pgUnknownPhaseNum = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: VolcanoNamespace,
Name: "podgroup_unknown_phase_num",
Help: "Number of podgroup at unknown phase",
}, []string{"queue_name"},
)

pgInqueuePhaseNum = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: VolcanoNamespace,
Name: "podgroup_inqueue_phase_num",
Help: "Number of podgroup at inqueue phase",
}, []string{"queue_name"},
)

pgCompletedPhaseNum = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: VolcanoNamespace,
Name: "podgroup_completed_phase_num",
Help: "Number of podgroup at completed phase",
}, []string{"queue_name"},
)
)

// UpdatePgPendingPhaseNum recored the num of podgroups at pending state in queue
func UpdatePgPendingPhaseNum(queueName string, num float64) {
pgPendingPhaseNum.WithLabelValues(queueName).Set(num)
}

// UpdatePgRunningPhaseNum recored the num of podgroups at running state in queue
func UpdatePgRunningPhaseNum(queueName string, num float64) {
pgRunningPhaseNum.WithLabelValues(queueName).Set(num)
}

// UpdatePgUnknownPhaseNum recored the num of podgroups at unknown state in queue
func UpdatePgUnknownPhaseNum(queueName string, num float64) {
pgUnknownPhaseNum.WithLabelValues(queueName).Set(num)
}

// UpdatePgInqueuePhaseNum recored the num of podgroups at inqueue state in queue
func UpdatePgInqueuePhaseNum(queueName string, num float64) {
pgInqueuePhaseNum.WithLabelValues(queueName).Set(num)
}

// UpdatePgCompletedPhaseNum recored the num of podgroups at completed state in queue
func UpdatePgCompletedPhaseNum(queueName string, num float64) {
pgCompletedPhaseNum.WithLabelValues(queueName).Set(num)
}
6 changes: 6 additions & 0 deletions pkg/controllers/metrics/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package metrics

const (
// VolcanoNamespace - namespace in prometheus used by volcano
VolcanoNamespace = "volcano"
)
19 changes: 15 additions & 4 deletions pkg/controllers/queue/queue_controller_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (

"volcano.sh/apis/pkg/apis/bus/v1alpha1"
schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
v1beta1apply "volcano.sh/apis/pkg/client/applyconfiguration/scheduling/v1beta1"
"volcano.sh/volcano/pkg/controllers/metrics"
"volcano.sh/volcano/pkg/controllers/queue/state"
)

Expand Down Expand Up @@ -57,9 +59,18 @@ func (c *queuecontroller) syncQueue(queue *schedulingv1beta1.Queue, updateStateF
queueStatus.Unknown++
case schedulingv1beta1.PodGroupInqueue:
queueStatus.Inqueue++
case schedulingv1beta1.PodGroupCompleted:
queueStatus.Completed++
}
}

// Update the metrics
metrics.UpdatePgPendingPhaseNum(queue.Name, float64(queueStatus.Pending))
metrics.UpdatePgRunningPhaseNum(queue.Name, float64(queueStatus.Running))
metrics.UpdatePgUnknownPhaseNum(queue.Name, float64(queueStatus.Unknown))
metrics.UpdatePgInqueuePhaseNum(queue.Name, float64(queueStatus.Inqueue))
metrics.UpdatePgCompletedPhaseNum(queue.Name, float64(queueStatus.Completed))

if updateStateFn != nil {
updateStateFn(&queueStatus, podGroups)
} else {
Expand All @@ -79,10 +90,10 @@ func (c *queuecontroller) syncQueue(queue *schedulingv1beta1.Queue, updateStateF
return nil
}

newQueue := queue.DeepCopy()
newQueue.Status = queueStatus
if _, err := c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil {
klog.Errorf("Failed to update status of Queue %s: %v.", newQueue.Name, err)
queueStatusApply := v1beta1apply.QueueStatus().WithState(queueStatus.State).WithAllocated(queueStatus.Allocated)
queueApply := v1beta1apply.Queue(queue.Name).WithStatus(queueStatusApply)
if _, err := c.vcClient.SchedulingV1beta1().Queues().ApplyStatus(context.TODO(), queueApply, metav1.ApplyOptions{FieldManager: controllerName}); err != nil {
klog.Errorf("Failed to apply status of Queue %s: %v.", queue.Name, err)
return err
}

Expand Down
Loading

0 comments on commit a91a112

Please sign in to comment.