Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GSoC] Compatibility Changes in Trial Controller #2394

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4410c1b
chore: add condition branch in requeue logic.
Electronic-Waste Jul 23, 2024
8a92977
chore: add ReportObservationLog in katib_manager_util.go.
Electronic-Waste Jul 23, 2024
4fb810c
chore: add ReportTrialUnavailableMetrics func.
Electronic-Waste Jul 23, 2024
712af68
chore: insert unavailable value into Katib DB.
Electronic-Waste Jul 23, 2024
76e3c95
fix: fix lint error.
Electronic-Waste Jul 23, 2024
7f6271b
fix: add nil condition judgement.
Electronic-Waste Jul 26, 2024
46a5afd
fix: add nil condition judgement in trial_controller_util.go
Electronic-Waste Jul 26, 2024
59e98a3
chore(trial): delete nil check of MC kind in the Trial controller.
Electronic-Waste Aug 2, 2024
bc9106e
chore(trial): init MC in newFakeTrialBatchJob to avoid nil condition …
Electronic-Waste Aug 2, 2024
a8c9d90
fix(trial): fix lint error.
Electronic-Waste Aug 2, 2024
4d1d010
fix(trial): fix lint error in controller.
Electronic-Waste Aug 2, 2024
45e4446
test(trial): add integration test for Push MC.
Electronic-Waste Aug 2, 2024
e0bd76a
chore(trial): retry reconcilation when reporting unavailable metrics …
Electronic-Waste Aug 25, 2024
7971594
test(trial): fix EXPECT order.
Electronic-Waste Aug 25, 2024
2b6b4c1
test(trial): fix typo error.
Electronic-Waste Aug 25, 2024
b2872ec
chore(trial): add errReportMetricsFailed.
Electronic-Waste Aug 26, 2024
17d5610
Update pkg/controller.v1beta1/trial/trial_controller.go
Electronic-Waste Aug 27, 2024
11d71af
Update pkg/controller.v1beta1/trial/trial_controller_util.go
Electronic-Waste Aug 28, 2024
303938b
Update pkg/controller.v1beta1/trial/trial_controller.go
Electronic-Waste Aug 28, 2024
541ec26
fix(trial): rename errors pkg.
Electronic-Waste Aug 28, 2024
2fe8c33
test(trial): update the order of UT.
Electronic-Waste Sep 1, 2024
8495589
test(trial): use different names for UTs.
Electronic-Waste Sep 3, 2024
af40d1f
test(trial): separate Push MC UTs with original UTs.
Electronic-Waste Sep 9, 2024
60214c8
test(trial): fix line error with gofmt.
Electronic-Waste Sep 9, 2024
008a574
test(trial): reserve one UT for Push MC.
Electronic-Waste Sep 9, 2024
ffe6089
test(trial): fix typo error.
Electronic-Waste Sep 10, 2024
ddd2cb3
test(trial): make some tiny changes.
Electronic-Waste Sep 11, 2024
6a7a528
fix(trial): move cancel func to t.Cleanup.
Electronic-Waste Sep 19, 2024
7cb4e3e
fix(trial): use the propagated gomega instance to improve debuggability.
Electronic-Waste Sep 19, 2024
9604ed4
fix(trial): use gofmt to reformat code.
Electronic-Waste Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/common/v1beta1/katib_manager_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ func GetObservationLog(request *api_pb.GetObservationLogRequest) (*api_pb.GetObs
return kc.GetObservationLog(ctx, request)
}

func ReportObservationLog(request *api_pb.ReportObservationLogRequest) (*api_pb.ReportObservationLogReply, error) {
ctx := context.Background()
kcc, err := getKatibDBManagerClientAndConn()
if err != nil {
return nil, err
}
defer closeKatibDBManagerConnection(kcc)
kc := kcc.KatibDBManagerClient
return kc.ReportObservationLog(ctx, request)
}

func DeleteObservationLog(request *api_pb.DeleteObservationLogRequest) (*api_pb.DeleteObservationLogReply, error) {
ctx := context.Background()
kcc, err := getKatibDBManagerClientAndConn()
Expand Down
17 changes: 17 additions & 0 deletions pkg/controller.v1beta1/trial/managerclient/managerclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type ManagerClient interface {
instance *trialsv1beta1.Trial) (*api_pb.GetObservationLogReply, error)
DeleteTrialObservationLog(
instance *trialsv1beta1.Trial) (*api_pb.DeleteObservationLogReply, error)
ReportTrialObservationLog(
instance *trialsv1beta1.Trial,
observationLogs *api_pb.ObservationLog) (*api_pb.ReportObservationLogReply, error)
}

