Skip to content

Commit

Permalink
Fix 2 problems (Pedigree, Partitions) (#423)
Browse files Browse the repository at this point in the history
* only keep parents truly in the dataset

* read with repartitioning

* Bump version: 5.1.2 → 5.1.3
  • Loading branch information
MattWellie authored Jul 20, 2024
1 parent 28a0811 commit 7915a4e
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 35 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 5.1.2
current_version = 5.1.3
commit = True
tag = False

Expand Down
2 changes: 1 addition & 1 deletion requirements-lint.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
black==22.3.0
black>=24.3.0
pre-commit>=3.5.0
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ cloudpathlib[all]>=0.16.0
cyvcf2>=0.30.18
dill>=0.3.7
google-api-core>=1.34.1 # version pin for faster pip resolution
hail==0.2.131
hail==0.2.132
Jinja2>=3.1.3
networkx>=3
obonet>=1
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def read_reqs(filename: str) -> list[str]:
name='talos',
description='Centre for Population Genomics Variant Prioritisation',
long_description=readme,
version='5.1.2',
version='5.1.3',
author='Matthew Welland, CPG',
author_email='matthew.welland@populationgenomics.org.au, cas.simons@populationgenomics.org.au',
package_data={'talos': ['templates/*.jinja', 'reanalysis_global.toml']},
Expand Down
2 changes: 1 addition & 1 deletion src/talos/CreateTalosHTML.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def main(results: str, panelapp: str, output: str, latest: str | None = None, sp
# build the HTML for latest reports only
latest_html = HTMLBuilder(results=date_filtered_object, panelapp_path=panelapp)
try:
get_logger().info(f'Attempting to create {latest_html}')
get_logger().info(f'Attempting to create {html_base}{latest}')
latest_html.write_html(output_filepath=f'{html_base}{latest}', latest=True)
except NoVariantsFoundError:
get_logger().info('No variants in that latest report, skipping')
Expand Down
14 changes: 5 additions & 9 deletions src/talos/GeneratePED.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,18 @@ def get_data_from_metamist(project: str, seq_type: str, tech: str) -> list[list[
# iterate over the pedigree entities, forming a list from each. List elements:
# Family ID, Individual ID, Paternal, Maternal, Sex, Affection status, Ext ID (or repeat), HPOs (or not if absent)
for entry in result['project']['pedigree']:
if not (int_id := ext_to_int.get(entry['individual_id'], entry['individual_id'])).startswith('CPG'):
continue
ped_row: list[str] = [
entry['family_id'],
ext_to_int.get(entry['individual_id'], entry['individual_id']), # defaults to internal ID
ext_to_int.get(entry['paternal_id'], entry['paternal_id']) or '0',
ext_to_int.get(entry['maternal_id'], entry['maternal_id']) or '0',
int_id,
ext_to_int.get(entry['paternal_id'], '0') or '0',
ext_to_int.get(entry['maternal_id'], '0') or '0',
str(entry['sex']),
str(entry['affected']),
entry['individual_id'],
]

# skip over the rows where we didn't find a linked internal ID
# this will prune the pedigree to remove all the data-only entries in the cohort
assert isinstance(entry['individual_id'], str)
if not ped_row[1].startswith('CPG'):
continue

# if there are recorded HPOs, extend the row with them
if hpos := cpg_to_hpos.get(ext_to_int.get(entry['individual_id'], 'missing')):
ped_row.extend(sorted(hpos))
Expand Down
90 changes: 71 additions & 19 deletions src/talos/RunHailFiltering.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
LOFTEE_HC = hl.str('HC')
PATHOGENIC = hl.str('pathogenic')

# decide whether to repartition the data before processing starts
MAX_PARTITIONS = 10000


def annotate_talos_clinvar(mt: hl.MatrixTable, clinvar: str) -> hl.MatrixTable:
"""
Expand Down Expand Up @@ -303,7 +306,10 @@ def filter_to_population_rare(mt: hl.MatrixTable) -> hl.MatrixTable:
# 'semi-rare' as dominant filters will be more strictly filtered later
rare_af_threshold = config_retrieve(['RunHailFiltering', 'af_semi_rare'])
return mt.filter_rows(
((mt.info.gnomad_ex_af < rare_af_threshold) & (mt.info.gnomad_af < rare_af_threshold))
(
(hl.or_else(mt.gnomad_exomes.AF, MISSING_FLOAT_LO) < rare_af_threshold)
& (hl.or_else(mt.gnomad_genomes.AF, MISSING_FLOAT_LO) < rare_af_threshold)
)
| (mt.info.clinvar_talos == ONE_INT),
)

Expand Down Expand Up @@ -332,6 +338,22 @@ def drop_useless_fields(mt: hl.MatrixTable) -> hl.MatrixTable:
)


def remove_variants_outside_gene_roi(mt: hl.MatrixTable, green_genes: hl.SetExpression) -> hl.MatrixTable:
"""
chunky filtering method - get rid of every variant without at least one green-gene annotation
does not edit the field itself, that will come later (split_rows_by_gene_and_filter_to_green)
Args:
mt ():
green_genes ():
Returns:
the same MT, just reduced
"""

# filter rows without a green gene (removes empty geneIds)
return mt.filter_rows(hl.len(green_genes.intersection(mt.geneIds)) > 0)


def split_rows_by_gene_and_filter_to_green(mt: hl.MatrixTable, green_genes: hl.SetExpression) -> hl.MatrixTable:
"""
splits each GeneId onto a new row, then filters any rows not annotating a Green PanelApp gene
Expand Down Expand Up @@ -799,6 +821,27 @@ def subselect_mt_to_pedigree(mt: hl.MatrixTable, pedigree: str) -> hl.MatrixTabl
return mt


def generate_a_checkpoint(mt: hl.MatrixTable, checkpoint_path: str) -> hl.MatrixTable:
"""
wrapper around a few repeated lines which write a checkpoint
Args:
mt ():
checkpoint_path (str): where to write the checkpoint
Returns:
"""
get_logger().info(f'Checkpointing to {checkpoint_path} after filtering out a ton of variants')
mt = mt.checkpoint(checkpoint_path)

# die if there are no variants remaining. Only ever count rows after a checkpoint
if not (current_rows := mt.count_rows()):
raise ValueError('No remaining rows to process!')

get_logger().info(f'Local checkpoint written, {current_rows} rows remain')
return mt


def cli_main():
"""
Read MT, filter, and apply category annotation, export as a VCF
Expand Down Expand Up @@ -859,8 +902,8 @@ def main(
# initiate Hail as a local cluster
number_of_cores = config_retrieve(['RunHailFiltering', 'cores', 'small_variants'], 8)
get_logger().info(f'Starting Hail with reference genome GRCh38, as a {number_of_cores} core local cluster')
hl.context.init_spark(master=f'local[{number_of_cores}]', quiet=True)
hl.default_reference('GRCh38')

hl.context.init_spark(master=f'local[{number_of_cores}]', default_reference='GRCh38', quiet=True)

# read the parsed panelapp data
get_logger().info(f'Reading PanelApp data from {panel_data!r}')
Expand All @@ -872,7 +915,16 @@ def main(

# read the matrix table from a localised directory
mt = hl.read_matrix_table(mt_path)
get_logger().info(f'Loaded annotated MT from {mt_path}, size: {mt.count_rows()}')
get_logger().info(f'Loaded annotated MT from {mt_path}, size: {mt.count_rows()}, partitions: {mt.n_partitions()}')

# repartition if required - local Hail with finite resources has struggled with some really high (~120k) partitions
# this creates a local duplicate of the input data with far smaller partition counts, for less processing overhead
if mt.n_partitions() > MAX_PARTITIONS:
get_logger().info('Shrinking partitions way down with a unshuffled repartition')
mt = mt.repartition(shuffle=False, n_partitions=number_of_cores * 10)
if checkpoint:
get_logger().info('Trying to write the result locally, might need more space on disk...')
mt = generate_a_checkpoint(mt, f'{checkpoint}_reparitioned')

# lookups for required fields all delegated to the hail_audit file
if not (
Expand All @@ -885,11 +937,20 @@ def main(
# subset to currently considered samples
mt = subselect_mt_to_pedigree(mt, pedigree=pedigree)

# shrink the time taken to write checkpoints
mt = drop_useless_fields(mt=mt)

# remove any rows which have no genes of interest
mt = remove_variants_outside_gene_roi(mt=mt, green_genes=green_expression)

# swap out the default clinvar annotations with private clinvar
mt = annotate_talos_clinvar(mt=mt, clinvar=clinvar)

# split each gene annotation onto separate rows, filter to green genes (PanelApp ROI)
mt = split_rows_by_gene_and_filter_to_green(mt=mt, green_genes=green_expression)
# remove common-in-gnomad variants (also includes ClinVar annotation)
mt = filter_to_population_rare(mt=mt)

if checkpoint:
mt = generate_a_checkpoint(mt, f'{checkpoint}_data')

# filter out quality failures
mt = filter_on_quality_flags(mt=mt)
Expand All @@ -898,22 +959,13 @@ def main(
mt = filter_to_well_normalised(mt=mt)

# filter variants by frequency
mt = extract_annotations(mt=mt)
mt = filter_matrix_by_ac(mt=mt)
mt = filter_to_population_rare(mt=mt)

# shrink the time taken to write checkpoints
mt = drop_useless_fields(mt=mt)

if checkpoint:
get_logger().info(f'Checkpointing to {checkpoint} after filtering out a ton of variants')
mt = mt.checkpoint(checkpoint)

# die if there are no variants remaining. Only ever count rows after a checkpoint
if not (current_rows := mt.count_rows()):
raise ValueError('No remaining rows to process!')
# rearrange the row annotation to make syntax nicer downstream
mt = extract_annotations(mt=mt)

get_logger().info(f'Local checkpoint written, {current_rows} rows remain')
# split each gene annotation onto separate rows, filter to green genes (PanelApp ROI)
mt = split_rows_by_gene_and_filter_to_green(mt=mt, green_genes=green_expression)

# add Labels to the MT
# current logic is to apply 1, 2, 3, and 5, then 4 (de novo)
Expand Down
2 changes: 1 addition & 1 deletion src/talos/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
"""

# Do not edit this file manually
__version__ = '5.1.2'
__version__ = '5.1.3'
4 changes: 3 additions & 1 deletion test/test_hail_categories.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ def test_filter_rows_for_rare(exomes, genomes, clinvar, length, make_a_mt):
make_a_mt ():
"""
anno_matrix = make_a_mt.annotate_rows(
info=make_a_mt.info.annotate(gnomad_ex_af=exomes, gnomad_af=genomes, clinvar_talos=clinvar),
gnomad_genomes=hl.Struct(AF=genomes),
gnomad_exomes=hl.Struct(AF=exomes),
info=make_a_mt.info.annotate(clinvar_talos=clinvar),
)
matrix = filter_to_population_rare(anno_matrix)
assert matrix.count_rows() == length
Expand Down

0 comments on commit 7915a4e

Please sign in to comment.