Skip to content

Commit

Permalink
Merge pull request #3786 from QingyaFan/sync_job_compatible
Browse files Browse the repository at this point in the history
Can not sync job status correctly when upgrading from v1.5 #3640
  • Loading branch information
volcano-sh-bot authored Oct 22, 2024
2 parents 26a7538 + f2b42e7 commit 979840c
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 55 deletions.
125 changes: 72 additions & 53 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,29 @@ import (

var calMutex sync.Mutex

// getPodGroupByJob returns the podgroup related to the vcjob.
// it will return normal pg if it exist in cluster,
// else it return legacy pg before version 1.5.
func (cc *jobcontroller) getPodGroupByJob(job *batch.Job) (*scheduling.PodGroup, error) {
pgName := cc.generateRelatedPodGroupName(job)
pg, err := cc.pgLister.PodGroups(job.Namespace).Get(pgName)
if err == nil {
return pg, nil
}
if apierrors.IsNotFound(err) {
pg, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name)
if err != nil {
return nil, err
}
return pg, nil
}
return nil, err
}

func (cc *jobcontroller) generateRelatedPodGroupName(job *batch.Job) string {
return fmt.Sprintf("%s-%s", job.Name, string(job.UID))
}

func (cc *jobcontroller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseMap, updateStatus state.UpdateStatusFn) error {
job := jobInfo.Job
klog.V(3).Infof("Killing Job <%s/%s>, current version %d", job.Namespace, job.Name, job.Status.Version)
Expand Down Expand Up @@ -152,12 +175,17 @@ func (cc *jobcontroller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.Pha
}

// Delete PodGroup
pgName := job.Name + "-" + string(job.UID)
if err := cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Delete(context.TODO(), pgName, metav1.DeleteOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("Failed to delete PodGroup of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
pg, err := cc.getPodGroupByJob(job)
if err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("Failed to find PodGroup of Job: %s/%s, error: %s", job.Namespace, job.Name, err.Error())
return err
}
if pg != nil {
if err := cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Delete(context.TODO(), pg.Name, metav1.DeleteOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("Failed to delete PodGroup of Job %s/%s: %v", job.Namespace, job.Name, err)
return err
}
}
}

Expand Down Expand Up @@ -281,8 +309,12 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat
}

