Skip to content

Commit

Permalink
Merge pull request #49 from MannLabs/add_parquet_compatibility
Browse files Browse the repository at this point in the history
Extend file compatibility
  • Loading branch information
ammarcsj authored Oct 16, 2024
2 parents f2200ed + caafcfc commit d5183b2
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 281 deletions.
290 changes: 12 additions & 278 deletions directlfq/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@


import os
import pathlib
if "__file__" in globals():#only run in the translated python file, as __file__ is not defined with ipython
Expand All @@ -8,11 +7,11 @@

import logging
import directlfq.config as config
import directlfq.utils_fileread as utils_fileread

#config.setup_logging()
LOGGER = logging.getLogger(__name__)

# %% ../nbdev_nbs/04_utils.ipynb 5

def get_samples_used_from_samplemap_file(samplemap_file, cond1, cond2):
samplemap_df = load_samplemap(samplemap_file)
return get_samples_used_from_samplemap_df(samplemap_df, cond1, cond2)
Expand Down Expand Up @@ -109,16 +108,6 @@ def invert_dictionary(my_map):
inv_map[v] = inv_map.get(v, []) + [k]
return inv_map

# %% ../nbdev_nbs/04_utils.ipynb 17
import statistics

def get_z_from_p_empirical(p_emp,p2z):
p_rounded = np.format_float_scientific(p_emp, 1)
if p_rounded in p2z:
return p2z.get(p_rounded)
z = statistics.NormalDist().inv_cdf(float(p_rounded))
p2z[p_rounded] = z
return z

# %% ../nbdev_nbs/04_utils.ipynb 18
def count_fraction_outliers_from_expected_fc(result_df, threshold, expected_log2fc):
Expand Down Expand Up @@ -261,7 +250,7 @@ def get_standard_columns_for_input_type(input_type):
return []

def filter_columns_to_existing_columns(columns, input_file):
existing_columns = pd.read_csv(input_file, sep='\t', nrows=1).columns
existing_columns = utils_fileread.read_columns_from_file(input_file)
return [x for x in columns if x in existing_columns]


