Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: imporve end to end queue processing latency #3772

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
defaultHealthzAddress = ":11251"
defaultLockObjectNamespace = "volcano-system"
defaultPodGroupWorkers = 5
defaultQueueWorkers = 5
defaultGCWorkers = 1
defaultControllers = "*"
)
Expand Down Expand Up @@ -75,6 +76,9 @@ type ServerOption struct {
// WorkerThreadsForPG is the number of threads syncing podgroup operations
// The larger the number, the faster the podgroup processing, but requires more CPU load.
WorkerThreadsForPG uint32
// WorkerThreadsForQueue is the number of threads syncing queue operations
// The larger the number, the faster the queue processing, but requires more CPU load.
WorkerThreadsForQueue uint32
// WorkerThreadsForGC is the number of threads for recycling jobs
// The larger the number, the faster the job recycling, but requires more CPU load.
WorkerThreadsForGC uint32
Expand Down Expand Up @@ -117,6 +121,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet, knownControllers []string) {
fs.BoolVar(&s.InheritOwnerAnnotations, "inherit-owner-annotations", true, "Enable inherit owner annotations for pods when create podgroup; it is enabled by default")
fs.Uint32Var(&s.WorkerThreadsForPG, "worker-threads-for-podgroup", defaultPodGroupWorkers, "The number of threads syncing podgroup operations. The larger the number, the faster the podgroup processing, but requires more CPU load.")
fs.Uint32Var(&s.WorkerThreadsForGC, "worker-threads-for-gc", defaultGCWorkers, "The number of threads for recycling jobs. The larger the number, the faster the job recycling, but requires more CPU load.")
fs.Uint32Var(&s.WorkerThreadsForQueue, "worker-threads-for-queue", defaultQueueWorkers, "The number of threads syncing queue operations. The larger the number, the faster the queue processing, but requires more CPU load.")
fs.StringSliceVar(&s.Controllers, "controllers", []string{defaultControllers}, fmt.Sprintf("Specify controller gates. Use '*' for all controllers, all knownController: %s ,and we can use "+
"'-' to disable controllers, e.g. \"-job-controller,-queue-controller\" to disable job and queue controllers.", knownControllers))
}
Expand Down
9 changes: 5 additions & 4 deletions cmd/controller-manager/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,11 @@ func TestAddFlags(t *testing.T) {
ResourceNamespace: defaultLockObjectNamespace,
ResourceName: "vc-controller-manager",
},
LockObjectNamespace: defaultLockObjectNamespace,
WorkerThreadsForPG: 5,
WorkerThreadsForGC: 1,
Controllers: []string{"*"},
LockObjectNamespace: defaultLockObjectNamespace,
WorkerThreadsForPG: 5,
WorkerThreadsForQueue: 5,
WorkerThreadsForGC: 1,
Controllers: []string{"*"},
}
expectedFeatureGates := map[featuregate.Feature]bool{features.ResourceTopology: false}

Expand Down
1 change: 1 addition & 0 deletions cmd/controller-manager/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func startControllers(config *rest.Config, opt *options.ServerOption) func(ctx c
controllerOpt.VCSharedInformerFactory = informerfactory.NewSharedInformerFactory(controllerOpt.VolcanoClient, 0)
controllerOpt.InheritOwnerAnnotations = opt.InheritOwnerAnnotations
controllerOpt.WorkerThreadsForPG = opt.WorkerThreadsForPG
controllerOpt.WorkerThreadsForQueue = opt.WorkerThreadsForQueue
controllerOpt.WorkerThreadsForGC = opt.WorkerThreadsForGC
controllerOpt.Config = config

Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type ControllerOption struct {

InheritOwnerAnnotations bool
WorkerThreadsForPG uint32
WorkerThreadsForQueue uint32
WorkerThreadsForGC uint32

// Config holds the common attributes that can be passed to a Kubernetes client
Expand Down
8 changes: 6 additions & 2 deletions pkg/controllers/queue/queue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type queuecontroller struct {
enqueueQueue func(req *apis.Request)

recorder record.EventRecorder
workers uint32
maxRequeueNum int
}

Expand Down Expand Up @@ -125,6 +126,7 @@ func (c *queuecontroller) Initialize(opt *framework.ControllerOption) error {
if c.maxRequeueNum < 0 {
c.maxRequeueNum = -1
}
c.workers = opt.WorkerThreadsForQueue

queueInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addQueue,
Expand Down Expand Up @@ -187,8 +189,10 @@ func (c *queuecontroller) Run(stopCh <-chan struct{}) {
}
}

go wait.Until(c.worker, 0, stopCh)
go wait.Until(c.commandWorker, 0, stopCh)
for i := 0; i < int(c.workers); i++ {
go wait.Until(c.worker, 0, stopCh)
go wait.Until(c.commandWorker, 0, stopCh)
}

<-stopCh
}
Expand Down
Loading