diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index 4ed6754f10..2bc66c0155 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -39,6 +39,7 @@ const ( defaultHealthzAddress = ":11251" defaultLockObjectNamespace = "volcano-system" defaultPodGroupWorkers = 5 + defaultQueueWorkers = 5 defaultGCWorkers = 1 defaultControllers = "*" ) @@ -75,6 +76,9 @@ type ServerOption struct { // WorkerThreadsForPG is the number of threads syncing podgroup operations // The larger the number, the faster the podgroup processing, but requires more CPU load. WorkerThreadsForPG uint32 + // WorkerThreadsForQueue is the number of threads syncing queue operations + // The larger the number, the faster the queue processing, but requires more CPU load. + WorkerThreadsForQueue uint32 // WorkerThreadsForGC is the number of threads for recycling jobs // The larger the number, the faster the job recycling, but requires more CPU load. WorkerThreadsForGC uint32 @@ -117,6 +121,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet, knownControllers []string) { fs.BoolVar(&s.InheritOwnerAnnotations, "inherit-owner-annotations", true, "Enable inherit owner annotations for pods when create podgroup; it is enabled by default") fs.Uint32Var(&s.WorkerThreadsForPG, "worker-threads-for-podgroup", defaultPodGroupWorkers, "The number of threads syncing podgroup operations. The larger the number, the faster the podgroup processing, but requires more CPU load.") fs.Uint32Var(&s.WorkerThreadsForGC, "worker-threads-for-gc", defaultGCWorkers, "The number of threads for recycling jobs. The larger the number, the faster the job recycling, but requires more CPU load.") + fs.Uint32Var(&s.WorkerThreadsForQueue, "worker-threads-for-queue", defaultQueueWorkers, "The number of threads syncing queue operations. The larger the number, the faster the queue processing, but requires more CPU load.") fs.StringSliceVar(&s.Controllers, "controllers", []string{defaultControllers}, fmt.Sprintf("Specify controller gates. Use '*' for all controllers, all knownController: %s ,and we can use "+ "'-' to disable controllers, e.g. \"-job-controller,-queue-controller\" to disable job and queue controllers.", knownControllers)) } diff --git a/cmd/controller-manager/app/options/options_test.go b/cmd/controller-manager/app/options/options_test.go index a17e71f601..a3154497a7 100644 --- a/cmd/controller-manager/app/options/options_test.go +++ b/cmd/controller-manager/app/options/options_test.go @@ -99,10 +99,11 @@ func TestAddFlags(t *testing.T) { ResourceNamespace: defaultLockObjectNamespace, ResourceName: "vc-controller-manager", }, - LockObjectNamespace: defaultLockObjectNamespace, - WorkerThreadsForPG: 5, - WorkerThreadsForGC: 1, - Controllers: []string{"*"}, + LockObjectNamespace: defaultLockObjectNamespace, + WorkerThreadsForPG: 5, + WorkerThreadsForQueue: 5, + WorkerThreadsForGC: 1, + Controllers: []string{"*"}, } expectedFeatureGates := map[featuregate.Feature]bool{features.ResourceTopology: false} diff --git a/cmd/controller-manager/app/server.go b/cmd/controller-manager/app/server.go index 8b80986647..9f98042815 100644 --- a/cmd/controller-manager/app/server.go +++ b/cmd/controller-manager/app/server.go @@ -128,6 +128,7 @@ func startControllers(config *rest.Config, opt *options.ServerOption) func(ctx c controllerOpt.VCSharedInformerFactory = informerfactory.NewSharedInformerFactory(controllerOpt.VolcanoClient, 0) controllerOpt.InheritOwnerAnnotations = opt.InheritOwnerAnnotations controllerOpt.WorkerThreadsForPG = opt.WorkerThreadsForPG + controllerOpt.WorkerThreadsForQueue = opt.WorkerThreadsForQueue controllerOpt.WorkerThreadsForGC = opt.WorkerThreadsForGC controllerOpt.Config = config diff --git a/pkg/controllers/framework/interface.go b/pkg/controllers/framework/interface.go index cf8c258ea8..6dc3bc7126 100644 --- a/pkg/controllers/framework/interface.go +++ b/pkg/controllers/framework/interface.go @@ -37,6 +37,7 @@ type ControllerOption struct { InheritOwnerAnnotations bool WorkerThreadsForPG uint32 + WorkerThreadsForQueue uint32 WorkerThreadsForGC uint32 // Config holds the common attributes that can be passed to a Kubernetes client diff --git a/pkg/controllers/queue/queue_controller.go b/pkg/controllers/queue/queue_controller.go index 8602a07d68..678424c804 100644 --- a/pkg/controllers/queue/queue_controller.go +++ b/pkg/controllers/queue/queue_controller.go @@ -90,6 +90,7 @@ type queuecontroller struct { enqueueQueue func(req *apis.Request) recorder record.EventRecorder + workers uint32 maxRequeueNum int } @@ -125,6 +126,7 @@ func (c *queuecontroller) Initialize(opt *framework.ControllerOption) error { if c.maxRequeueNum < 0 { c.maxRequeueNum = -1 } + c.workers = opt.WorkerThreadsForQueue queueInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.addQueue, @@ -187,8 +189,10 @@ func (c *queuecontroller) Run(stopCh <-chan struct{}) { } } - go wait.Until(c.worker, 0, stopCh) - go wait.Until(c.commandWorker, 0, stopCh) + for i := 0; i < int(c.workers); i++ { + go wait.Until(c.worker, 0, stopCh) + go wait.Until(c.commandWorker, 0, stopCh) + } <-stopCh }