Skip to content

Commit

Permalink
replace the deprecated RateLimitingInterface with TypedRateLimitingIn…
Browse files Browse the repository at this point in the history
…terface in the pkg/controllers package

Signed-off-by: xovoxy <xilovele@gmail.com>
  • Loading branch information
xovoxy committed Oct 24, 2024
1 parent 1ab6088 commit 9964bd2
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 125 deletions.
22 changes: 8 additions & 14 deletions pkg/controllers/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type jobCache struct {
sync.Mutex

jobs map[string]*apis.JobInfo
deletedJobs workqueue.RateLimitingInterface
deletedJobs workqueue.TypedRateLimitingInterface[*apis.JobInfo]
}

func keyFn(ns, name string) string {
Expand Down Expand Up @@ -75,15 +75,15 @@ func jobKeyOfPod(pod *v1.Pod) (string, error) {

// New gets the job Cache.
func New() Cache {
queue := workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 180*time.Second),
queue := workqueue.NewTypedMaxOfRateLimiter(
workqueue.NewTypedItemExponentialFailureRateLimiter[*apis.JobInfo](5*time.Millisecond, 180*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
&workqueue.TypedBucketRateLimiter[*apis.JobInfo]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)

return &jobCache{
jobs: map[string]*apis.JobInfo{},
deletedJobs: workqueue.NewRateLimitingQueue(queue),
deletedJobs: workqueue.NewTypedRateLimitingQueue(queue),
}
}

Expand Down Expand Up @@ -355,23 +355,17 @@ func (jc *jobCache) worker() {
}

func (jc *jobCache) processCleanupJob() bool {
obj, shutdown := jc.deletedJobs.Get()
job, shutdown := jc.deletedJobs.Get()
if shutdown {
return false
}
defer jc.deletedJobs.Done(obj)

job, ok := obj.(*apis.JobInfo)
if !ok {
klog.Errorf("failed to convert %v to *apis.JobInfo", obj)
return true
}
defer jc.deletedJobs.Done(job)

jc.Mutex.Lock()
defer jc.Mutex.Unlock()

if jobTerminated(job) {
jc.deletedJobs.Forget(obj)
jc.deletedJobs.Forget(job)
key := keyFn(job.Namespace, job.Name)
delete(jc.jobs, key)
klog.V(3).Infof("Job <%s> was deleted.", key)
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/garbagecollector/garbagecollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type gccontroller struct {
jobSynced func() bool

// queues that need to be updated.
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]

workers uint32
}
Expand All @@ -81,7 +81,7 @@ func (gc *gccontroller) Initialize(opt *framework.ControllerOption) error {
gc.jobInformer = jobInformer
gc.jobLister = jobInformer.Lister()
gc.jobSynced = jobInformer.Informer().HasSynced
gc.queue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
gc.queue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())
gc.workers = opt.WorkerThreadsForGC

jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -165,13 +165,13 @@ func (gc *gccontroller) processNextWorkItem() bool {
}
defer gc.queue.Done(key)

err := gc.processJob(key.(string))
err := gc.processJob(key)
gc.handleErr(err, key)

return true
}

func (gc *gccontroller) handleErr(err error, key interface{}) {
func (gc *gccontroller) handleErr(err error, key string) {
if err == nil {
gc.queue.Forget(key)
return
Expand Down
18 changes: 8 additions & 10 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ type jobcontroller struct {
queueSynced func() bool

// queue that need to sync up
queueList []workqueue.RateLimitingInterface
commandQueue workqueue.RateLimitingInterface
queueList []workqueue.TypedRateLimitingInterface[apis.Request]
commandQueue workqueue.TypedRateLimitingInterface[*busv1alpha1.Command]
cache jobcache.Cache
// Job Event recorder
recorder record.EventRecorder

errTasks workqueue.RateLimitingInterface
errTasks workqueue.TypedRateLimitingInterface[*v1.Pod]
workers uint32
maxRequeueNum int
}
Expand All @@ -135,8 +135,8 @@ func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error {
recorder := eventBroadcaster.NewRecorder(vcscheme.Scheme, v1.EventSource{Component: "vc-controller-manager"})

cc.informerFactory = sharedInformers
cc.queueList = make([]workqueue.RateLimitingInterface, workers)
cc.commandQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
cc.queueList = make([]workqueue.TypedRateLimitingInterface[apis.Request], workers)
cc.commandQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[*busv1alpha1.Command]())
cc.cache = jobcache.New()
cc.errTasks = newRateLimitingQueue()
cc.recorder = recorder
Expand All @@ -148,7 +148,7 @@ func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error {

var i uint32
for i = 0; i < workers; i++ {
cc.queueList[i] = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
cc.queueList[i] = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[apis.Request]())
}

factory := opt.VCSharedInformerFactory
Expand Down Expand Up @@ -292,7 +292,7 @@ func (cc *jobcontroller) belongsToThisRoutine(key string, count uint32) bool {
return val%cc.workers == count
}

func (cc *jobcontroller) getWorkerQueue(key string) workqueue.RateLimitingInterface {
func (cc *jobcontroller) getWorkerQueue(key string) workqueue.TypedRateLimitingInterface[apis.Request] {
var hashVal hash.Hash32
var val uint32

Expand All @@ -308,13 +308,11 @@ func (cc *jobcontroller) getWorkerQueue(key string) workqueue.RateLimitingInterf

func (cc *jobcontroller) processNextReq(count uint32) bool {
queue := cc.queueList[count]
obj, shutdown := queue.Get()
req, shutdown := queue.Get()
if shutdown {
klog.Errorf("Fail to pop item from queue")
return false
}

req := obj.(apis.Request)
defer queue.Done(req)

key := jobcache.JobKeyByReq(&req)
Expand Down
3 changes: 1 addition & 2 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,11 +389,10 @@ func (cc *jobcontroller) handleCommands() {
}

func (cc *jobcontroller) processNextCommand() bool {
obj, shutdown := cc.commandQueue.Get()
cmd, shutdown := cc.commandQueue.Get()
if shutdown {
return false
}
cmd := obj.(*bus.Command)
defer cc.commandQueue.Done(cmd)

if err := cc.vcClient.BusV1alpha1().Commands(cmd.Namespace).Delete(context.TODO(), cmd.Name, metav1.DeleteOptions{}); err != nil {
Expand Down
23 changes: 8 additions & 15 deletions pkg/controllers/job/job_controller_resync.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,26 @@ import (
"k8s.io/klog/v2"
)

func newRateLimitingQueue() workqueue.RateLimitingInterface {
return workqueue.NewRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 180*time.Second),
func newRateLimitingQueue() workqueue.TypedRateLimitingInterface[*v1.Pod] {
return workqueue.NewTypedRateLimitingQueue(workqueue.NewTypedMaxOfRateLimiter[*v1.Pod](
workqueue.NewTypedItemExponentialFailureRateLimiter[*v1.Pod](5*time.Millisecond, 180*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
&workqueue.TypedBucketRateLimiter[*v1.Pod]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
))
}

func (cc *jobcontroller) processResyncTask() {
obj, shutdown := cc.errTasks.Get()
task, shutdown := cc.errTasks.Get()
if shutdown {
return
}

// one task only resync 10 times
if cc.errTasks.NumRequeues(obj) > 10 {
cc.errTasks.Forget(obj)
return
}

defer cc.errTasks.Done(obj)

task, ok := obj.(*v1.Pod)
if !ok {
klog.Errorf("failed to convert %v to *v1.Pod", obj)
if cc.errTasks.NumRequeues(task) > 10 {
cc.errTasks.Forget(task)
return
}
defer cc.errTasks.Done(task)

if err := cc.syncTask(task); err != nil {
klog.Errorf("Failed to sync pod <%v/%v>, retry it, err %v", task.Namespace, task.Name, err)
Expand Down
31 changes: 12 additions & 19 deletions pkg/controllers/jobflow/jobflow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type jobflowcontroller struct {
// JobFlow Event recorder
recorder record.EventRecorder

queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[apis.FlowRequest]
enqueueJobFlow func(req apis.FlowRequest)

syncHandler func(req *apis.FlowRequest) error
Expand Down Expand Up @@ -122,7 +122,7 @@ func (jf *jobflowcontroller) Initialize(opt *framework.ControllerOption) error {
eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: jf.kubeClient.CoreV1().Events("")})

jf.recorder = eventBroadcaster.NewRecorder(versionedscheme.Scheme, v1.EventSource{Component: "vc-controller-manager"})
jf.queue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
jf.queue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[apis.FlowRequest]())

jf.enqueueJobFlow = jf.enqueue

Expand Down Expand Up @@ -156,7 +156,7 @@ func (jf *jobflowcontroller) worker() {
}

func (jf *jobflowcontroller) processNextWorkItem() bool {
obj, shutdown := jf.queue.Get()
req, shutdown := jf.queue.Get()
if shutdown {
// Stop working
return false
Expand All @@ -168,16 +168,10 @@ func (jf *jobflowcontroller) processNextWorkItem() bool {
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer jf.queue.Done(obj)

req, ok := obj.(apis.FlowRequest)
if !ok {
klog.Errorf("%v is not a valid queue request struct.", obj)
return true
}
defer jf.queue.Done(req)

err := jf.syncHandler(&req)
jf.handleJobFlowErr(err, obj)
jf.handleJobFlowErr(err, req)

return true
}
Expand Down Expand Up @@ -212,23 +206,22 @@ func (jf *jobflowcontroller) handleJobFlow(req *apis.FlowRequest) error {
return nil
}

func (jf *jobflowcontroller) handleJobFlowErr(err error, obj interface{}) {
func (jf *jobflowcontroller) handleJobFlowErr(err error, req apis.FlowRequest) {
if err == nil {
jf.queue.Forget(obj)
jf.queue.Forget(req)
return
}

if jf.maxRequeueNum == -1 || jf.queue.NumRequeues(obj) < jf.maxRequeueNum {
klog.V(4).Infof("Error syncing jobFlow request %v for %v.", obj, err)
jf.queue.AddRateLimited(obj)
if jf.maxRequeueNum == -1 || jf.queue.NumRequeues(req) < jf.maxRequeueNum {
klog.V(4).Infof("Error syncing jobFlow request %v for %v.", req, err)
jf.queue.AddRateLimited(req)
return
}

req, _ := obj.(apis.FlowRequest)
jf.recordEventsForJobFlow(req.Namespace, req.JobFlowName, v1.EventTypeWarning, string(req.Action),
fmt.Sprintf("%v JobFlow failed for %v", req.Action, err))
klog.V(4).Infof("Dropping JobFlow request %v out of the queue for %v.", obj, err)
jf.queue.Forget(obj)
klog.V(4).Infof("Dropping JobFlow request %v out of the queue for %v.", req, err)
jf.queue.Forget(req)
}

func (jf *jobflowcontroller) recordEventsForJobFlow(namespace, name, eventType, reason, message string) {
Expand Down
31 changes: 12 additions & 19 deletions pkg/controllers/jobtemplate/jobtemplate_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type jobtemplatecontroller struct {
// JobTemplate Event recorder
recorder record.EventRecorder

queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[apis.FlowRequest]
enqueueJobTemplate func(req apis.FlowRequest)

syncHandler func(req *apis.FlowRequest) error
Expand Down Expand Up @@ -111,7 +111,7 @@ func (jt *jobtemplatecontroller) Initialize(opt *framework.ControllerOption) err
eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: jt.kubeClient.CoreV1().Events("")})

jt.recorder = eventBroadcaster.NewRecorder(versionedscheme.Scheme, v1.EventSource{Component: "vc-controller-manager"})
jt.queue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
jt.queue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[apis.FlowRequest]())

jt.enqueueJobTemplate = jt.enqueue

Expand Down Expand Up @@ -144,7 +144,7 @@ func (jt *jobtemplatecontroller) worker() {
}

func (jt *jobtemplatecontroller) processNextWorkItem() bool {
obj, shutdown := jt.queue.Get()
req, shutdown := jt.queue.Get()
if shutdown {
// Stop working
return false
Expand All @@ -156,16 +156,10 @@ func (jt *jobtemplatecontroller) processNextWorkItem() bool {
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer jt.queue.Done(obj)

req, ok := obj.(apis.FlowRequest)
if !ok {
klog.Errorf("%v is not a valid queue request struct.", obj)
return true
}
defer jt.queue.Done(req)

err := jt.syncHandler(&req)
jt.handleJobTemplateErr(err, obj)
jt.handleJobTemplateErr(err, req)

return true
}
Expand Down Expand Up @@ -195,23 +189,22 @@ func (jt *jobtemplatecontroller) handleJobTemplate(req *apis.FlowRequest) error
return nil
}

func (jt *jobtemplatecontroller) handleJobTemplateErr(err error, obj interface{}) {
func (jt *jobtemplatecontroller) handleJobTemplateErr(err error, req apis.FlowRequest) {
if err == nil {
jt.queue.Forget(obj)
jt.queue.Forget(req)
return
}

if jt.maxRequeueNum == -1 || jt.queue.NumRequeues(obj) < jt.maxRequeueNum {
klog.V(4).Infof("Error syncing jobTemplate request %v for %v.", obj, err)
jt.queue.AddRateLimited(obj)
if jt.maxRequeueNum == -1 || jt.queue.NumRequeues(req) < jt.maxRequeueNum {
klog.V(4).Infof("Error syncing jobTemplate request %v for %v.", req, err)
jt.queue.AddRateLimited(req)
return
}

req, _ := obj.(*apis.FlowRequest)
jt.recordEventsForJobTemplate(req.Namespace, req.JobTemplateName, v1.EventTypeWarning, string(req.Action),
fmt.Sprintf("%v JobTemplate failed for %v", req.Action, err))
klog.V(2).Infof("Dropping JobTemplate request %v out of the queue for %v.", obj, err)
jt.queue.Forget(obj)
klog.V(2).Infof("Dropping JobTemplate request %v out of the queue for %v.", req, err)
jt.queue.Forget(req)
}

func (jt *jobtemplatecontroller) recordEventsForJobTemplate(namespace, name, eventType, reason, message string) {
Expand Down
7 changes: 3 additions & 4 deletions pkg/controllers/podgroup/pg_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type pgcontroller struct {
// A store of replicaset
rsSynced func() bool

queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[podRequest]

schedulerNames []string
workers uint32
Expand All @@ -85,7 +85,7 @@ func (pg *pgcontroller) Initialize(opt *framework.ControllerOption) error {
pg.vcClient = opt.VolcanoClient
pg.workers = opt.WorkerThreadsForPG

pg.queue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
pg.queue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[podRequest]())

pg.schedulerNames = make([]string, len(opt.SchedulerNames))
copy(pg.schedulerNames, opt.SchedulerNames)
Expand Down Expand Up @@ -146,13 +146,12 @@ func (pg *pgcontroller) worker() {
}

func (pg *pgcontroller) processNextReq() bool {
obj, shutdown := pg.queue.Get()
req, shutdown := pg.queue.Get()
if shutdown {
klog.Errorf("Fail to pop item from queue")
return false
}

req := obj.(podRequest)
defer pg.queue.Done(req)

pod, err := pg.podLister.Pods(req.podNamespace).Get(req.podName)
Expand Down
Loading

0 comments on commit 9964bd2

Please sign in to comment.