Skip to content

Commit

Permalink
Fix broken gCNV Annotation pipeline (#960)
Browse files Browse the repository at this point in the history
* Add the get-gencode script

* use the pre-downloaded gencode data in gCNV and SV

* Bump version: 1.29.4 → 1.29.5
  • Loading branch information
MattWellie authored Oct 28, 2024
1 parent e679675 commit 8240817
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 115 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.29.4
current_version = 1.29.5
commit = True
tag = False

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ permissions:
contents: read

env:
VERSION: 1.29.4
VERSION: 1.29.5

jobs:
docker:
Expand Down
3 changes: 3 additions & 0 deletions configs/defaults/gatk_sv_multisample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ name = 'gatk_sv'
ref_fasta = 'gs://cpg-common-main/references/hg38/v0/dragen_reference/Homo_sapiens_assembly38_masked.fasta'
status_reporter = 'metamist'

# update this when cpg_workflows.scripts.get_gencode_gtf.sh is re-run
gencode_gtf_file = 'gs://cpg-common-main/references/hg38/v0/gencode_47.gtf.gz'

# Write dataset MTs for these datasets. Required for the AnnotateDatasetSv stage.
# write_mt_for_datasets = []

Expand Down
3 changes: 3 additions & 0 deletions configs/defaults/gcnv.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ num_samples_per_scatter_block = 10
gncv_max_events = 320
gncv_max_pass_events = 30

# update this when cpg_workflows.scripts.get_gencode_gtf.sh is re-run
gencode_gtf_file = 'gs://cpg-common-main/references/hg38/v0/gencode_47.gtf.gz'

[images]
gatk_docker = "australia-southeast1-docker.pkg.dev/cpg-common/images/sv/gatk:2023-07-13-4.4.0.0-43-gd79823f9c-NIGHTLY-SNAPSHOT"
gatk_gcnv = 'australia-southeast1-docker.pkg.dev/cpg-common/images/sv/gatk:4.2.6.1-57-g9e03432'
Expand Down
21 changes: 10 additions & 11 deletions cpg_workflows/jobs/gcnv.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from cpg_utils.config import config_retrieve, get_config, image_path
from cpg_utils.hail_batch import command, fasta_res_group, get_batch, query_command
from cpg_workflows.filetypes import CramPath
from cpg_workflows.query_modules import seqr_loader, seqr_loader_cnv
from cpg_workflows.query_modules import seqr_loader
from cpg_workflows.resources import HIGHMEM
from cpg_workflows.scripts import upgrade_ped_with_inferred
from cpg_workflows.utils import can_reuse, chunks
Expand Down Expand Up @@ -670,7 +670,7 @@ def update_vcf_attributes(input_tmp: str, output_file: str):
def annotate_dataset_jobs_cnv(
mt_path: Path,
sgids: list[str],
out_mt_path: Path,
out_mt_path: str,
tmp_prefix: Path,
job_attrs: dict | None = None,
) -> list[Job]:
Expand All @@ -688,7 +688,7 @@ def annotate_dataset_jobs_cnv(
subset_j: Job | None = None
if not subset_mt_path.exists():
subset_j = get_batch().new_job('subset cohort to dataset', (job_attrs or {}) | {'tool': 'hail query'})
subset_j.image(image_path('cpg_workflows'))
subset_j.image(config_retrieve(['workflow', 'driver_image']))
subset_j.command(
query_command(
seqr_loader,
Expand All @@ -701,16 +701,15 @@ def annotate_dataset_jobs_cnv(
)

annotate_j = get_batch().new_job('annotate dataset', (job_attrs or {}) | {'tool': 'hail query'})
annotate_j.image(image_path('cpg_workflows'))
annotate_j.image(config_retrieve(['workflow', 'driver_image']))
annotate_j.command(
query_command(
seqr_loader_cnv,
seqr_loader_cnv.annotate_dataset_gcnv.__name__,
str(subset_mt_path),
str(out_mt_path),
setup_gcp=True,
),
'seqr_loader_cnv '
f'--mt_out {out_mt_path} '
f'--checkpoint {str(tmp_prefix / "checkpoints")} '
'dataset ' # use the annotate_DATASET functionality
f'--mt_in {str(subset_mt_path)} ',
)

if subset_j:
annotate_j.depends_on(subset_j)
return [subset_j, annotate_j]
Expand Down
3 changes: 3 additions & 0 deletions cpg_workflows/jobs/seqr_loader_sv.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ def annotate_cohort_jobs_sv(

j = get_batch().new_job('Annotate cohort', job_attrs)
j.image(get_config()['workflow']['driver_image'])
gencode_gz = get_config()['worfklow']['gencode_gtf_file']
gencode_gtf_local = get_batch().read_input(str(gencode_gz))
j.command(
query_command(
seqr_loader_sv,
seqr_loader_sv.annotate_cohort_sv.__name__,
str(vcf_path),
str(out_mt_path),
str(gencode_gtf_local),
str(checkpoint_prefix),
setup_gcp=True,
),
Expand Down
77 changes: 28 additions & 49 deletions cpg_workflows/query_modules/seqr_loader_sv.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@

import gzip
import logging

import requests
from os.path import join

import hail as hl

from cpg_utils import to_path
from cpg_utils.config import get_config, reference_path
from cpg_utils.hail_batch import genome_build
from cpg_workflows.utils import checkpoint_hail, read_hail
from cpg_workflows.utils import generator_chunks, read_hail

# I'm just going to go ahead and steal these constants from their seqr loader
BOTHSIDES_SUPPORT = 'BOTHSIDES_SUPPORT'
Expand Down Expand Up @@ -103,48 +101,16 @@ def get_cpx_interval(x):
return hl.struct(type=type_chr[0], chrom=chr_pos[0], start=hl.int32(pos[0]), end=hl.int32(pos[1]))


def download_gencode_gene_id_mapping(gencode_release: str) -> str:
"""
This is an inefficient stand-in for now. Swap this out with a more permanent
location for this file in the resources bucket
Not suuuuuuper keen on storing a pickled representation, but a minimised JSON
would be a good middle ground. Parsed ~= 62000 {str: str}
Args:
gencode_release (str | int): Which gencode release do you want?
Returns:
str - path to localised GTF file
def parse_gtf_from_local(gtf_path: str, chunk_size: int | None = None) -> hl.dict:
"""
Read over the localised GTF file and read into a dict
# this is the thing
local_gtf = 'localfile.gtf.gz'

# check for presence in config
if (config_gtf := get_config().get('gencode_gtf_path')) is not None:
# treat as GCP
if config_gtf.startswith('gs://'):
to_path(config_gtf).copy(local_gtf)
gtf_path = local_gtf
else:
gtf_path = config_gtf
else:
gtf_path = GENCODE_GTF_URL.format(gencode_release=gencode_release)

# if it wasn't a local file, download it
if gtf_path.startswith(('http://', 'https://')):
gz_stream = requests.get(gtf_path, stream=True)
with open(local_gtf, 'wb') as f:
f.writelines(gz_stream)
gz_stream.close()

# OK - I think that covers all the viable situations for now
return local_gtf


def parse_gtf_from_local(gtf_path: str) -> hl.dict:
"""
Read over the localised file and read into a dict
n.b. due to a limit in Spark of 20MB per String length, the dictionary here is actually too large to be used
in annotation expressions. To remedy this, the dictionary is returned as a list of fragments, and we can use each
one in turn, then create a checkpoint between them.
Args:
gtf_path ():
chunk_size (int): if specified, returns this dict as a list of dicts
Returns:
the gene lookup dictionary as a Hail DictExpression
"""
Expand All @@ -165,12 +131,24 @@ def parse_gtf_from_local(gtf_path: str) -> hl.dict:
# parse info field
info_fields_list = [x.strip().split() for x in record['info'].split(';') if x != '']
info_fields = {k: v.strip('"') for k, v in info_fields_list}

# skip an ENSG: ENSG mapping, redundant...
if info_fields['gene_name'].startswith('ENSG'):
continue
gene_id_mapping[info_fields['gene_name']] = info_fields['gene_id'].split('.')[0]
logging.info('Completed ingestion of gene-ID mapping')
return hl.literal(gene_id_mapping)

all_keys = list(gene_id_mapping.keys())
logging.info(f'Completed ingestion of gene-ID mapping, {len(all_keys)} entries')
if chunk_size is None:
return [hl.literal(gene_id_mapping)]

# hail can't impute the type of a generator, so do this in baby steps
sub_dictionaries = [{key: gene_id_mapping[key] for key in keys} for keys in generator_chunks(all_keys, chunk_size)]

return [hl.literal(each_dict) for each_dict in sub_dictionaries]


def annotate_cohort_sv(vcf_path: str, out_mt_path: str, checkpoint_prefix: str | None = None):
def annotate_cohort_sv(vcf_path: str, out_mt_path: str, gencode_gz: str, checkpoint: str | None = None):
"""
Translate an annotated SV VCF into a Seqr-ready format
Relevant gCNV specific schema
Expand All @@ -180,7 +158,8 @@ def annotate_cohort_sv(vcf_path: str, out_mt_path: str, checkpoint_prefix: str |
Args:
vcf_path (str): Where is the VCF??
out_mt_path (str): And where do you need output!?
checkpoint_prefix (str): CHECKPOINT!@!!
gencode_gz (str): The path to a compressed GENCODE GTF file
checkpoint (str): CHECKPOINT!@!!
"""

logger = logging.getLogger('annotate_cohort_sv')
Expand Down Expand Up @@ -267,11 +246,11 @@ def annotate_cohort_sv(vcf_path: str, out_mt_path: str, checkpoint_prefix: str |
mt = mt.annotate_rows(info=mt.info.annotate(CPX_TYPE=mt.sv_types[0]))

# save those changes
mt = checkpoint_hail(mt, 'initial_annotation_round.mt', checkpoint_prefix)
if checkpoint:
mt = mt.checkpoint(join(checkpoint, 'initial_annotation_round.mt'))

# get the Gene-Symbol mapping dict
gene_id_mapping_file = download_gencode_gene_id_mapping(get_config().get('gencode_release', '46'))
gene_id_mapping = parse_gtf_from_local(gene_id_mapping_file)
gene_id_mapping = parse_gtf_from_local(gencode_gz)[0]

# OK, NOW IT'S BUSINESS TIME
conseq_predicted_gene_cols = [
Expand Down
15 changes: 15 additions & 0 deletions cpg_workflows/scripts/get_gencode_gtf.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env bash

# use this get_gencode_gtf.sh script to download the latest GENCODE GTF file
# and place it in the appropriate place in the CPG bucket

set -ex

GENCODE_VERSION=${1:-"47"}

# download the GTF file
wget -O gencode.gtf.gz "https://ftp.ebi.ac.uk/pub/databases/gencode/Gencode_human/release_${GENCODE_VERSION}/gencode.v${GENCODE_VERSION}.annotation.gtf.gz"

# don't unzip the file - unzipping inflates from ~50MB to ~1.8GB, and transfer is probably the bottleneck
# place the file in the place in the CPG bucket
gsutil cp gencode.gtf "gs://cpg-common-main/references/hg38/v0/gencode_${GENCODE_VERSION}.gtf.gz"
Loading

0 comments on commit 8240817

Please sign in to comment.