Skip to content

Commit

Permalink
Merge pull request bigbio#9 from enriquea/main
Browse files Browse the repository at this point in the history
PR response to issue bigbio#8
  • Loading branch information
ypriverol authored Jan 26, 2024
2 parents 2b2571a + b2aec28 commit 3d3e882
Show file tree
Hide file tree
Showing 12 changed files with 170 additions and 37 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ pip install . -r requirements.txt

### Maintainers
- Enrique Audain (https://github.com/enriquea)
- Yasset Perez-Riverol (https://github.com/ypriverol
- Yasset Perez-Riverol (https://github.com/ypriverol)
12 changes: 6 additions & 6 deletions docs/README.data.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Here we describe the main data structures used in `fsspark` and how to use them.

The current module support as input data a headed Tab-separated values (TSV) file with `S x 2+F` dimensions,
where `S` is the number of samples (rows) and `F` is the number of features (columns). The first column of the file
is expected to contain the `sample IDs`, the second column the `response variable` and the remaining
is expected to contain the `sample IDs`, the second column the `sample label` and the remaining
columns the `features`. The response variable can be either binary, categorical or continuous; and should
be encoded as `0` and `1` for binary variables, as integers for categorical variables and as floats
for continuous variables.
Expand All @@ -20,12 +20,12 @@ The following is an example of a TSV file with a binary response variable:

```
------------------------------------------------------------------------
| sample_id | response | feature_1 | feature_2 | feature_3 | feature_4 |
| sample_id | label | feature_1 | feature_2 | feature_3 | feature_4 |
------------------------------------------------------------------------
| sample_1 | 0 | 0.1 | 0.2 | 0.3 | 0.4 |
| sample_2 | 1 | 0.5 | 0.6 | 0.7 | 0.8 |
| sample_3 | 0 | 0.9 | 0.10 | 0.11 | 0.12 |
| sample_4 | 1 | 0.13 | 0.14 | 0.15 | 0.16 |
| sample_1 | a | 0.1 | 0.2 | 0.3 | 0.4 |
| sample_2 | b | 0.5 | 0.6 | 0.7 | 0.8 |
| sample_3 | b | 0.9 | 0.10 | 0.11 | 0.12 |
| sample_4 | c | 0.13 | 0.14 | 0.15 | 0.16 |
------------------------------------------------------------------------
```
Expand Down
5 changes: 2 additions & 3 deletions docs/README.methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ from fsspark.fs.core import FSDataFrame
from fsspark.fs.ml import cv_rf_classification, get_accuracy, get_predictions
from fsspark.fs.multivariate import multivariate_filter
from fsspark.fs.univariate import univariate_filter
from fsspark.fs.utils import (filter_missingness_rate,
from fsspark.fs.utils import (remove_features_by_missingness_rate,
impute_missing)
from fsspark.utils.datasets import get_tnbc_data_path
from fsspark.utils.io import import_table_as_psdf
Expand All @@ -86,8 +86,7 @@ fsdf = FSDataFrame(fsdf, sample_col='Sample', label_col='label')
# Step 1. Data pre-processing.

# a) Filter missingness rate
fsdf = filter_missingness_rate(fsdf,
threshold=0.1)
fsdf = remove_features_by_missingness_rate(fsdf, threshold=0.1)

# b) Impute data frame
fsdf = impute_missing(fsdf)
Expand Down
13 changes: 13 additions & 0 deletions fsspark/fs/core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import numpy as np
from typing import (Union,
Optional,
List,
Expand Down Expand Up @@ -203,6 +204,18 @@ def get_sdf_vector(self, output_column_vector: str = 'features') -> pyspark.sql.

return sdf_vector

def _collect_features_as_array(self) -> np.array:
"""
Collect features from FSDataFrame as an array.
`Warning`: This method will collect the entire DataFrame into the driver.
Uses this method on small datasets only (e.g., after filtering or splitting the data)
:return: Numpy array
"""
sdf = self.get_sdf().select(*self.get_features_names())
a = np.array(sdf.collect())
return a

def to_psdf(self) -> pyspark.pandas.DataFrame:
"""
Convert Spark DataFrame to Pandas on Spark DataFrame
Expand Down
26 changes: 15 additions & 11 deletions fsspark/fs/ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
)
from pyspark.ml.regression import RandomForestRegressor, FMRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.pandas import DataFrame

from fsspark.fs.core import FSDataFrame
from fsspark.utils.generic import tag


@tag("spark implementation")
def cv_rf_classification(
fsdf: FSDataFrame, binary_classification: bool = True
) -> CrossValidatorModel:
Expand All @@ -34,7 +37,6 @@ def cv_rf_classification(
Otherwise, implement a multi-class classification problem.
:return: CrossValidatorModel
TODO: Consider here if make sense to return the full CV Model.
"""
features_col = "features"
sdf = fsdf.get_sdf_vector(output_column_vector=features_col)
Expand Down Expand Up @@ -69,6 +71,7 @@ def cv_rf_classification(
return cv_model


@tag("spark implementation")
def cv_svc_classification(
fsdf: FSDataFrame,
) -> CrossValidatorModel:
Expand All @@ -79,7 +82,6 @@ def cv_svc_classification(
:param fsdf: FSDataFrame
:return: CrossValidatorModel
TODO: Consider here if make sense to return the full CV Model.
"""

features_col = "features"
Expand Down Expand Up @@ -108,6 +110,7 @@ def cv_svc_classification(
return cv_model


@tag("spark implementation")
def cv_rf_regression(fsdf: FSDataFrame) -> CrossValidatorModel:
"""
Cross-validation with Random Forest regressor as estimator.
Expand All @@ -116,7 +119,6 @@ def cv_rf_regression(fsdf: FSDataFrame) -> CrossValidatorModel:
:param fsdf: FSDataFrame
:return: CrossValidatorModel
TODO: Consider here if make sense to return the full CV Model.
"""

features_col = "features"
Expand Down Expand Up @@ -148,6 +150,7 @@ def cv_rf_regression(fsdf: FSDataFrame) -> CrossValidatorModel:
return cv_model


@tag("spark implementation")
def cv_fm_regression(fsdf: FSDataFrame) -> CrossValidatorModel:
"""
Cross-validation with Factorization Machines as estimator.
Expand All @@ -156,7 +159,6 @@ def cv_fm_regression(fsdf: FSDataFrame) -> CrossValidatorModel:
:param fsdf: FSDataFrame
:return: CrossValidatorModel
TODO: Do it make sense here to return the full CV Model??
"""

features_col = "features"
Expand Down Expand Up @@ -184,12 +186,14 @@ def cv_fm_regression(fsdf: FSDataFrame) -> CrossValidatorModel:

def get_accuracy(model: CrossValidatorModel) -> float:
"""
Get accuracy from a trained CrossValidatorModel (best model).
# TODO: This function should be able to parse all available models.
Currently only support RandomForestClassificationModel.
:param model: Trained CrossValidatorModel
:return: Training accuracy
:return: accuracy
"""

best_model = model.bestModel
if isinstance(best_model, RandomForestClassificationModel):
acc = best_model.summary.accuracy
Expand All @@ -200,7 +204,7 @@ def get_accuracy(model: CrossValidatorModel) -> float:
return acc


def get_predictions(model: CrossValidatorModel) -> pyspark.sql.DataFrame:
def get_predictions(model: CrossValidatorModel) -> pyspark.pandas.DataFrame:
"""
# TODO: This function should be able to parse all available models.
Currently only support RandomForestClassificationModel.
Expand All @@ -219,11 +223,11 @@ def get_predictions(model: CrossValidatorModel) -> pyspark.sql.DataFrame:
)
else:
pred = None
return pred
return pred.pandas_api()


def get_feature_scores(model: CrossValidatorModel,
indexed_features: pyspark.pandas.series.Series = None) -> pd.DataFrame:
indexed_features: pyspark.pandas.series.Series = None) -> pyspark.pandas.DataFrame:
"""
Extract features scores (e.g. importance or coefficients) from a trained CrossValidatorModel.
Expand All @@ -234,7 +238,7 @@ def get_feature_scores(model: CrossValidatorModel,
:param indexed_features: If provided, report features names rather than features indices.
Usually, the output from `training_data.get_features_indexed()`.
:return: Pandas on DataFrame with feature importance
:return: Pandas DataFrame with feature importance
"""

df_features = (None if indexed_features is None
Expand Down Expand Up @@ -279,5 +283,5 @@ def get_feature_scores(model: CrossValidatorModel,
return df.sort_values(by="coefficients", ascending=False)

else:
df = None # this should follow with parsing options for the different models.
return df
# TODO: here we should support other models.
pass
14 changes: 11 additions & 3 deletions fsspark/fs/multivariate.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,31 @@

from fsspark.fs.core import FSDataFrame
from fsspark.fs.utils import find_maximal_independent_set
from fsspark.utils.generic import tag

logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s")
logger = logging.getLogger("FSSPARK:MULTIVARIATE")
logger.setLevel(logging.INFO)


@tag("experimental")
def _compute_correlation_matrix(sdf: pyspark.sql.DataFrame,
features_col: str = 'features',
method: str = "pearson") -> np.ndarray:
"""
Compute features Matrix Correlation.
TODO: Warning: Computed matrix correlation will collected into the drive with this implementation.
:param sdf: Spark DataFrame
:param features_col: Name of the feature column vector name.
:param method: One of `pearson` (default) or `spearman`.
:return: Numpy array.
"""

logger.warning("Warning: Computed matrix correlation will be collected into the drive with this implementation.\n"
"This may cause memory issues. Use it preferably with small datasets.")
logger.info(f"Computing correlation matrix using {method} method.")

mcorr = (Correlation
.corr(sdf, features_col, method)
.collect()[0][0]
Expand All @@ -35,6 +41,7 @@ def _compute_correlation_matrix(sdf: pyspark.sql.DataFrame,
return mcorr


@tag("experimental")
def multivariate_correlation_selector(fsdf: FSDataFrame,
strict: bool = True,
corr_threshold: float = 0.75,
Expand All @@ -43,9 +50,9 @@ def multivariate_correlation_selector(fsdf: FSDataFrame,
Compute the correlation matrix (Pearson) among input features and select those below a specified threshold.
:param fsdf: Input FSDataFrame
:param strict: If True (default), apply hard filtering (strict) to remove highly related features.
:param strict: If True (default), apply hard filtering (strict) to remove highly correlated features.
Otherwise, find the maximal independent set of highly correlated features (approximate method).
The approximate method is experimental.
`Warning`: The approximate method is experimental.
:param corr_threshold: Minimal correlation threshold to consider two features correlated.
:param method: One of `pearson` (default) or `spearman`.
Expand Down Expand Up @@ -84,6 +91,7 @@ def multivariate_correlation_selector(fsdf: FSDataFrame,
return selected_features


@tag("spark implementation")
def multivariate_variance_selector(fsdf: FSDataFrame,
variance_threshold: float = 0.0) -> List[str]:
"""
Expand Down
15 changes: 10 additions & 5 deletions fsspark/fs/univariate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@
from pyspark.ml.feature import UnivariateFeatureSelector

from fsspark.fs.core import FSDataFrame
from fsspark.utils.generic import tag

logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s")
logger = logging.getLogger("FSSPARK:UNIVARIATE")
logger.setLevel(logging.INFO)


@tag("spark implementation")
def compute_univariate_corr(fsdf: FSDataFrame) -> Dict[str, float]:
"""
Compute the correlation coefficient between every column (features) in the input DataFrame and a defined target.
Compute the correlation coefficient between every column (features) in the input DataFrame and the label (class).
:param fsdf: Input FSDataFrame
Expand All @@ -32,7 +34,7 @@ def compute_univariate_corr(fsdf: FSDataFrame) -> Dict[str, float]:
def univariate_correlation_selector(fsdf: FSDataFrame,
corr_threshold: float = 0.3) -> List[str]:
"""
Select features based on its correlation with a target label, if corr value is less than a specified threshold.
Select features based on its correlation with a label (class), if corr value is less than a specified threshold.
Expected both features and label to be of type numeric.
:param fsdf: FSDataFrame
Expand All @@ -46,6 +48,7 @@ def univariate_correlation_selector(fsdf: FSDataFrame,
return selected_features


@tag("spark implementation")
def univariate_selector(fsdf: FSDataFrame,
label_type: str = 'categorical',
**kwargs) -> List[str]:
Expand All @@ -63,7 +66,7 @@ def univariate_selector(fsdf: FSDataFrame,
"""

vector_col_name = 'features'
vsdf = fsdf.get_sdf_vector(output_column_vector=vector_col_name)
sdf = fsdf.get_sdf_vector(output_column_vector=vector_col_name)
label = fsdf.get_label_col_name()

# set selector
Expand All @@ -85,13 +88,14 @@ def univariate_selector(fsdf: FSDataFrame,
.setLabelCol(label)
)

model = selector.fit(vsdf)
model = selector.fit(sdf)
selected_features_indices = model.selectedFeatures
selected_features = fsdf.get_features_by_index(selected_features_indices)

return selected_features


@tag("spark implementation")
def univariate_filter(fsdf: FSDataFrame,
univariate_method: str = 'u_corr',
**kwargs) -> FSDataFrame:
Expand All @@ -100,7 +104,8 @@ def univariate_filter(fsdf: FSDataFrame,
:param fsdf: Input FSDataFrame
:param univariate_method: Univariate selector method.
Possible values are 'u_corr', 'anova' or 'f_regression'.
Possible values are 'u_corr', 'anova' (categorical label)
or 'f_regression' (continuous label).
:return: Filtered FSDataFrame
"""
Expand Down
Loading

0 comments on commit 3d3e882

Please sign in to comment.