Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix broken gCNV Annotation pipeline #960

Merged
merged 8 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.29.4
current_version = 1.29.5
commit = True
tag = False

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ permissions:
contents: read

env:
VERSION: 1.29.4
VERSION: 1.29.5

jobs:
docker:
Expand Down
3 changes: 3 additions & 0 deletions configs/defaults/gatk_sv_multisample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ name = 'gatk_sv'
ref_fasta = 'gs://cpg-common-main/references/hg38/v0/dragen_reference/Homo_sapiens_assembly38_masked.fasta'
status_reporter = 'metamist'

# update this when cpg_workflows.scripts.get_gencode_gtf.sh is re-run
gencode_gtf_file = 'gs://cpg-common-main/references/hg38/v0/gencode_47.gtf.gz'

# Write dataset MTs for these datasets. Required for the AnnotateDatasetSv stage.
# write_mt_for_datasets = []

Expand Down
3 changes: 3 additions & 0 deletions configs/defaults/gcnv.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ num_samples_per_scatter_block = 10
gncv_max_events = 320
gncv_max_pass_events = 30

# update this when cpg_workflows.scripts.get_gencode_gtf.sh is re-run
gencode_gtf_file = 'gs://cpg-common-main/references/hg38/v0/gencode_47.gtf.gz'

[images]
gatk_docker = "australia-southeast1-docker.pkg.dev/cpg-common/images/sv/gatk:2023-07-13-4.4.0.0-43-gd79823f9c-NIGHTLY-SNAPSHOT"
gatk_gcnv = 'australia-southeast1-docker.pkg.dev/cpg-common/images/sv/gatk:4.2.6.1-57-g9e03432'
Expand Down
23 changes: 11 additions & 12 deletions cpg_workflows/jobs/gcnv.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
from cpg_utils.config import config_retrieve, get_config, image_path
from cpg_utils.hail_batch import command, fasta_res_group, get_batch, query_command
from cpg_workflows.filetypes import CramPath
from cpg_workflows.query_modules import seqr_loader, seqr_loader_cnv
from cpg_workflows.query_modules import seqr_loader
from cpg_workflows.resources import HIGHMEM
from cpg_workflows.scripts import upgrade_ped_with_inferred
from cpg_workflows.scripts import seqr_loader_cnv, upgrade_ped_with_inferred
from cpg_workflows.utils import can_reuse, chunks


