diff --git a/.gitignore b/.gitignore index e8fc92e..9eb0e48 100644 --- a/.gitignore +++ b/.gitignore @@ -132,3 +132,4 @@ dmypy.json local/ testscripts/ .idea/* +/benchmarking/ diff --git a/docs/README.methods.md b/docs/README.methods.md index 0c2c977..5fb42cd 100644 --- a/docs/README.methods.md +++ b/docs/README.methods.md @@ -53,75 +53,4 @@ A typical workflow written using `fsspark` can be divided roughly in four major ### 5. Feature selection pipeline example -Check it out [here](fsspark/pipeline/fs_corr_rf.py) a full FS pipeline example. - -```python -""" -Example of a feature selection pipeline implemented in fsspark. - -After data import and pre-processing, the pipeline applies univariate correlation filter, -multivariate correlation filter and Randon Forest classification. - -""" - -from fsspark.config.context import init_spark, stop_spark_session -from fsspark.fs.core import FSDataFrame -from fsspark.fs.ml import cv_rf_classification, get_accuracy, get_predictions -from fsspark.fs.multivariate import multivariate_filter -from fsspark.fs.univariate import univariate_filter -from fsspark.fs.utils import (remove_features_by_missingness_rate, - impute_missing) -from fsspark.utils.datasets import get_tnbc_data_path -from fsspark.utils.io import import_table_as_psdf - -# Init spark -init_spark() - -# Import data -fsdf = import_table_as_psdf(get_tnbc_data_path(), - n_partitions=5) - -fsdf = FSDataFrame(fsdf, sample_col='Sample', label_col='label') - -# Step 1. Data pre-processing. - -# a) Filter missingness rate -fsdf = remove_features_by_missingness_rate(fsdf, threshold=0.1) - -# b) Impute data frame -fsdf = impute_missing(fsdf) - -# c) Scale features -fsdf = fsdf.scale_features(scaler_method='standard') - -# Split dataset in training/testing -training_df, testing_df = fsdf.split_df(label_type_cat=True, - split_training_factor=0.8) - -# Step 2. Apply univariate correlation filter -training_df = univariate_filter(training_df, - univariate_method='u_corr', - corr_threshold=0.3) - -# Step 3. Apply multivariate correlation filter -training_df = multivariate_filter(training_df, - multivariate_method='m_corr', - corr_threshold=0.7 - ) - -# Step 4. ML-algorithm with cross-validation -cv_model = cv_rf_classification(training_df, - binary_classification=False) - -# Print out some stats - -# Get accuracy from training -acc = get_accuracy(model=cv_model) -print(f"Training accuracy: {acc}") - -# Get predictions from training -pred = get_predictions(model=cv_model) -pred.show() - -stop_spark_session() -``` +[FS pipeline example](../fsspark/pipeline/fs_pipeline_example.py) diff --git a/fsspark/config/context.py b/fsspark/config/context.py index 5b8a9cf..2cc5a50 100644 --- a/fsspark/config/context.py +++ b/fsspark/config/context.py @@ -6,12 +6,14 @@ PYARROW_SETTINGS, PANDAS_ON_SPARK_API_SETTINGS) - os.environ['PYARROW_IGNORE_TIMEZONE'] = "1" + + # os.environ['JAVA_HOME'] = "/Library/Java/JavaVirtualMachines/jdk1.8.0_162.jdk/Contents/Home" # os.environ['SPARK_HOME'] = "/usr/local/spark-3.3.0-bin-hadoop3" -def init_spark(apply_pyarrow_settings: bool = True, +def init_spark(master: str = "local[8]", + apply_pyarrow_settings: bool = True, apply_extra_spark_settings: bool = True, apply_pandas_settings: bool = True) -> SparkSession: """ @@ -24,7 +26,7 @@ def init_spark(apply_pyarrow_settings: bool = True, # init or get spark session. spark = (SparkSession.builder - .master("local[8]") + .master(master) .appName("fs-spark") ) diff --git a/fsspark/fs/constants.py b/fsspark/fs/constants.py new file mode 100644 index 0000000..27eb6e3 --- /dev/null +++ b/fsspark/fs/constants.py @@ -0,0 +1,53 @@ +# Define constants for the project + + +# Define univariate feature selection methods constants +ANOVA = 'anova' +UNIVARIATE_CORRELATION = 'u_corr' +F_REGRESSION = 'f_regression' + +# Define dict with univariate feature selection methods and brief description +UNIVARIATE_METHODS = { + ANOVA: 'ANOVA univariate feature selection (F-classification)', + UNIVARIATE_CORRELATION: 'Univariate Correlation', + F_REGRESSION: 'Univariate F-regression' +} + +# Define multivariate feature selection methods constants +MULTIVARIATE_CORRELATION = 'm_corr' +MULTIVARIATE_VARIANCE = 'variance' + +# Define dict with multivariate feature selection methods and brief description +MULTIVARIATE_METHODS = { + MULTIVARIATE_CORRELATION: 'Multivariate Correlation', + MULTIVARIATE_VARIANCE: 'Multivariate Variance' +} + +# Define machine learning wrapper methods constants + +# binary classification +RF_BINARY = 'rf_binary' +LSVC_BINARY = 'lsvc_binary' +FM_BINARY = 'fm_binary' # TODO: implement this method + +# multilabel classification +RF_MULTILABEL = 'rf_multilabel' +LR_MULTILABEL = 'lg_multilabel' # TODO: implement this method + +# regression +RF_REGRESSION = 'rf_regression' +FM_REGRESSION = 'fm_regression' # TODO: implement this method + + +# Define dict with machine learning wrapper methods and brief description +ML_METHODS = { + RF_BINARY: 'Random Forest Binary Classifier', + LSVC_BINARY: 'Linear SVC Binary Classifier', + FM_BINARY: 'Factorization Machine Binary Classifier', + + RF_MULTILABEL: 'Random Forest Multi-label Classifier', + LR_MULTILABEL: 'Logistic Regression Multi-label Classifier', + + RF_REGRESSION: 'Random Forest Regression', + FM_REGRESSION: 'Factorization Machine Regression' +} diff --git a/fsspark/fs/core.py b/fsspark/fs/core.py index b9e86aa..8c59fce 100644 --- a/fsspark/fs/core.py +++ b/fsspark/fs/core.py @@ -179,14 +179,6 @@ def get_sample_label(self) -> list: """ return self.__indexed_instances.tolist() - # def get_samples(self) -> pyspark.pandas.Series: - # """ - # Get samples identifiers from DataFrame. Coerce data type to string. - # - # :return: Pandas Series - # """ - # return self.__df[self.__sample_col].astype("str") - def get_sdf_vector(self, output_column_vector: str = 'features') -> pyspark.sql.DataFrame: """ Return a Spark dataframe with feature columns assembled into a column vector (a.k.a. Dense Vector column). @@ -204,6 +196,18 @@ def get_sdf_vector(self, output_column_vector: str = 'features') -> pyspark.sql. return sdf_vector + def get_sdf_and_label(self, + output_column_vector: str = 'features') -> Tuple[pyspark.sql.dataframe.DataFrame, str, str]: + """ + Extracts the Spark DataFrame and label column name from FSDataFrame. + + :param: output_column_vector: Name of the output column vector. + :return: A tuple containing the Spark DataFrame and the label column name. + """ + sdf = self.get_sdf_vector(output_column_vector=output_column_vector) + label_col = self.get_label_col_name() + return sdf, label_col, output_column_vector + def _collect_features_as_array(self) -> np.array: """ Collect features from FSDataFrame as an array. diff --git a/fsspark/fs/methods.py b/fsspark/fs/methods.py new file mode 100644 index 0000000..d9ce31b --- /dev/null +++ b/fsspark/fs/methods.py @@ -0,0 +1,551 @@ +from abc import ABC, abstractmethod +from typing import List, Type, Union, Tuple, Optional, Dict, Any + +from fsspark.fs.constants import (ML_METHODS, UNIVARIATE_METHODS, + MULTIVARIATE_METHODS) +from fsspark.fs.core import FSDataFrame +from fsspark.fs.ml import MLCVModel +from fsspark.fs.multivariate import multivariate_filter +from fsspark.fs.univariate import univariate_filter + + +class FSMethod(ABC): + """ + A general class for feature selection methods. + """ + + valid_methods: Tuple[str] + + def __init__(self, + fs_method, + **kwargs): + """ + Initialize the feature selection method with the specified parameters. + + :param fs_method: The feature selection method to be used. + :param kwargs: Additional keyword arguments for the feature selection method. + """ + self.fs_method = fs_method + self.kwargs = kwargs + self.validate_method(fs_method) + + @property + def valid_methods(self): + """ + Get the valid methods for feature selection. + + :return: A tuple of valid methods. + """ + return tuple(self.valid_methods) + + @abstractmethod + def validate_method(self, fs_method: str): + """ + Abstract method to validate the feature selection method. + + :param fs_method: The feature selection method to be validated. + """ + pass + + @abstractmethod + def select_features(self, fsdf: FSDataFrame): + """ + Abstract method to select features using the feature selection method. + + :param fsdf: The data frame on which feature selection is to be performed. + """ + pass + + @abstractmethod + def validate_params(self, **kwargs): + """ + Abstract method to validate the parameters for the feature selection method. + + :param kwargs: The parameters to be validated. + """ + pass + + def get_params(self): + """ + Get the parameters for the feature selection method. + + :return: The parameters as a copy of the kwargs dictionary. + """ + return self.kwargs.copy() + + def set_params(self, **kwargs): + """ + Set the parameters for the feature selection method. + + :param kwargs: The new parameters to be set. + """ + self.kwargs.update(kwargs) + + +class FSUnivariate(FSMethod): + """ + A class for univariate feature selection methods. + + Attributes: + fs_method (str): The univariate method to be used for feature selection. + kwargs (dict): Additional keyword arguments for the feature selection method. + """ + + valid_methods = list(UNIVARIATE_METHODS.keys()) + + def __init__(self, fs_method: str, **kwargs): + """ + Initialize the univariate feature selection method with the specified parameters. + + Parameters: + fs_method: The univariate method to be used for feature selection. + kwargs: Additional keyword arguments for the feature selection method. + """ + + super().__init__(fs_method, **kwargs) + self.validate_method(fs_method) + + def validate_method(self, fs_method: str): + """ + Validate the univariate method. + + Parameters: + fs_method: The univariate method to be validated. + """ + + if fs_method not in self.valid_methods: + raise InvalidMethodError( + f"Invalid univariate method: {fs_method}. " + f"Accepted methods are {', '.join(self.valid_methods)}" + ) + + def validate_params(self, **kwargs): + """ + Validate the parameters for the univariate method. + + Parameters: + kwargs: The parameters to be validated. + """ + # Additional validation is done directly in the underlying feature selection method + pass + + def select_features(self, fsdf) -> FSDataFrame: + """ + Select features using the specified univariate method. + + Parameters: + fsdf: The data frame on which feature selection is to be performed. + + Returns: + The selected features. + """ + + return univariate_filter( + fsdf, univariate_method=self.fs_method, **self.kwargs + ) + + def __str__(self): + return f"FSUnivariate(method={self.fs_method}, kwargs={self.kwargs})" + + def __repr__(self): + return self.__str__() + + +class FSMultivariate(FSMethod): + """ + The FSMultivariate class is a subclass of the FSMethod class and is used for multivariate + feature selection methods. It provides a way to select features using different multivariate methods such as + multivariate correlation and variance. + + Example Usage + ------------- + # Create an instance of FSMultivariate with multivariate_method='m_corr' + fs_multivariate = FSMultivariate(multivariate_method='m_corr') + + # Select features using the multivariate method + selected_features = fs_multivariate.select_features(fsdf) + """ + + valid_methods = list(MULTIVARIATE_METHODS.keys()) + + def __init__(self, fs_method: str, **kwargs): + """ + Initialize the multivariate feature selection method with the specified parameters. + + Parameters: + fsdf: The data frame on which feature selection is to be performed. + fs_method: The multivariate method to be used for feature selection. + kwargs: Additional keyword arguments for the feature selection method. + """ + + super().__init__(fs_method, **kwargs) + self.validate_method(fs_method) + + def validate_method(self, multivariate_method: str): + """ + Validate the multivariate method. + + Parameters: + multivariate_method: The multivariate method to be validated. + """ + + if multivariate_method not in self.valid_methods: + raise InvalidMethodError( + f"Invalid multivariate method: " + f"{multivariate_method}. Accepted methods are {', '.join(self.valid_methods)}" + ) + + def validate_params(self, **kwargs): + """ + Validate the parameters for the multivariate method. + + Parameters: + kwargs: The parameters to be validated. + """ + # Additional validation is done directly in the underlying feature selection method + pass + + def select_features(self, fsdf: FSDataFrame): + """ + Select features using the specified multivariate method. + """ + + return multivariate_filter( + fsdf, multivariate_method=self.fs_method, **self.kwargs + ) + + def __str__(self): + return f"FSMultivariate(multivariate_method={self.fs_method}, kwargs={self.kwargs})" + + def __repr__(self): + return self.__str__() + + +class FSMLMethod(FSMethod): + """ + A class for machine learning feature selection methods. + + Attributes: + fs_method (str): The machine learning method to be used for feature selection. + kwargs (dict): Additional keyword arguments for the feature selection method. + """ + + valid_methods = list(ML_METHODS.keys()) + _ml_model: MLCVModel = None + + def __init__(self, + fs_method: str, + rfe: bool = False, + rfe_iterations: int = 3, + percent_to_keep: float = 0.90, + **kwargs): + """ + Initialize the machine learning feature selection method with the specified parameters. + + Parameters: + fs_method: The machine learning method to be used for feature selection. + kwargs: Additional keyword arguments for the feature selection method. + """ + + super().__init__(fs_method, **kwargs) + self.validate_method(fs_method) + + # set the estimator, grid and cv parameters (or none if not provided) + self.estimator_params = kwargs.get('estimator_params', None) # estimator parameters + self.evaluator_params = kwargs.get('evaluator_params', None) # evaluator parameters + self.grid_params = kwargs.get('grid_params', None) # grid parameters + self.cv_params = kwargs.get('cv_params', None) # cross-validation parameters + + # set the machine learning model + self._ml_model = self._set_ml_model() + + # parameters to control the recursive feature elimination process (rfe) + self.rfe = rfe + self.percent_to_keep = percent_to_keep + self.rfe_iterations = rfe_iterations + + # performance metrics + self.rfe_training_metric: list = [] # performance metrics on training for each rfe iteration + self.training_metric = None # performance metrics on training (final model) + self.testing_metric = None # performance metrics on testing (final model) + + # feature importance + self.feature_scores = None + + def validate_method(self, fs_method: str): + """ + Validate the machine learning method. + + Parameters: + fs_method: The machine learning method to be validated. + """ + + if fs_method not in self.valid_methods: + raise InvalidMethodError( + f"Invalid machine learning method: {fs_method}. Accepted methods are {', '.join(self.valid_methods)}" + ) + + def validate_params(self, **kwargs): + """ + Validate the parameters for the machine learning method. + + Parameters: + kwargs: The parameters to be validated. + """ + # Additional validation is done directly in the underlying feature selection method + pass + + def _set_ml_model(self): + """ + Select the machine learning model to be used for feature selection. + + Returns: + The machine learning model. + """ + + model_type = self.fs_method + + self._ml_model = MLCVModel.create_model( + model_type=model_type, + estimator_params=self.estimator_params, + evaluator_params=self.evaluator_params, + grid_params=self.grid_params, + cv_params=self.cv_params + ) + + return self._ml_model + + def _fit_and_filter(self, df: FSDataFrame) -> FSDataFrame: + + # fit the current machine learning model + self._ml_model.fit(df) + + # get feature scores + feature_scores = self._ml_model.get_feature_scores() + + # get feature based on the (percentile) threshold provided + # expected a dataframe sorted by scores in descending order + selected_features = feature_scores.iloc[ + :int(self.percent_to_keep * len(feature_scores)) + ]['feature_index'] + + return df.filter_features_by_index(selected_features, keep=True) + + def select_features(self, fsdf: FSDataFrame) -> FSDataFrame: + """ + Select features using the specified machine learning method. + + Parameters: + fsdf: The data frame on which feature selection is to be performed. + + Returns: + FSDataFrame: The data frame with selected features. + """ + + if fsdf is None or fsdf.count_features() == 0 or fsdf.count_instances() == 0: + raise ValueError("The data frame is empty or does not contain any features.") + + fsdf = self._fit_and_filter(fsdf) + + # Recursive feature elimination + if self.rfe: + for iteration in range(self.rfe_iterations): + print(f"RFE: running {iteration + 1} of {self.rfe_iterations} iterations...") + fsdf = self._fit_and_filter(fsdf) + # collect the performance metrics on training for every rfe iteration + self.rfe_training_metric.append(self._ml_model.get_eval_metric_on_training()) + + # get the final performance metric on training + self.training_metric = self._ml_model.get_eval_metric_on_training() + + # get the feature scores after feature selection + self.feature_scores = self._ml_model.get_feature_scores() + + return fsdf + + def get_eval_metric_name(self): + """ + Get the evaluation metric name. + + Returns: + The evaluation metric name. + """ + + if self._ml_model is None: + raise ValueError("No machine learning model is available.") + + return self._ml_model.get_eval_metric_name() + + def get_eval_metric_on_training_rfe(self): + """ + Get the evaluation metric on the training data for each RFE iteration. + + Returns: + The evaluation metric on the training data for each RFE iteration. + """ + if self.rfe_training_metric is None: + raise ValueError("No training metric is available. Run the select_features method first.") + return self.rfe_training_metric + + def get_eval_metric_on_training(self): + """ + Get the evaluation metric on the training data. + + Returns: + The evaluation metric on the training data. + """ + if self.training_metric is None: + raise ValueError("No training metric is available. Run the select_features method first.") + return self.training_metric + + def get_eval_metric_on_testing(self, fsdf: FSDataFrame): + """ + Evaluate the machine learning method on the testing data. + + Parameters: + fsdf: The testing data frame on which the machine learning method is to be evaluated. + + Returns: + The evaluation metric on the testing data. + """ + + if fsdf is None or fsdf.count_features() == 0 or fsdf.count_instances() == 0: + raise ValueError("The testing data frame is empty or does not contain any features.") + + # evaluate the model on the testing data + eval_metric = self._ml_model.get_eval_metric_on_testing(fsdf) + self.testing_metric = eval_metric + + return eval_metric + + def get_feature_scores(self): + """ + Get the feature scores after feature selection. + + Returns: + The feature scores as a pandas DataFrame. + """ + + if self.feature_scores is None: + raise ValueError("Feature scores are not available. Run the feature selection method first.") + + return self.feature_scores + + def __str__(self): + return f"FSMLMethod(method={self.fs_method}, kwargs={self.kwargs})" + + def __repr__(self): + return self.__str__() + + +class FSPipeline: + """ + The FSPipeline class creates a pipeline of feature selection methods. It provides a way to + chain multiple feature selection methods together to create a pipeline of feature selection methods. + + Example Usage + ------------- + # Create an instance of FSPipeline with the specified feature selection methods + fs_pipeline = FSPipeline(fs_methods=[FSUnivariate('anova'), FSMultivariate('m_corr')]) + + # Select features using the pipeline + selected_features = fs_pipeline.select_features(fsdf) + """ + + _valid_methods: List[Type[Union[FSUnivariate, FSMultivariate, FSMLMethod]]] = [FSUnivariate, + FSMultivariate, + FSMLMethod] + + def __init__(self, + df_training: FSDataFrame, + df_testing: Optional[FSDataFrame], + fs_stages: List[Union[FSUnivariate, FSMultivariate, FSMLMethod]]): + """ + Initialize the feature selection pipeline with the specified feature selection methods. + + Parameters: + df_training: The training data frame on which the feature selection pipeline is to be run. + df_testing: The testing data frame on which the ML wrapper method (if any) is to be evaluated. + fs_stages: A list of feature selection methods to be used in the pipeline. + """ + + self.df_training = df_training + self.df_testing = df_testing + self.fs_stages = fs_stages + self.validate_methods() + + self.pipeline_results = {} + + def validate_methods(self): + """ + Validate the feature selection methods in the pipeline. + """ + # check if the pipeline contains at least one feature selection method + if len(self.fs_stages) == 0: + raise ValueError("The pipeline must contain at least one feature selection method.") + + # check if the feature selection methods are valid + if not all(isinstance(method, tuple(self._valid_methods)) for method in self.fs_stages): + raise InvalidMethodError(f"Invalid feature selection method. " + f"Accepted methods are {', '.join([str(m) for m in self._valid_methods])}") + + # check if only one ML method is used in the pipeline + ml_methods = [method for method in self.fs_stages if isinstance(method, FSMLMethod)] + if len(ml_methods) > 1: + raise ValueError("Only one ML method is allowed in the pipeline.") + + def run(self) -> Dict[str, Any]: + """ + Run the feature selection pipeline. + + Returns: + A dictionary with the results of the feature selection pipeline. + """ + + # apply each feature selection method in the pipeline sequentially + n_stages = len(self.fs_stages) + fsdf_tmp = self.df_training + + self.pipeline_results.update(n_stages=n_stages) + + for i, method in enumerate(self.fs_stages): + print(f"Running stage {i + 1} of {n_stages} of the feature selection pipeline: {method}") + if isinstance(method, FSMLMethod): + + fsdf_tmp = method.select_features(fsdf_tmp) + + # collect the results during the feature selection process (rfe iterations, feature scores, etc.) + self.pipeline_results.update(rfe_iterations=method.rfe_iterations) + self.pipeline_results.update(feature_scores=method.get_feature_scores()) + self.pipeline_results.update(eval_metric=method.get_eval_metric_name()) + self.pipeline_results.update(rfe_training_metric=method.get_eval_metric_on_training_rfe()) + self.pipeline_results.update(training_metric=method.get_eval_metric_on_training()) + + if self.df_testing is not None: + + # evaluate the final model on the testing data (if available) + testing_metric = method.get_eval_metric_on_testing(self.df_testing) + self.pipeline_results.update(testing_metric=testing_metric) + + else: + fsdf_tmp = method.select_features(fsdf_tmp) + + self.pipeline_results.update(n_initial_features=self.df_training.count_features()) + self.pipeline_results.update(n_selected_features=fsdf_tmp.count_features()) + + return self.pipeline_results + + def __str__(self): + return f"FSPipeline(fs_methods={self.fs_stages})" + + def __repr__(self): + return self.__str__() + + +class InvalidMethodError(Exception): + """ + Error raised when an invalid feature selection method is used. + """ + + def __init__(self, message): + super().__init__(message) diff --git a/fsspark/fs/ml.py b/fsspark/fs/ml.py index 9f372de..42b51cc 100644 --- a/fsspark/fs/ml.py +++ b/fsspark/fs/ml.py @@ -4,284 +4,385 @@ for feature selection (e.g., rank by feature importance) and prediction. """ - -import pyspark.sql -import pyspark.pandas as pd -from pyspark.ml.classification import ( - RandomForestClassifier, - LinearSVC, - RandomForestClassificationModel, LinearSVCModel, -) -from pyspark.ml.evaluation import ( - MulticlassClassificationEvaluator, - RegressionEvaluator, - BinaryClassificationEvaluator, -) -from pyspark.ml.regression import RandomForestRegressor, FMRegressor -from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel -from pyspark.pandas import DataFrame - +import warnings +from typing import List, Any, Dict, Optional, Union + +import pandas as pd +from pyspark.ml import Estimator, Model +from pyspark.ml.classification import (RandomForestClassificationModel, + LinearSVCModel, + RandomForestClassifier, + LinearSVC, LogisticRegression, LogisticRegressionModel) +from pyspark.ml.evaluation import (Evaluator, + BinaryClassificationEvaluator, + MulticlassClassificationEvaluator, + RegressionEvaluator) +from pyspark.ml.regression import RandomForestRegressionModel, RandomForestRegressor +from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel, Param + +from fsspark.fs.constants import (RF_BINARY, + LSVC_BINARY, + FM_BINARY, + RF_MULTILABEL, + LR_MULTILABEL, + RF_REGRESSION, + FM_REGRESSION, + ML_METHODS) from fsspark.fs.core import FSDataFrame -from fsspark.utils.generic import tag - - -@tag("spark implementation") -def cv_rf_classification( - fsdf: FSDataFrame, binary_classification: bool = True -) -> CrossValidatorModel: - """ - Cross-validation with Random Forest classifier as estimator. - - :param fsdf: FSDataFrame - :param binary_classification: If true (default), current problem is considered of type binary classification. - Otherwise, implement a multi-class classification problem. - :return: CrossValidatorModel - """ - features_col = "features" - sdf = fsdf.get_sdf_vector(output_column_vector=features_col) - label_col = fsdf.get_label_col_name() - - # set the estimator (a.k.a. the ML algorithm) - rf = RandomForestClassifier( - featuresCol=features_col, - labelCol=label_col, - numTrees=3, - maxDepth=2, - seed=42, - leafCol="leafId", - ) - grid = ParamGridBuilder().addGrid(rf.maxDepth, [2, 3]).build() - - # set the evaluator. - if binary_classification: - evaluator = BinaryClassificationEvaluator() - else: - evaluator = MulticlassClassificationEvaluator() - - cv = CrossValidator( - estimator=rf, - estimatorParamMaps=grid, - evaluator=evaluator, - parallelism=2, - numFolds=3, - collectSubModels=False, - ) - cv_model = cv.fit(sdf) - return cv_model - - -@tag("spark implementation") -def cv_svc_classification( - fsdf: FSDataFrame, -) -> CrossValidatorModel: - """ - Cross-validation with Linear Support Vector classifier as estimator. - Support only binary classification. +ESTIMATORS_CLASSES = [RandomForestClassifier, RandomForestRegressionModel, LinearSVC, LogisticRegression] +EVALUATORS_CLASSES = [BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator] - :param fsdf: FSDataFrame - :return: CrossValidatorModel +# Define an abstract class that allow to create a factory of models +# with the same interface +# This class allows to perform the following operations: +# - Define an Estimator +# - Define an Evaluator +# - Define a grid of parameters (model tuning) +# - Define a cross-validator (model fitting) +class MLCVModel: """ - - features_col = "features" - sdf = fsdf.get_sdf_vector(output_column_vector=features_col) - label_col = fsdf.get_label_col_name() - - # set the estimator (a.k.a. the ML algorithm) - svm = LinearSVC( - featuresCol=features_col, labelCol=label_col, maxIter=10, regParam=0.1 - ) - - grid = ParamGridBuilder().addGrid(svm.maxIter, [10]).build() - - # set the evaluator. - evaluator = BinaryClassificationEvaluator() - - cv = CrossValidator( - estimator=svm, - estimatorParamMaps=grid, - evaluator=evaluator, - parallelism=2, - numFolds=3, - collectSubModels=False, - ) - cv_model = cv.fit(sdf) - return cv_model - - -@tag("spark implementation") -def cv_rf_regression(fsdf: FSDataFrame) -> CrossValidatorModel: + A factory class for creating various machine learning models with Spark MLlib. + ML model are created using a cross-validator approach for hyperparameter tuning. """ - Cross-validation with Random Forest regressor as estimator. - Optimised for regression problems. - :param fsdf: FSDataFrame - - :return: CrossValidatorModel - """ - - features_col = "features" - sdf = fsdf.get_sdf_vector(output_column_vector=features_col) - label_col = fsdf.get_label_col_name() - - rf = RandomForestRegressor( - featuresCol=features_col, - labelCol=label_col, - numTrees=3, - maxDepth=2, - seed=42, - leafCol="leafId", - ) - grid = ParamGridBuilder().addGrid(rf.maxDepth, [2, 3]).build() - - # set the evaluator. - evaluator = RegressionEvaluator() - - cv = CrossValidator( - estimator=rf, - estimatorParamMaps=grid, - evaluator=evaluator, - parallelism=2, - numFolds=3, - collectSubModels=False, - ) - cv_model = cv.fit(sdf) - return cv_model - - -@tag("spark implementation") -def cv_fm_regression(fsdf: FSDataFrame) -> CrossValidatorModel: - """ - Cross-validation with Factorization Machines as estimator. - Optimised for regression problems. - - :param fsdf: FSDataFrame + _cross_validator: CrossValidator = None + _fitted_cv_model: CrossValidatorModel = None + _best_model: Model = None + _fsdf: FSDataFrame = None + + def __init__(self, + estimator: Union[RandomForestClassifier | + RandomForestRegressionModel | + LinearSVC | + LogisticRegression], + evaluator: Union[BinaryClassificationEvaluator | + MulticlassClassificationEvaluator | + RegressionEvaluator], + estimator_params: Optional[Dict[str, Any]] = None, + evaluator_params: Optional[Dict[str, Any]] = None, + grid_params: Optional[Dict[str, List[Any]]] = None, + cv_params: Optional[Dict[str, Any]] = None): + """ + Initializes the MLModel with optional estimator, evaluator, and parameter specifications. + """ + self.estimator = estimator + self.evaluator = evaluator + self.estimator_params = estimator_params + self.evaluator_params = evaluator_params + self.grid_params = grid_params + self.cv_params = cv_params + + self._initialize_model() + + def _initialize_model(self): + # Validate and set estimator parameters + if self.estimator: + self._validate_estimator(self.estimator) + self._validate_estimator_params(self.estimator_params) + self._set_estimator_params() + + # Validate and evaluator + if self.evaluator: + self._validate_evaluator(self.evaluator) + self._validate_evaluator_params(self.evaluator_params) + self._set_evaluator_params() + + # Parse and set grid parameters + if self.grid_params: + self.grid_params = self._parse_grid_params(self.grid_params) + + # Initialize and set cross-validator parameters + self._set_cross_validator() + + def _parse_grid_params(self, grid_params: Dict[str, List[Any]]) -> List[Dict[Param, Any]]: + """ + Parse the grid parameters to create a list of dictionaries. + + :param grid_params: A dictionary containing the parameter names as keys and a list of values as values. + :return: A list of dictionaries, where each dictionary represents a set of parameter values. + """ + grid = ParamGridBuilder() + for param, values in grid_params.items(): + if hasattr(self.estimator, param): + grid = grid.addGrid(getattr(self.estimator, param), values) + else: + raise AttributeError(f"{self.estimator.__class__.__name__} does not have attribute {param}") + return grid.build() + + def _validate_estimator(self, estimator: Estimator) -> 'MLCVModel': + """ + Validate the estimator. + + :param estimator: The estimator to validate. + :return: The validated estimator. + """ + # check estimator is an instance of ESTIMATORS_CLASSES + if not isinstance(estimator, tuple(ESTIMATORS_CLASSES)): + raise ValueError(f"Estimator must be an instance of {ESTIMATORS_CLASSES}") + return self + + def _validate_evaluator(self, evaluator: Evaluator) -> 'MLCVModel': + """ + Validate the evaluator. + + :param evaluator: The evaluator to validate. + :return: The validated evaluator. + """ + # check evaluator is an instance of EVALUATORS_CLASSES + if not isinstance(evaluator, tuple(EVALUATORS_CLASSES)): + raise ValueError(f"Evaluator must be an instance of {EVALUATORS_CLASSES}") + return self + + def _validate_estimator_params(self, estimator_params: Dict[str, Any]) -> None: + """ + Validate the estimator parameters. + + :param estimator_params: A dictionary containing the parameter names as keys and values as values. + """ + if estimator_params is None: + return + for param, _ in estimator_params.items(): + if not self.estimator.hasParam(param): + raise AttributeError(f"{self.estimator.__class__.__name__} does not have attribute {param}") + + def _validate_evaluator_params(self, evaluator_params: Dict[str, Any]) -> None: + """ + Validate the evaluator parameters. + + :param evaluator_params: A dictionary containing the parameter names as keys and values as values. + """ + if evaluator_params is None: + return + for param, _ in evaluator_params.items(): + if not self.evaluator.hasParam(param): + raise AttributeError(f"{self.evaluator.__class__.__name__} does not have attribute {param}") + + def _set_evaluator_params(self) -> 'MLCVModel': + """ + Set evaluator parameters. + """ + if self.evaluator_params is not None: + self.evaluator = self.evaluator.setParams(**self.evaluator_params) + return self + + def _set_estimator_params(self) -> 'MLCVModel': + """ + Set estimator parameters. + """ + if self.estimator_params is not None: + self.estimator = self.estimator.setParams(**self.estimator_params) + return self + + def _set_cv_params(self, cv_params: Dict[str, Any]) -> 'MLCVModel': + """ + Parse the cross-validator parameters to create an instance of CrossValidator. + + :param cv_params: A dictionary containing the parameter names as keys and values as values. + :return: An instance of CrossValidator. + """ + + for param, value in cv_params.items(): + if hasattr(self._cross_validator, param): + setattr(self._cross_validator, param, value) + else: + raise AttributeError(f"{self._cross_validator.__class__.__name__} does not have attribute {param}") + return self + + def _set_cross_validator(self) -> 'MLCVModel': + """ + Build the model using the cross-validator. + + :return: The CrossValidator model. + """ + try: + self._cross_validator = CrossValidator( + estimator=self.estimator, + estimatorParamMaps=self.grid_params, + evaluator=self.evaluator, + ) + if self.cv_params is not None: + self._cross_validator = self._cross_validator.setParams(**self.cv_params) + return self + except Exception as e: + print(f"An error occurred while creating the CrossValidator: {str(e)}") + # Handle the exception or raise it to be handled by the caller + raise + + def fit(self, fsdf: FSDataFrame) -> 'MLCVModel': + """ + Fit the model using the cross-validator. + + :return: The CrossValidatorModel after fitting. + """ + # Extract the Spark DataFrame and label column name from FSDataFrame + self._fsdf = fsdf + + if self._cross_validator is None or self.estimator is None or self.evaluator is None: + raise ValueError("Cross-validator, estimator, or evaluator not set properly.") + + self._fitted_cv_model = self._cross_validator.fit(self._fsdf.get_sdf_vector()) + return self + + def _get_best_model(self) -> Model: + """ + Get the best model from the fitted CrossValidatorModel. + + :return: The best model. + """ + if self._fitted_cv_model is None: + raise ValueError("CrossValidatorModel not fitted. Use fit() to fit the model.") + self._best_model = self._fitted_cv_model.bestModel + return self._best_model + + # define a static method that allows to set a ml model based on the model type + @staticmethod + def create_model(model_type: str, + estimator_params: Dict[str, Any] = None, + evaluator_params: Dict[str, Any] = None, + grid_params: Dict[str, List[Any]] = None, + cv_params: Dict[str, Any] = None) -> 'MLCVModel': + """ + Set a machine learning model based on the model type. + + :param model_type: The type of model to set. + :param estimator_params: Parameters for the estimator. + :param evaluator_params: Parameters for the evaluator. + :param grid_params: A dictionary containing the parameter names as keys and a list of values as values. + :param cv_params: Parameters for the cross-validator. + + :return: An instance of MLModel. + """ + if model_type == RF_BINARY: + estimator = RandomForestClassifier() + evaluator = BinaryClassificationEvaluator() + elif model_type == LSVC_BINARY: + estimator = LinearSVC() + evaluator = BinaryClassificationEvaluator() + elif model_type == RF_MULTILABEL: + estimator = RandomForestClassifier() + evaluator = MulticlassClassificationEvaluator() + elif model_type == LR_MULTILABEL: + estimator = LogisticRegression() + evaluator = MulticlassClassificationEvaluator() + elif model_type == RF_REGRESSION: + estimator = RandomForestRegressor() + evaluator = RegressionEvaluator() + else: + raise ValueError(f"Unsupported model type: {model_type}." + f"Supported model types are: {list(ML_METHODS.keys())}") + + ml_method = MLCVModel( + estimator=estimator, + evaluator=evaluator, + estimator_params=estimator_params, + evaluator_params=evaluator_params, + grid_params=grid_params, + cv_params=cv_params + ) - :return: CrossValidatorModel - """ + return ml_method - features_col = "features" - sdf = fsdf.get_sdf_vector(output_column_vector=features_col) - label_col = fsdf.get_label_col_name() + def get_eval_metric_name(self) -> str: + """ + Get the evaluation metric name. - fmr = FMRegressor(featuresCol=features_col, labelCol=label_col) + :return: The evaluation metric name. + """ + return self.evaluator.getMetricName() - grid = ParamGridBuilder().addGrid(fmr.factorSize, [4, 8]).build() + def get_feature_scores(self) -> pd.DataFrame: - # set the evaluator. - evaluator = RegressionEvaluator() + # TODO: This function should be able to parse all available models. - cv = CrossValidator( - estimator=fmr, - estimatorParamMaps=grid, - evaluator=evaluator, - parallelism=2, - numFolds=3, - collectSubModels=False, - ) - cv_model = cv.fit(sdf) - return cv_model + indexed_features = self._fsdf.get_features_indexed() + best_model = self._get_best_model() + # raise exception if the model is not none + if best_model is None: + raise ValueError("No ML model have been fitted. Use fit() to fit the model.") -def get_accuracy(model: CrossValidatorModel) -> float: - """ - Get accuracy from a trained CrossValidatorModel (best model). - # TODO: This function should be able to parse all available models. - Currently only support RandomForestClassificationModel. + df_features = pd.DataFrame(indexed_features.to_numpy(), + columns=["features"]) - :param model: Trained CrossValidatorModel - :return: accuracy - """ + if isinstance(best_model, (RandomForestClassificationModel, RandomForestRegressionModel)): + df_scores = pd.DataFrame( + data=best_model.featureImportances.toArray(), + columns=["scores"] + ) - best_model = model.bestModel - if isinstance(best_model, RandomForestClassificationModel): - acc = best_model.summary.accuracy - elif isinstance(best_model, LinearSVCModel): - acc = best_model.summary().accuracy - else: - acc = None - return acc + df_scores = df_scores.reset_index(level=0).rename(columns={"index": "feature_index"}) + # merge the feature scores with the feature names + df = df_features.merge( + df_scores, how="right", left_index=True, right_index=True + ) # index-to-index merging -def get_predictions(model: CrossValidatorModel) -> pyspark.pandas.DataFrame: - """ - # TODO: This function should be able to parse all available models. - Currently only support RandomForestClassificationModel. + # sort the dataframe by scores in descending order + df = df.sort_values(by="scores", ascending=False) - :param model: Trained CrossValidatorModel + # add feature percentile rank to the features_scores dataframe + df['percentile_rank'] = df['scores'].rank(pct=True) - :return: DataFrame with sample label predictions - """ - best_model = model.bestModel - if isinstance(best_model, RandomForestClassificationModel): - pred = best_model.summary.predictions.drop( - best_model.getFeaturesCol(), - best_model.getLeafCol(), - best_model.getRawPredictionCol(), - best_model.getProbabilityCol(), - ) - else: - pred = None - return pred.pandas_api() + return df + else: + raise ValueError("Unsupported model type. " + "Only RandomForestClassificationModel, " + "RandomForestRegressionModel, and LinearSVCModel are supported.") -def get_feature_scores(model: CrossValidatorModel, - indexed_features: pyspark.pandas.series.Series = None) -> pyspark.pandas.DataFrame: - """ - Extract features scores (e.g. importance or coefficients) from a trained CrossValidatorModel. + def get_eval_metric_on_training(self) -> float: + """ + Get the evaluation metric on training data from a trained CrossValidatorModel (best model). - # TODO: This function should be able to parse all available models. - Currently only support RandomForestClassificationModel and LinearSVCModel. + :return: A dictionary containing the evaluation metric name and value. + """ - :param model: Trained CrossValidatorModel - :param indexed_features: If provided, report features names rather than features indices. - Usually, the output from `training_data.get_features_indexed()`. + # TODO: This function should be able to parse all available models. - :return: Pandas DataFrame with feature importance - """ + # get the best model from the fitted cross-validator model + best_model = self._get_best_model() - df_features = (None if indexed_features is None - else indexed_features.to_dataframe(name='features') - ) + # get the eval metric name from the evaluator + eval_metric_name = self.get_eval_metric_name() - best_model = model.bestModel + if isinstance(best_model, (RandomForestClassificationModel, LogisticRegressionModel)): + metric_value = getattr(best_model.summary, eval_metric_name) - if isinstance(best_model, RandomForestClassificationModel): + elif isinstance(best_model, LinearSVCModel): + metric_value = getattr(best_model.summary(), eval_metric_name) - importance = pd.DataFrame(data=best_model.featureImportances.toArray(), - columns=['importance']) + else: + warnings.warn("Unsupported model type. Unable to get evaluation metric.") + metric_value = None - df = (importance - .reset_index(level=0) - .rename(columns={"index": "feature_index"}) - ) + return metric_value - if df_features is not None: - # if available, get feature names rather than reporting feature index. - df = (df_features - .merge(importance, how='right', left_index=True, right_index=True) # index-to-index merging - ) + def get_eval_metric_on_testing(self, test_data: FSDataFrame) -> float: + """ + Get accuracy on test data from a trained CrossValidatorModel (best model). - return df.sort_values(by="importance", ascending=False) + :param test_data: The test data as a FSDataFrame object. + :return: accuracy + """ - elif isinstance(best_model, LinearSVCModel): + # TODO: This function should be able to parse all available models. - coefficients = pd.DataFrame(data=best_model.coefficients, - columns=['coefficients']) + # get the best model from the fitted cross-validator model + best_model = self._get_best_model() - df = (coefficients - .reset_index(level=0) - .rename(columns={"index": "feature_index"}) - ) + # get test data features harmonized with training features + training_features = self._fsdf.get_features_names() + test_data = test_data.filter_features(training_features, keep=True) - if indexed_features is not None: - df = (df_features - .merge(coefficients, how='right', left_index=True, right_index=True) # index-to-index merging - ) + # predict the test data + predictions = None + if isinstance(best_model, (RandomForestClassificationModel, LinearSVCModel, LogisticRegressionModel)): + predictions = best_model.transform(test_data.get_sdf_vector()) - return df.sort_values(by="coefficients", ascending=False) + metric_value = None + if predictions is not None: + metric_value = self.evaluator.evaluate(predictions) - else: - # TODO: here we should support other models. - pass + return metric_value diff --git a/fsspark/fs/multivariate.py b/fsspark/fs/multivariate.py index c82f67b..f4af43e 100644 --- a/fsspark/fs/multivariate.py +++ b/fsspark/fs/multivariate.py @@ -6,6 +6,8 @@ from pyspark.ml.feature import (VarianceThresholdSelector) from pyspark.ml.stat import Correlation +from fsspark.fs.constants import MULTIVARIATE_METHODS, MULTIVARIATE_CORRELATION, MULTIVARIATE_VARIANCE + from fsspark.fs.core import FSDataFrame from fsspark.fs.utils import find_maximal_independent_set from fsspark.utils.generic import tag @@ -18,23 +20,23 @@ @tag("experimental") def _compute_correlation_matrix(sdf: pyspark.sql.DataFrame, features_col: str = 'features', - method: str = "pearson") -> np.ndarray: + corr_method: str = "pearson") -> np.ndarray: """ Compute features Matrix Correlation. :param sdf: Spark DataFrame :param features_col: Name of the feature column vector name. - :param method: One of `pearson` (default) or `spearman`. + :param corr_method: One of `pearson` (default) or `spearman`. :return: Numpy array. """ logger.warning("Warning: Computed matrix correlation will be collected into the drive with this implementation.\n" "This may cause memory issues. Use it preferably with small datasets.") - logger.info(f"Computing correlation matrix using {method} method.") + logger.info(f"Computing correlation matrix using {corr_method} method.") mcorr = (Correlation - .corr(sdf, features_col, method) + .corr(sdf, features_col, corr_method) .collect()[0][0] .toArray() ) @@ -45,7 +47,7 @@ def _compute_correlation_matrix(sdf: pyspark.sql.DataFrame, def multivariate_correlation_selector(fsdf: FSDataFrame, strict: bool = True, corr_threshold: float = 0.75, - method: str = "pearson") -> List[str]: + corr_method: str = "pearson") -> List[str]: """ Compute the correlation matrix (Pearson) among input features and select those below a specified threshold. @@ -54,7 +56,7 @@ def multivariate_correlation_selector(fsdf: FSDataFrame, Otherwise, find the maximal independent set of highly correlated features (approximate method). `Warning`: The approximate method is experimental. :param corr_threshold: Minimal correlation threshold to consider two features correlated. - :param method: One of `pearson` (default) or `spearman`. + :param corr_method: One of `pearson` (default) or `spearman`. :return: List of selected features names """ @@ -65,7 +67,7 @@ def multivariate_correlation_selector(fsdf: FSDataFrame, # compute correlation matrix mcorr = _compute_correlation_matrix(sdf, features_col=colum_vector_features, - method=method) + corr_method=corr_method) mcorr = np.abs(mcorr) # get absolute correlation value combs_above_cutoff = np.triu(mcorr, k=1) > corr_threshold # create bool matrix that meet criteria @@ -132,12 +134,13 @@ def multivariate_filter(fsdf: FSDataFrame, :return: Filtered FSDataFrame """ - if multivariate_method == 'm_corr': + if multivariate_method == MULTIVARIATE_CORRELATION: selected_features = multivariate_correlation_selector(fsdf, **kwargs) - elif multivariate_method == 'variance': + elif multivariate_method == MULTIVARIATE_VARIANCE: selected_features = multivariate_variance_selector(fsdf, **kwargs) else: - raise ValueError("`method` must be one of m_corr or variance.") + raise ValueError(f"Invalid multivariate method: {multivariate_method}. " + f"Choose one of {MULTIVARIATE_METHODS.keys()}.") logger.info(f"Applying multivariate filter {multivariate_method}.") diff --git a/fsspark/fs/univariate.py b/fsspark/fs/univariate.py index 56a21df..1103762 100644 --- a/fsspark/fs/univariate.py +++ b/fsspark/fs/univariate.py @@ -4,6 +4,8 @@ import pyspark.sql.functions as f from pyspark.ml.feature import UnivariateFeatureSelector +from fsspark.fs.constants import ANOVA, UNIVARIATE_CORRELATION, F_REGRESSION, UNIVARIATE_METHODS + from fsspark.fs.core import FSDataFrame from fsspark.utils.generic import tag @@ -51,6 +53,8 @@ def univariate_correlation_selector(fsdf: FSDataFrame, @tag("spark implementation") def univariate_selector(fsdf: FSDataFrame, label_type: str = 'categorical', + selection_mode: str = 'percentile', + selection_threshold: float = 0.8, **kwargs) -> List[str]: """ Wrapper for `UnivariateFeatureSelector`. @@ -61,6 +65,8 @@ def univariate_selector(fsdf: FSDataFrame, :param fsdf: Input FSDataFrame :param label_type: Type of label. Possible values are 'categorical' or 'continuous'. + :param selection_mode: Mode for feature selection. Possible values are 'numTopFeatures' or 'percentile'. + :param selection_threshold: Number of features to select or the percentage of features to select. :return: List of selected features names """ @@ -86,6 +92,8 @@ def univariate_selector(fsdf: FSDataFrame, .setFeatureType("continuous") .setOutputCol("selectedFeatures") .setLabelCol(label) + .setSelectionMode(selection_mode) + .setSelectionThreshold(selection_threshold) ) model = selector.fit(sdf) @@ -110,14 +118,15 @@ def univariate_filter(fsdf: FSDataFrame, :return: Filtered FSDataFrame """ - if univariate_method == 'anova': + if univariate_method == ANOVA: selected_features = univariate_selector(fsdf, label_type='categorical', **kwargs) - elif univariate_method == 'f_regression': + elif univariate_method == F_REGRESSION: selected_features = univariate_selector(fsdf, label_type='continuous', **kwargs) - elif univariate_method == 'u_corr': + elif univariate_method == UNIVARIATE_CORRELATION: selected_features = univariate_correlation_selector(fsdf, **kwargs) else: - raise ValueError("`method` must be one of anova, f_regression or u_corr.") + raise ValueError(f"Univariate method {univariate_method} not supported. " + f"Expected one of {UNIVARIATE_METHODS.keys()}") logger.info(f"Applying univariate filter {univariate_method}.") diff --git a/fsspark/pipeline/fs_corr_rf.py b/fsspark/pipeline/fs_corr_rf.py deleted file mode 100644 index feea1b8..0000000 --- a/fsspark/pipeline/fs_corr_rf.py +++ /dev/null @@ -1,69 +0,0 @@ -""" -Example of a feature selection pipeline implemented in fsspark. - -After data import and pre-processing, the pipeline applies univariate correlation filter, -multivariate correlation filter and Randon Forest classification. - -""" - -from fsspark.config.context import init_spark, stop_spark_session -from fsspark.fs.core import FSDataFrame -from fsspark.fs.ml import cv_rf_classification, get_accuracy, get_predictions, get_feature_scores -from fsspark.fs.multivariate import multivariate_filter -from fsspark.fs.univariate import univariate_filter -from fsspark.fs.utils import remove_features_by_missingness_rate, impute_missing -from fsspark.utils.datasets import get_tnbc_data_path -from fsspark.utils.io import import_table_as_psdf - -# Init spark -init_spark() - -# Import data -fsdf = import_table_as_psdf(get_tnbc_data_path(), n_partitions=5) - -fsdf = FSDataFrame(fsdf, sample_col="Sample", label_col="label") - -# Step 1. Data pre-processing. - -# a) Filter missingness rate -fsdf = remove_features_by_missingness_rate(fsdf, threshold=0.1) - -# b) Impute data frame -fsdf = impute_missing(fsdf) - -# c) Scale features -fsdf = fsdf.scale_features(scaler_method="standard") - -# Split dataset in training/testing -training_df, testing_df = fsdf.split_df(label_type_cat=True, split_training_factor=0.8) - -# Step 2. Apply univariate correlation filter -training_df = univariate_filter( - training_df, univariate_method="u_corr", corr_threshold=0.3 -) - -# Step 3. Apply multivariate correlation filter -training_df = multivariate_filter( - training_df, multivariate_method="m_corr", corr_threshold=0.7 -) - -# Step 4. ML-algorithm with cross-validation -cv_model = cv_rf_classification(training_df, binary_classification=False) - -# Print out some stats - -# Get accuracy from training -acc = get_accuracy(model=cv_model) -print(f"Training accuracy: {acc}") - -# Get predictions from training -pred = get_predictions(model=cv_model) -pred.show() - -# Get feature importance -imp = get_feature_scores(model=cv_model, - indexed_features=training_df.get_features_indexed() - ) -print(imp.head(10)) - -stop_spark_session() diff --git a/fsspark/pipeline/fs_pipeline_example.py b/fsspark/pipeline/fs_pipeline_example.py new file mode 100644 index 0000000..c7b9f75 --- /dev/null +++ b/fsspark/pipeline/fs_pipeline_example.py @@ -0,0 +1,67 @@ +""" +Example of a feature selection pipeline implemented in fsspark. + +After data import and pre-processing, the pipeline applies univariate correlation filter, +multivariate correlation filter and Randon Forest classification. + +""" + +from fsspark.config.context import init_spark, stop_spark_session +from fsspark.fs.core import FSDataFrame +from fsspark.fs.methods import FSPipeline, FSUnivariate, FSMultivariate, FSMLMethod +from fsspark.utils.datasets import get_tnbc_data_path +from fsspark.utils.io import import_table_as_psdf + +# Init spark +init_spark(apply_pyarrow_settings=True, + apply_extra_spark_settings=True, + apply_pandas_settings=True) + +# 1. Import data +df = import_table_as_psdf(get_tnbc_data_path(), n_partitions=5) +fsdf = FSDataFrame(df, sample_col='Sample', label_col='label') + +# 2. Split data +training_data, testing_data = fsdf.split_df(split_training_factor=0.6) + +# 3. Set feature selection methods +# create a Univariate object +univariate = FSUnivariate( + fs_method='anova', + selection_mode='percentile', + selection_threshold=0.8) + +# create a Multivariate object +multivariate = FSMultivariate( + fs_method='m_corr', + corr_threshold=0.75, + corr_method="pearson" +) + +# create a MLMethod object +rf_classifier = FSMLMethod( + fs_method='rf_multilabel', + rfe=True, + rfe_iterations=2, + percent_to_keep=0.9, + estimator_params={'labelCol': 'label'}, + evaluator_params={'metricName': 'accuracy'}, + grid_params={'numTrees': [10, 15], 'maxDepth': [5, 10]}, + cv_params={'parallelism': 2, 'numFolds': 5} +) + +# 4. Create a pipeline object +fs_pipeline = FSPipeline(df_training=training_data, + df_testing=testing_data, + fs_stages=[univariate, multivariate, rf_classifier]) + +# 5. Run the pipeline +results = fs_pipeline.run() + +# Print results +print(f"Accuracy on training: {results['training_metric']}") +print(f"Accuracy on testing: {results['testing_metric']}") +print(results.get('feature_scores')) + + +stop_spark_session() diff --git a/fsspark/tests/test_fs_pipeline.py b/fsspark/tests/test_fs_pipeline.py new file mode 100644 index 0000000..a1f6b5e --- /dev/null +++ b/fsspark/tests/test_fs_pipeline.py @@ -0,0 +1,71 @@ +import unittest + +from fsspark.config.context import init_spark, stop_spark_session +from fsspark.fs.core import FSDataFrame +from fsspark.fs.methods import FSPipeline, FSUnivariate, FSMultivariate, FSMLMethod +from fsspark.utils.datasets import get_tnbc_data_path +from fsspark.utils.io import import_table_as_psdf + + +class FeatureSelectionPipelineTest(unittest.TestCase): + + def setUp(self) -> None: + init_spark(apply_pyarrow_settings=True, + apply_extra_spark_settings=True, + apply_pandas_settings=True) + + def tearDown(self) -> None: + stop_spark_session() + + @staticmethod + def import_FSDataFrame(): + df = import_table_as_psdf(get_tnbc_data_path(), n_partitions=5) + fsdf = FSDataFrame(df, sample_col='Sample', label_col='label') + return fsdf + + def test_feature_selection_pipeline(self): + fsdf = self.import_FSDataFrame() + + training_data, testing_data = fsdf.split_df(split_training_factor=0.6) + + # create a Univariate object + univariate = FSUnivariate( + fs_method='anova', + selection_mode='percentile', + selection_threshold=0.8) + + # create a Multivariate object + multivariate = FSMultivariate( + fs_method='m_corr', + corr_threshold=0.75, + corr_method="pearson" + ) + + # create a MLMethod object + rf_classifier = FSMLMethod( + fs_method='rf_multilabel', + rfe=True, + rfe_iterations=2, + percent_to_keep=0.9, + estimator_params={'labelCol': 'label'}, + evaluator_params={'metricName': 'accuracy'}, + grid_params={'numTrees': [10, 15], 'maxDepth': [5, 10]}, + cv_params={'parallelism': 2, 'numFolds': 5} + ) + + # create a pipeline object + fs_pipeline = FSPipeline(df_training=training_data, + df_testing=testing_data, + fs_stages=[univariate, multivariate, rf_classifier]) + + # run the pipeline + results = fs_pipeline.run() + + # print results + print(results) + + assert results.get('training_metric') > 0.9 + + +if __name__ == '__main__': + unittest.main() diff --git a/fsspark/tests/test_ml_methods.py b/fsspark/tests/test_ml_methods.py new file mode 100644 index 0000000..3dc4bda --- /dev/null +++ b/fsspark/tests/test_ml_methods.py @@ -0,0 +1,180 @@ +import unittest + +from pyspark.ml.classification import (RandomForestClassifier, + LogisticRegression) +from pyspark.ml.evaluation import (BinaryClassificationEvaluator, + MulticlassClassificationEvaluator) + +from fsspark.config.context import init_spark, stop_spark_session +from fsspark.fs.core import FSDataFrame +from fsspark.fs.ml import MLCVModel +from fsspark.utils.datasets import get_tnbc_data_path +from fsspark.utils.io import import_table_as_psdf + + +class MLMethodTest(unittest.TestCase): + + def setUp(self) -> None: + init_spark(apply_pyarrow_settings=True, + apply_extra_spark_settings=True, + apply_pandas_settings=True) + + def tearDown(self) -> None: + stop_spark_session() + + @staticmethod + def import_FSDataFrame(): + df = import_table_as_psdf(get_tnbc_data_path(), n_partitions=5) + fsdf = FSDataFrame(df, sample_col='Sample', label_col='label') + return fsdf + + def test_build_model_using_cross_validator(self): + fsdf = self.import_FSDataFrame() + estimator = RandomForestClassifier() + evaluator = BinaryClassificationEvaluator() + grid_params = {'numTrees': [10, 20, 30], 'maxDepth': [5, 10, 15]} + ml_method = MLCVModel( + estimator=estimator, + evaluator=evaluator, + estimator_params=None, + grid_params=None, + cv_params=None + ) + + print(ml_method._cross_validator.__str__()) + assert ml_method._cross_validator is not None + + def test_get_feature_scores_random_forest_classifier(self): + # Create a sample FSDataFrame + fsdf = self.import_FSDataFrame() + + # Create a RandomForestClassifier model + estimator = RandomForestClassifier() + evaluator = MulticlassClassificationEvaluator() + estimator_params = {'labelCol': 'label'} + grid_params = {'numTrees': [10, 20, 30], 'maxDepth': [5, 10, 15]} + cv_params = {'parallelism': 2, 'numFolds': 5, 'collectSubModels': False} + + ml_method = MLCVModel( + estimator=estimator, + evaluator=evaluator, + estimator_params=estimator_params, + grid_params=grid_params, + cv_params=cv_params + ) + + (ml_method + .fit(fsdf) + ) + + # Get the feature scores + feature_scores = ml_method.get_feature_scores() + + # Assert that the feature scores DataFrame is not empty + assert not feature_scores.empty + + # Assert that the feature scores DataFrame has the expected columns + expected_columns = ['features', 'feature_index', 'scores', 'percentile_rank'] + assert list(feature_scores.columns) == expected_columns + + # check if dataframe is sorted by scores (descending) + assert feature_scores['scores'].is_monotonic_decreasing + + print(feature_scores) + + def test_multilabel_rf_model(self): + fsdf = self.import_FSDataFrame() + training_data, testing_data = fsdf.split_df(split_training_factor=0.8) + + estimator = RandomForestClassifier() + evaluator = MulticlassClassificationEvaluator(metricName='accuracy') + estimator_params = {'labelCol': 'label'} + grid_params = {'numTrees': [5, 10], 'maxDepth': [3, 5]} + cv_params = {'parallelism': 2, 'numFolds': 3} + + ml_method = MLCVModel( + estimator=estimator, + evaluator=evaluator, + estimator_params=estimator_params, + grid_params=grid_params, + cv_params=cv_params + ) + + (ml_method + .fit(training_data) + ) + + # get the accuracy on training + eval_training = ml_method.get_eval_metric_on_training() + print(f"Accuracy on training data: {eval_training}") + + # get the accuracy on testing + testing_acc = ml_method.get_eval_metric_on_testing(testing_data) + print(f"Accuracy on test data: {testing_acc}") + assert testing_acc > 0.7 + + def test_multilabel_lr_model(self): + fsdf = self.import_FSDataFrame() + training_data, testing_data = fsdf.split_df(split_training_factor=0.6) + + estimator = LogisticRegression() + evaluator = MulticlassClassificationEvaluator(metricName='accuracy') + estimator_params = {'labelCol': 'label'} + grid_params = {'regParam': [0.1, 0.01]} + cv_params = {'parallelism': 2, 'numFolds': 3} + + ml_method = MLCVModel( + estimator=estimator, + evaluator=evaluator, + estimator_params=estimator_params, + grid_params=grid_params, + cv_params=cv_params + ) + + (ml_method + .fit(training_data) + ) + + # get the accuracy on training + eval_training = ml_method.get_eval_metric_on_training() + print(f"Accuracy on training data: {eval_training}") + + # get the accuracy on testing + testing_acc = ml_method.get_eval_metric_on_testing(testing_data) + print(f"Accuracy on test data: {testing_acc}") + assert testing_acc > 0.7 + + def test_FSMLMethod(self): + from fsspark.fs.methods import FSMLMethod + + fsdf = self.import_FSDataFrame() + training_data, testing_data = fsdf.split_df(split_training_factor=0.7) + + estimator_params = {'labelCol': 'label'} + grid_params = {'numTrees': [5, 10], 'maxDepth': [3, 5]} + cv_params = {'parallelism': 2, 'numFolds': 3} + + ml_method = FSMLMethod( + fs_method='rf_multilabel', + rfe=True, + rfe_iterations=2, + percent_to_keep=0.9, + estimator_params=estimator_params, + evaluator_params={'metricName': 'accuracy'}, + grid_params=grid_params, + cv_params=cv_params + ) + + filtered_fsdf = ml_method.select_features(training_data) + + training_acc = ml_method.get_eval_metric_on_training() + print(f"Training accuracy: {training_acc}") + assert training_acc > 0.8 + + testing_acc = ml_method.get_eval_metric_on_testing(testing_data) + print(f"Testing accuracy: {testing_acc}") + assert testing_acc > 0.7 + + +if __name__ == '__main__': + unittest.main() diff --git a/fsspark/utils/io.py b/fsspark/utils/io.py index 59321a5..951c5cb 100644 --- a/fsspark/utils/io.py +++ b/fsspark/utils/io.py @@ -16,7 +16,7 @@ def import_table(path: str, Import tsv file as Spark DataFrame. :param path: File path - :param header: + :param header: True if the first row is header. :param sep: Column separator :param n_partitions: Minimal number of partitions @@ -39,6 +39,31 @@ def import_table(path: str, return sdf +def import_parquet(path: str, + header: bool = True) -> pyspark.sql.DataFrame: + """ + Import parquet file as Spark DataFrame. + + :param path: File path + :param header: True if the first row is header. + + :return: Spark DataFrame + """ + + _sc = pyspark.sql.SparkSession.getActiveSession() + + if _sc is None: + raise ValueError("Active Spark Session not found...") + + sdf = (_sc + .read + .option("header", header) + .option("inferSchema", "true") + .parquet(path) + ) + return sdf + + def import_table_as_psdf(path: str, sep: str = "\t", n_partitions: int = 5) -> pyspark.pandas.DataFrame: