Skip to content

Commit

Permalink
Merge pull request #3430 from lowang-bh/fixAllocate
Browse files Browse the repository at this point in the history
improve: continue allocating if remain tasks is more than job's minMember
  • Loading branch information
volcano-sh-bot authored Jul 15, 2024
2 parents 277f19b + 23c5d4c commit 85494a9
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 54 deletions.
15 changes: 14 additions & 1 deletion pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
continue
}

// check if the task with its spec has already predicates failed
if job.TaskHasFitErrors(task) {
klog.V(5).Infof("Task %s with role spec %s has already predicated failed, skip", task.Name, task.TaskRole)
continue
}

klog.V(3).Infof("There are <%d> nodes for Job <%v/%v>", len(ssn.Nodes), job.Namespace, job.Name)

if err := ssn.PrePredicateFn(task); err != nil {
Expand All @@ -196,7 +202,14 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
predicateNodes, fitErrors := ph.PredicateNodes(task, allNodes, alloc.predicate, true)
if len(predicateNodes) == 0 {
job.NodesFitErrors[task.UID] = fitErrors
break
// Assume that all left tasks are allocatable, but can not meet gang-scheduling min member,
// so we should break from continuously allocating.
// otherwise, should continue to find other allocatable task
if job.NeedContinueAllocating() {
continue
} else {
break
}
}

// Candidate nodes are divided into two gradients:
Expand Down
76 changes: 76 additions & 0 deletions pkg/scheduler/actions/allocate/allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,77 @@ func TestAllocate(t *testing.T) {
proportion.PluginName: proportion.New,
predicates.PluginName: predicates.New,
nodeorder.PluginName: nodeorder.New,
gang.PluginName: gang.New,
}
tests := []uthelper.TestCommonStruct{
{
Name: "prepredicate failed: node selector does not match",
PodGroups: []*schedulingv1.PodGroup{
util.BuildPodGroup("pg1", "c1", "c1", 1, nil, schedulingv1.PodGroupInqueue),
},
Pods: []*v1.Pod{
// should use different role, because allocate actions default to enable the role caches when predicate
util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, map[string]string{"nodeRole": "master"}),
util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"nodeRole": "worker"}),
},
Nodes: []*v1.Node{
util.BuildNode("n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "worker"}),
},
Queues: []*schedulingv1.Queue{
util.BuildQueue("c1", 1, nil),
},
ExpectBindMap: map[string]string{
"c1/p2": "n1",
},
ExpectBindsNum: 1,
},
{
Name: "prepredicate failed and tasks are not used up, continue on untill min member meet",
PodGroups: []*schedulingv1.PodGroup{
util.BuildPodGroup("pg1", "c1", "c1", 2, map[string]int32{"master": 1, "worker": 1}, schedulingv1.PodGroupInqueue),
},
Pods: []*v1.Pod{
// should use different role, because allocate actions default to enable the role caches when predicate
util.BuildPod("c1", "p0", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, map[string]string{"nodeRole": "master"}),
util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, map[string]string{"nodeRole": "master"}),
util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"nodeRole": "worker"}),
util.BuildPod("c1", "p3", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"nodeRole": "worker"}),
},
Nodes: []*v1.Node{
util.BuildNode("n1", api.BuildResourceList("1", "2Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "master"}),
util.BuildNode("n2", api.BuildResourceList("1", "2Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "worker"}),
},
Queues: []*schedulingv1.Queue{
util.BuildQueue("c1", 1, nil),
},
ExpectBindMap: map[string]string{
"c1/p0": "n1",
"c1/p2": "n2",
},
ExpectBindsNum: 2,
},
{
Name: "master's min member can not be allocated, break from allocating",
PodGroups: []*schedulingv1.PodGroup{
util.BuildPodGroup("pg1", "c1", "c1", 2, map[string]int32{"master": 2, "worker": 0}, schedulingv1.PodGroupInqueue),
},
Pods: []*v1.Pod{
// should use different role, because allocate actions default to enable the role caches when predicate
util.BuildPod("c1", "p0", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, map[string]string{"nodeRole": "master"}),
util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, map[string]string{"nodeRole": "master"}),
util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"nodeRole": "worker"}),
util.BuildPod("c1", "p3", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"nodeRole": "worker"}),
},
Nodes: []*v1.Node{
util.BuildNode("n1", api.BuildResourceList("1", "2Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "master"}),
util.BuildNode("n2", api.BuildResourceList("2", "2Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "worker"}),
},
Queues: []*schedulingv1.Queue{
util.BuildQueue("c1", 1, nil),
},
ExpectBindMap: map[string]string{},
ExpectBindsNum: 0,
},
{
Name: "one Job with two Pods on one node",
PodGroups: []*schedulingv1.PodGroup{
Expand Down Expand Up @@ -145,6 +214,13 @@ func TestAllocate(t *testing.T) {
tiers := []conf.Tier{
{
Plugins: []conf.PluginOption{
{
Name: gang.PluginName,
EnabledJobOrder: &trueValue,
EnabledJobReady: &trueValue,
EnabledJobPipelined: &trueValue,
EnabledJobStarving: &trueValue,
},
{
Name: drf.PluginName,
EnabledPreemptable: &trueValue,
Expand Down
Loading

0 comments on commit 85494a9

Please sign in to comment.