From 1c21a17cd99d4ad67f6022817ac12fa3d3c6a334 Mon Sep 17 00:00:00 2001 From: MattWellie Date: Fri, 25 Oct 2024 15:29:31 +1000 Subject: [PATCH] Cache the Hash, but better --- cpg_workflows/targets.py | 35 +++++++++++++++++++++++++---------- cpg_workflows/utils.py | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 10 deletions(-) diff --git a/cpg_workflows/targets.py b/cpg_workflows/targets.py index d737db071..13b96ebcf 100644 --- a/cpg_workflows/targets.py +++ b/cpg_workflows/targets.py @@ -6,16 +6,15 @@ import logging from dataclasses import dataclass from enum import Enum -from functools import cache from typing import Optional import pandas as pd from cpg_utils import Path, to_path from cpg_utils.config import dataset_path, get_config, reference_path, web_url - -from .filetypes import AlignmentInput, BamPath, CramPath, FastqPairs, GvcfPath -from .metamist import Assay +from cpg_workflows.filetypes import AlignmentInput, BamPath, CramPath, FastqPairs, GvcfPath +from cpg_workflows.metamist import Assay +from cpg_workflows.utils import alignment_inputs_hash_mc, cohort_inputs_hash_by_id, dataset_inputs_hash_by_id class Target: @@ -41,17 +40,12 @@ def get_sequencing_group_ids(self, only_active: bool = True) -> list[str]: """ return [s.id for s in self.get_sequencing_groups(only_active=only_active)] - @cache def alignment_inputs_hash(self) -> str: """ Unique hash string of sample alignment inputs. Useful to decide whether the analysis on the target needs to be rerun. """ - s = ' '.join( - sorted(' '.join(str(s.alignment_input)) for s in self.get_sequencing_groups() if s.alignment_input), - ) - h = hashlib.sha256(s.encode()).hexdigest()[:38] - return f'{h}_{len(self.get_sequencing_group_ids())}' + raise NotImplementedError('This method should not be called directly') @property def target_id(self) -> str: @@ -122,6 +116,13 @@ def target_id(self) -> str: """Unique target ID""" return self.name + def alignment_inputs_hash(self) -> str: + """ + Unique hash string of sample alignment inputs. Useful to decide + whether the analysis on the target needs to be rerun. + """ + return alignment_inputs_hash_mc() + def get_cohorts(self, only_active: bool = True) -> list['Cohort']: """ Gets list of all cohorts. @@ -256,6 +257,13 @@ def target_id(self) -> str: """Unique target ID""" return self.name + def alignment_inputs_hash(self) -> str: + """ + Unique hash string of sample alignment inputs. Useful to decide + whether the analysis on the target needs to be rerun. + """ + return cohort_inputs_hash_by_id(self.name) + def write_ped_file(self, out_path: Path | None = None, use_participant_id: bool = False) -> Path: """ Create a PED file for all samples in the whole cohort @@ -406,6 +414,13 @@ def __repr__(self): def __str__(self): return f'{self.name} ({len(self.get_sequencing_groups())} sequencing groups)' + def alignment_inputs_hash(self) -> str: + """ + Unique hash string of sample alignment inputs. Useful to decide + whether the analysis on the target needs to be rerun. + """ + return dataset_inputs_hash_by_id(self.name) + def _seq_type_subdir(self) -> str: """ Subdirectory parametrised by sequencing type. For genomes, we don't diff --git a/cpg_workflows/utils.py b/cpg_workflows/utils.py index 6bdfcd100..a579d7942 100644 --- a/cpg_workflows/utils.py +++ b/cpg_workflows/utils.py @@ -2,6 +2,7 @@ Utility functions and constants. """ +import hashlib import logging import re import string @@ -20,10 +21,48 @@ from cpg_utils import Path, to_path from cpg_utils.config import config_retrieve, get_config +from cpg_workflows.workflow import get_multicohort LOGGER: logging.Logger | None = None +@lru_cache(1) +def alignment_inputs_hash_mc() -> str: + """ + Unique hash string of sample alignment inputs. Useful to decide + whether the analysis on the target needs to be rerun. + """ + all_sgs = get_multicohort().get_sequencing_groups() + s = ' '.join(sorted(' '.join(str(s.alignment_input)) for s in all_sgs if s.alignment_input)) + return f'{hashlib.sha256(s.encode()).hexdigest()[:38]}_{len(all_sgs)}' + + +@lru_cache +def cohort_inputs_hash_by_id(cohort_id: str) -> str: + """ + Unique hash string of sample alignment inputs. Useful to decide + whether the analysis on the target needs to be rerun. + """ + if (cohort := get_multicohort().get_cohort_by_name(cohort_id)) is None: + raise ValueError(f'Cohort {cohort_id} not found') + all_sgs = cohort.get_sequencing_groups() + s = ' '.join(sorted(' '.join(str(s.alignment_input)) for s in all_sgs if s.alignment_input)) + return f'{hashlib.sha256(s.encode()).hexdigest()[:38]}_{len(all_sgs)}' + + +@lru_cache +def dataset_inputs_hash_by_id(dataset_id: str) -> str: + """ + Unique hash string of sample alignment inputs. Useful to decide + whether the analysis on the target needs to be rerun. + """ + if (dataset := get_multicohort().get_dataset_by_name(dataset_id)) is None: + raise ValueError(f'Dataset {dataset_id} not found') + all_sgs = dataset.get_sequencing_groups() + s = ' '.join(sorted(' '.join(str(s.alignment_input)) for s in all_sgs if s.alignment_input)) + return f'{hashlib.sha256(s.encode()).hexdigest()[:38]}_{len(all_sgs)}' + + def get_logger(logger_name: str | None = None, log_level: int = logging.INFO) -> logging.Logger: """ creates a logger instance (so as not to use the root logger)