Skip to content

Commit

Permalink
Cache the Hash, but better
Browse files Browse the repository at this point in the history
  • Loading branch information
MattWellie committed Oct 25, 2024
1 parent 37f8400 commit 1c21a17
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 10 deletions.
35 changes: 25 additions & 10 deletions cpg_workflows/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@
import logging
from dataclasses import dataclass
from enum import Enum
from functools import cache
from typing import Optional

import pandas as pd

from cpg_utils import Path, to_path
from cpg_utils.config import dataset_path, get_config, reference_path, web_url

from .filetypes import AlignmentInput, BamPath, CramPath, FastqPairs, GvcfPath
from .metamist import Assay
from cpg_workflows.filetypes import AlignmentInput, BamPath, CramPath, FastqPairs, GvcfPath
from cpg_workflows.metamist import Assay
from cpg_workflows.utils import alignment_inputs_hash_mc, cohort_inputs_hash_by_id, dataset_inputs_hash_by_id


class Target:
Expand All @@ -41,17 +40,12 @@ def get_sequencing_group_ids(self, only_active: bool = True) -> list[str]:
"""
return [s.id for s in self.get_sequencing_groups(only_active=only_active)]

@cache
def alignment_inputs_hash(self) -> str:
"""
Unique hash string of sample alignment inputs. Useful to decide
whether the analysis on the target needs to be rerun.
"""
s = ' '.join(
sorted(' '.join(str(s.alignment_input)) for s in self.get_sequencing_groups() if s.alignment_input),
)
h = hashlib.sha256(s.encode()).hexdigest()[:38]
return f'{h}_{len(self.get_sequencing_group_ids())}'
raise NotImplementedError('This method should not be called directly')

@property
def target_id(self) -> str:
Expand Down Expand Up @@ -122,6 +116,13 @@ def target_id(self) -> str:
"""Unique target ID"""
return self.name

def alignment_inputs_hash(self) -> str:
"""
Unique hash string of sample alignment inputs. Useful to decide
whether the analysis on the target needs to be rerun.
"""
return alignment_inputs_hash_mc()

def get_cohorts(self, only_active: bool = True) -> list['Cohort']:
"""
Gets list of all cohorts.
Expand Down Expand Up @@ -256,6 +257,13 @@ def target_id(self) -> str:
"""Unique target ID"""
return self.name

def alignment_inputs_hash(self) -> str:
"""
Unique hash string of sample alignment inputs. Useful to decide
whether the analysis on the target needs to be rerun.
"""
return cohort_inputs_hash_by_id(self.name)

def write_ped_file(self, out_path: Path | None = None, use_participant_id: bool = False) -> Path:
"""
Create a PED file for all samples in the whole cohort
Expand Down Expand Up @@ -406,6 +414,13 @@ def __repr__(self):
def __str__(self):
return f'{self.name} ({len(self.get_sequencing_groups())} sequencing groups)'

def alignment_inputs_hash(self) -> str:
"""
Unique hash string of sample alignment inputs. Useful to decide
whether the analysis on the target needs to be rerun.
"""
return dataset_inputs_hash_by_id(self.name)

def _seq_type_subdir(self) -> str:
"""
Subdirectory parametrised by sequencing type. For genomes, we don't
Expand Down
39 changes: 39 additions & 0 deletions cpg_workflows/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Utility functions and constants.
"""

import hashlib
import logging
import re
import string
Expand All @@ -20,10 +21,48 @@

from cpg_utils import Path, to_path
from cpg_utils.config import config_retrieve, get_config
from cpg_workflows.workflow import get_multicohort

LOGGER: logging.Logger | None = None


@lru_cache(1)
def alignment_inputs_hash_mc() -> str:
"""
Unique hash string of sample alignment inputs. Useful to decide
whether the analysis on the target needs to be rerun.
"""
all_sgs = get_multicohort().get_sequencing_groups()
s = ' '.join(sorted(' '.join(str(s.alignment_input)) for s in all_sgs if s.alignment_input))
return f'{hashlib.sha256(s.encode()).hexdigest()[:38]}_{len(all_sgs)}'


@lru_cache
def cohort_inputs_hash_by_id(cohort_id: str) -> str:
"""
Unique hash string of sample alignment inputs. Useful to decide
whether the analysis on the target needs to be rerun.
"""
if (cohort := get_multicohort().get_cohort_by_name(cohort_id)) is None:
raise ValueError(f'Cohort {cohort_id} not found')
all_sgs = cohort.get_sequencing_groups()
s = ' '.join(sorted(' '.join(str(s.alignment_input)) for s in all_sgs if s.alignment_input))
return f'{hashlib.sha256(s.encode()).hexdigest()[:38]}_{len(all_sgs)}'


@lru_cache
def dataset_inputs_hash_by_id(dataset_id: str) -> str:
"""
Unique hash string of sample alignment inputs. Useful to decide
whether the analysis on the target needs to be rerun.
"""
if (dataset := get_multicohort().get_dataset_by_name(dataset_id)) is None:
raise ValueError(f'Dataset {dataset_id} not found')
all_sgs = dataset.get_sequencing_groups()
s = ' '.join(sorted(' '.join(str(s.alignment_input)) for s in all_sgs if s.alignment_input))
return f'{hashlib.sha256(s.encode()).hexdigest()[:38]}_{len(all_sgs)}'


def get_logger(logger_name: str | None = None, log_level: int = logging.INFO) -> logging.Logger:
"""
creates a logger instance (so as not to use the root logger)
Expand Down

0 comments on commit 1c21a17

Please sign in to comment.