Expand Down Expand Up @@ -670,7 +670,7 @@ def update_vcf_attributes(input_tmp: str, output_file: str):
def annotate_dataset_jobs_cnv(
mt_path: Path,
sgids: list[str],
out_mt_path: Path,
out_mt_path: str,
tmp_prefix: Path,
job_attrs: dict | None = None,
) -> list[Job]:
Expand All @@ -688,7 +688,7 @@ def annotate_dataset_jobs_cnv(
subset_j: Job | None = None
if not subset_mt_path.exists():
subset_j = get_batch().new_job('subset cohort to dataset', (job_attrs or {}) | {'tool': 'hail query'})
subset_j.image(image_path('cpg_workflows'))
subset_j.image(config_retrieve(['workflow', 'driver_image']))
subset_j.command(
query_command(
seqr_loader,
Expand All @@ -701,16 +701,15 @@ def annotate_dataset_jobs_cnv(
)

annotate_j = get_batch().new_job('annotate dataset', (job_attrs or {}) | {'tool': 'hail query'})
annotate_j.image(image_path('cpg_workflows'))
annotate_j.image(config_retrieve(['workflow', 'driver_image']))
annotate_j.command(
query_command(
seqr_loader_cnv,
seqr_loader_cnv.annotate_dataset_gcnv.__name__,
str(subset_mt_path),
str(out_mt_path),
setup_gcp=True,
),
'seqr_loader_cnv '
f'--mt_out {out_mt_path} '
f'--checkpoint {str(tmp_prefix / "checkpoints")} '
'dataset ' # use the annotate_DATASET functionality
f'--mt_in {str(subset_mt_path)} ',
)

if subset_j:
annotate_j.depends_on(subset_j)
return [subset_j, annotate_j]
Expand Down
3 changes: 3 additions & 0 deletions cpg_workflows/jobs/seqr_loader_sv.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ def annotate_cohort_jobs_sv(

j = get_batch().new_job('Annotate cohort', job_attrs)
j.image(get_config()['workflow']['driver_image'])
gencode_gz = get_config()['worfklow']['gencode_gtf_file']
gencode_gtf_local = get_batch().read_input(str(gencode_gz))
j.command(
query_command(
seqr_loader_sv,
seqr_loader_sv.annotate_cohort_sv.__name__,
str(vcf_path),
str(out_mt_path),
str(gencode_gtf_local),
str(checkpoint_prefix),
setup_gcp=True,
),
Expand Down
77 changes: 28 additions & 49 deletions cpg_workflows/query_modules/seqr_loader_sv.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@

import gzip
import logging

import requests
from os.path import join

import hail as hl

from cpg_utils import to_path
from cpg_utils.config import get_config, reference_path
from cpg_utils.hail_batch import genome_build
from cpg_workflows.utils import checkpoint_hail, read_hail
from cpg_workflows.utils import generator_chunks, read_hail

# I'm just going to go ahead and steal these constants from their seqr loader
BOTHSIDES_SUPPORT = 'BOTHSIDES_SUPPORT'
Expand Down Expand Up @@ -103,48 +101,16 @@ def get_cpx_interval(x):
return hl.struct(type=type_chr[0], chrom=chr_pos[0], start=hl.int32(pos[0]), end=hl.int32(pos[1]))


def download_gencode_gene_id_mapping(gencode_release: str) -> str:
"""
This is an inefficient stand-in for now. Swap this out with a more permanent
location for this file in the resources bucket
Not suuuuuuper keen on storing a pickled representation, but a minimised JSON
would be a good middle ground. Parsed ~= 62000 {str: str}
Args:
gencode_release (str | int): Which gencode release do you want?
Returns:
str - path to localised GTF file
def parse_gtf_from_local(gtf_path: str, chunk_size: int | None = None) -> hl.dict:
"""
Read over the localised GTF file and read into a dict

# this is the thing
local_gtf = 'localfile.gtf.gz'

# check for presence in config
if (config_gtf := get_config().get('gencode_gtf_path')) is not None:
# treat as GCP
if config_gtf.startswith('gs://'):
to_path(config_gtf).copy(local_gtf)
gtf_path = local_gtf
else:
gtf_path = config_gtf
else:
gtf_path = GENCODE_GTF_URL.format(gencode_release=gencode_release)

# if it wasn't a local file, download it
if gtf_path.startswith(('http://', 'https://')):
gz_stream = requests.get(gtf_path, stream=True)
with open(local_gtf, 'wb') as f:
f.writelines(gz_stream)
gz_stream.close()

# OK - I think that covers all the viable situations for now
return local_gtf


def parse_gtf_from_local(gtf_path: str) -> hl.dict:
"""
Read over the localised file and read into a dict
n.b. due to a limit in Spark of 20MB per String length, the dictionary here is actually too large to be used
in annotation expressions. To remedy this, the dictionary is returned as a list of fragments, and we can use each
one in turn, then create a checkpoint between them.
Args:
gtf_path ():
chunk_size (int): if specified, returns this dict as a list of dicts
Returns:
the gene lookup dictionary as a Hail DictExpression
"""
Expand All @@ -165,12 +131,24 @@ def parse_gtf_from_local(gtf_path: str) -> hl.dict:
# parse info field
info_fields_list = [x.strip().split() for x in record['info'].split(';') if x != '']
info_fields = {k: v.strip('"') for k, v in info_fields_list}

# skip an ENSG: ENSG mapping, redundant...
if info_fields['gene_name'].startswith('ENSG'):
continue
gene_id_mapping[info_fields['gene_name']] = info_fields['gene_id'].split('.')[0]
logging.info('Completed ingestion of gene-ID mapping')
return hl.literal(gene_id_mapping)

all_keys = list(gene_id_mapping.keys())
logging.info(f'Completed ingestion of gene-ID mapping, {len(all_keys)} entries')
if chunk_size is None:
return [hl.literal(gene_id_mapping)]

# hail can't impute the type of a generator, so do this in baby steps
sub_dictionaries = [{key: gene_id_mapping[key] for key in keys} for keys in generator_chunks(all_keys, chunk_size)]

return [hl.literal(each_dict) for each_dict in sub_dictionaries]


def annotate_cohort_sv(vcf_path: str, out_mt_path: str, checkpoint_prefix: str | None = None):
def annotate_cohort_sv(vcf_path: str, out_mt_path: str, gencode_gz: str, checkpoint: str | None = None):
"""
Translate an annotated SV VCF into a Seqr-ready format
Relevant gCNV specific schema
Expand All @@ -180,7 +158,8 @@ def annotate_cohort_sv(vcf_path: str, out_mt_path: str, checkpoint_prefix: str |
Args:
vcf_path (str): Where is the VCF??
out_mt_path (str): And where do you need output!?
checkpoint_prefix (str): CHECKPOINT!@!!
gencode_gz (str): The path to a compressed GENCODE GTF file
checkpoint (str): CHECKPOINT!@!!
"""

logger = logging.getLogger('annotate_cohort_sv')
Expand Down Expand Up @@ -267,11 +246,11 @@ def annotate_cohort_sv(vcf_path: str, out_mt_path: str, checkpoint_prefix: str |
mt = mt.annotate_rows(info=mt.info.annotate(CPX_TYPE=mt.sv_types[0]))

# save those changes
mt = checkpoint_hail(mt, 'initial_annotation_round.mt', checkpoint_prefix)
if checkpoint:
mt = mt.checkpoint(join(checkpoint, 'initial_annotation_round.mt'))

# get the Gene-Symbol mapping dict
gene_id_mapping_file = download_gencode_gene_id_mapping(get_config().get('gencode_release', '46'))
gene_id_mapping = parse_gtf_from_local(gene_id_mapping_file)
gene_id_mapping = parse_gtf_from_local(gencode_gz)[0]

# OK, NOW IT'S BUSINESS TIME
conseq_predicted_gene_cols = [
Expand Down
15 changes: 15 additions & 0 deletions cpg_workflows/scripts/get_gencode_gtf.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env bash

# use this get_gencode_gtf.sh script to download the latest GENCODE GTF file
# and place it in the appropriate place in the CPG bucket

set -ex

GENCODE_VERSION=${1:-"47"}

# download the GTF file
wget -O gencode.gtf.gz "https://ftp.ebi.ac.uk/pub/databases/gencode/Gencode_human/release_${GENCODE_VERSION}/gencode.v${GENCODE_VERSION}.annotation.gtf.gz"

# don't unzip the file - unzipping inflates from ~50MB to ~1.8GB, and transfer is probably the bottleneck
# place the file in the place in the CPG bucket
gsutil cp gencode.gtf "gs://cpg-common-main/references/hg38/v0/gencode_${GENCODE_VERSION}.gtf.gz"
Loading