Expand Down Expand Up @@ -586,7 +575,7 @@ def reformat_and_write_longtable_according_to_config(input_file, outfile_name, c
os.remove(outfile_name)

relevant_cols = get_relevant_columns_config_dict(config_dict_for_type)
input_df_it = pd.read_csv(input_file, sep = sep, decimal=decimal, usecols = relevant_cols, encoding ='latin1', chunksize = chunksize)
input_df_it = utils_fileread.read_file_with_pandas(input_file=input_file, sep=sep, decimal=decimal, usecols=relevant_cols, chunksize=chunksize)
input_df_list = []
header = True
for input_df_subset in input_df_it:
Expand Down Expand Up @@ -745,7 +734,7 @@ def reformat_and_write_wideformat_table(peptides_tsv, outfile_name, config_dict)

input_df.to_csv(outfile_name, sep = '\t', index = None)

# %% ../nbdev_nbs/04_utils.ipynb 42

import os
def check_for_processed_runs_in_results_folder(results_folder):
contained_condpairs = []
Expand All @@ -757,7 +746,10 @@ def check_for_processed_runs_in_results_folder(results_folder):
contained_condpairs.append(res_name)
return contained_condpairs

# %% ../nbdev_nbs/04_utils.ipynb 44




import pandas as pd
import os
import pathlib
Expand Down Expand Up @@ -843,13 +835,11 @@ def get_input_type_and_config_dict(input_file, input_type_to_use = None):
sep='\t'
if '.txt' in filename:
sep='\t'

if 'sep' not in locals():
raise TypeError(f"neither of the file extensions (.tsv, .csv, .txt) detected for file {input_file}! Your filename has to contain one of these extensions. Please modify your file name accordingly.")

else:
sep="\t"


uploaded_data_columns = set(pd.read_csv(input_file, sep=sep, nrows=1, encoding ='latin1').columns)
uploaded_data_columns = utils_fileread.read_columns_from_file(input_file, sep=sep)

for input_type in type2relevant_columns.keys():
if (input_type_to_use is not None) and (input_type!=input_type_to_use):
Expand Down Expand Up @@ -940,259 +930,3 @@ def __initialize_df_iterator__(self):
def __write_reformatted_df_to_file__(reformatted_df, filepath ,write_header):
reformatted_df.to_csv(filepath, header=write_header, mode='a', sep = "\t", index = None)

# %% ../nbdev_nbs/04_utils.ipynb 51
import os
import re

class AcquisitionTableHandler():
def __init__(self, results_dir, samples):
self._table_infos = AcquisitionTableInfo(results_dir=results_dir)
self._header_infos = AcquisitionTableHeaders(self._table_infos)
self._samples = self.__reformat_samples_if_necessary(samples)

def get_acquisition_info_df(self):
return self.__get_reformated_df__()

def save_dataframe_as_new_acquisition_dataframe(self):
self._output_paths = AcquisitionTableOutputPaths(self._table_infos)
self.__remove_possible_pre_existing_ml_table__(self._output_paths.output_file_name)
df_reformater = AcquisitionTableReformater(table_infos = self._table_infos, header_infos=self._header_infos, samples = self._samples, dataframe_already_preformated=False)
df_reformater.reformat_and_save_acquisition_data_frame(self._output_paths.output_file_name)

def update_ml_file_location_in_method_parameters_yaml(self):
method_params = load_method_parameters(self._table_infos._results_dir)
if self._output_paths == None:
raise Exception("output paths not initialized! This could be because no dataframe was saved before")
method_params[self._output_paths.ml_file_accession_in_yaml] = self._output_paths.output_file_name
save_dict_as_yaml(method_params, self._output_paths.method_parameters_yaml_path)

def __get_reformated_df__(self):
df_reformater = AcquisitionTableReformater(table_infos = self._table_infos, header_infos=self._header_infos, samples = self._samples, dataframe_already_preformated=True)
df = df_reformater.reformat_and_load_acquisition_data_frame()
return df.convert_dtypes()

def __reformat_samples_if_necessary(self, samples):
if "plexDIA" in self._table_infos._input_type:
return self.__get_plexDIA_samplenames__(samples)
else:
return samples

def __get_plexDIA_samplenames__(self, samples):
new_samples = []
for sample in samples:
new_samples.append(self.__get_samplename_without_mtraq_tag__(sample))
return new_samples

@staticmethod
def __get_samplename_without_mtraq_tag__(samplename):
pattern = "(.*)(_\(mTRAQ-n-.\))"
matched = re.match(pattern, samplename)
return matched.group(1)

@staticmethod
def __remove_possible_pre_existing_ml_table__(output_file_name):
if os.path.exists(output_file_name):
os.remove(output_file_name)
LOGGER.info(f"removed pre existing {output_file_name}")


class AcquisitionTableInfo():
def __init__(self, results_dir, sep = "\t", decimal = "."):
self._results_dir = results_dir
self._sep = sep
self._decimal = decimal
self._method_params_dict = load_method_parameters(results_dir)
self._input_file = self.__get_input_file__()
self._file_ending_of_formatted_table = ".ml_info_table.tsv"
self.already_formatted = self.__check_if_input_file_is_already_formatted__()
self._input_type, self._config_dict = self.__get_input_type_and_config_dict__()
self._sample_column = self.__get_sample_column__()
self.last_ion_level_to_use = self.__get_last_ion_level_to_use__()

def __get_input_file__(self):
if self._method_params_dict.get('ml_input_file') is None:
return self.__get_location_of_original_file__()
else:
return self._method_params_dict.get('ml_input_file')

def __check_if_input_file_is_already_formatted__(self):
if self._file_ending_of_formatted_table in self._input_file:
return True
else:
return False

def __get_input_type_and_config_dict__(self):
if self.already_formatted:
original_file = self.__get_location_of_original_file__()
else:
original_file = self._input_file
input_type, config_dict, _ = get_input_type_and_config_dict(original_file)
return input_type, config_dict

def __get_location_of_original_file__(self):
input_file = self._method_params_dict.get('input_file')
return self.__get_original_filename_from_input_file__(input_file)

@staticmethod
def __get_original_filename_from_input_file__(input_file):
pattern = "(.*\.tsv|.*\.csv|.*\.txt)(\..*)(.aq_reformat.tsv)"
m = re.match(pattern=pattern, string=input_file)
if m:
return m.group(1)
else:
return input_file


def __get_sample_column__(self):
return self._config_dict.get("sample_ID")

def __get_last_ion_level_to_use__(self):
return self._config_dict["ml_level"]





class AcquisitionTableHeaders():
def __init__(self, acquisition_table_info):

self._table_info = acquisition_table_info

self._ion_hierarchy = self.__get_ordered_ion_hierarchy__()
self._included_levelnames = self.__get_included_levelnames__()
self._ion_headers_grouped = self.__get_ion_headers_grouped__()
self._ion_headers = self.__get_ion_headers__()
self._numeric_headers = self.__get_numeric_headers__()
self._relevant_headers = self.__get_relevant_headers__()

def __get_ordered_ion_hierarchy__(self):
ion_hierarchy = self._table_info._config_dict.get("ion_hierarchy")
hier_key = 'fragion' if 'fragion' in ion_hierarchy.keys() else list(ion_hierarchy.keys())[0]
ion_hierarchy_on_chosen_key = ion_hierarchy.get(hier_key)
return ion_hierarchy_on_chosen_key

def __get_included_levelnames__(self):
levelnames = self.__get_all_levelnames__(self._ion_hierarchy)
last_ionlevel_idx = levelnames.index(self._table_info.last_ion_level_to_use)
return levelnames[:last_ionlevel_idx+1]

@staticmethod
def __get_all_levelnames__(ion_hierarchy):
return ion_hierarchy.get('order')

def __get_ion_headers_grouped__(self):
mapping_dict = self.__get_levelname_mapping_dict(self._ion_hierarchy)
return [mapping_dict.get(x) for x in self._included_levelnames]#on each level there can be multiple names, so it is a list of lists

@staticmethod
def __get_levelname_mapping_dict(ion_hierarchy):
return ion_hierarchy.get('mapping')

def __get_ion_headers__(self):
return list(itertools.chain(*self._ion_headers_grouped))


def __get_relevant_headers__(self):
relevant_headers = self._numeric_headers+self._ion_headers + [self._table_info._sample_column]
return self.__remove_possible_none_values_from_list__(relevant_headers)

@staticmethod
def __remove_possible_none_values_from_list__(list):
return [x for x in list if x is not None]

def __get_numeric_headers__(self):
df_sample = pd.read_csv(self._table_info._input_file, sep = self._table_info._sep, decimal = self._table_info._decimal, encoding='latin1', nrows=3000) #sample 3000 rows from the df to assess the types of each row
df_sample = df_sample.replace({False: 0, True: 1})
numeric_headers = list(df_sample.select_dtypes(include=np.number).columns)
numeric_headers = AcquisitionTableHeaderFilter().filter_numeric_headers_if_specified(input_type = self._table_info._input_type, numeric_headers = numeric_headers)
return numeric_headers


class AcquisitionTableOutputPaths():
def __init__(self, table_info):
self._table_info = table_info
self.output_file_name = self.__get_output_file_name__()
self.method_parameters_yaml_path = self.__get_method_parameters_yaml_path__()
self.ml_file_accession_in_yaml = "ml_input_file"

def __get_output_file_name__(self):
old_file_name = self._table_info._input_file
new_file_name = old_file_name+self._table_info._file_ending_of_formatted_table
return new_file_name

def __get_method_parameters_yaml_path__(self):
return f"{self._table_info._results_dir}/aq_parameters.yaml"


class AcquisitionTableReformater(LongTableReformater):
def __init__(self, table_infos, header_infos, samples, dataframe_already_preformated = False):

LongTableReformater.__init__(self, table_infos._input_file)
self._table_infos = table_infos
self._header_infos = header_infos
self._samples = samples
self._dataframe_already_preformated = dataframe_already_preformated

#set the two functions that specify the explicit reformatting
self._reformatting_function = self.__reformatting_function__
self._iterator_function = self.__initialize_iterator_with_specified_columns__

def __reformatting_function__(self, input_df_subset):
input_df_subset = input_df_subset.drop_duplicates()
input_df_subset = self.__filter_reformated_df_if_necessary__(input_df_subset)
if not self._dataframe_already_preformated:
input_df_subset = add_merged_ionnames(input_df_subset, self._header_infos._included_levelnames, self._header_infos._ion_headers_grouped, None, None)
return input_df_subset

def __filter_reformated_df_if_necessary__(self, reformatted_df):
if 'spectronaut' in self._table_infos._input_type or 'diann' in self._table_infos._input_type:
return self.__filter_reformatted_dataframe_to_relevant_samples__(reformatted_df)
else:
return reformatted_df

def __filter_reformatted_dataframe_to_relevant_samples__(self, input_df_subset):
return input_df_subset[[x in self._samples for x in input_df_subset[self._table_infos._sample_column]]]

def __initialize_iterator_with_specified_columns__(self):
cols_to_use = self.__get_cols_to_use__()
return pd.read_csv(self._table_infos._input_file, sep = self._table_infos._sep, decimal=self._table_infos._decimal, usecols = cols_to_use, encoding ='latin1', chunksize=1000000)

def __get_cols_to_use__(self):
cols_to_use = self._header_infos._relevant_headers
if self._dataframe_already_preformated:
return cols_to_use+[config.QUANT_ID]
else:
return cols_to_use




class AcquisitionTableHeaderFilter():
def __init__(self):
self._spectronaut_header_filter = lambda x : (("EG." in x) | ("FG." in x)) and ("Global" not in x)
self._maxquant_header_filter = lambda x : ("Intensity" not in x) and ("Experiment" not in x)

def filter_numeric_headers_if_specified(self, input_type, numeric_headers):
if 'spectronaut' in input_type:
return [x for x in numeric_headers if self._spectronaut_header_filter(x)]
elif 'maxquant' in input_type:
return [x for x in numeric_headers if self._maxquant_header_filter(x)]
else:
return numeric_headers




# %% ../nbdev_nbs/04_utils.ipynb 52
def merge_acquisition_df_parameter_df(acquisition_df, parameter_df, groupby_merge_type = 'mean'):
"""acquisition df contains details on the acquisition, parameter df are the parameters derived from the tree
"""
merged_df = parameter_df.merge(acquisition_df, how = 'left', on = config.QUANT_ID)
if groupby_merge_type == 'mean':
merged_df = merged_df.groupby(config.QUANT_ID).mean().reset_index()
if groupby_merge_type == 'min':
merged_df = merged_df.groupby(config.QUANT_ID).min().reset_index()
if groupby_merge_type == 'max':
merged_df = merged_df.groupby(config.QUANT_ID).max().reset_index()
merged_df = merged_df.dropna(axis=1, how='all')
return merged_df
Loading

0 comments on commit d5183b2

Please sign in to comment.