var syncTask bool
pgName := job.Name + "-" + string(job.UID)
if pg, _ := cc.pgLister.PodGroups(job.Namespace).Get(pgName); pg != nil {
pg, err := cc.getPodGroupByJob(job)
if err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("Failed to find PodGroup of Job: %s/%s, error: %s", job.Namespace, job.Name, err.Error())
return err
}
if pg != nil {
if pg.Status.Phase != "" && pg.Status.Phase != scheduling.PodGroupPending {
syncTask = true
}
Expand Down Expand Up @@ -656,63 +688,50 @@ func (cc *jobcontroller) createPVC(job *batch.Job, vcName string, volumeClaim *v

func (cc *jobcontroller) createOrUpdatePodGroup(job *batch.Job) error {
// If PodGroup does not exist, create one for Job.
pgName := job.Name + "-" + string(job.UID)
var pg *scheduling.PodGroup
var err error
pg, err = cc.pgLister.PodGroups(job.Namespace).Get(pgName)
pg, err := cc.getPodGroupByJob(job)
if err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("Failed to get PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
// try to get old pg if new pg not exist
pg, err = cc.pgLister.PodGroups(job.Namespace).Get(job.Name)
if err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("Failed to get PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}

minTaskMember := map[string]int32{}
for _, task := range job.Spec.Tasks {
if task.MinAvailable != nil {
minTaskMember[task.Name] = *task.MinAvailable
} else {
minTaskMember[task.Name] = task.Replicas
}
minTaskMember := map[string]int32{}
for _, task := range job.Spec.Tasks {
if task.MinAvailable != nil {
minTaskMember[task.Name] = *task.MinAvailable
} else {
minTaskMember[task.Name] = task.Replicas
}
}

pg := &scheduling.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
// add job.UID into its name when create new PodGroup
Name: pgName,
Annotations: job.Annotations,
Labels: job.Labels,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
},
Spec: scheduling.PodGroupSpec{
MinMember: job.Spec.MinAvailable,
MinTaskMember: minTaskMember,
Queue: job.Spec.Queue,
MinResources: cc.calcPGMinResources(job),
PriorityClassName: job.Spec.PriorityClassName,
pg := &scheduling.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
// add job.UID into its name when create new PodGroup
Name: cc.generateRelatedPodGroupName(job),
Annotations: job.Annotations,
Labels: job.Labels,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
}
},
Spec: scheduling.PodGroupSpec{
MinMember: job.Spec.MinAvailable,
MinTaskMember: minTaskMember,
Queue: job.Spec.Queue,
MinResources: cc.calcPGMinResources(job),
PriorityClassName: job.Spec.PriorityClassName,
},
}

if _, err = cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Create(context.TODO(), pg, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
klog.Errorf("Failed to create PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
if _, err = cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Create(context.TODO(), pg, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
klog.Errorf("Failed to create PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
return nil
}
return nil
}

pgShouldUpdate := false
Expand Down
80 changes: 78 additions & 2 deletions pkg/controllers/job/job_controller_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,57 @@ func TestKillJobFunc(t *testing.T) {
Plugins: []string{"svc", "ssh", "env"},
ExpectVal: nil,
},
{
Name: "KillJob compatibility with version lt 1.5 success case",
Job: &v1alpha1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job2",
Namespace: namespace,
UID: "e7f18111-1cec-11ea-b688-fa163ec79500",
ResourceVersion: "100",
},
},
PodGroup: &schedulingapi.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "job2",
Namespace: namespace,
},
},
PodRetainPhase: state.PodRetainPhaseNone,
UpdateStatus: nil,
JobInfo: &apis.JobInfo{
Namespace: namespace,
Name: "jobinfo2",
Pods: map[string]map[string]*v1.Pod{
"task1": {
"pod1": buildPod(namespace, "pod1", v1.PodRunning, nil),
"pod2": buildPod(namespace, "pod2", v1.PodRunning, nil),
},
},
},
Services: []v1.Service{
{
ObjectMeta: metav1.ObjectMeta{
Name: "job2",
Namespace: namespace,
},
},
},
Secrets: []v1.Secret{
{
ObjectMeta: metav1.ObjectMeta{
Name: "job2-e7f18111-1cec-11ea-b688-fa163ec79500-ssh",
Namespace: namespace,
},
},
},
Pods: map[string]*v1.Pod{
"pod1": buildPod(namespace, "pod1", v1.PodRunning, nil),
"pod2": buildPod(namespace, "pod2", v1.PodRunning, nil),
},
Plugins: []string{"svc", "ssh", "env"},
ExpectVal: nil,
},
}

for i, testcase := range testcases {
Expand Down Expand Up @@ -588,6 +639,30 @@ func TestUpdatePodGroupIfJobUpdateFunc(t *testing.T) {
},
ExpectVal: nil,
},
{
Name: "UpdatePodGroup compatibility with version lt 1.5 success Case",
Job: &v1alpha1.Job{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: "job2",
ResourceVersion: "100",
UID: "e7f18111-1cec-11ea-b688-fa163ec79500",
},
Spec: v1alpha1.JobSpec{
PriorityClassName: "new",
},
},
PodGroup: &schedulingapi.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: "job2",
},
Spec: schedulingapi.PodGroupSpec{
MinResources: &v1.ResourceList{},
},
},
ExpectVal: nil,
},
}

for _, testcase := range testcases {
Expand All @@ -601,8 +676,9 @@ func TestUpdatePodGroupIfJobUpdateFunc(t *testing.T) {
t.Errorf("Expected return value to be equal to expected: %s, but got: %s", testcase.ExpectVal, err)
}

pgName := testcase.Job.Name + "-" + string(testcase.Job.UID)
pg, err := fakeController.vcClient.SchedulingV1beta1().PodGroups(namespace).Get(context.TODO(), pgName, metav1.GetOptions{})
pgName := testcase.PodGroup.Name
pg, err := fakeController.vcClient.SchedulingV1beta1().PodGroups(namespace).
Get(context.TODO(), pgName, metav1.GetOptions{})
if err != nil {
t.Error("Expected PodGroup to be created, but not created")
}
Expand Down

0 comments on commit 979840c

Please sign in to comment.