Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dataProcess): Add parallel processing functionality to dataProcess #116

Merged
merged 2 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 81 additions & 5 deletions R/dataProcess.R
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
#' which have more than 50\% missing values. FALSE is default.
#' @param maxQuantileforCensored Maximum quantile for deciding censored missing values, default is 0.999
#' @param fix_missing Optional, same as the `fix_missing` parameter in MSstatsConvert::MSstatsBalancedDesign function
#' @param numberOfCores Number of cores for parallel processing. When > 1,
#' a logfile named `MSstats_dataProcess_log_progress.log` is created to
#' track progress. Only works for Linux & Mac OS. Default is 1.
#' @inheritParams .documentFunction
#'
#' @importFrom utils sessionInfo
Expand Down Expand Up @@ -78,7 +81,8 @@ dataProcess = function(
min_feature_count = 2, n_top_feature = 3, summaryMethod = "TMP",
equalFeatureVar = TRUE, censoredInt = "NA", MBimpute = TRUE,
remove50missing = FALSE, fix_missing = NULL, maxQuantileforCensored = 0.999,
use_log_file = TRUE, append = FALSE, verbose = TRUE, log_file_path = NULL
use_log_file = TRUE, append = FALSE, verbose = TRUE, log_file_path = NULL,
numberOfCores = 1
) {
MSstatsConvert::MSstatsLogsSettings(use_log_file, append, verbose,
log_file_path,
Expand Down Expand Up @@ -108,10 +112,10 @@ dataProcess = function(
processed = getProcessed(input)
input = MSstatsPrepareForSummarization(input, summaryMethod, MBimpute, censoredInt,
remove_uninformative_feature_outlier)
input_split = split(input, input$PROTEIN)
summarized = tryCatch(MSstatsSummarize(input_split, summaryMethod,
summarized = tryCatch(MSstatsSummarizeWithMultipleCores(input, summaryMethod,
MBimpute, censoredInt,
remove50missing, equalFeatureVar),
remove50missing, equalFeatureVar,
numberOfCores),
error = function(e) {
print(e)
NULL
Expand All @@ -125,6 +129,79 @@ dataProcess = function(
output
}

#' Feature-level data summarization with multiple cores
#'
#' @param input feature-level data processed by dataProcess subfunctions
#' @param method summarization method: "linear" or "TMP"
#' @param equal_variance only for summaryMethod = "linear". Default is TRUE.
#' Logical variable for whether the model should account for heterogeneous variation
#' among intensities from different features. Default is TRUE, which assume equal
#' variance among intensities from features. FALSE means that we cannot assume
#' equal variance among intensities from features, then we will account for
#' heterogeneous variation from different features.
#' @param censored_symbol Missing values are censored or at random.
#' 'NA' (default) assumes that all 'NA's in 'Intensity' column are censored.
#' '0' uses zero intensities as censored intensity.
#' In this case, NA intensities are missing at random.
#' The output from Skyline should use '0'.
#' Null assumes that all NA intensites are randomly missing.
#' @param remove50missing only for summaryMethod = "TMP". TRUE removes the runs
#' which have more than 50\% missing values. FALSE is default.
#' @param impute only for summaryMethod = "TMP" and censoredInt = 'NA' or '0'.
#' TRUE (default) imputes 'NA' or '0' (depending on censoredInt option) by Accelated failure model.
#' FALSE uses the values assigned by cutoffCensored
#' @param numberOfCores Number of cores for parallel processing. When > 1,
#' a logfile named `MSstats_dataProcess_log_progress.log` is created to
#' track progress. Only works for Linux & Mac OS. Default is 1.
#'
#' @importFrom parallel makeCluster parLapply stopCluster clusterExport
#'
#' @return list of length one with run-level data.
#'
MSstatsSummarizeWithMultipleCores = function(input, method, impute, censored_symbol,
remove50missing, equal_variance, numberOfCores = 1) {
if (numberOfCores > 1) {
protein_indices = split(seq_len(nrow(input)), list(input$PROTEIN))
num_proteins = length(protein_indices)
function_environment = environment()
cl = parallel::makeCluster(numberOfCores)
parallel::clusterExport(cl, c("MSstatsSummarizeSingleTMP",
"MSstatsSummarizeSingleLinear",
"input", "impute", "censored_symbol",
"remove50missing", "protein_indices",
"equal_variance"),
envir = function_environment)
cat(paste0("Number of proteins to process: ", num_proteins),
sep = "\n", file = "MSstats_dataProcess_log_progress.log")
if (method == "TMP") {
summarized_results = parallel::parLapply(cl, seq_len(num_proteins), function(i) {
if (i %% 100 == 0) {
cat("Finished processing an additional 100 proteins",
sep = "\n", file = "MSstats_dataProcess_log_progress.log", append = TRUE)
}
single_protein = input[protein_indices[[i]],]
MSstatsSummarizeSingleTMP(
single_protein, impute, censored_symbol, remove50missing)
})
} else {
summarized_results = parallel::parLapply(cl, seq_len(num_proteins), function(i) {
if (i %% 100 == 0) {
cat("Finished processing an additional 100 proteins",
sep = "\n", file = "MSstats_dataProcess_log_progress.log", append = TRUE)
}
single_protein = input[protein_indices[[i]],]
MSstatsSummarizeSingleLinear(single_protein, equal_variance)
})
}
parallel::stopCluster(cl)
return(summarized_results)
} else {
input_split = split(input, input$PROTEIN)
return(MSstatsSummarize(input_split, method, impute, censored_symbol,
remove50missing, equal_variance))
}
}


#' Feature-level data summarization
#'
Expand Down Expand Up @@ -175,7 +252,6 @@ dataProcess = function(
#'
MSstatsSummarize = function(proteins_list, method, impute, censored_symbol,
tonywu1999 marked this conversation as resolved.
Show resolved Hide resolved
remove50missing, equal_variance) {

num_proteins = length(proteins_list)
summarized_results = vector("list", num_proteins)
if (method == "TMP") {
Expand Down
6 changes: 2 additions & 4 deletions R/groupComparison.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#' @param log_base base of the logarithm used in dataProcess.
#' @param numberOfCores Number of cores for parallel processing. When > 1,
#' a logfile named `MSstats_groupComparison_log_progress.log` is created to
#' track progress. Only works for Linux & Mac OS.
#' track progress. Only works for Linux & Mac OS. Default is 1.
#' @inheritParams .documentFunction
#'
#' @details
Expand Down Expand Up @@ -115,9 +115,7 @@ MSstatsPrepareForGroupComparison = function(summarization_output) {
#' @param numberOfCores Number of cores for parallel processing. When > 1,
#' a logfile named `MSstats_groupComparison_log_progress.log` is created to
#' track progress. Only works for Linux & Mac OS.
#'
#' @importFrom utils txtProgressBar setTxtProgressBar
#' @importFrom parallel makeCluster clusterExport parLapply stopCluster
#'
#'
#' @export
#'
Expand Down
2 changes: 2 additions & 0 deletions R/utils_groupcomparison.R
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ getSamplesInfo = function(summarization_output) {
#' @param numberOfCores Number of cores for parallel processing.
#' A logfile named `MSstats_groupComparison_log_progress.log` is created to
#' track progress. Only works for Linux & Mac OS.
#' @importFrom parallel makeCluster clusterExport parLapply stopCluster
#' @keywords internal
.groupComparisonWithMultipleCores = function(summarized_list, contrast_matrix,
save_fitted_models, repeated, samples_info,
Expand Down Expand Up @@ -483,6 +484,7 @@ getSamplesInfo = function(summarization_output) {
#' @param save_fitted_models if TRUE, fitted models will be included in the output
#' @param repeated logical, output of checkRepeatedDesign function
#' @param samples_info data.table, output of getSamplesInfo function
#' @importFrom utils txtProgressBar setTxtProgressBar
#' @keywords internal
.groupComparisonWithSingleCore = function(summarized_list, contrast_matrix,
save_fitted_models, repeated,
Expand Down
16 changes: 16 additions & 0 deletions inst/tinytest/test_dataProcess.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Test dataProcess with default parameters ------------------------------------
QuantDataDefault = dataProcess(SRMRawData, use_log_file = FALSE)
QuantDataDefaultLinear = dataProcess(DDARawData, use_log_file = FALSE,
summaryMethod = "linear")

# Test dataProcess with numberOfCores parameter ----------------------
QuantDataParallel = dataProcess(SRMRawData, use_log_file = FALSE,
numberOfCores = 2)
QuantDataParallelLinear = dataProcess(DDARawData, use_log_file = FALSE,
summaryMethod = "linear", numberOfCores = 2)

expect_equal(nrow(QuantDataDefault$FeatureLevelData),
nrow(QuantDataParallel$FeatureLevelData))

expect_equal(nrow(QuantDataDefaultLinear$FeatureLevelData),
nrow(QuantDataParallelLinear$FeatureLevelData))
52 changes: 52 additions & 0 deletions man/MSstatsSummarizeV2.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion man/dataProcess.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/groupComparison.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading