diff --git a/watchdog_pipeline/watchdog_pipeline.py b/watchdog_pipeline/watchdog_pipeline.py index d580aa8e..598380f0 100644 --- a/watchdog_pipeline/watchdog_pipeline.py +++ b/watchdog_pipeline/watchdog_pipeline.py @@ -31,32 +31,39 @@ # Set the path you want to watch -main_path_to_watch = sys.argv[1] -dry_run = sys.argv[2] -report_only = sys.argv[3] -panoptes = sys.argv[4] +# main_path_to_watch = sys.argv[1] + +paths_to_watch = [ + # "/g/korbel/shared/data/others/StrandSeq/runs", + "/g/korbel/shared/genecore", + # "/g/korbel/STOCKS/Data/Assay/sequencing", +] + +dry_run = sys.argv[1] +report_only = sys.argv[2] +panoptes = sys.argv[3] data_location = "/scratch/tweber/DATA/MC_DATA/STOCKS" # publishdir_location = "/g/korbel/weber/TMP/WORKFLOW_RESULTS_DEV" publishdir_location = "/g/korbel/WORKFLOW_RESULTS" -genecore_prefix = main_path_to_watch +genecore_prefix = paths_to_watch[0] # profile_slurm = [ # "--profile", # "/g/korbel2/weber/workspace/snakemake_profiles/HPC/dev/slurm_legacy_conda/", # ] -profile_slurm = [ - "--profile", - "/g/korbel2/weber/workspace/snakemake_profiles/local/conda_singularity/", - "--cores", - "64", - "--singularity-args", - '"-B /scratch,/g"', -] # profile_slurm = [ # "--profile", -# "/g/korbel2/weber/workspace/snakemake_profiles/HPC/dev/slurm_EMBL/", +# "/g/korbel2/weber/workspace/snakemake_profiles/local/conda_singularity/", +# "--cores", +# "64", +# "--singularity-args", +# '"-B /scratch,/g"', # ] +profile_slurm = [ + "--profile", + "/g/korbel2/weber/workspace/mosaicatcher-update/workflow/snakemake_profiles/HPC/dev/slurm_EMBL/", +] profile_dry_run = [ "--profile", "workflow/snakemake_profiles/local/conda/", @@ -70,7 +77,7 @@ "/g/korbel2/weber/miniconda3/envs/snakemake_panoptesfix/bin/snakemake" ) # Panoptes -pipeline = sys.argv[5] +pipeline = sys.argv[4] assert pipeline in [ "ashleys-qc-pipeline", "mosaicatcher-pipeline", @@ -102,6 +109,7 @@ def generate_data_file(directory): unwanted = ["._.DS_Store", ".DS_Store", "config"] total_list_runs = sorted([e for e in os.listdir(directory) if e not in unwanted]) + print(total_list_runs) l_df = list() @@ -175,12 +183,12 @@ def extract_samples_names(self, l, directory_path): file_count = file_counts_per_sample[sample_name] # Determine plate type using modulo 96 operation - if file_count % 96 != 0: - # raise ValueError( - print( - f"Invalid file count for sample {sample_name} with file count {file_count}. Must be a multiple of 96." - ) - continue + # if file_count % 96 != 0: + # # raise ValueError( + # print( + # f"Invalid file count for sample {sample_name} with file count {file_count}. Must be a multiple of 96." + # ) + # continue plate_type = int(file_count / 2) if (j + 1) % file_count == 0: @@ -342,6 +350,7 @@ def process_sample( last_message_timestamp, prefixes, plate_type, + path_to_watch, ): run_id = f"{pipeline}--{plate}--{sample_name}" workflow_id = self.find_workflow_id_by_name(workflows_data, run_id) @@ -455,13 +464,15 @@ def process_sample( "completed_at": workflow_id["completed_at"], "jobs_done": workflow_id["jobs_done"], "jobs_total": workflow_id["jobs_total"], + "run_path": f"{path_to_watch}/{plate}", } + print(tmp_d) return tmp_d # Main function to process directories def process_directories( self, - main_path_to_watch, + paths_to_watch, excluded_samples, pipeline, data_location, @@ -474,37 +485,56 @@ def process_directories( main_df = [] if len(workflows_data) > 0: - for year in os.listdir(main_path_to_watch): - if year.startswith("20"): # Assuming only years are relevant - path_to_watch = f"{main_path_to_watch}/{year}" - total_list_runs = sorted( - [e for e in os.listdir(path_to_watch) if e not in unwanted] - ) - for plate in total_list_runs: - if plate.split("-")[0][:2] == "20": - directory_path = f"{path_to_watch}/{plate}" - prefixes, samples, plate_types = self.extract_samples_names( - glob.glob(f"{directory_path}/*.txt.gz"), - directory_path, - ) - if len(set(prefixes)) == 1: - for sample_name, plate_type in zip( - samples, plate_types + for path_to_watch in paths_to_watch: + total_list_runs = [] + + if path_to_watch == "/g/korbel/STOCKS/Data/Assay/sequencing": + for year in os.listdir(path_to_watch): + if year.startswith( + "20" + ): # Assuming directories starting with "20" are years + year_path = os.path.join(path_to_watch, year) + for folder in os.listdir(year_path): + folder_path = os.path.join(year_path, folder) + if ( + os.path.isdir(folder_path) + and folder not in unwanted ): - if sample_name not in excluded_samples: - result = self.process_sample( - sample_name, - plate, - pipeline, - data_location, - publishdir_location, - variable, - workflows_data, - last_message_timestamp, - prefixes, - plate_type, - ) - main_df.append(result) + total_list_runs.append(folder_path) + else: + total_list_runs = [ + os.path.join(path_to_watch, e) + for e in os.listdir(path_to_watch) + if e not in unwanted + and os.path.isdir(os.path.join(path_to_watch, e)) + ] + + print(total_list_runs) + + for directory_path in total_list_runs: + print(directory_path) + prefixes, samples, plate_types = self.extract_samples_names( + glob.glob(f"{directory_path}/*.txt.gz"), + directory_path, + ) + + if len(set(prefixes)) == 1: + for sample_name, plate_type in zip(samples, plate_types): + if sample_name not in excluded_samples: + result = self.process_sample( + sample_name, + os.path.basename(directory_path), + pipeline, + data_location, + publishdir_location, + variable, + workflows_data, + last_message_timestamp, + prefixes, + plate_type, + path_to_watch, + ) + main_df.append(result) return pd.DataFrame(main_df) def check_unprocessed_folder(self): @@ -542,7 +572,7 @@ def check_unprocessed_folder(self): excluded_samples = config["excluded_samples"] main_df = self.process_directories( - main_path_to_watch, + paths_to_watch, excluded_samples, pipeline, data_location, @@ -556,6 +586,8 @@ def check_unprocessed_folder(self): pd.options.display.max_colwidth = 30 main_df = pd.DataFrame(main_df) + # print(main_df) + # exit() # main_df.loc[(main_df["labels"] == True) & (main_df["report"] == True), "real_status"] = "Completed" main_df.loc[ (main_df["final_output_scratch"] == True) & (main_df["report"] == False), @@ -597,10 +629,11 @@ def check_unprocessed_folder(self): print("\n") print(main_df) + print(main_df.run_path.values[0]) test_json = main_df.to_json(orient="records", date_format="iso") - print(test_json) - print(pd.read_json(test_json, orient="records")) - exit() + # print(test_json) + # print(pd.read_json(test_json, orient="records")) + # exit() # pipeline_final_file_variable = ( # "ashleys_final_scratch" @@ -613,77 +646,77 @@ def check_unprocessed_folder(self): if dry_run_db is False: cursor = connection.cursor() - assert ( - main_df.loc[ - (main_df["final_output_scratch"] == False) - & (main_df["report"] == True) - ].shape[0] - == 0 - ), "Error in table, samples have report done without the completion of the pipeline" - - logging.info( - "Correcting status of plates with report.zip and final_output_scratch" - ) - - for row in main_df.loc[ - (main_df["final_output_scratch"] == True) - & (main_df["report"] == True) - & (main_df["status"] != "Done") - ].to_dict("records"): - logging.info(row) - panoptes_entry = f"{pipeline}--{row['plate']}--{row['sample']}" - workflow_id = row["panoptes_id"] - - # if workflow_id != "None": - # command = f'sqlite3 /g/korbel2/weber/workspace/strandscape/.panoptes.db "DELETE FROM workflows WHERE id={workflow_id};"' - # subprocess.run(command, shell=True, check=True) - - panoptes_data = [ - e for e in workflows_data["workflows"] if e["id"] == workflow_id - ] - - print(panoptes_entry) - print(panoptes_data) - print(row) - if workflow_id and workflow_id != "None": - assert ( - len(panoptes_data) > 0 - ), f"Data issue between pika & panoptes, {str(panoptes_data)}" - - if panoptes_data: - panoptes_data = panoptes_data[0] - if "completed_at" not in panoptes_data: - panoptes_data["completed_at"] = last_message_timestamp + # assert ( + # main_df.loc[ + # (main_df["final_output_scratch"] == False) + # & (main_df["report"] == True) + # ].shape[0] + # == 0 + # ), "Error in table, samples have report done without the completion of the pipeline" - command = f'sqlite3 /g/korbel2/weber/workspace/strandscape/.panoptes.db "DELETE FROM workflows WHERE id={workflow_id};"' - subprocess.run(command, shell=True, check=True) + # logging.info( + # "Correcting status of plates with report.zip and final_output_scratch" + # ) - else: - logging.info("Panoptes data not found for workflow entry: %s", row) - panoptes_data = { - "started_at": last_message_timestamp, - "completed_at": last_message_timestamp, - "jobs_done": "1", - "jobs_total": "1", - } - - print(row) - - cursor.execute( - """ - INSERT INTO workflows (name, status, done, total, started_at, completed_at) - VALUES (?, ?, ?, ?, ?, ?) - """, - ( - panoptes_entry, - "Done", - panoptes_data["jobs_done"], - panoptes_data["jobs_total"], - panoptes_data["started_at"], - panoptes_data["completed_at"], - ), - ) - connection.commit() + # for row in main_df.loc[ + # (main_df["final_output_scratch"] == True) + # & (main_df["report"] == True) + # & (main_df["status"] != "Done") + # ].to_dict("records"): + # logging.info(row) + # panoptes_entry = f"{pipeline}--{row['plate']}--{row['sample']}" + # workflow_id = row["panoptes_id"] + + # # if workflow_id != "None": + # # command = f'sqlite3 /g/korbel2/weber/workspace/strandscape/.panoptes.db "DELETE FROM workflows WHERE id={workflow_id};"' + # # subprocess.run(command, shell=True, check=True) + + # panoptes_data = [ + # e for e in workflows_data["workflows"] if e["id"] == workflow_id + # ] + + # print(panoptes_entry) + # print(panoptes_data) + # print(row) + # if workflow_id and workflow_id != "None": + # assert ( + # len(panoptes_data) > 0 + # ), f"Data issue between pika & panoptes, {str(panoptes_data)}" + + # if panoptes_data: + # panoptes_data = panoptes_data[0] + # if "completed_at" not in panoptes_data: + # panoptes_data["completed_at"] = last_message_timestamp + + # command = f'sqlite3 /g/korbel2/weber/workspace/strandscape/.panoptes.db "DELETE FROM workflows WHERE id={workflow_id};"' + # subprocess.run(command, shell=True, check=True) + + # else: + # logging.info("Panoptes data not found for workflow entry: %s", row) + # panoptes_data = { + # "started_at": last_message_timestamp, + # "completed_at": last_message_timestamp, + # "jobs_done": "1", + # "jobs_total": "1", + # } + + # print(row) + + # cursor.execute( + # """ + # INSERT INTO workflows (name, status, done, total, started_at, completed_at) + # VALUES (?, ?, ?, ?, ?, ?) + # """, + # ( + # panoptes_entry, + # "Done", + # panoptes_data["jobs_done"], + # panoptes_data["jobs_total"], + # panoptes_data["started_at"], + # panoptes_data["completed_at"], + # ), + # ) + # connection.commit() logging.info( "Processing plates without final_output_scratch or outdated without report.zip" @@ -710,7 +743,8 @@ def check_unprocessed_folder(self): subprocess.run(command, shell=True, check=True) self.process_new_directory( - "/".join([path_to_watch, row["plate"]]), + row["run_path"], + # "/".join([path_to_watch, row["plate"]]), row["prefix"], row["sample"], row["plate_type"], @@ -906,6 +940,7 @@ def execute_command( "--rerun-incomplete", "--rerun-triggers", "mtime", + # "--touch", ] if cell: @@ -1095,7 +1130,7 @@ def main(): observer = Observer() # Assign the observer to the path and the event handler - observer.schedule(event_handler, main_path_to_watch, recursive=False) + observer.schedule(event_handler, paths_to_watch[0], recursive=False) # Start the observer observer.start() diff --git a/workflow/Snakefile b/workflow/Snakefile index 652acaaa..4262bd44 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -19,12 +19,12 @@ if config["ashleys_pipeline"] is True: module ashleys_qc: snakefile: - # "../../ashleys-qc-pipeline/workflow/Snakefile" - github( - "friendsofstrandseq/ashleys-qc-pipeline", - path="workflow/Snakefile", - tag=str(config["ashleys_pipeline_version"]), - ) + "../../ashleys-qc-pipeline/workflow/Snakefile" + # github( + # "friendsofstrandseq/ashleys-qc-pipeline", + # path="workflow/Snakefile", + # tag=str(config["ashleys_pipeline_version"]), + # ) config: config diff --git a/workflow/rules/common.smk b/workflow/rules/common.smk index 8802f433..d6bc5c6b 100644 --- a/workflow/rules/common.smk +++ b/workflow/rules/common.smk @@ -124,8 +124,6 @@ if config["scNOVA"] is True: ), "chrY is not handled by scNOVA yet, please remove it for config['chromosomes'] and add it in config['chomosomes_to_exclude']" - - if config["strandscape_labels_path"]: folder_location = config["abs_path"].join( config["strandscape_labels_path"].split("/")[:-1] @@ -732,6 +730,15 @@ def get_all_plots(wildcards): ] l_outputs.extend([sub_e for e in tmp_l_divide_count_plots for sub_e in e]) + # CONDA ENVS + + conda_envs = ["mc_base", "rtools", "mc_bioinfo_tools"] + conda_envs += ( + ["scNOVA_bioinfo_tools", "scNOVA_base", "scNOVA_rtools"] + if config["scNOVA"] is True + else [] + ) + if config["split_qc_plot"] is True: # print("OK") @@ -759,6 +766,25 @@ def get_all_plots(wildcards): ) ) + # Config section + + l_outputs.extend( + expand( + "{folder}/{sample}/config/config.yaml", + folder=config["data_location"], + sample=wildcards.sample, + ), + ) + + l_outputs.extend( + expand( + "{folder}/{sample}/config/{conda}.yaml", + folder=config["data_location"], + sample=wildcards.sample, + conda=conda_envs, + ), + ) + else: # SV_consistency section @@ -901,6 +927,23 @@ def get_all_plots(wildcards): ), ) + l_outputs.extend( + expand( + "{folder}/{sample}/config/conda_export/{conda}.yaml", + folder=config["data_location"], + sample=wildcards.sample, + conda=conda_envs, + ), + ) + + if config["publishdir"] != "": + l_outputs.extend( + expand( + "{folder}/{sample}/config/publishdir_outputs_mc.ok", + folder=config["data_location"], + sample=wildcards.sample, + ) + ) # Run summary section # l_outputs.extend( @@ -914,3 +957,84 @@ def get_all_plots(wildcards): # from pprint import pprint # pprint(l_outputs) return l_outputs + + +def publishdir_fct_mc(wildcards): + """ + Restricted for ASHLEYS at the moment + Backup files on a secondary location + """ + list_files_to_copy = [ + # ASHLEYS + "{folder}/{sample}/cell_selection/labels_raw.tsv", + "{folder}/{sample}/cell_selection/labels.tsv", + "{folder}/{sample}/counts/{sample}.info_raw", + "{folder}/{sample}/counts/{sample}.txt.raw.gz", + # "{folder}/{sample}/config/config.yaml", + # MC + "{folder}/{sample}/config/config.yaml", + ] + + list_files_to_copy += [ + e for e in get_all_plots(wildcards) if "publishdir_outputs_mc.ok" not in e + ] + + final_list = [ + expand(e, folder=config["data_location"], sample=wildcards.sample) + for e in list_files_to_copy + ] + final_list = [sub_e for e in final_list for sub_e in e] + final_list.extend( + expand( + "{folder}/{sample}/plots/counts/CountComplete.{plottype_counts}.pdf", + folder=config["data_location"], + sample=wildcards.sample, + plottype_counts=plottype_counts, + ) + ) + + if config["use_light_data"] is False: + final_list.extend( + expand( + "{folder}/{sample}/plots/plate/ashleys_plate_{plate_plot}.pdf", + folder=config["data_location"], + sample=wildcards.sample, + plate_plot=["predictions", "probabilities"], + ) + ) + final_list.extend( + expand( + "{folder}/{sample}/cell_selection/labels_positive_control_corrected.tsv", + folder=config["data_location"], + sample=wildcards.sample, + ) + ) + final_list.extend( + expand( + "{folder}/{sample}/config/bypass_cell.txt", + folder=config["data_location"], + sample=wildcards.sample, + ) + ) + + # folders_to_keep = [ + # "plots", + # "snv_calls", + # "segmentation", + # "haplotag", + # "strandphaser", + # "ploidy", + # "stats", + # "mosaiclassifier", + # ] + + # final_list += expand( + # "{folder}/{sample}/{folder_to_keep}/", + # folder=config["data_location"], + # sample=wildcards.sample, + # folder_to_keep=folders_to_keep, + # ) + + # print(final_list) + + return final_list diff --git a/workflow/rules/utils.smk b/workflow/rules/utils.smk index 4eaf8464..5eeff593 100644 --- a/workflow/rules/utils.smk +++ b/workflow/rules/utils.smk @@ -11,8 +11,8 @@ rule check_sm_tag: """ sample_name="{wildcards.sample}" sm_tag=$(samtools view -H {input} | grep '^@RG' | sed "s/.*SM:\([^\\t]*\).*/\\1/g") - - if [[ $sample_name == $sm_tag ]]; then + + if [[ $sample_name == $sm_tag ]]; then echo "{input}: $sm_tag $sample_name OK" > {output} echo "{input}: $sm_tag $sample_name OK" > {log} else @@ -141,6 +141,9 @@ rule samtools_faindex: "samtools faidx {input}" +######################################## + + rule save_config: input: "config/config.yaml", @@ -154,3 +157,74 @@ rule save_config: mem_mb=get_mem_mb, script: "../scripts/utils/dump_config.py" + + +rule save_conda_versions_mc_base: + output: + "{folder}/{sample}/config/conda_export/mc_base.yaml", + conda: + "../envs/mc_base.yaml" + shell: + "conda env export > {output}" + + +rule save_conda_versions_mc_bioinfo_tools: + output: + "{folder}/{sample}/config/conda_export/mc_bioinfo_tools.yaml", + conda: + "../envs/mc_bioinfo_tools.yaml" + shell: + "conda env export > {output}" + + +rule save_conda_versions_rtools: + output: + "{folder}/{sample}/config/conda_export/rtools.yaml", + conda: + "../envs/rtools.yaml" + shell: + "conda env export > {output}" + + +if config["scNOVA"] == True: + + rule save_conda_versions_scNOVA_bioinfo_tools: + output: + "{folder}/{sample}/config/conda_export/scNOVA.yaml", + conda: + "../envs/scNOVA/scNOVA_bioinfo_tools.yaml" + shell: + "conda env export > {output}" + + rule save_conda_versions_scNOVA_DL: + output: + "{folder}/{sample}/config/conda_export/scNOVA_DL.yaml", + conda: + "../envs/scNOVA/scNOVA_DL.yaml" + shell: + "conda env export > {output}" + + rule save_conda_versions_scNOVA_R: + output: + "{folder}/{sample}/config/conda_export/scNOVA_R.yaml", + conda: + "../envs/scNOVA/scNOVA_R.yaml" + shell: + "conda env export > {output}" + + +# PUBLISHDIR + +if config["publishdir"] != "": + + rule publishdir_outputs_mc: + input: + list_publishdir=publishdir_fct_mc, + output: + touch("{folder}/{sample}/config/publishdir_outputs_mc.ok"), + log: + "{folder}/log/publishdir_outputs/{sample}.log", + conda: + "../envs/mc_base.yaml" + script: + "../scripts/utils/publishdir.py" diff --git a/workflow/scripts/utils/publishdir.py b/workflow/scripts/utils/publishdir.py new file mode 100644 index 00000000..58dd36ad --- /dev/null +++ b/workflow/scripts/utils/publishdir.py @@ -0,0 +1,46 @@ +import subprocess +import os + + +def run_command(command): + process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE) + while True: + output = process.stdout.readline() + if not output and process.poll() is not None: + break + if output: + print(output.strip()) + rc = process.poll() + return rc + + +data_location = snakemake.config["data_location"] +publishdir = snakemake.config["publishdir"] +run = snakemake.wildcards.folder.split("/")[-1] +print(snakemake.wildcards.folder) +print(run) + +# Create base directory to maintain the structure +os.makedirs(f"{publishdir}/{run}", exist_ok=True) + +for item in list(snakemake.input.list_publishdir): + print(item) + # Replace the base path with the destination path + destination_path = item.replace(data_location, f"{publishdir}/{run}") + + if os.path.isdir(item): + # Ensure the destination directory exists + os.makedirs(destination_path, exist_ok=True) + # Copy the entire directory recursively + rsync_command = ( + f"rsync --ignore-existing -avzh --progress {item}/ {destination_path}/" + ) + else: + # Copy the file (including parent directory structure) + os.makedirs(os.path.dirname(destination_path), exist_ok=True) + rsync_command = ( + f"rsync --ignore-existing -avzh --progress {item} {destination_path}" + ) + + print(rsync_command) + run_command(rsync_command) diff --git a/workflow/snakemake_profiles b/workflow/snakemake_profiles index a9afa0e1..406538fe 160000 --- a/workflow/snakemake_profiles +++ b/workflow/snakemake_profiles @@ -1 +1 @@ -Subproject commit a9afa0e1fb167b068ec5b597bfefe90c5625dc4a +Subproject commit 406538fe7174de8478a23f9f93e32e9b6e99f1fc