Skip to content

Commit

Permalink
feat: add hierarchical queues for capacity plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Rui-Gan <ganrui.cs@gmail.com>
  • Loading branch information
Rui-Gan committed Oct 15, 2024
1 parent 0843c0d commit 549f2be
Show file tree
Hide file tree
Showing 14 changed files with 1,390 additions and 149 deletions.
71 changes: 71 additions & 0 deletions pkg/controllers/queue/queue_controller_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ package queue
import (
"context"
"fmt"
"strings"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"volcano.sh/apis/pkg/apis/bus/v1alpha1"
busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1"
schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/controllers/apis"
"volcano.sh/volcano/pkg/controllers/queue/state"
)

Expand Down Expand Up @@ -92,6 +96,13 @@ func (c *queuecontroller) syncQueue(queue *schedulingv1beta1.Queue, updateStateF
func (c *queuecontroller) openQueue(queue *schedulingv1beta1.Queue, updateStateFn state.UpdateQueueStatusFn) error {
klog.V(4).Infof("Begin to open queue %s.", queue.Name)

if queue.Status.State != schedulingv1beta1.QueueStateOpen {
continued, err := c.openHierarchicalQueue(queue)
if !continued {
return err
}
}

newQueue := queue.DeepCopy()
newQueue.Status.State = schedulingv1beta1.QueueStateOpen

Expand Down Expand Up @@ -134,6 +145,13 @@ func (c *queuecontroller) openQueue(queue *schedulingv1beta1.Queue, updateStateF
func (c *queuecontroller) closeQueue(queue *schedulingv1beta1.Queue, updateStateFn state.UpdateQueueStatusFn) error {
klog.V(4).Infof("Begin to close queue %s.", queue.Name)

if queue.Status.State != schedulingv1beta1.QueueStateClosed && queue.Status.State != schedulingv1beta1.QueueStateClosing {
continued, err := c.closeHierarchicalQueue(queue)
if !continued {
return err
}
}

newQueue := queue.DeepCopy()
newQueue.Status.State = schedulingv1beta1.QueueStateClosed

Expand Down Expand Up @@ -173,3 +191,56 @@ func (c *queuecontroller) closeQueue(queue *schedulingv1beta1.Queue, updateState

return nil
}

func (c *queuecontroller) openHierarchicalQueue(queue *schedulingv1beta1.Queue) (bool, error) {
if queue.Spec.Parent == "" || queue.Spec.Parent == "root" {
return true, nil
}

parentQueue, err := c.queueLister.Get(queue.Spec.Parent)
if err != nil {
return false, fmt.Errorf("Failed to get parent queue <%s> of queue <%s>: %v", queue.Spec.Parent, queue.Name, err)
}

if parentQueue.Status.State == schedulingv1beta1.QueueStateClosing || parentQueue.Status.State == schedulingv1beta1.QueueStateClosed {
klog.Errorf("Failed to open queue %s because its parent queue %s is closing or closed. Open the parent queue first.", queue.Name, queue.Spec.Parent)
return false, nil
}

return true, nil
}

func (c *queuecontroller) closeHierarchicalQueue(queue *schedulingv1beta1.Queue) (bool, error) {
if queue.Name == "root" {
klog.Errorf("Root queue cannot be closed")
return false, nil
}

queueList, err := c.queueLister.List(labels.Everything())
if err != nil {
return false, err
}

openChildQueue := make([]string, 0)
for _, childQueue := range queueList {
if childQueue.Spec.Parent != queue.Name {
continue
}
if childQueue.Status.State != schedulingv1beta1.QueueStateClosed && childQueue.Status.State != schedulingv1beta1.QueueStateClosing {
req := &apis.Request{
QueueName: childQueue.Name,
Action: busv1alpha1.CloseQueueAction,
}

c.enqueue(req)
openChildQueue = append(openChildQueue, childQueue.Name)
klog.V(3).Infof("Closing child queue <%s> because its parent queue %s is closing or closed.", childQueue.Name, queue.Name)
}
}

if len(openChildQueue) > 0 {
return false, fmt.Errorf("failed to close queue %s because its child queues %v are still open", queue.Name, strings.Join(openChildQueue, ","))
}

return true, nil
}
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (pmpt *Action) preempt(
continue
}

victimsQueue := ssn.BuildVictimsPriorityQueue(victims)
victimsQueue := ssn.BuildVictimsPriorityQueue(victims, preemptor)
// Preempt victims for tasks, pick lowest priority task first.
preempted := api.EmptyResource()

Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (ra *Action) Execute(ssn *framework.Session) {
continue
}

victimsQueue := ssn.BuildVictimsPriorityQueue(victims)
victimsQueue := ssn.BuildVictimsPriorityQueue(victims, task)

resreq := task.InitResreq.Clone()
reclaimed := api.EmptyResource()
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type LessFn func(interface{}, interface{}) bool
// CompareFn is the func declaration used by sort or priority queue.
type CompareFn func(interface{}, interface{}) int

// VictimCompareFn is the func declaration used by sort or priority victims.
type VictimCompareFn func(interface{}, interface{}, interface{}) int

// ValidateFn is the func declaration used to check object's status.
type ValidateFn func(interface{}) bool

Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,11 @@ func (sc *SchedulerCache) Client() kubernetes.Interface {
return sc.kubeClient
}

// VCClient returns the volcano clientSet
func (sc *SchedulerCache) VCClient() vcclient.Interface {
return sc.vcClient
}

// ClientConfig returns the rest config
func (sc *SchedulerCache) ClientConfig() *rest.Config {
return sc.restConfig
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"

vcclient "volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding"
)
Expand Down Expand Up @@ -74,6 +75,9 @@ type Cache interface {
// Client returns the kubernetes clientSet, which can be used by plugins
Client() kubernetes.Interface

// VCClient returns the volcano clientSet, which can be used by plugins
VCClient() vcclient.Interface

// ClientConfig returns the rest config
ClientConfig() *rest.Config

Expand Down
94 changes: 52 additions & 42 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"volcano.sh/apis/pkg/apis/scheduling"
schedulingscheme "volcano.sh/apis/pkg/apis/scheduling/scheme"
vcv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
vcclient "volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/cache"
"volcano.sh/volcano/pkg/scheduler/conf"
Expand All @@ -46,6 +47,7 @@ type Session struct {
UID types.UID

kubeClient kubernetes.Interface
vcClient vcclient.Interface
recorder record.EventRecorder
cache cache.Cache
restConfig *rest.Config
Expand All @@ -72,22 +74,23 @@ type Session struct {
Configurations []conf.Configuration
NodeList []*api.NodeInfo

plugins map[string]Plugin
eventHandlers []*EventHandler
jobOrderFns map[string]api.CompareFn
queueOrderFns map[string]api.CompareFn
taskOrderFns map[string]api.CompareFn
clusterOrderFns map[string]api.CompareFn
predicateFns map[string]api.PredicateFn
prePredicateFns map[string]api.PrePredicateFn
bestNodeFns map[string]api.BestNodeFn
nodeOrderFns map[string]api.NodeOrderFn
batchNodeOrderFns map[string]api.BatchNodeOrderFn
nodeMapFns map[string]api.NodeMapFn
nodeReduceFns map[string]api.NodeReduceFn
preemptableFns map[string]api.EvictableFn
reclaimableFns map[string]api.EvictableFn
overusedFns map[string]api.ValidateFn
plugins map[string]Plugin
eventHandlers []*EventHandler
jobOrderFns map[string]api.CompareFn
queueOrderFns map[string]api.CompareFn
victimQueueOrderFns map[string]api.VictimCompareFn
taskOrderFns map[string]api.CompareFn
clusterOrderFns map[string]api.CompareFn
predicateFns map[string]api.PredicateFn
prePredicateFns map[string]api.PrePredicateFn
bestNodeFns map[string]api.BestNodeFn
nodeOrderFns map[string]api.NodeOrderFn
batchNodeOrderFns map[string]api.BatchNodeOrderFn
nodeMapFns map[string]api.NodeMapFn
nodeReduceFns map[string]api.NodeReduceFn
preemptableFns map[string]api.EvictableFn
reclaimableFns map[string]api.EvictableFn
overusedFns map[string]api.ValidateFn
// preemptiveFns means whether current queue can reclaim from other queue,
// while reclaimableFns means whether current queue's resources can be reclaimed.
preemptiveFns map[string]api.ValidateWithCandidateFn
Expand All @@ -107,6 +110,7 @@ func openSession(cache cache.Cache) *Session {
ssn := &Session{
UID: uuid.NewUUID(),
kubeClient: cache.Client(),
vcClient: cache.VCClient(),
restConfig: cache.ClientConfig(),
recorder: cache.EventRecorder(),
cache: cache,
Expand All @@ -121,32 +125,33 @@ func openSession(cache cache.Cache) *Session {
RevocableNodes: map[string]*api.NodeInfo{},
Queues: map[api.QueueID]*api.QueueInfo{},

plugins: map[string]Plugin{},
jobOrderFns: map[string]api.CompareFn{},
queueOrderFns: map[string]api.CompareFn{},
taskOrderFns: map[string]api.CompareFn{},
clusterOrderFns: map[string]api.CompareFn{},
predicateFns: map[string]api.PredicateFn{},
prePredicateFns: map[string]api.PrePredicateFn{},
bestNodeFns: map[string]api.BestNodeFn{},
nodeOrderFns: map[string]api.NodeOrderFn{},
batchNodeOrderFns: map[string]api.BatchNodeOrderFn{},
nodeMapFns: map[string]api.NodeMapFn{},
nodeReduceFns: map[string]api.NodeReduceFn{},
preemptableFns: map[string]api.EvictableFn{},
reclaimableFns: map[string]api.EvictableFn{},
overusedFns: map[string]api.ValidateFn{},
preemptiveFns: map[string]api.ValidateWithCandidateFn{},
allocatableFns: map[string]api.AllocatableFn{},
jobReadyFns: map[string]api.ValidateFn{},
jobPipelinedFns: map[string]api.VoteFn{},
jobValidFns: map[string]api.ValidateExFn{},
jobEnqueueableFns: map[string]api.VoteFn{},
jobEnqueuedFns: map[string]api.JobEnqueuedFn{},
targetJobFns: map[string]api.TargetJobFn{},
reservedNodesFns: map[string]api.ReservedNodesFn{},
victimTasksFns: map[string][]api.VictimTasksFn{},
jobStarvingFns: map[string]api.ValidateFn{},
plugins: map[string]Plugin{},
jobOrderFns: map[string]api.CompareFn{},
queueOrderFns: map[string]api.CompareFn{},
victimQueueOrderFns: map[string]api.VictimCompareFn{},
taskOrderFns: map[string]api.CompareFn{},
clusterOrderFns: map[string]api.CompareFn{},
predicateFns: map[string]api.PredicateFn{},
prePredicateFns: map[string]api.PrePredicateFn{},
bestNodeFns: map[string]api.BestNodeFn{},
nodeOrderFns: map[string]api.NodeOrderFn{},
batchNodeOrderFns: map[string]api.BatchNodeOrderFn{},
nodeMapFns: map[string]api.NodeMapFn{},
nodeReduceFns: map[string]api.NodeReduceFn{},
preemptableFns: map[string]api.EvictableFn{},
reclaimableFns: map[string]api.EvictableFn{},
overusedFns: map[string]api.ValidateFn{},
preemptiveFns: map[string]api.ValidateWithCandidateFn{},
allocatableFns: map[string]api.AllocatableFn{},
jobReadyFns: map[string]api.ValidateFn{},
jobPipelinedFns: map[string]api.VoteFn{},
jobValidFns: map[string]api.ValidateExFn{},
jobEnqueueableFns: map[string]api.VoteFn{},
jobEnqueuedFns: map[string]api.JobEnqueuedFn{},
targetJobFns: map[string]api.TargetJobFn{},
reservedNodesFns: map[string]api.ReservedNodesFn{},
victimTasksFns: map[string][]api.VictimTasksFn{},
jobStarvingFns: map[string]api.ValidateFn{},
}

snapshot := cache.Snapshot()
Expand Down Expand Up @@ -590,6 +595,11 @@ func (ssn Session) KubeClient() kubernetes.Interface {
return ssn.kubeClient
}

// VCClient returns the volcano client
func (ssn Session) VCClient() vcclient.Interface {
return ssn.vcClient
}

// ClientConfig returns the rest client
func (ssn Session) ClientConfig() *rest.Config {
return ssn.restConfig
Expand Down
30 changes: 27 additions & 3 deletions pkg/scheduler/framework/session_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ func (ssn *Session) AddQueueOrderFn(name string, qf api.CompareFn) {
ssn.queueOrderFns[name] = qf
}

// AddVictimQueueOrderFn add victim job order function
func (ssn *Session) AddVictimQueueOrderFn(name string, vcf api.VictimCompareFn) {
ssn.victimQueueOrderFns[name] = vcf
}

// AddClusterOrderFn add queue order function
func (ssn *Session) AddClusterOrderFn(name string, qf api.CompareFn) {
ssn.clusterOrderFns[name] = qf
Expand Down Expand Up @@ -594,6 +599,23 @@ func (ssn *Session) QueueOrderFn(l, r interface{}) bool {
return lv.Queue.CreationTimestamp.Before(&rv.Queue.CreationTimestamp)
}

// VictimQueueOrderFn invoke victimqueueorder function of the plugins
func (ssn *Session) VictimQueueOrderFn(l, r, preemptor interface{}) bool {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
qof, found := ssn.victimQueueOrderFns[plugin.Name]
if !found {
continue
}
if j := qof(l, r, preemptor); j != 0 {
return j < 0
}
}
}

return !ssn.QueueOrderFn(l, r)
}

// TaskCompareFns invoke taskorder function of the plugins
func (ssn *Session) TaskCompareFns(l, r interface{}) int {
for _, tier := range ssn.Tiers {
Expand Down Expand Up @@ -791,7 +813,7 @@ func (ssn *Session) NodeOrderReduceFn(task *api.TaskInfo, pluginNodeScoreMap map
// BuildVictimsPriorityQueue returns a priority queue with victims sorted by:
// if victims has same job id, sorted by !ssn.TaskOrderFn
// if victims has different job id, sorted by !ssn.JobOrderFn
func (ssn *Session) BuildVictimsPriorityQueue(victims []*api.TaskInfo) *util.PriorityQueue {
func (ssn *Session) BuildVictimsPriorityQueue(victims []*api.TaskInfo, preemptor *api.TaskInfo) *util.PriorityQueue {
victimsQueue := util.NewPriorityQueue(func(l, r interface{}) bool {
lv := l.(*api.TaskInfo)
rv := r.(*api.TaskInfo)
Expand All @@ -801,8 +823,10 @@ func (ssn *Session) BuildVictimsPriorityQueue(victims []*api.TaskInfo) *util.Pri

lvJob, lvJobFound := ssn.Jobs[lv.Job]
rvJob, rvJobFound := ssn.Jobs[rv.Job]
if lvJobFound && rvJobFound && lvJob.Queue != rvJob.Queue {
return !ssn.QueueOrderFn(ssn.Queues[lvJob.Queue], ssn.Queues[rvJob.Queue])
preemptorJob, preemptorJobFound := ssn.Jobs[preemptor.Job]

if lvJobFound && rvJobFound && preemptorJobFound && lvJob.Queue != rvJob.Queue {
return ssn.VictimQueueOrderFn(ssn.Queues[lvJob.Queue], ssn.Queues[rvJob.Queue], ssn.Queues[preemptorJob.Queue])
}

return !ssn.JobOrderFn(lvJob, rvJob)
Expand Down
Loading

0 comments on commit 549f2be

Please sign in to comment.