From 23c5d4c3eed61681b50392a89f77782add553b43 Mon Sep 17 00:00:00 2001 From: lowang-bh Date: Mon, 13 May 2024 21:34:16 +0800 Subject: [PATCH] improve: parse and store task spec in taskInfo; pre-check if task has fit error so as to save predicating time fix comments: use taskRole to store the value of 'volcano.sh/task-spec' in taskInfo Signed-off-by: lowang-bh Signed-off-by: lowang-bh Signed-off-by: lowang-bh --- pkg/scheduler/actions/allocate/allocate.go | 13 +++- pkg/scheduler/api/job_info.go | 79 +++++++++++++++------- pkg/scheduler/util/predicate_helper.go | 2 +- 3 files changed, 66 insertions(+), 28 deletions(-) diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 89680d95c5..508f72b99d 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -178,6 +178,12 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a continue } + // check if the task with its spec has already predicates failed + if job.TaskHasFitErrors(task) { + klog.V(5).Infof("Task %s with role spec %s has already predicated failed, skip", task.Name, task.TaskRole) + continue + } + klog.V(3).Infof("There are <%d> nodes for Job <%v/%v>", len(ssn.Nodes), job.Namespace, job.Name) if err := ssn.PrePredicateFn(task); err != nil { @@ -193,9 +199,10 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a predicateNodes, fitErrors := ph.PredicateNodes(task, allNodes, alloc.predicate, true) if len(predicateNodes) == 0 { job.NodesFitErrors[task.UID] = fitErrors - // Assume that all left tasks is allocable, can not meet gang-scheduling min member, we should break from continuously allocating - // otherwise, should continue to find other allocable task - if job.CheckJobNeedContinueAllocating() { + // Assume that all left tasks are allocatable, but can not meet gang-scheduling min member, + // so we should break from continuously allocating. + // otherwise, should continue to find other allocatable task + if job.NeedContinueAllocating() { continue } else { break diff --git a/pkg/scheduler/api/job_info.go b/pkg/scheduler/api/job_info.go index 4e6fdd22fb..001383f664 100644 --- a/pkg/scheduler/api/job_info.go +++ b/pkg/scheduler/api/job_info.go @@ -107,6 +107,7 @@ type TaskInfo struct { Name string Namespace string + TaskRole string // value of "volcano.sh/task-spec" // Resreq is the resource that used when task running. Resreq *Resource @@ -148,7 +149,10 @@ func getJobID(pod *v1.Pod) JobID { return "" } -func getTaskSpec(pod *v1.Pod) string { +func getTaskRole(pod *v1.Pod) string { + if pod == nil { + return "" + } if ts, found := pod.Annotations[batch.TaskSpecKey]; found && len(ts) != 0 { return ts } @@ -170,6 +174,7 @@ func NewTaskInfo(pod *v1.Pod) *TaskInfo { preemptable := GetPodPreemptable(pod) revocableZone := GetPodRevocableZone(pod) topologyInfo := GetPodTopologyInfo(pod) + role := getTaskRole(pod) jobID := getJobID(pod) @@ -178,6 +183,7 @@ func NewTaskInfo(pod *v1.Pod) *TaskInfo { Job: jobID, Name: pod.Name, Namespace: pod.Namespace, + TaskRole: role, Priority: 1, Pod: pod, Resreq: resReq, @@ -251,6 +257,7 @@ func (ti *TaskInfo) Clone() *TaskInfo { Job: ti.Job, Name: ti.Name, Namespace: ti.Namespace, + TaskRole: ti.TaskRole, Priority: ti.Priority, PodVolumes: ti.PodVolumes, Pod: ti.Pod, @@ -269,18 +276,11 @@ func (ti *TaskInfo) Clone() *TaskInfo { } } -func (ti *TaskInfo) GetTaskSpecKey() string { - if ti.Pod == nil { - return "" - } - return getTaskSpec(ti.Pod) -} - // String returns the taskInfo details in a string func (ti TaskInfo) String() string { - res := fmt.Sprintf("Task (%v:%v/%v): job %v, status %v, pri %v, "+ + res := fmt.Sprintf("Task (%v:%v/%v): taskSpec %s, job %v, status %v, pri %v, "+ "resreq %v, preemptable %v, revocableZone %v", - ti.UID, ti.Namespace, ti.Name, ti.Job, ti.Status, ti.Priority, + ti.UID, ti.Namespace, ti.Name, ti.TaskRole, ti.Job, ti.Status, ti.Priority, ti.Resreq, ti.Preemptable, ti.RevocableZone) if ti.NumaInfo != nil { @@ -723,22 +723,53 @@ func (ji *JobInfo) PendingBestEffortTaskNum() int32 { return int32(count) } -// CheckJobNeedContinueAllocating checks whether it can continue to allocate for current job -// there are two cases to continue: -// 1. job's total allocable number meet its minAvailable -// 2. each task's allocable number meet its independent minAvailable -func (ji *JobInfo) CheckJobNeedContinueAllocating() bool { +// FitFailedRoles returns the job roles' failed fit records +func (ji *JobInfo) FitFailedRoles() map[string]struct{} { failedRoles := map[string]struct{}{} for tid := range ji.NodesFitErrors { task := ji.Tasks[tid] - failedRoles[getTaskSpec(task.Pod)] = struct{}{} + failedRoles[task.TaskRole] = struct{}{} + } + return failedRoles +} + +// TaskHasFitErrors checks if the task has fit errors and can continue try predicating +func (ji *JobInfo) TaskHasFitErrors(task *TaskInfo) bool { + // if the task didn't set the spec key, should not use the cache + if len(task.TaskRole) == 0 { + return false + } + + _, exist := ji.FitFailedRoles()[task.TaskRole] + return exist +} + +// NeedContinueAllocating checks whether it can continue on allocating for current job +// when its one pod predicated failed, there are two cases to continue: +// 1. job's total allocatable number meet its minAvailable(each task role has no independent minMember setting): +// because there are cases that some of the pods are not allocatable, but other pods are allocatable and +// the number of this kind pods can meet the gang-scheduling +// 2. each task's allocable number meet its independent minAvailable +// this is for the case that each task role has its own independent minMember. +// eg, current role's pod has a failed predicating result but its allocated number has meet its minMember, +// the other roles' pods which have no failed predicating results can continue on +// +// performance analysis: +// +// As the failed predicating role has been pre-checked when it was popped from queue, +// this function will only be called at most as the number of roles in this job. +func (ji *JobInfo) NeedContinueAllocating() bool { + // Ensures all tasks must be running; if any pod allocation fails, further execution stops + if int(ji.MinAvailable) == len(ji.Tasks) { + return false } + failedRoles := ji.FitFailedRoles() pending := map[string]int32{} for _, task := range ji.TaskStatusIndex[Pending] { - pending[getTaskSpec(task.Pod)]++ + pending[task.TaskRole]++ } - // 1. don't consider each role's min, just consider total allocable number vs job's MinAvailable + // 1. don't consider each role's min, just consider total allocatable number vs job's MinAvailable if ji.MinAvailable < ji.TaskMinAvailableTotal { left := int32(0) for role, cnt := range pending { @@ -773,7 +804,7 @@ func (ji *JobInfo) getJobAllocatedRoles() map[string]int32 { if AllocatedStatus(status) || status == Succeeded { for _, task := range tasks { - occupiedMap[getTaskSpec(task.Pod)]++ + occupiedMap[task.TaskRole]++ } continue } @@ -781,7 +812,7 @@ func (ji *JobInfo) getJobAllocatedRoles() map[string]int32 { if status == Pending { for _, task := range tasks { if task.InitResreq.IsEmpty() { - occupiedMap[getTaskSpec(task.Pod)]++ + occupiedMap[task.TaskRole]++ } } } @@ -803,7 +834,7 @@ func (ji *JobInfo) CheckTaskValid() bool { status == Pipelined || status == Pending { for _, task := range tasks { - actual[getTaskSpec(task.Pod)]++ + actual[task.TaskRole]++ } } } @@ -847,7 +878,7 @@ func (ji *JobInfo) CheckTaskPipelined() bool { status == Succeeded || status == Pipelined { for _, task := range tasks { - occupiedMap[getTaskSpec(task.Pod)]++ + occupiedMap[task.TaskRole]++ } continue } @@ -855,7 +886,7 @@ func (ji *JobInfo) CheckTaskPipelined() bool { if status == Pending { for _, task := range tasks { if task.InitResreq.IsEmpty() { - occupiedMap[getTaskSpec(task.Pod)]++ + occupiedMap[task.TaskRole]++ } } } @@ -880,7 +911,7 @@ func (ji *JobInfo) CheckTaskStarving() bool { status == Succeeded || status == Pipelined { for _, task := range tasks { - occupiedMap[getTaskSpec(task.Pod)]++ + occupiedMap[task.TaskRole]++ } continue } diff --git a/pkg/scheduler/util/predicate_helper.go b/pkg/scheduler/util/predicate_helper.go index 2ce5164c66..ec7061eaae 100644 --- a/pkg/scheduler/util/predicate_helper.go +++ b/pkg/scheduler/util/predicate_helper.go @@ -101,7 +101,7 @@ func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeI } func taskGroupID(task *api.TaskInfo) string { - return fmt.Sprintf("%s/%s", task.Job, task.GetTaskSpecKey()) + return fmt.Sprintf("%s/%s", task.Job, task.TaskRole) } func NewPredicateHelper() PredicateHelper {