diff --git a/tfx/orchestration/experimental/core/pipeline_ops.py b/tfx/orchestration/experimental/core/pipeline_ops.py index 76188665f9e..0e5ed9fc564 100644 --- a/tfx/orchestration/experimental/core/pipeline_ops.py +++ b/tfx/orchestration/experimental/core/pipeline_ops.py @@ -1298,9 +1298,15 @@ def orchestrate( if filter_fn is None: filter_fn = lambda _: True - all_pipeline_states = pstate.PipelineState.load_all_active_and_owned( - mlmd_connection_manager.primary_mlmd_handle - ) + # Try to load active pipelines. If there is a recoverable error, return False + # and then retry in the next orchestration iteration. + try: + all_pipeline_states = pstate.PipelineState.load_all_active_and_owned( + mlmd_connection_manager.primary_mlmd_handle + ) + except Exception as e: # pylint: disable=broad-except + raise e + pipeline_states = [s for s in all_pipeline_states if filter_fn(s)] if not pipeline_states: logging.info('No active pipelines to run.') diff --git a/tfx/orchestration/experimental/core/pipeline_ops_test.py b/tfx/orchestration/experimental/core/pipeline_ops_test.py index 56bb1151871..0d58e8c00b3 100644 --- a/tfx/orchestration/experimental/core/pipeline_ops_test.py +++ b/tfx/orchestration/experimental/core/pipeline_ops_test.py @@ -54,6 +54,7 @@ from tfx.types import standard_artifacts from tfx.utils import status as status_lib +from ml_metadata import errors as mlmd_errors from ml_metadata.proto import metadata_store_pb2 @@ -3590,5 +3591,7 @@ def test_orchestrate_pipelines_with_specified_pipeline_uid( self.assertTrue(task_queue.is_empty()) + + if __name__ == '__main__': tf.test.main()