diff --git a/pkg/common/v1beta1/katib_manager_util.go b/pkg/common/v1beta1/katib_manager_util.go index 9e6dd819e4c..3345f45b124 100644 --- a/pkg/common/v1beta1/katib_manager_util.go +++ b/pkg/common/v1beta1/katib_manager_util.go @@ -72,6 +72,17 @@ func GetObservationLog(request *api_pb.GetObservationLogRequest) (*api_pb.GetObs return kc.GetObservationLog(ctx, request) } +func ReportObservationLog(request *api_pb.ReportObservationLogRequest) (*api_pb.ReportObservationLogReply, error) { + ctx := context.Background() + kcc, err := getKatibDBManagerClientAndConn() + if err != nil { + return nil, err + } + defer closeKatibDBManagerConnection(kcc) + kc := kcc.KatibDBManagerClient + return kc.ReportObservationLog(ctx, request) +} + func DeleteObservationLog(request *api_pb.DeleteObservationLogRequest) (*api_pb.DeleteObservationLogReply, error) { ctx := context.Background() kcc, err := getKatibDBManagerClientAndConn() diff --git a/pkg/controller.v1beta1/trial/managerclient/managerclient.go b/pkg/controller.v1beta1/trial/managerclient/managerclient.go index 656ed8248d8..8250aa5d6aa 100644 --- a/pkg/controller.v1beta1/trial/managerclient/managerclient.go +++ b/pkg/controller.v1beta1/trial/managerclient/managerclient.go @@ -28,6 +28,9 @@ type ManagerClient interface { instance *trialsv1beta1.Trial) (*api_pb.GetObservationLogReply, error) DeleteTrialObservationLog( instance *trialsv1beta1.Trial) (*api_pb.DeleteObservationLogReply, error) + ReportTrialObservationLog( + instance *trialsv1beta1.Trial, + observationLogs *api_pb.ObservationLog) (*api_pb.ReportObservationLogReply, error) } // DefaultClient implements the Client interface. @@ -88,3 +91,17 @@ func (d *DefaultClient) DeleteTrialObservationLog( } return reply, nil } + +func (d *DefaultClient) ReportTrialObservationLog( + instance *trialsv1beta1.Trial, + observationLog *api_pb.ObservationLog) (*api_pb.ReportObservationLogReply, error) { + request := &api_pb.ReportObservationLogRequest{ + TrialName: instance.Name, + ObservationLog: observationLog, + } + reply, err := common.ReportObservationLog(request) + if err != nil { + return nil, err + } + return reply, nil +} diff --git a/pkg/controller.v1beta1/trial/trial_controller.go b/pkg/controller.v1beta1/trial/trial_controller.go index 77652331f56..1a8b5084dcf 100644 --- a/pkg/controller.v1beta1/trial/trial_controller.go +++ b/pkg/controller.v1beta1/trial/trial_controller.go @@ -18,13 +18,14 @@ package trial import ( "context" + "errors" "fmt" "time" "github.com/spf13/viper" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" - "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -42,6 +43,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + commonapiv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1" trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1" "github.com/kubeflow/katib/pkg/controller.v1beta1/consts" "github.com/kubeflow/katib/pkg/controller.v1beta1/trial/managerclient" @@ -57,6 +59,8 @@ var ( log = logf.Log.WithName(ControllerName) // errMetricsNotReported is the error when Trial job is succeeded but metrics are not reported yet errMetricsNotReported = fmt.Errorf("metrics are not reported yet") + // errReportMetricsFailed is the error when `unavailable` metrics value can't be inserted to the Katib DB. + errReportMetricsFailed = fmt.Errorf("failed to report unavailable metrics") ) // Add creates a new Trial Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller @@ -150,7 +154,7 @@ func (r *ReconcileTrial) Reconcile(ctx context.Context, request reconcile.Reques original := &trialsv1beta1.Trial{} err := r.Get(ctx, request.NamespacedName, original) if err != nil { - if errors.IsNotFound(err) { + if apierrors.IsNotFound(err) { // Object not found, return. Created objects are automatically garbage collected. // For additional cleanup logic use finalizers. return reconcile.Result{}, nil @@ -179,7 +183,7 @@ func (r *ReconcileTrial) Reconcile(ctx context.Context, request reconcile.Reques } else { err := r.reconcileTrial(instance) if err != nil { - if err == errMetricsNotReported { + if errors.Is(err, errMetricsNotReported) || errors.Is(err, errReportMetricsFailed) { return reconcile.Result{ RequeueAfter: time.Second * 1, }, nil @@ -244,9 +248,12 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1beta1.Trial) error { } } - // If observation is empty metrics collector doesn't finish. - // For early stopping metrics collector are reported logs before Trial status is changed to EarlyStopped. - if jobStatus.Condition == trialutil.JobSucceeded && instance.Status.Observation == nil { + // If observation is empty, metrics collector doesn't finish. + // For early stopping scenario, metrics collector will report logs before Trial status is changed to EarlyStopped. + // We need to requeue reconcile when the Trial is succeeded, metrics collector's type is not `Push`, and metrics are not reported. + if jobStatus.Condition == trialutil.JobSucceeded && + instance.Status.Observation == nil && + instance.Spec.MetricsCollector.Collector.Kind != commonapiv1beta1.PushCollector { logger.Info("Trial job is succeeded but metrics are not reported, reconcile requeued") return errMetricsNotReported } @@ -255,7 +262,7 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1beta1.Trial) error { // if job has succeeded and if observation field is available. // if job has failed // This will ensure that trial is set to be complete only if metric is collected at least once - r.UpdateTrialStatusCondition(instance, deployedJob.GetName(), jobStatus) + return r.UpdateTrialStatusCondition(instance, deployedJob.GetName(), jobStatus) } return nil } @@ -271,7 +278,7 @@ func (r *ReconcileTrial) reconcileJob(instance *trialsv1beta1.Trial, desiredJob deployedJob.SetGroupVersionKind(gvk) err = r.Get(context.TODO(), types.NamespacedName{Name: desiredJob.GetName(), Namespace: desiredJob.GetNamespace()}, deployedJob) if err != nil { - if errors.IsNotFound(err) { + if apierrors.IsNotFound(err) { if instance.IsCompleted() { return nil, nil } diff --git a/pkg/controller.v1beta1/trial/trial_controller_test.go b/pkg/controller.v1beta1/trial/trial_controller_test.go index 558cdba563e..00ac7764b5e 100644 --- a/pkg/controller.v1beta1/trial/trial_controller_test.go +++ b/pkg/controller.v1beta1/trial/trial_controller_test.go @@ -17,7 +17,7 @@ limitations under the License. package trial import ( - "sync" + "context" "testing" "time" @@ -48,14 +48,47 @@ import ( const ( namespace = "default" - trialName = "test-trial" batchJobName = "test-job" objectiveMetric = "accuracy" - timeout = time.Second * 80 + timeout = time.Second * 10 ) -var trialKey = types.NamespacedName{Name: trialName, Namespace: namespace} -var batchJobKey = types.NamespacedName{Name: batchJobName, Namespace: namespace} +var ( + batchJobKey = types.NamespacedName{Name: batchJobName, Namespace: namespace} + observationLogAvailable = &api_pb.GetObservationLogReply{ + ObservationLog: &api_pb.ObservationLog{ + MetricLogs: []*api_pb.MetricLog{ + { + TimeStamp: "2020-08-10T14:47:38+08:00", + Metric: &api_pb.Metric{ + Name: objectiveMetric, + Value: "0.99", + }, + }, + { + TimeStamp: "2020-08-10T14:50:38+08:00", + Metric: &api_pb.Metric{ + Name: objectiveMetric, + Value: "0.11", + }, + }, + }, + }, + } + observationLogUnavailable = &api_pb.GetObservationLogReply{ + ObservationLog: &api_pb.ObservationLog{ + MetricLogs: []*api_pb.MetricLog{ + { + Metric: &api_pb.Metric{ + Name: objectiveMetric, + Value: consts.UnavailableMetricValue, + }, + TimeStamp: time.Time{}.UTC().Format(time.RFC3339), + }, + }, + }, + } +) func init() { logf.SetLogger(zap.New(zap.UseDevMode(true))) @@ -112,6 +145,7 @@ func TestReconcileBatchJob(t *testing.T) { // Try to update status until it be succeeded for err != nil { updatedInstance := &trialsv1beta1.Trial{} + trialKey := types.NamespacedName{Name: instance.Name, Namespace: namespace} if err = c.Get(ctx, trialKey, updatedInstance); err != nil { continue } @@ -134,59 +168,22 @@ func TestReconcileBatchJob(t *testing.T) { viper.Set(consts.ConfigTrialResources, trialResources) g.Expect(add(mgr, recFn)).NotTo(gomega.HaveOccurred()) - // Start test manager. - wg := &sync.WaitGroup{} - wg.Add(1) + // Start test manager + mgrCtx, cancel := context.WithCancel(context.TODO()) + t.Cleanup(cancel) go func() { - defer wg.Done() - g.Expect(mgr.Start(ctx)).NotTo(gomega.HaveOccurred()) + g.Expect(mgr.Start(mgrCtx)).NotTo(gomega.HaveOccurred()) }() - // Result for GetTrialObservationLog with some metrics. - observationLogAvailable := &api_pb.GetObservationLogReply{ - ObservationLog: &api_pb.ObservationLog{ - MetricLogs: []*api_pb.MetricLog{ - { - TimeStamp: "2020-08-10T14:47:38+08:00", - Metric: &api_pb.Metric{ - Name: objectiveMetric, - Value: "0.99", - }, - }, - { - TimeStamp: "2020-08-10T14:50:38+08:00", - Metric: &api_pb.Metric{ - Name: objectiveMetric, - Value: "0.11", - }, - }, - }, - }, - } - // Empty result for GetTrialObservationLog. - // If objective metrics are not parsed, metrics collector reports "unavailable" value to DB. - observationLogUnavailable := &api_pb.GetObservationLogReply{ - ObservationLog: &api_pb.ObservationLog{ - MetricLogs: []*api_pb.MetricLog{ - { - Metric: &api_pb.Metric{ - Name: objectiveMetric, - Value: consts.UnavailableMetricValue, - }, - TimeStamp: time.Time{}.UTC().Format(time.RFC3339), - }, - }, - }, - } - t.Run(`Trial run with "Failed" BatchJob.`, func(t *testing.T) { g := gomega.NewGomegaWithT(t) mockManagerClient.EXPECT().DeleteTrialObservationLog(gomock.Any()).Return(nil, nil) - trial := newFakeTrialBatchJob() + trial := newFakeTrialBatchJob(commonv1beta1.StdOutCollector, "test-failed-batch-job") + trialKey := types.NamespacedName{Name: "test-failed-batch-job", Namespace: namespace} batchJob := &batchv1.Job{} - // Create the Trial + // Create the Trial with StdOut MC g.Expect(c.Create(ctx, trial)).NotTo(gomega.HaveOccurred()) // Expect that BatchJob with appropriate name is created @@ -239,7 +236,7 @@ func TestReconcileBatchJob(t *testing.T) { }, timeout).Should(gomega.BeTrue()) }) - t.Run(`Trail with "Complete" BatchJob and Available metrics.`, func(t *testing.T) { + t.Run(`Trial with "Complete" BatchJob and Available metrics.`, func(t *testing.T) { g := gomega.NewGomegaWithT(t) gomock.InOrder( mockManagerClient.EXPECT().GetTrialObservationLog(gomock.Any()).Return(observationLogAvailable, nil).MinTimes(1), @@ -262,8 +259,9 @@ func TestReconcileBatchJob(t *testing.T) { } g.Expect(c.Status().Update(ctx, batchJob)).NotTo(gomega.HaveOccurred()) - // Create the Trial - trial := newFakeTrialBatchJob() + // Create the Trial with StdOut MC + trial := newFakeTrialBatchJob(commonv1beta1.StdOutCollector, "test-available-stdout") + trialKey := types.NamespacedName{Name: "test-available-stdout", Namespace: namespace} g.Expect(c.Create(ctx, trial)).NotTo(gomega.HaveOccurred()) // Expect that Trial status is succeeded and metrics are properly populated @@ -290,28 +288,71 @@ func TestReconcileBatchJob(t *testing.T) { }, timeout).Should(gomega.BeTrue()) }) - t.Run(`Trail with "Complete" BatchJob and Unavailable metrics.`, func(t *testing.T) { + t.Run(`Trial with "Complete" BatchJob and Unavailable metrics(StdOut MC).`, func(t *testing.T) { g := gomega.NewGomegaWithT(t) gomock.InOrder( mockManagerClient.EXPECT().GetTrialObservationLog(gomock.Any()).Return(observationLogUnavailable, nil).MinTimes(1), mockManagerClient.EXPECT().DeleteTrialObservationLog(gomock.Any()).Return(nil, nil), ) - // Create the Trial - trial := newFakeTrialBatchJob() + // Create the Trial with StdOut MC + trial := newFakeTrialBatchJob(commonv1beta1.StdOutCollector, "test-unavailable-stdout") + trialKey := types.NamespacedName{Name: "test-unavailable-stdout", Namespace: namespace} g.Expect(c.Create(ctx, trial)).NotTo(gomega.HaveOccurred()) // Expect that Trial status is succeeded with "false" status and "metrics unavailable" reason. // Metrics unavailable because GetTrialObservationLog returns "unavailable". + g.Eventually(func(g gomega.Gomega) { + g.Expect(c.Get(ctx, trialKey, trial)).Should(gomega.Succeed()) + g.Expect(trial.IsMetricsUnavailable()).Should(gomega.BeTrue()) + g.Expect(trial.Status.Observation.Metrics).ShouldNot(gomega.HaveLen(0)) + g.Expect(trial.Status.Observation.Metrics[0]).Should(gomega.BeComparableTo(commonv1beta1.Metric{ + Name: objectiveMetric, + Min: consts.UnavailableMetricValue, + Max: consts.UnavailableMetricValue, + Latest: consts.UnavailableMetricValue, + })) + }, timeout).Should(gomega.Succeed()) + + // Delete the Trial + g.Expect(c.Delete(ctx, trial)).NotTo(gomega.HaveOccurred()) + + // Expect that Trial is deleted g.Eventually(func() bool { - if err = c.Get(ctx, trialKey, trial); err != nil { - return false - } - return trial.IsMetricsUnavailable() && - len(trial.Status.Observation.Metrics) > 0 && - trial.Status.Observation.Metrics[0].Min == consts.UnavailableMetricValue && - trial.Status.Observation.Metrics[0].Max == consts.UnavailableMetricValue && - trial.Status.Observation.Metrics[0].Latest == consts.UnavailableMetricValue + return errors.IsNotFound(c.Get(ctx, trialKey, &trialsv1beta1.Trial{})) }, timeout).Should(gomega.BeTrue()) + }) + + t.Run(`Trial with "Complete" BatchJob and Unavailable metrics(Push MC, failed once).`, func(t *testing.T) { + mockCtrl.Finish() + g := gomega.NewGomegaWithT(t) + gomock.InOrder( + mockManagerClient.EXPECT().GetTrialObservationLog(gomock.Any()).Return(observationLogUnavailable, nil), + mockManagerClient.EXPECT().ReportTrialObservationLog(gomock.Any(), gomock.Any()).Return(nil, errReportMetricsFailed), + mockManagerClient.EXPECT().GetTrialObservationLog(gomock.Any()).Return(observationLogUnavailable, nil), + mockManagerClient.EXPECT().ReportTrialObservationLog(gomock.Any(), gomock.Any()).Return(nil, nil), + mockManagerClient.EXPECT().DeleteTrialObservationLog(gomock.Any()).Return(nil, nil), + ) + mockManagerClient.EXPECT().GetTrialObservationLog(gomock.Any()).Return(observationLogUnavailable, nil).AnyTimes() + mockManagerClient.EXPECT().ReportTrialObservationLog(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + + // Create the Trial with Push MC + trial := newFakeTrialBatchJob(commonv1beta1.PushCollector, "test-unavailable-push-failed-once") + trialKey := types.NamespacedName{Name: "test-unavailable-push-failed-once", Namespace: namespace} + g.Expect(c.Create(ctx, trial)).NotTo(gomega.HaveOccurred()) + + // Expect that Trial status is succeeded with "false" status and "metrics unavailable" reason. + // Metrics unavailable because GetTrialObservationLog returns "unavailable". + g.Eventually(func(g gomega.Gomega) { + g.Expect(c.Get(ctx, trialKey, trial)).Should(gomega.Succeed()) + g.Expect(trial.IsMetricsUnavailable()).Should(gomega.BeTrue()) + g.Expect(trial.Status.Observation.Metrics).ShouldNot(gomega.HaveLen(0)) + g.Expect(trial.Status.Observation.Metrics[0]).Should(gomega.BeComparableTo(commonv1beta1.Metric{ + Name: objectiveMetric, + Min: consts.UnavailableMetricValue, + Max: consts.UnavailableMetricValue, + Latest: consts.UnavailableMetricValue, + })) + }, timeout).Should(gomega.Succeed()) // Delete the Trial g.Expect(c.Delete(ctx, trial)).NotTo(gomega.HaveOccurred()) @@ -386,7 +427,7 @@ func TestGetObjectiveMetricValue(t *testing.T) { g.Expect(err).To(gomega.HaveOccurred()) } -func newFakeTrialBatchJob() *trialsv1beta1.Trial { +func newFakeTrialBatchJob(mcType commonv1beta1.CollectorKind, trialName string) *trialsv1beta1.Trial { primaryContainer := "training-container" job := &batchv1.Job{ @@ -429,8 +470,13 @@ func newFakeTrialBatchJob() *trialsv1beta1.Trial { }, Spec: trialsv1beta1.TrialSpec{ PrimaryContainerName: primaryContainer, - SuccessCondition: experimentsv1beta1.DefaultJobSuccessCondition, - FailureCondition: experimentsv1beta1.DefaultJobFailureCondition, + MetricsCollector: commonv1beta1.MetricsCollectorSpec{ + Collector: &commonv1beta1.CollectorSpec{ + Kind: mcType, + }, + }, + SuccessCondition: experimentsv1beta1.DefaultJobSuccessCondition, + FailureCondition: experimentsv1beta1.DefaultJobFailureCondition, Objective: &commonv1beta1.ObjectiveSpec{ ObjectiveMetricName: objectiveMetric, MetricStrategies: []commonv1beta1.MetricStrategy{ diff --git a/pkg/controller.v1beta1/trial/trial_controller_util.go b/pkg/controller.v1beta1/trial/trial_controller_util.go index 96341275a0e..ddfc0b11628 100644 --- a/pkg/controller.v1beta1/trial/trial_controller_util.go +++ b/pkg/controller.v1beta1/trial/trial_controller_util.go @@ -39,7 +39,7 @@ const ( ) // UpdateTrialStatusCondition updates Trial status from current deployed Job status -func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1beta1.Trial, deployedJobName string, jobStatus *trialutil.TrialJobStatus) { +func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1beta1.Trial, deployedJobName string, jobStatus *trialutil.TrialJobStatus) error { logger := log.WithValues("Trial", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) timeNow := metav1.Now() @@ -70,6 +70,15 @@ func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1beta1.Tria msg := "Metrics are not available" reason := TrialMetricsUnavailableReason + // If the type of metrics collector is Push, We should insert an unavailable value to Katib DB. + // We would retry reconciliation if some error occurs while we report unavailable metrics. + if instance.Spec.MetricsCollector.Collector.Kind == commonv1beta1.PushCollector { + if err := r.reportUnavailableMetrics(instance); err != nil { + logger.Error(err, "Failed to insert unavailable value to Katib DB") + return fmt.Errorf("%w: %w", errReportMetricsFailed, err) + } + } + // Get message and reason from deployed job if jobStatus.Message != "" { msg = fmt.Sprintf("%v. Job message: %v", msg, jobStatus.Message) @@ -119,6 +128,7 @@ func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1beta1.Tria // TODO(gaocegege): Should we maintain a TrialsRunningCount? } // else nothing to do + return nil } func (r *ReconcileTrial) UpdateTrialStatusObservation(instance *trialsv1beta1.Trial) error { @@ -162,6 +172,23 @@ func (r *ReconcileTrial) updateFinalizers(instance *trialsv1beta1.Trial, finaliz } } +func (r *ReconcileTrial) reportUnavailableMetrics(instance *trialsv1beta1.Trial) error { + observationLog := &api_pb.ObservationLog{ + MetricLogs: []*api_pb.MetricLog{ + { + TimeStamp: time.Time{}.UTC().Format(time.RFC3339), + Metric: &api_pb.Metric{ + Name: instance.Spec.Objective.ObjectiveMetricName, + Value: consts.UnavailableMetricValue, + }, + }, + }, + } + _, err := r.ReportTrialObservationLog(instance, observationLog) + + return err +} + func getMetrics(metricLogs []*api_pb.MetricLog, strategies []commonv1beta1.MetricStrategy) (*commonv1beta1.Observation, error) { metrics := make(map[string]*commonv1beta1.Metric) timestamps := make(map[string]*time.Time) diff --git a/pkg/mock/v1beta1/trial/managerclient/katibmanager.go b/pkg/mock/v1beta1/trial/managerclient/katibmanager.go index 4706cc1d877..159a5ade746 100644 --- a/pkg/mock/v1beta1/trial/managerclient/katibmanager.go +++ b/pkg/mock/v1beta1/trial/managerclient/katibmanager.go @@ -69,3 +69,18 @@ func (mr *MockManagerClientMockRecorder) GetTrialObservationLog(arg0 any) *gomoc mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTrialObservationLog", reflect.TypeOf((*MockManagerClient)(nil).GetTrialObservationLog), arg0) } + +// ReportTrialObservationLog mocks base method. +func (m *MockManagerClient) ReportTrialObservationLog(arg0 *v1beta1.Trial, arg1 *api_v1_beta1.ObservationLog) (*api_v1_beta1.ReportObservationLogReply, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReportTrialObservationLog", arg0, arg1) + ret0, _ := ret[0].(*api_v1_beta1.ReportObservationLogReply) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReportTrialObservationLog indicated an expected call of ReportTrialObservationLog. +func (mr *MockManagerClientMockRecorder) ReportTrialObservationLog(arg0 any, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportTrialObservationLog", reflect.TypeOf((*MockManagerClient)(nil).ReportTrialObservationLog), arg0, arg1) +}