From d1d860fc705c08cd2bb79b3557f0676bf9066f89 Mon Sep 17 00:00:00 2001 From: lowang-bh Date: Sun, 21 Apr 2024 23:05:47 +0800 Subject: [PATCH 1/3] improve: add predicate result code in task's fitError, so that it can be used in preempting Signed-off-by: lowang-bh --- pkg/scheduler/actions/allocate/allocate.go | 4 +- pkg/scheduler/actions/backfill/backfill.go | 2 +- pkg/scheduler/actions/preempt/preempt.go | 5 +- pkg/scheduler/actions/reclaim/reclaim.go | 3 +- pkg/scheduler/api/job_info_test.go | 6 +- pkg/scheduler/api/types.go | 70 ++++++++++++++++++++++ pkg/scheduler/api/unschedule_info.go | 29 +++++++-- pkg/scheduler/util/predicate_helper.go | 69 --------------------- 8 files changed, 105 insertions(+), 83 deletions(-) diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 6e86bc8f25..bb23520997 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -295,7 +295,7 @@ func (alloc *Action) predicate(task *api.TaskInfo, node *api.NodeInfo) ([]*api.S if ok, resources := task.InitResreq.LessEqualWithResourcesName(node.FutureIdle(), api.Zero); !ok { return nil, api.NewFitError(task, node, api.WrapInsufficientResourceReason(resources)) } - var statusSets util.StatusSets + var statusSets api.StatusSets statusSets, err := alloc.session.PredicateFn(task, node) if err != nil { return nil, api.NewFitError(task, node, err.Error()) @@ -303,7 +303,7 @@ func (alloc *Action) predicate(task *api.TaskInfo, node *api.NodeInfo) ([]*api.S if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { - return nil, api.NewFitError(task, node, statusSets.Message()) + return nil, api.NewFitStatus(task, node, statusSets) } return nil, nil } diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index 847f557396..8daa720031 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -45,7 +45,7 @@ func (backfill *Action) Execute(ssn *framework.Session) { defer klog.V(5).Infof("Leaving Backfill ...") predicateFunc := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { - var statusSets util.StatusSets + var statusSets api.StatusSets statusSets, err := ssn.PredicateFn(task, node) if err != nil { return nil, err diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 2bc8cba2a3..9aadaf6502 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -209,18 +209,19 @@ func preempt( ) (bool, error) { assigned := false allNodes := ssn.NodeList + // TODO: we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action if err := ssn.PrePredicateFn(preemptor); err != nil { return false, fmt.Errorf("PrePredicate for task %s/%s failed for: %v", preemptor.Namespace, preemptor.Name, err) } predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { - var statusSets util.StatusSets + var statusSets api.StatusSets statusSets, _ = ssn.PredicateFn(task, node) // When filtering candidate nodes, need to consider the node statusSets instead of the err information. // refer to kube-scheduler preemption code: https://github.com/kubernetes/kubernetes/blob/9d87fa215d9e8020abdc17132d1252536cd752d2/pkg/scheduler/framework/preemption/preemption.go#L422 if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { - return nil, api.NewFitError(task, node, statusSets.Message()) + return nil, api.NewFitStatus(task, node, statusSets) } return nil, nil } diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index 4d8b4d5a4c..1224f69f75 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -125,8 +125,9 @@ func (ra *Action) Execute(ssn *framework.Session) { } assigned := false + // TODO: we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action for _, n := range ssn.Nodes { - var statusSets util.StatusSets + var statusSets api.StatusSets statusSets, _ = ssn.PredicateFn(task, n) // When filtering candidate nodes, need to consider the node statusSets instead of the err information. diff --git a/pkg/scheduler/api/job_info_test.go b/pkg/scheduler/api/job_info_test.go index fc2ba232e0..8baac278bb 100644 --- a/pkg/scheduler/api/job_info_test.go +++ b/pkg/scheduler/api/job_info_test.go @@ -224,9 +224,9 @@ func TestTaskSchedulingReason(t *testing.T) { nodefes: map[TaskID]*FitErrors{ TaskID(t6.UID): { nodes: map[string]*FitError{ - "node1": {Reasons: []string{NodePodNumberExceeded}}, - "node2": {Reasons: []string{NodeResourceFitFailed}}, - "node3": {Reasons: []string{NodeResourceFitFailed}}, + "node1": {Status: []*Status{{Reason: NodePodNumberExceeded}}}, + "node2": {Status: []*Status{{Reason: NodeResourceFitFailed}}}, + "node3": {Status: []*Status{{Reason: NodeResourceFitFailed}}}, }, }, }, diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index aa96f8dc4a..7c52481ce5 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -17,6 +17,8 @@ limitations under the License. package api import ( + "strings" + k8sframework "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -157,6 +159,74 @@ func (s Status) String() string { return s.Reason } +type StatusSets []*Status + +func (s StatusSets) ContainsUnschedulable() bool { + for _, status := range s { + if status == nil { + continue + } + if status.Code == Unschedulable { + return true + } + } + return false +} + +func (s StatusSets) ContainsUnschedulableAndUnresolvable() bool { + for _, status := range s { + if status == nil { + continue + } + if status.Code == UnschedulableAndUnresolvable { + return true + } + } + return false +} + +func (s StatusSets) ContainsErrorSkipOrWait() bool { + for _, status := range s { + if status == nil { + continue + } + if status.Code == Error || status.Code == Skip || status.Code == Wait { + return true + } + } + return false +} + +// Message return the message generated from StatusSets +func (s StatusSets) Message() string { + if s == nil { + return "" + } + all := make([]string, 0, len(s)) + for _, status := range s { + if status.Reason == "" { + continue + } + all = append(all, status.Reason) + } + return strings.Join(all, ",") +} + +// Reasons return the reasons list +func (s StatusSets) Reasons() []string { + if s == nil { + return nil + } + all := make([]string, 0, len(s)) + for _, status := range s { + if status.Reason == "" { + continue + } + all = append(all, status.Reason) + } + return all +} + // ValidateExFn is the func declaration used to validate the result. type ValidateExFn func(interface{}) *ValidateResult diff --git a/pkg/scheduler/api/unschedule_info.go b/pkg/scheduler/api/unschedule_info.go index 4dc08d1d12..07cd55f5fa 100644 --- a/pkg/scheduler/api/unschedule_info.go +++ b/pkg/scheduler/api/unschedule_info.go @@ -56,7 +56,7 @@ func (f *FitErrors) SetNodeError(nodeName string, err error) { default: fe = &FitError{ NodeName: nodeName, - Reasons: []string{obj.Error()}, + Status: []*Status{{Code: Error, Reason: obj.Error()}}, } } @@ -74,7 +74,7 @@ func (f *FitErrors) Error() string { reasons := make(map[string]int) for _, node := range f.nodes { - for _, reason := range node.Reasons { + for _, reason := range node.Reasons() { reasons[reason]++ } } @@ -96,7 +96,7 @@ type FitError struct { taskNamespace string taskName string NodeName string - Reasons []string + Status StatusSets } // NewFitError return FitError by message @@ -105,14 +105,33 @@ func NewFitError(task *TaskInfo, node *NodeInfo, message ...string) *FitError { taskName: task.Name, taskNamespace: task.Namespace, NodeName: node.Name, - Reasons: message, } + sts := make([]*Status, 0, len(message)) + for _, msg := range message { + sts = append(sts, &Status{Reason: msg, Code: Error}) + } + fe.Status = StatusSets(sts) return fe } +func NewFitStatus(task *TaskInfo, node *NodeInfo, sts StatusSets) *FitError { + fe := &FitError{ + taskName: task.Name, + taskNamespace: task.Namespace, + NodeName: node.Name, + Status: sts, + } + return fe +} + +// Reasons returns the reasons +func (fe *FitError) Reasons() []string { + return fe.Status.Reasons() +} + // Error returns the final error message func (f *FitError) Error() string { - return fmt.Sprintf("task %s/%s on node %s fit failed: %s", f.taskNamespace, f.taskName, f.NodeName, strings.Join(f.Reasons, ", ")) + return fmt.Sprintf("task %s/%s on node %s fit failed: %s", f.taskNamespace, f.taskName, f.NodeName, strings.Join(f.Reasons(), ", ")) } // WrapInsufficientResourceReason wrap insufficient resource reason. diff --git a/pkg/scheduler/util/predicate_helper.go b/pkg/scheduler/util/predicate_helper.go index 2ce5164c66..7a1a62c78e 100644 --- a/pkg/scheduler/util/predicate_helper.go +++ b/pkg/scheduler/util/predicate_helper.go @@ -3,7 +3,6 @@ package util import ( "context" "fmt" - "strings" "sync" "sync/atomic" @@ -107,71 +106,3 @@ func taskGroupID(task *api.TaskInfo) string { func NewPredicateHelper() PredicateHelper { return &predicateHelper{taskPredicateErrorCache: map[string]map[string]error{}} } - -type StatusSets []*api.Status - -func (s StatusSets) ContainsUnschedulable() bool { - for _, status := range s { - if status == nil { - continue - } - if status.Code == api.Unschedulable { - return true - } - } - return false -} - -func (s StatusSets) ContainsUnschedulableAndUnresolvable() bool { - for _, status := range s { - if status == nil { - continue - } - if status.Code == api.UnschedulableAndUnresolvable { - return true - } - } - return false -} - -func (s StatusSets) ContainsErrorSkipOrWait() bool { - for _, status := range s { - if status == nil { - continue - } - if status.Code == api.Error || status.Code == api.Skip || status.Code == api.Wait { - return true - } - } - return false -} - -// Message return the message generated from StatusSets -func (s StatusSets) Message() string { - if s == nil { - return "" - } - all := make([]string, 0, len(s)) - for _, status := range s { - if status.Reason == "" { - continue - } - all = append(all, status.Reason) - } - return strings.Join(all, ",") -} - -// Reasons return the reasons list -func (s StatusSets) Reasons() []string { - if s == nil { - return nil - } - all := make([]string, 0, len(s)) - for _, status := range s { - if status.Reason == "" { - continue - } - all = append(all, status.Reason) - } - return all -} From d57c6b2069d0cb90820c9f488b6fb539e5f9c7d3 Mon Sep 17 00:00:00 2001 From: lowang-bh Date: Fri, 17 May 2024 22:50:37 +0800 Subject: [PATCH 2/3] add testcase about FitErrors Signed-off-by: lowang-bh --- pkg/scheduler/actions/allocate/allocate.go | 2 +- pkg/scheduler/actions/preempt/preempt.go | 2 +- pkg/scheduler/api/unschedule_info.go | 6 +- pkg/scheduler/api/unschedule_info_test.go | 117 +++++++++++++++++++++ 4 files changed, 124 insertions(+), 3 deletions(-) create mode 100644 pkg/scheduler/api/unschedule_info_test.go diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index bb23520997..28aed3aec9 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -303,7 +303,7 @@ func (alloc *Action) predicate(task *api.TaskInfo, node *api.NodeInfo) ([]*api.S if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { - return nil, api.NewFitStatus(task, node, statusSets) + return nil, api.NewFitStatus(task, node, statusSets...) } return nil, nil } diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 9aadaf6502..bbcf0a339e 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -221,7 +221,7 @@ func preempt( // When filtering candidate nodes, need to consider the node statusSets instead of the err information. // refer to kube-scheduler preemption code: https://github.com/kubernetes/kubernetes/blob/9d87fa215d9e8020abdc17132d1252536cd752d2/pkg/scheduler/framework/preemption/preemption.go#L422 if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { - return nil, api.NewFitStatus(task, node, statusSets) + return nil, api.NewFitStatus(task, node, statusSets...) } return nil, nil } diff --git a/pkg/scheduler/api/unschedule_info.go b/pkg/scheduler/api/unschedule_info.go index 07cd55f5fa..f7149222a6 100644 --- a/pkg/scheduler/api/unschedule_info.go +++ b/pkg/scheduler/api/unschedule_info.go @@ -114,7 +114,8 @@ func NewFitError(task *TaskInfo, node *NodeInfo, message ...string) *FitError { return fe } -func NewFitStatus(task *TaskInfo, node *NodeInfo, sts StatusSets) *FitError { +// NewFitStatus returns a fit error with code and reason in it +func NewFitStatus(task *TaskInfo, node *NodeInfo, sts ...*Status) *FitError { fe := &FitError{ taskName: task.Name, taskNamespace: task.Namespace, @@ -126,6 +127,9 @@ func NewFitStatus(task *TaskInfo, node *NodeInfo, sts StatusSets) *FitError { // Reasons returns the reasons func (fe *FitError) Reasons() []string { + if fe == nil { + return []string{} + } return fe.Status.Reasons() } diff --git a/pkg/scheduler/api/unschedule_info_test.go b/pkg/scheduler/api/unschedule_info_test.go new file mode 100644 index 0000000000..37ab23a4e3 --- /dev/null +++ b/pkg/scheduler/api/unschedule_info_test.go @@ -0,0 +1,117 @@ +/* +Copyright 2024 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package api + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +const ( + affinityRulesNotMatch = "node(s) didn't match pod affinity rules" + existingAntiAffinityNotMatch = "node(s) didn't satisfy existing pods anti-affinity rules" + nodeAffinity = "node(s) didn't match Pod's node affinity/selector" +) + +func TestFitError(t *testing.T) { + tests := []struct { + task *TaskInfo + node *NodeInfo + reason []string + status []*Status + want *FitError + errStr string + }{ + { + task: &TaskInfo{Name: "pod1", Namespace: "ns1"}, + node: &NodeInfo{Name: "node1"}, + reason: []string{affinityRulesNotMatch, nodeAffinity}, + want: &FitError{ + NodeName: "node1", taskNamespace: "ns1", taskName: "pod1", + Status: []*Status{{Reason: affinityRulesNotMatch, Code: Error}, {Reason: nodeAffinity, Code: Error}}, + }, + errStr: "task ns1/pod1 on node node1 fit failed: " + affinityRulesNotMatch + ", " + nodeAffinity, + }, + { + task: &TaskInfo{Name: "pod2", Namespace: "ns2"}, + node: &NodeInfo{Name: "node2"}, + status: []*Status{{Reason: nodeAffinity, Code: UnschedulableAndUnresolvable}, {Reason: existingAntiAffinityNotMatch, Code: Error}}, + reason: []string{nodeAffinity, existingAntiAffinityNotMatch}, + want: &FitError{ + NodeName: "node2", taskNamespace: "ns2", taskName: "pod2", + Status: []*Status{{Reason: nodeAffinity, Code: UnschedulableAndUnresolvable}, {Reason: existingAntiAffinityNotMatch, Code: Error}}, + }, + errStr: "task ns2/pod2 on node node2 fit failed: " + nodeAffinity + ", " + existingAntiAffinityNotMatch, + }, + } + + var got *FitError + for _, test := range tests { + if len(test.status) != 0 { + got = NewFitStatus(test.task, test.node, test.status...) + } else if len(test.reason) != 0 { + got = NewFitError(test.task, test.node, test.reason...) + } + + assert.Equal(t, test.want, got) + assert.Equal(t, test.reason, got.Reasons()) + assert.Equal(t, test.errStr, got.Error()) + } +} + +func TestFitErrors(t *testing.T) { + tests := []struct { + node string + fitStr string + err error + fiterr *FitError + want string // expected error string + }{ + { + want: "0/0 nodes are unavailable", // base fit err string is empty, set as the default + }, + { + node: "node1", + fitStr: "fit failed", + err: fmt.Errorf(NodePodNumberExceeded), + want: "fit failed: 1 node(s) pod number exceeded.", + }, + { + node: "node1", + fitStr: "NodeResourceFitFailed", + err: fmt.Errorf(NodePodNumberExceeded), + fiterr: &FitError{ + taskNamespace: "ns1", taskName: "task1", NodeName: "node2", + Status: []*Status{{Reason: nodeAffinity, Code: UnschedulableAndUnresolvable}}, + }, + want: "NodeResourceFitFailed: 1 node(s) didn't match Pod's node affinity/selector, 1 node(s) pod number exceeded.", + }, + } + for _, test := range tests { + fitErrs := NewFitErrors() + fitErrs.SetError(test.fitStr) + if test.err != nil { + fitErrs.SetNodeError(test.node, test.err) + } + if test.fiterr != nil { + fitErrs.SetNodeError(test.fiterr.NodeName, test.fiterr) + } + got := fitErrs.Error() + assert.Equal(t, test.want, got) + } +} From c9a9f6d2c12cd1d7bf54e886eec1b8191c4667e5 Mon Sep 17 00:00:00 2001 From: lowang-bh Date: Sat, 18 May 2024 15:37:12 +0800 Subject: [PATCH 3/3] filter out those nodes are not helpful for preemption Signed-off-by: lowang-bh --- pkg/scheduler/actions/allocate/allocate.go | 7 +- pkg/scheduler/actions/preempt/preempt.go | 8 +- pkg/scheduler/actions/reclaim/reclaim.go | 5 +- pkg/scheduler/api/unschedule_info.go | 19 ++- pkg/scheduler/api/unschedule_info_test.go | 26 +++- pkg/scheduler/framework/session.go | 26 ++++ .../framework/session_plugins_test.go | 138 ++++++++++++++++++ 7 files changed, 210 insertions(+), 19 deletions(-) create mode 100644 pkg/scheduler/framework/session_plugins_test.go diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 28aed3aec9..0e20241624 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -292,10 +292,11 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a func (alloc *Action) predicate(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { // Check for Resource Predicate + var statusSets api.StatusSets if ok, resources := task.InitResreq.LessEqualWithResourcesName(node.FutureIdle(), api.Zero); !ok { - return nil, api.NewFitError(task, node, api.WrapInsufficientResourceReason(resources)) + statusSets = append(statusSets, &api.Status{Code: api.Unschedulable, Reason: api.WrapInsufficientResourceReason(resources)}) + return nil, api.NewFitErrWithStatus(task, node, statusSets...) } - var statusSets api.StatusSets statusSets, err := alloc.session.PredicateFn(task, node) if err != nil { return nil, api.NewFitError(task, node, err.Error()) @@ -303,7 +304,7 @@ func (alloc *Action) predicate(task *api.TaskInfo, node *api.NodeInfo) ([]*api.S if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { - return nil, api.NewFitStatus(task, node, statusSets...) + return nil, api.NewFitErrWithStatus(task, node, statusSets...) } return nil, nil } diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index bbcf0a339e..0b51adfa8b 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -208,8 +208,7 @@ func preempt( predicateHelper util.PredicateHelper, ) (bool, error) { assigned := false - allNodes := ssn.NodeList - // TODO: we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action + if err := ssn.PrePredicateFn(preemptor); err != nil { return false, fmt.Errorf("PrePredicate for task %s/%s failed for: %v", preemptor.Namespace, preemptor.Name, err) } @@ -221,11 +220,12 @@ func preempt( // When filtering candidate nodes, need to consider the node statusSets instead of the err information. // refer to kube-scheduler preemption code: https://github.com/kubernetes/kubernetes/blob/9d87fa215d9e8020abdc17132d1252536cd752d2/pkg/scheduler/framework/preemption/preemption.go#L422 if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { - return nil, api.NewFitStatus(task, node, statusSets...) + return nil, api.NewFitErrWithStatus(task, node, statusSets...) } return nil, nil } - + // we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action + allNodes := ssn.GetUnschedulableAndUnresolvableNodesForTask(preemptor) predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, true) nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index 1224f69f75..a028f5726f 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -125,8 +125,9 @@ func (ra *Action) Execute(ssn *framework.Session) { } assigned := false - // TODO: we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action - for _, n := range ssn.Nodes { + // we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action + totalNodes := ssn.GetUnschedulableAndUnresolvableNodesForTask(task) + for _, n := range totalNodes { var statusSets api.StatusSets statusSets, _ = ssn.PredicateFn(task, n) diff --git a/pkg/scheduler/api/unschedule_info.go b/pkg/scheduler/api/unschedule_info.go index f7149222a6..60b7b012cc 100644 --- a/pkg/scheduler/api/unschedule_info.go +++ b/pkg/scheduler/api/unschedule_info.go @@ -4,6 +4,8 @@ import ( "fmt" "sort" "strings" + + "k8s.io/apimachinery/pkg/util/sets" ) const ( @@ -63,6 +65,17 @@ func (f *FitErrors) SetNodeError(nodeName string, err error) { f.nodes[nodeName] = fe } +// GetUnschedulableAndUnresolvableNodes returns the set of nodes that has no help from preempting pods from it +func (f *FitErrors) GetUnschedulableAndUnresolvableNodes() map[string]sets.Empty { + ret := make(map[string]sets.Empty) + for _, node := range f.nodes { + if node.Status.ContainsUnschedulableAndUnresolvable() { + ret[node.NodeName] = sets.Empty{} + } + } + return ret +} + // Error returns the final error message func (f *FitErrors) Error() string { if f.err == "" { @@ -99,7 +112,7 @@ type FitError struct { Status StatusSets } -// NewFitError return FitError by message +// NewFitError return FitError by message, setting default code to Error func NewFitError(task *TaskInfo, node *NodeInfo, message ...string) *FitError { fe := &FitError{ taskName: task.Name, @@ -114,8 +127,8 @@ func NewFitError(task *TaskInfo, node *NodeInfo, message ...string) *FitError { return fe } -// NewFitStatus returns a fit error with code and reason in it -func NewFitStatus(task *TaskInfo, node *NodeInfo, sts ...*Status) *FitError { +// NewFitErrWithStatus returns a fit error with code and reason in it +func NewFitErrWithStatus(task *TaskInfo, node *NodeInfo, sts ...*Status) *FitError { fe := &FitError{ taskName: task.Name, taskNamespace: task.Namespace, diff --git a/pkg/scheduler/api/unschedule_info_test.go b/pkg/scheduler/api/unschedule_info_test.go index 37ab23a4e3..56c9273ddf 100644 --- a/pkg/scheduler/api/unschedule_info_test.go +++ b/pkg/scheduler/api/unschedule_info_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/util/sets" ) const ( @@ -32,16 +33,19 @@ func TestFitError(t *testing.T) { tests := []struct { task *TaskInfo node *NodeInfo - reason []string status []*Status - want *FitError + // the wanted reason from fitError + reason []string + // the wanted fitError + wantErr *FitError + // string of fitError errStr string }{ { task: &TaskInfo{Name: "pod1", Namespace: "ns1"}, node: &NodeInfo{Name: "node1"}, reason: []string{affinityRulesNotMatch, nodeAffinity}, - want: &FitError{ + wantErr: &FitError{ NodeName: "node1", taskNamespace: "ns1", taskName: "pod1", Status: []*Status{{Reason: affinityRulesNotMatch, Code: Error}, {Reason: nodeAffinity, Code: Error}}, }, @@ -52,7 +56,7 @@ func TestFitError(t *testing.T) { node: &NodeInfo{Name: "node2"}, status: []*Status{{Reason: nodeAffinity, Code: UnschedulableAndUnresolvable}, {Reason: existingAntiAffinityNotMatch, Code: Error}}, reason: []string{nodeAffinity, existingAntiAffinityNotMatch}, - want: &FitError{ + wantErr: &FitError{ NodeName: "node2", taskNamespace: "ns2", taskName: "pod2", Status: []*Status{{Reason: nodeAffinity, Code: UnschedulableAndUnresolvable}, {Reason: existingAntiAffinityNotMatch, Code: Error}}, }, @@ -63,12 +67,12 @@ func TestFitError(t *testing.T) { var got *FitError for _, test := range tests { if len(test.status) != 0 { - got = NewFitStatus(test.task, test.node, test.status...) + got = NewFitErrWithStatus(test.task, test.node, test.status...) } else if len(test.reason) != 0 { got = NewFitError(test.task, test.node, test.reason...) } - assert.Equal(t, test.want, got) + assert.Equal(t, test.wantErr, got) assert.Equal(t, test.reason, got.Reasons()) assert.Equal(t, test.errStr, got.Error()) } @@ -81,15 +85,20 @@ func TestFitErrors(t *testing.T) { err error fiterr *FitError want string // expected error string + // nodes that are not helpful for preempting, which has a code of UnschedulableAndUnresolvable + filterNodes map[string]sets.Empty }{ { - want: "0/0 nodes are unavailable", // base fit err string is empty, set as the default + want: "0/0 nodes are unavailable", // base fit err string is empty, set as the default + filterNodes: map[string]sets.Empty{}, }, { node: "node1", fitStr: "fit failed", err: fmt.Errorf(NodePodNumberExceeded), want: "fit failed: 1 node(s) pod number exceeded.", + // no node has UnschedulableAndUnresolvable + filterNodes: map[string]sets.Empty{}, }, { node: "node1", @@ -100,6 +109,8 @@ func TestFitErrors(t *testing.T) { Status: []*Status{{Reason: nodeAffinity, Code: UnschedulableAndUnresolvable}}, }, want: "NodeResourceFitFailed: 1 node(s) didn't match Pod's node affinity/selector, 1 node(s) pod number exceeded.", + // only node2 has UnschedulableAndUnresolvable + filterNodes: map[string]sets.Empty{"node2": {}}, }, } for _, test := range tests { @@ -113,5 +124,6 @@ func TestFitErrors(t *testing.T) { } got := fitErrs.Error() assert.Equal(t, test.want, got) + assert.Equal(t, test.filterNodes, fitErrs.GetUnschedulableAndUnresolvableNodes()) } } diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index eccfbe62f0..34ada7c541 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -287,6 +287,32 @@ func jobStatus(ssn *Session, jobInfo *api.JobInfo) scheduling.PodGroupStatus { return status } +// GetUnschedulableAndUnresolvableNodesForTask filter out those node that has UnschedulableAndUnresolvable +func (ssn *Session) GetUnschedulableAndUnresolvableNodesForTask(task *api.TaskInfo) []*api.NodeInfo { + fitErrors, ok1 := ssn.Jobs[task.Job] + if !ok1 { + return ssn.NodeList + } + fitErr, ok2 := fitErrors.NodesFitErrors[task.UID] + if !ok2 { + return ssn.NodeList + } + + skipNodes := fitErr.GetUnschedulableAndUnresolvableNodes() + if len(skipNodes) == 0 { + return ssn.NodeList + } + + ret := make([]*api.NodeInfo, 0, len(ssn.Nodes)) + for _, node := range ssn.Nodes { + if _, ok := skipNodes[node.Name]; !ok { + ret = append(ret, node) + } + } + + return ret +} + // Statement returns new statement object func (ssn *Session) Statement() *Statement { return &Statement{ diff --git a/pkg/scheduler/framework/session_plugins_test.go b/pkg/scheduler/framework/session_plugins_test.go new file mode 100644 index 0000000000..be8e5b695b --- /dev/null +++ b/pkg/scheduler/framework/session_plugins_test.go @@ -0,0 +1,138 @@ +/* +Copyright 2024 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + + schedulingv1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/cache" + "volcano.sh/volcano/pkg/scheduler/util" +) + +func newFitErr(taskName, nodeName string, sts ...*api.Status) *api.FitError { + return api.NewFitErrWithStatus(&api.TaskInfo{Name: taskName}, &api.NodeInfo{Name: nodeName}, sts...) +} + +func TestFilterOutPreemptMayNotHelpNodes(t *testing.T) { + tests := []struct { + Name string + PodGroups []*schedulingv1.PodGroup + Pods []*v1.Pod + Nodes []*v1.Node + Queues []*schedulingv1.Queue + status map[api.TaskID]*api.FitError + want map[api.TaskID][]string // task's nodes name list which is helpful for preemption + }{ + { + Name: "all are helpful for preemption", + PodGroups: []*schedulingv1.PodGroup{util.BuildPodGroup("pg1", "c1", "c1", 1, nil, schedulingv1.PodGroupInqueue)}, + Pods: []*v1.Pod{ + util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("2", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, nil), + util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("2", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil), + }, + Nodes: []*v1.Node{ + util.BuildNode("n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "worker"}), + util.BuildNode("n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "worker"}), + }, + Queues: []*schedulingv1.Queue{util.BuildQueue("c1", 1, nil)}, + status: map[api.TaskID]*api.FitError{}, + want: map[api.TaskID][]string{"c1-p2": {"n1", "n2"}, "c1-p1": {"n1", "n2"}}, + }, + { + Name: "master predicate failed: node selector does not match", + PodGroups: []*schedulingv1.PodGroup{util.BuildPodGroup("pg1", "c1", "c1", 1, nil, schedulingv1.PodGroupInqueue)}, + Pods: []*v1.Pod{ + util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("2", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, map[string]string{"nodeRole": "master"}), + util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("2", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"nodeRole": "worker"}), + }, + Nodes: []*v1.Node{util.BuildNode("n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "worker"})}, + Queues: []*schedulingv1.Queue{util.BuildQueue("c1", 1, nil)}, + status: map[api.TaskID]*api.FitError{"c1-p1": newFitErr("c1-p1", "n1", &api.Status{Reason: "node(s) didn't match Pod's node selector", Code: api.UnschedulableAndUnresolvable})}, + want: map[api.TaskID][]string{"c1-p2": {"n1"}, "c1-p1": {}}, + }, + { + Name: "p1,p3 has node fit error", + PodGroups: []*schedulingv1.PodGroup{util.BuildPodGroup("pg1", "c1", "c1", 2, map[string]int32{"master": 1, "worker": 1}, schedulingv1.PodGroupInqueue)}, + Pods: []*v1.Pod{ + util.BuildPod("c1", "p0", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, map[string]string{"nodeRole": "master"}), + util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, map[string]string{"nodeRole": "master"}), + util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"nodeRole": "worker"}), + util.BuildPod("c1", "p3", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"nodeRole": "worker"}), + }, + Nodes: []*v1.Node{ + util.BuildNode("n1", api.BuildResourceList("1", "2Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "master"}), + util.BuildNode("n2", api.BuildResourceList("1", "2Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "worker"}), + }, + Queues: []*schedulingv1.Queue{util.BuildQueue("c1", 1, nil)}, + status: map[api.TaskID]*api.FitError{ + "c1-p1": newFitErr("c1-p1", "n2", &api.Status{Reason: "node(s) didn't match Pod's node selector", Code: api.UnschedulableAndUnresolvable}), + "c1-p3": newFitErr("c1-p3", "n1", &api.Status{Reason: "node(s) didn't match Pod's node selector", Code: api.UnschedulableAndUnresolvable}), + }, + // notes that are useful for preempting + want: map[api.TaskID][]string{ + "c1-p0": {"n1", "n2"}, + "c1-p1": {"n1"}, + "c1-p2": {"n1", "n2"}, + "c1-p3": {"n2"}, + }, + }, + } + + for i, test := range tests { + t.Run(test.Name, func(t *testing.T) { + scherCache := cache.NewDefaultMockSchedulerCache("test-scheduler") + for _, node := range test.Nodes { + scherCache.AddOrUpdateNode(node) + } + for _, pod := range test.Pods { + scherCache.AddPod(pod) + } + for _, pg := range test.PodGroups { + scherCache.AddPodGroupV1beta1(pg) + } + for _, queue := range test.Queues { + scherCache.AddQueueV1beta1(queue) + } + ssn := OpenSession(scherCache, nil, nil) + defer CloseSession(ssn) + for _, job := range ssn.Jobs { + for _, task := range job.TaskStatusIndex[api.Pending] { + if fitErr, exist := test.status[task.UID]; exist { + fe := api.NewFitErrors() + fe.SetNodeError(fitErr.NodeName, fitErr) + job.NodesFitErrors[task.UID] = fe + } + + // check potential nodes + potentialNodes := ssn.GetUnschedulableAndUnresolvableNodesForTask(task) + want := test.want[task.UID] + got := make([]string, 0, len(potentialNodes)) + for _, node := range potentialNodes { + got = append(got, node.Name) + } + assert.Equal(t, want, got, fmt.Sprintf("case %d: task %s", i, task.UID)) + } + } + }) + } +}