// DefaultClient implements the Client interface.
Expand Down Expand Up @@ -88,3 +91,17 @@ func (d *DefaultClient) DeleteTrialObservationLog(
}
return reply, nil
}

func (d *DefaultClient) ReportTrialObservationLog(
instance *trialsv1beta1.Trial,
observationLog *api_pb.ObservationLog) (*api_pb.ReportObservationLogReply, error) {
request := &api_pb.ReportObservationLogRequest{
TrialName: instance.Name,
ObservationLog: observationLog,
}
reply, err := common.ReportObservationLog(request)
if err != nil {
return nil, err
}
return reply, nil
}
23 changes: 15 additions & 8 deletions pkg/controller.v1beta1/trial/trial_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ package trial

import (
"context"
"errors"
"fmt"
"time"

"github.com/spf13/viper"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -42,6 +43,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

commonapiv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1"
trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1"
"github.com/kubeflow/katib/pkg/controller.v1beta1/consts"
"github.com/kubeflow/katib/pkg/controller.v1beta1/trial/managerclient"
Expand All @@ -57,6 +59,8 @@ var (
log = logf.Log.WithName(ControllerName)
// errMetricsNotReported is the error when Trial job is succeeded but metrics are not reported yet
errMetricsNotReported = fmt.Errorf("metrics are not reported yet")
// errReportMetricsFailed is the error when `unavailable` metrics value can't be inserted to the Katib DB.
errReportMetricsFailed = fmt.Errorf("failed to report unavailable metrics")
)

// Add creates a new Trial Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
Expand Down Expand Up @@ -150,7 +154,7 @@ func (r *ReconcileTrial) Reconcile(ctx context.Context, request reconcile.Reques
original := &trialsv1beta1.Trial{}
err := r.Get(ctx, request.NamespacedName, original)
if err != nil {
if errors.IsNotFound(err) {
if apierrors.IsNotFound(err) {
// Object not found, return. Created objects are automatically garbage collected.
// For additional cleanup logic use finalizers.
return reconcile.Result{}, nil
Expand Down Expand Up @@ -179,7 +183,7 @@ func (r *ReconcileTrial) Reconcile(ctx context.Context, request reconcile.Reques
} else {
err := r.reconcileTrial(instance)
if err != nil {
if err == errMetricsNotReported {
if errors.Is(err, errMetricsNotReported) || errors.Is(err, errReportMetricsFailed) {
return reconcile.Result{
RequeueAfter: time.Second * 1,
}, nil
Expand Down Expand Up @@ -244,9 +248,12 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1beta1.Trial) error {
}
}

// If observation is empty metrics collector doesn't finish.
// For early stopping metrics collector are reported logs before Trial status is changed to EarlyStopped.
if jobStatus.Condition == trialutil.JobSucceeded && instance.Status.Observation == nil {
// If observation is empty, metrics collector doesn't finish.
// For early stopping scenario, metrics collector will report logs before Trial status is changed to EarlyStopped.
// We need to requeue reconcile when the Trial is succeeded, metrics collector's type is not `Push`, and metrics are not reported.
if jobStatus.Condition == trialutil.JobSucceeded &&
instance.Status.Observation == nil &&
instance.Spec.MetricsCollector.Collector.Kind != commonapiv1beta1.PushCollector {
logger.Info("Trial job is succeeded but metrics are not reported, reconcile requeued")
return errMetricsNotReported
}
Expand All @@ -255,7 +262,7 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1beta1.Trial) error {
// if job has succeeded and if observation field is available.
// if job has failed
// This will ensure that trial is set to be complete only if metric is collected at least once
r.UpdateTrialStatusCondition(instance, deployedJob.GetName(), jobStatus)
return r.UpdateTrialStatusCondition(instance, deployedJob.GetName(), jobStatus)
}
return nil
}
Expand All @@ -271,7 +278,7 @@ func (r *ReconcileTrial) reconcileJob(instance *trialsv1beta1.Trial, desiredJob
deployedJob.SetGroupVersionKind(gvk)
err = r.Get(context.TODO(), types.NamespacedName{Name: desiredJob.GetName(), Namespace: desiredJob.GetNamespace()}, deployedJob)
if err != nil {
if errors.IsNotFound(err) {
if apierrors.IsNotFound(err) {
if instance.IsCompleted() {
return nil, nil
}
Expand Down
178 changes: 112 additions & 66 deletions pkg/controller.v1beta1/trial/trial_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package trial

import (
"sync"
"context"
"testing"
"time"

Expand Down Expand Up @@ -48,14 +48,47 @@ import (

const (
namespace = "default"
trialName = "test-trial"
batchJobName = "test-job"
objectiveMetric = "accuracy"
timeout = time.Second * 80
timeout = time.Second * 10
)

var trialKey = types.NamespacedName{Name: trialName, Namespace: namespace}
var batchJobKey = types.NamespacedName{Name: batchJobName, Namespace: namespace}
var (
batchJobKey = types.NamespacedName{Name: batchJobName, Namespace: namespace}
observationLogAvailable = &api_pb.GetObservationLogReply{
ObservationLog: &api_pb.ObservationLog{
MetricLogs: []*api_pb.MetricLog{
{
TimeStamp: "2020-08-10T14:47:38+08:00",
Metric: &api_pb.Metric{
Name: objectiveMetric,
Value: "0.99",
},
},
{
TimeStamp: "2020-08-10T14:50:38+08:00",
Metric: &api_pb.Metric{
Name: objectiveMetric,
Value: "0.11",
},
},
},
},
}
observationLogUnavailable = &api_pb.GetObservationLogReply{
ObservationLog: &api_pb.ObservationLog{
MetricLogs: []*api_pb.MetricLog{
{
Metric: &api_pb.Metric{
Name: objectiveMetric,
Value: consts.UnavailableMetricValue,
},
TimeStamp: time.Time{}.UTC().Format(time.RFC3339),
},
},
},
}
)

func init() {
logf.SetLogger(zap.New(zap.UseDevMode(true)))
Expand Down Expand Up @@ -112,6 +145,7 @@ func TestReconcileBatchJob(t *testing.T) {
// Try to update status until it be succeeded
for err != nil {
updatedInstance := &trialsv1beta1.Trial{}
trialKey := types.NamespacedName{Name: instance.Name, Namespace: namespace}
if err = c.Get(ctx, trialKey, updatedInstance); err != nil {
continue
}
Expand All @@ -134,59 +168,22 @@ func TestReconcileBatchJob(t *testing.T) {
viper.Set(consts.ConfigTrialResources, trialResources)
g.Expect(add(mgr, recFn)).NotTo(gomega.HaveOccurred())

// Start test manager.
wg := &sync.WaitGroup{}
wg.Add(1)
// Start test manager
mgrCtx, cancel := context.WithCancel(context.TODO())
Electronic-Waste marked this conversation as resolved.
Show resolved Hide resolved
t.Cleanup(cancel)
go func() {
defer wg.Done()
g.Expect(mgr.Start(ctx)).NotTo(gomega.HaveOccurred())
g.Expect(mgr.Start(mgrCtx)).NotTo(gomega.HaveOccurred())
}()

// Result for GetTrialObservationLog with some metrics.
observationLogAvailable := &api_pb.GetObservationLogReply{
ObservationLog: &api_pb.ObservationLog{
MetricLogs: []*api_pb.MetricLog{
{
TimeStamp: "2020-08-10T14:47:38+08:00",
Metric: &api_pb.Metric{
Name: objectiveMetric,
Value: "0.99",
},
},
{
TimeStamp: "2020-08-10T14:50:38+08:00",
Metric: &api_pb.Metric{
Name: objectiveMetric,
Value: "0.11",
},
},
},
},
}
// Empty result for GetTrialObservationLog.
// If objective metrics are not parsed, metrics collector reports "unavailable" value to DB.
observationLogUnavailable := &api_pb.GetObservationLogReply{
ObservationLog: &api_pb.ObservationLog{
MetricLogs: []*api_pb.MetricLog{
{
Metric: &api_pb.Metric{
Name: objectiveMetric,
Value: consts.UnavailableMetricValue,
},
TimeStamp: time.Time{}.UTC().Format(time.RFC3339),
},
},
},
}

t.Run(`Trial run with "Failed" BatchJob.`, func(t *testing.T) {
g := gomega.NewGomegaWithT(t)
mockManagerClient.EXPECT().DeleteTrialObservationLog(gomock.Any()).Return(nil, nil)

trial := newFakeTrialBatchJob()
trial := newFakeTrialBatchJob(commonv1beta1.StdOutCollector, "test-failed-batch-job")
trialKey := types.NamespacedName{Name: "test-failed-batch-job", Namespace: namespace}
batchJob := &batchv1.Job{}

// Create the Trial
// Create the Trial with StdOut MC
g.Expect(c.Create(ctx, trial)).NotTo(gomega.HaveOccurred())

// Expect that BatchJob with appropriate name is created
Expand Down Expand Up @@ -239,7 +236,7 @@ func TestReconcileBatchJob(t *testing.T) {
}, timeout).Should(gomega.BeTrue())
})

t.Run(`Trail with "Complete" BatchJob and Available metrics.`, func(t *testing.T) {
t.Run(`Trial with "Complete" BatchJob and Available metrics.`, func(t *testing.T) {
g := gomega.NewGomegaWithT(t)
gomock.InOrder(
mockManagerClient.EXPECT().GetTrialObservationLog(gomock.Any()).Return(observationLogAvailable, nil).MinTimes(1),
Expand All @@ -262,8 +259,9 @@ func TestReconcileBatchJob(t *testing.T) {
}
g.Expect(c.Status().Update(ctx, batchJob)).NotTo(gomega.HaveOccurred())

// Create the Trial
trial := newFakeTrialBatchJob()
// Create the Trial with StdOut MC
trial := newFakeTrialBatchJob(commonv1beta1.StdOutCollector, "test-available-stdout")
trialKey := types.NamespacedName{Name: "test-available-stdout", Namespace: namespace}
g.Expect(c.Create(ctx, trial)).NotTo(gomega.HaveOccurred())

// Expect that Trial status is succeeded and metrics are properly populated
Expand All @@ -290,28 +288,71 @@ func TestReconcileBatchJob(t *testing.T) {
}, timeout).Should(gomega.BeTrue())
})

t.Run(`Trail with "Complete" BatchJob and Unavailable metrics.`, func(t *testing.T) {
t.Run(`Trial with "Complete" BatchJob and Unavailable metrics(StdOut MC).`, func(t *testing.T) {
g := gomega.NewGomegaWithT(t)
gomock.InOrder(
mockManagerClient.EXPECT().GetTrialObservationLog(gomock.Any()).Return(observationLogUnavailable, nil).MinTimes(1),
mockManagerClient.EXPECT().DeleteTrialObservationLog(gomock.Any()).Return(nil, nil),
)
// Create the Trial
trial := newFakeTrialBatchJob()
// Create the Trial with StdOut MC
trial := newFakeTrialBatchJob(commonv1beta1.StdOutCollector, "test-unavailable-stdout")
trialKey := types.NamespacedName{Name: "test-unavailable-stdout", Namespace: namespace}
g.Expect(c.Create(ctx, trial)).NotTo(gomega.HaveOccurred())

// Expect that Trial status is succeeded with "false" status and "metrics unavailable" reason.
// Metrics unavailable because GetTrialObservationLog returns "unavailable".
g.Eventually(func(g gomega.Gomega) {
g.Expect(c.Get(ctx, trialKey, trial)).Should(gomega.Succeed())
g.Expect(trial.IsMetricsUnavailable()).Should(gomega.BeTrue())
g.Expect(trial.Status.Observation.Metrics).ShouldNot(gomega.HaveLen(0))
g.Expect(trial.Status.Observation.Metrics[0]).Should(gomega.BeComparableTo(commonv1beta1.Metric{
Name: objectiveMetric,
Min: consts.UnavailableMetricValue,
Max: consts.UnavailableMetricValue,
Latest: consts.UnavailableMetricValue,
}))
}, timeout).Should(gomega.Succeed())

// Delete the Trial
g.Expect(c.Delete(ctx, trial)).NotTo(gomega.HaveOccurred())

// Expect that Trial is deleted
g.Eventually(func() bool {
if err = c.Get(ctx, trialKey, trial); err != nil {
return false
}
return trial.IsMetricsUnavailable() &&
len(trial.Status.Observation.Metrics) > 0 &&
trial.Status.Observation.Metrics[0].Min == consts.UnavailableMetricValue &&
trial.Status.Observation.Metrics[0].Max == consts.UnavailableMetricValue &&
trial.Status.Observation.Metrics[0].Latest == consts.UnavailableMetricValue
return errors.IsNotFound(c.Get(ctx, trialKey, &trialsv1beta1.Trial{}))
}, timeout).Should(gomega.BeTrue())
})

t.Run(`Trial with "Complete" BatchJob and Unavailable metrics(Push MC, failed once).`, func(t *testing.T) {
mockCtrl.Finish()
g := gomega.NewGomegaWithT(t)
gomock.InOrder(
mockManagerClient.EXPECT().GetTrialObservationLog(gomock.Any()).Return(observationLogUnavailable, nil),
mockManagerClient.EXPECT().ReportTrialObservationLog(gomock.Any(), gomock.Any()).Return(nil, errReportMetricsFailed),
mockManagerClient.EXPECT().GetTrialObservationLog(gomock.Any()).Return(observationLogUnavailable, nil),
mockManagerClient.EXPECT().ReportTrialObservationLog(gomock.Any(), gomock.Any()).Return(nil, nil),
mockManagerClient.EXPECT().DeleteTrialObservationLog(gomock.Any()).Return(nil, nil),
)
mockManagerClient.EXPECT().GetTrialObservationLog(gomock.Any()).Return(observationLogUnavailable, nil).AnyTimes()
mockManagerClient.EXPECT().ReportTrialObservationLog(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()

// Create the Trial with Push MC
trial := newFakeTrialBatchJob(commonv1beta1.PushCollector, "test-unavailable-push-failed-once")
trialKey := types.NamespacedName{Name: "test-unavailable-push-failed-once", Namespace: namespace}
g.Expect(c.Create(ctx, trial)).NotTo(gomega.HaveOccurred())

// Expect that Trial status is succeeded with "false" status and "metrics unavailable" reason.
// Metrics unavailable because GetTrialObservationLog returns "unavailable".
g.Eventually(func(g gomega.Gomega) {
g.Expect(c.Get(ctx, trialKey, trial)).Should(gomega.Succeed())
g.Expect(trial.IsMetricsUnavailable()).Should(gomega.BeTrue())
g.Expect(trial.Status.Observation.Metrics).ShouldNot(gomega.HaveLen(0))
g.Expect(trial.Status.Observation.Metrics[0]).Should(gomega.BeComparableTo(commonv1beta1.Metric{
Name: objectiveMetric,
Min: consts.UnavailableMetricValue,
Max: consts.UnavailableMetricValue,
Latest: consts.UnavailableMetricValue,
}))
}, timeout).Should(gomega.Succeed())

// Delete the Trial
g.Expect(c.Delete(ctx, trial)).NotTo(gomega.HaveOccurred())
Expand Down Expand Up @@ -386,7 +427,7 @@ func TestGetObjectiveMetricValue(t *testing.T) {
g.Expect(err).To(gomega.HaveOccurred())
}

func newFakeTrialBatchJob() *trialsv1beta1.Trial {
func newFakeTrialBatchJob(mcType commonv1beta1.CollectorKind, trialName string) *trialsv1beta1.Trial {
primaryContainer := "training-container"

job := &batchv1.Job{
Expand Down Expand Up @@ -429,8 +470,13 @@ func newFakeTrialBatchJob() *trialsv1beta1.Trial {
},
Spec: trialsv1beta1.TrialSpec{
PrimaryContainerName: primaryContainer,
SuccessCondition: experimentsv1beta1.DefaultJobSuccessCondition,
FailureCondition: experimentsv1beta1.DefaultJobFailureCondition,
MetricsCollector: commonv1beta1.MetricsCollectorSpec{
Collector: &commonv1beta1.CollectorSpec{
Kind: mcType,
},
},
SuccessCondition: experimentsv1beta1.DefaultJobSuccessCondition,
FailureCondition: experimentsv1beta1.DefaultJobFailureCondition,
Objective: &commonv1beta1.ObjectiveSpec{
ObjectiveMetricName: objectiveMetric,
MetricStrategies: []commonv1beta1.MetricStrategy{
Expand Down
Loading