Skip to content

Commit

Permalink
feat: modify mutate queue
Browse files Browse the repository at this point in the history
Signed-off-by: Rui-Gan <ganrui.cs@gmail.com>
  • Loading branch information
Rui-Gan committed Oct 16, 2024
1 parent 4ac4de2 commit 85be62a
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 0 deletions.
33 changes: 33 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,35 @@ func newDefaultQueue(vcClient vcclient.Interface, defaultQueue string) {
}
}

// newRootQueue init root queue
func newRootQueue(vcClient vcclient.Interface) {
reclaimable := false
rootQueue := vcv1beta1.Queue{
ObjectMeta: metav1.ObjectMeta{
Name: "root",
},
Spec: vcv1beta1.QueueSpec{
Reclaimable: &reclaimable,
Weight: 1,
},
}

err := retry.OnError(wait.Backoff{
Steps: 60,
Duration: time.Second,
Factor: 1,
Jitter: 0.1,
}, func(err error) bool {
return !apierrors.IsAlreadyExists(err)
}, func() error {
_, err := vcClient.SchedulingV1beta1().Queues().Create(context.TODO(), &rootQueue, metav1.CreateOptions{})
return err
})
if err != nil && !apierrors.IsAlreadyExists(err) {
panic(fmt.Errorf("failed init root queue, with err: %v", err))
}
}

func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string, nodeWorkers uint32, ignoredProvisioners []string) *SchedulerCache {
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
Expand All @@ -532,6 +561,10 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
newDefaultQueue(vcClient, defaultQueue)
klog.Infof("Create init queue named default")

// create root queue
newRootQueue(vcClient)
klog.Infof("Create init queue named root")

errTaskRateLimiter := workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(100), 1000)},
Expand Down
17 changes: 17 additions & 0 deletions pkg/webhooks/admission/queues/mutate/mutate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func Queues(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
func createQueuePatch(queue *schedulingv1beta1.Queue) ([]byte, error) {
var patch []patchOperation

patchParent := patchRootParent(queue)
if patchParent != nil {
patch = append(patch, *patchParent)
}
// add root node if the root node not specified
hierarchy := queue.Annotations[schedulingv1beta1.KubeHierarchyAnnotationKey]
hierarchicalWeights := queue.Annotations[schedulingv1beta1.KubeHierarchyWeightAnnotationKey]
Expand Down Expand Up @@ -141,3 +145,16 @@ func createQueuePatch(queue *schedulingv1beta1.Queue) ([]byte, error) {

return json.Marshal(patch)
}

func patchRootParent(queue *schedulingv1beta1.Queue) *patchOperation {
if queue.Name == "root" {
return nil
}

// Add root parent if not specified.
if queue.Spec.Parent == "" {
return &patchOperation{Op: "add", Path: "/spec/parent", Value: "root"}
}

return nil
}
58 changes: 58 additions & 0 deletions pkg/webhooks/admission/queues/mutate/mutate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func TestMutateQueues(t *testing.T) {
Name: "normal-case-refresh-default-state",
},
Spec: schedulingv1beta1.QueueSpec{
Parent: "root",
Weight: 1,
},
}
Expand Down Expand Up @@ -90,6 +91,7 @@ func TestMutateQueues(t *testing.T) {
Spec: schedulingv1beta1.QueueSpec{
Reclaimable: &trueValue,
Weight: 1,
Parent: "root",
},
Status: schedulingv1beta1.QueueStatus{
State: schedulingv1beta1.QueueStateOpen,
Expand All @@ -116,6 +118,31 @@ func TestMutateQueues(t *testing.T) {
t.Errorf("Marshal appendRootPatch failed for %v", err)
}

stateNotSetParent := schedulingv1beta1.Queue{
ObjectMeta: metav1.ObjectMeta{
Name: "normal-case-append-default-parent",
},
Spec: schedulingv1beta1.QueueSpec{
Weight: 1,
Reclaimable: &trueValue,
},
}

stateNotSetParentJSON, err := json.Marshal(stateNotSetParent)
if err != nil {
t.Errorf("Marshal queue without parent failed for %v.", err)
}
var stateNotSetParentPatch []patchOperation
stateNotSetParentPatch = append(stateNotSetParentPatch, patchOperation{
Op: "add",
Path: "/spec/parent",
Value: "root",
})
stateNotSetParentPatchJSON, err := json.Marshal(stateNotSetParentPatch)
if err != nil {
t.Errorf("Marshal stateNotSetParentPatch failed for %v", err)
}

testCases := []struct {
Name string
AR admissionv1.AdmissionReview
Expand Down Expand Up @@ -211,6 +238,37 @@ func TestMutateQueues(t *testing.T) {
Patch: appendRootPatchJSON,
},
},
{
Name: "Normal Case Append Default Parent For Queue",
AR: admissionv1.AdmissionReview{
TypeMeta: metav1.TypeMeta{
Kind: "AdmissionReview",
APIVersion: "admission.k8s.io/v1",
},
Request: &admissionv1.AdmissionRequest{
Kind: metav1.GroupVersionKind{
Group: "scheduling.volcano.sh",
Version: "v1beta1",
Kind: "Queue",
},
Resource: metav1.GroupVersionResource{
Group: "scheduling.volcano.sh",
Version: "v1beta1",
Resource: "queues",
},
Name: "qeueu-hierarchy-without-root",
Operation: "CREATE",
Object: runtime.RawExtension{
Raw: stateNotSetParentJSON,
},
},
},
reviewResponse: &admissionv1.AdmissionResponse{
Allowed: true,
PatchType: &pt,
Patch: stateNotSetParentPatchJSON,
},
},
}

for _, testCase := range testCases {
Expand Down

0 comments on commit 85be62a

Please sign in to comment.