Skip to content

Commit

Permalink
perf: imporve end to end queue processing latency
Browse files Browse the repository at this point in the history
  • Loading branch information
fengruotj authored and tanjie.master committed Oct 15, 2024
1 parent 7dfa8c6 commit 39e69bb
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 6 deletions.
5 changes: 5 additions & 0 deletions cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
defaultHealthzAddress = ":11251"
defaultLockObjectNamespace = "volcano-system"
defaultPodGroupWorkers = 5
defaultQueueWorkers = 5
defaultGCWorkers = 1
defaultControllers = "*"
)
Expand Down Expand Up @@ -75,6 +76,9 @@ type ServerOption struct {
// WorkerThreadsForPG is the number of threads syncing podgroup operations
// The larger the number, the faster the podgroup processing, but requires more CPU load.
WorkerThreadsForPG uint32
// WorkerThreadsForQueue is the number of threads syncing queue operations
// The larger the number, the faster the queue processing, but requires more CPU load.
WorkerThreadsForQueue uint32
// WorkerThreadsForGC is the number of threads for recycling jobs
// The larger the number, the faster the job recycling, but requires more CPU load.
WorkerThreadsForGC uint32
Expand Down Expand Up @@ -117,6 +121,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet, knownControllers []string) {
fs.BoolVar(&s.InheritOwnerAnnotations, "inherit-owner-annotations", true, "Enable inherit owner annotations for pods when create podgroup; it is enabled by default")
fs.Uint32Var(&s.WorkerThreadsForPG, "worker-threads-for-podgroup", defaultPodGroupWorkers, "The number of threads syncing podgroup operations. The larger the number, the faster the podgroup processing, but requires more CPU load.")
fs.Uint32Var(&s.WorkerThreadsForGC, "worker-threads-for-gc", defaultGCWorkers, "The number of threads for recycling jobs. The larger the number, the faster the job recycling, but requires more CPU load.")
fs.Uint32Var(&s.WorkerThreadsForQueue, "worker-threads-for-queue", defaultQueueWorkers, "The number of threads syncing queue operations. The larger the number, the faster the queue processing, but requires more CPU load.")
fs.StringSliceVar(&s.Controllers, "controllers", []string{defaultControllers}, fmt.Sprintf("Specify controller gates. Use '*' for all controllers, all knownController: %s ,and we can use "+
"'-' to disable controllers, e.g. \"-job-controller,-queue-controller\" to disable job and queue controllers.", knownControllers))
}
Expand Down
9 changes: 5 additions & 4 deletions cmd/controller-manager/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,11 @@ func TestAddFlags(t *testing.T) {
ResourceNamespace: defaultLockObjectNamespace,
ResourceName: "vc-controller-manager",
},
LockObjectNamespace: defaultLockObjectNamespace,
WorkerThreadsForPG: 5,
WorkerThreadsForGC: 1,
Controllers: []string{"*"},
LockObjectNamespace: defaultLockObjectNamespace,
WorkerThreadsForPG: 5,
WorkerThreadsForQueue: 5,
WorkerThreadsForGC: 1,
Controllers: []string{"*"},
}
expectedFeatureGates := map[featuregate.Feature]bool{features.ResourceTopology: false}

Expand Down
1 change: 1 addition & 0 deletions cmd/controller-manager/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func startControllers(config *rest.Config, opt *options.ServerOption) func(ctx c
controllerOpt.VCSharedInformerFactory = informerfactory.NewSharedInformerFactory(controllerOpt.VolcanoClient, 0)
controllerOpt.InheritOwnerAnnotations = opt.InheritOwnerAnnotations
controllerOpt.WorkerThreadsForPG = opt.WorkerThreadsForPG
controllerOpt.WorkerThreadsForQueue = opt.WorkerThreadsForQueue
controllerOpt.WorkerThreadsForGC = opt.WorkerThreadsForGC
controllerOpt.Config = config

Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type ControllerOption struct {

InheritOwnerAnnotations bool
WorkerThreadsForPG uint32
WorkerThreadsForQueue uint32
WorkerThreadsForGC uint32

// Config holds the common attributes that can be passed to a Kubernetes client
Expand Down
8 changes: 6 additions & 2 deletions pkg/controllers/queue/queue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type queuecontroller struct {
enqueueQueue func(req *apis.Request)

recorder record.EventRecorder
workers uint32
maxRequeueNum int
}

Expand Down Expand Up @@ -125,6 +126,7 @@ func (c *queuecontroller) Initialize(opt *framework.ControllerOption) error {
if c.maxRequeueNum < 0 {
c.maxRequeueNum = -1
}
c.workers = opt.WorkerThreadsForQueue

queueInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addQueue,
Expand Down Expand Up @@ -187,8 +189,10 @@ func (c *queuecontroller) Run(stopCh <-chan struct{}) {
}
}

go wait.Until(c.worker, 0, stopCh)
go wait.Until(c.commandWorker, 0, stopCh)
for i := 0; i < int(c.workers); i++ {
go wait.Until(c.worker, 0, stopCh)
go wait.Until(c.commandWorker, 0, stopCh)
}

<-stopCh
}
Expand Down

0 comments on commit 39e69bb

Please sign in to comment.