diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 1bc2aa66c..0841ca550 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.29.4 +current_version = 1.29.5 commit = True tag = False diff --git a/.github/workflows/docker.yaml b/.github/workflows/docker.yaml index 86a990387..7b1760f9e 100644 --- a/.github/workflows/docker.yaml +++ b/.github/workflows/docker.yaml @@ -15,7 +15,7 @@ permissions: contents: read env: - VERSION: 1.29.4 + VERSION: 1.29.5 jobs: docker: diff --git a/configs/defaults/gatk_sv_multisample.toml b/configs/defaults/gatk_sv_multisample.toml index 872afdbb7..bdb37b17c 100644 --- a/configs/defaults/gatk_sv_multisample.toml +++ b/configs/defaults/gatk_sv_multisample.toml @@ -3,6 +3,9 @@ name = 'gatk_sv' ref_fasta = 'gs://cpg-common-main/references/hg38/v0/dragen_reference/Homo_sapiens_assembly38_masked.fasta' status_reporter = 'metamist' +# update this when cpg_workflows.scripts.get_gencode_gtf.sh is re-run +gencode_gtf_file = 'gs://cpg-common-main/references/hg38/v0/gencode_47.gtf.gz' + # Write dataset MTs for these datasets. Required for the AnnotateDatasetSv stage. # write_mt_for_datasets = [] diff --git a/configs/defaults/gcnv.toml b/configs/defaults/gcnv.toml index 512a677d0..56dbeb1f2 100644 --- a/configs/defaults/gcnv.toml +++ b/configs/defaults/gcnv.toml @@ -13,6 +13,9 @@ num_samples_per_scatter_block = 10 gncv_max_events = 320 gncv_max_pass_events = 30 +# update this when cpg_workflows.scripts.get_gencode_gtf.sh is re-run +gencode_gtf_file = 'gs://cpg-common-main/references/hg38/v0/gencode_47.gtf.gz' + [images] gatk_docker = "australia-southeast1-docker.pkg.dev/cpg-common/images/sv/gatk:2023-07-13-4.4.0.0-43-gd79823f9c-NIGHTLY-SNAPSHOT" gatk_gcnv = 'australia-southeast1-docker.pkg.dev/cpg-common/images/sv/gatk:4.2.6.1-57-g9e03432' diff --git a/cpg_workflows/jobs/gcnv.py b/cpg_workflows/jobs/gcnv.py index 09aa5a72c..8889b727a 100644 --- a/cpg_workflows/jobs/gcnv.py +++ b/cpg_workflows/jobs/gcnv.py @@ -11,7 +11,7 @@ from cpg_utils.config import config_retrieve, get_config, image_path from cpg_utils.hail_batch import command, fasta_res_group, get_batch, query_command from cpg_workflows.filetypes import CramPath -from cpg_workflows.query_modules import seqr_loader, seqr_loader_cnv +from cpg_workflows.query_modules import seqr_loader from cpg_workflows.resources import HIGHMEM from cpg_workflows.scripts import upgrade_ped_with_inferred from cpg_workflows.utils import can_reuse, chunks @@ -670,7 +670,7 @@ def update_vcf_attributes(input_tmp: str, output_file: str): def annotate_dataset_jobs_cnv( mt_path: Path, sgids: list[str], - out_mt_path: Path, + out_mt_path: str, tmp_prefix: Path, job_attrs: dict | None = None, ) -> list[Job]: @@ -688,7 +688,7 @@ def annotate_dataset_jobs_cnv( subset_j: Job | None = None if not subset_mt_path.exists(): subset_j = get_batch().new_job('subset cohort to dataset', (job_attrs or {}) | {'tool': 'hail query'}) - subset_j.image(image_path('cpg_workflows')) + subset_j.image(config_retrieve(['workflow', 'driver_image'])) subset_j.command( query_command( seqr_loader, @@ -701,16 +701,15 @@ def annotate_dataset_jobs_cnv( ) annotate_j = get_batch().new_job('annotate dataset', (job_attrs or {}) | {'tool': 'hail query'}) - annotate_j.image(image_path('cpg_workflows')) + annotate_j.image(config_retrieve(['workflow', 'driver_image'])) annotate_j.command( - query_command( - seqr_loader_cnv, - seqr_loader_cnv.annotate_dataset_gcnv.__name__, - str(subset_mt_path), - str(out_mt_path), - setup_gcp=True, - ), + 'seqr_loader_cnv ' + f'--mt_out {out_mt_path} ' + f'--checkpoint {str(tmp_prefix / "checkpoints")} ' + 'dataset ' # use the annotate_DATASET functionality + f'--mt_in {str(subset_mt_path)} ', ) + if subset_j: annotate_j.depends_on(subset_j) return [subset_j, annotate_j] diff --git a/cpg_workflows/jobs/seqr_loader_sv.py b/cpg_workflows/jobs/seqr_loader_sv.py index 0a28cc41f..26655b5a1 100644 --- a/cpg_workflows/jobs/seqr_loader_sv.py +++ b/cpg_workflows/jobs/seqr_loader_sv.py @@ -23,12 +23,15 @@ def annotate_cohort_jobs_sv( j = get_batch().new_job('Annotate cohort', job_attrs) j.image(get_config()['workflow']['driver_image']) + gencode_gz = get_config()['worfklow']['gencode_gtf_file'] + gencode_gtf_local = get_batch().read_input(str(gencode_gz)) j.command( query_command( seqr_loader_sv, seqr_loader_sv.annotate_cohort_sv.__name__, str(vcf_path), str(out_mt_path), + str(gencode_gtf_local), str(checkpoint_prefix), setup_gcp=True, ), diff --git a/cpg_workflows/query_modules/seqr_loader_sv.py b/cpg_workflows/query_modules/seqr_loader_sv.py index ace6a8531..a293d7545 100644 --- a/cpg_workflows/query_modules/seqr_loader_sv.py +++ b/cpg_workflows/query_modules/seqr_loader_sv.py @@ -4,15 +4,13 @@ import gzip import logging - -import requests +from os.path import join import hail as hl -from cpg_utils import to_path from cpg_utils.config import get_config, reference_path from cpg_utils.hail_batch import genome_build -from cpg_workflows.utils import checkpoint_hail, read_hail +from cpg_workflows.utils import generator_chunks, read_hail # I'm just going to go ahead and steal these constants from their seqr loader BOTHSIDES_SUPPORT = 'BOTHSIDES_SUPPORT' @@ -103,48 +101,16 @@ def get_cpx_interval(x): return hl.struct(type=type_chr[0], chrom=chr_pos[0], start=hl.int32(pos[0]), end=hl.int32(pos[1])) -def download_gencode_gene_id_mapping(gencode_release: str) -> str: - """ - This is an inefficient stand-in for now. Swap this out with a more permanent - location for this file in the resources bucket - Not suuuuuuper keen on storing a pickled representation, but a minimised JSON - would be a good middle ground. Parsed ~= 62000 {str: str} - Args: - gencode_release (str | int): Which gencode release do you want? - Returns: - str - path to localised GTF file +def parse_gtf_from_local(gtf_path: str, chunk_size: int | None = None) -> hl.dict: """ + Read over the localised GTF file and read into a dict - # this is the thing - local_gtf = 'localfile.gtf.gz' - - # check for presence in config - if (config_gtf := get_config().get('gencode_gtf_path')) is not None: - # treat as GCP - if config_gtf.startswith('gs://'): - to_path(config_gtf).copy(local_gtf) - gtf_path = local_gtf - else: - gtf_path = config_gtf - else: - gtf_path = GENCODE_GTF_URL.format(gencode_release=gencode_release) - - # if it wasn't a local file, download it - if gtf_path.startswith(('http://', 'https://')): - gz_stream = requests.get(gtf_path, stream=True) - with open(local_gtf, 'wb') as f: - f.writelines(gz_stream) - gz_stream.close() - - # OK - I think that covers all the viable situations for now - return local_gtf - - -def parse_gtf_from_local(gtf_path: str) -> hl.dict: - """ - Read over the localised file and read into a dict + n.b. due to a limit in Spark of 20MB per String length, the dictionary here is actually too large to be used + in annotation expressions. To remedy this, the dictionary is returned as a list of fragments, and we can use each + one in turn, then create a checkpoint between them. Args: gtf_path (): + chunk_size (int): if specified, returns this dict as a list of dicts Returns: the gene lookup dictionary as a Hail DictExpression """ @@ -165,12 +131,24 @@ def parse_gtf_from_local(gtf_path: str) -> hl.dict: # parse info field info_fields_list = [x.strip().split() for x in record['info'].split(';') if x != ''] info_fields = {k: v.strip('"') for k, v in info_fields_list} + + # skip an ENSG: ENSG mapping, redundant... + if info_fields['gene_name'].startswith('ENSG'): + continue gene_id_mapping[info_fields['gene_name']] = info_fields['gene_id'].split('.')[0] - logging.info('Completed ingestion of gene-ID mapping') - return hl.literal(gene_id_mapping) + + all_keys = list(gene_id_mapping.keys()) + logging.info(f'Completed ingestion of gene-ID mapping, {len(all_keys)} entries') + if chunk_size is None: + return [hl.literal(gene_id_mapping)] + + # hail can't impute the type of a generator, so do this in baby steps + sub_dictionaries = [{key: gene_id_mapping[key] for key in keys} for keys in generator_chunks(all_keys, chunk_size)] + + return [hl.literal(each_dict) for each_dict in sub_dictionaries] -def annotate_cohort_sv(vcf_path: str, out_mt_path: str, checkpoint_prefix: str | None = None): +def annotate_cohort_sv(vcf_path: str, out_mt_path: str, gencode_gz: str, checkpoint: str | None = None): """ Translate an annotated SV VCF into a Seqr-ready format Relevant gCNV specific schema @@ -180,7 +158,8 @@ def annotate_cohort_sv(vcf_path: str, out_mt_path: str, checkpoint_prefix: str | Args: vcf_path (str): Where is the VCF?? out_mt_path (str): And where do you need output!? - checkpoint_prefix (str): CHECKPOINT!@!! + gencode_gz (str): The path to a compressed GENCODE GTF file + checkpoint (str): CHECKPOINT!@!! """ logger = logging.getLogger('annotate_cohort_sv') @@ -267,11 +246,11 @@ def annotate_cohort_sv(vcf_path: str, out_mt_path: str, checkpoint_prefix: str | mt = mt.annotate_rows(info=mt.info.annotate(CPX_TYPE=mt.sv_types[0])) # save those changes - mt = checkpoint_hail(mt, 'initial_annotation_round.mt', checkpoint_prefix) + if checkpoint: + mt = mt.checkpoint(join(checkpoint, 'initial_annotation_round.mt')) # get the Gene-Symbol mapping dict - gene_id_mapping_file = download_gencode_gene_id_mapping(get_config().get('gencode_release', '46')) - gene_id_mapping = parse_gtf_from_local(gene_id_mapping_file) + gene_id_mapping = parse_gtf_from_local(gencode_gz)[0] # OK, NOW IT'S BUSINESS TIME conseq_predicted_gene_cols = [ diff --git a/cpg_workflows/scripts/get_gencode_gtf.sh b/cpg_workflows/scripts/get_gencode_gtf.sh new file mode 100644 index 000000000..46dd27c0e --- /dev/null +++ b/cpg_workflows/scripts/get_gencode_gtf.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +# use this get_gencode_gtf.sh script to download the latest GENCODE GTF file +# and place it in the appropriate place in the CPG bucket + +set -ex + +GENCODE_VERSION=${1:-"47"} + +# download the GTF file +wget -O gencode.gtf.gz "https://ftp.ebi.ac.uk/pub/databases/gencode/Gencode_human/release_${GENCODE_VERSION}/gencode.v${GENCODE_VERSION}.annotation.gtf.gz" + +# don't unzip the file - unzipping inflates from ~50MB to ~1.8GB, and transfer is probably the bottleneck +# place the file in the place in the CPG bucket +gsutil cp gencode.gtf "gs://cpg-common-main/references/hg38/v0/gencode_${GENCODE_VERSION}.gtf.gz" diff --git a/cpg_workflows/query_modules/seqr_loader_cnv.py b/cpg_workflows/scripts/seqr_loader_cnv.py similarity index 72% rename from cpg_workflows/query_modules/seqr_loader_cnv.py rename to cpg_workflows/scripts/seqr_loader_cnv.py index 462546774..dcf9c9b82 100644 --- a/cpg_workflows/query_modules/seqr_loader_cnv.py +++ b/cpg_workflows/scripts/seqr_loader_cnv.py @@ -1,20 +1,16 @@ """ -Hail Query functions for seqr loader; SV edition. +Hail Query functions for seqr loader; CNV edition. """ import datetime import logging +from argparse import ArgumentParser +from os.path import join import hail as hl -from cpg_utils.config import get_config -from cpg_utils.hail_batch import genome_build -from cpg_workflows.query_modules.seqr_loader_sv import ( - download_gencode_gene_id_mapping, - get_expr_for_xpos, - parse_gtf_from_local, -) -from cpg_workflows.utils import checkpoint_hail, read_hail +from cpg_utils.hail_batch import genome_build, init_batch +from cpg_workflows.query_modules.seqr_loader_sv import get_expr_for_xpos, parse_gtf_from_local # I'm just going to go ahead and steal these constants from their seqr loader GENE_SYMBOL = 'gene_symbol' @@ -30,7 +26,7 @@ } -def annotate_cohort_gcnv(vcf_path: str, out_mt_path: str, checkpoint_prefix: str | None = None): +def annotate_cohort_gcnv(vcf: str, mt_out: str, gencode: str, checkpoint: str, *args, **kwargs): """ Translate an annotated gCNV VCF into a Seqr-ready format Relevant gCNV specific schema @@ -38,17 +34,15 @@ def annotate_cohort_gcnv(vcf_path: str, out_mt_path: str, checkpoint_prefix: str Relevant gCNV loader script https://github.com/populationgenomics/seqr-loading-pipelines/blob/master/luigi_pipeline/seqr_gcnv_loading.py Args: - vcf_path (str): Where is the VCF?? - out_mt_path (str): And where do you need output!? - checkpoint_prefix (str): CHECKPOINT!@!! + vcf (str): Where is the VCF?? + mt_out (str): And where do you need output!? + gencode (str): The path to a compressed GENCODE GTF file + checkpoint (str): location we can write checkpoints to """ - logger = logging.getLogger('annotate_cohort_gcnv') - logger.setLevel(logging.INFO) - - logger.info(f'Importing SV VCF {vcf_path}') + logging.info(f'Importing SV VCF {vcf}') mt = hl.import_vcf( - vcf_path, + vcf, array_elements_required=False, force_bgz=True, reference_genome=genome_build(), @@ -57,7 +51,7 @@ def annotate_cohort_gcnv(vcf_path: str, out_mt_path: str, checkpoint_prefix: str # add attributes required for Seqr mt = mt.annotate_globals( - sourceFilePath=vcf_path, + sourceFilePath=vcf, genomeVersion=genome_build().replace('GRCh', ''), hail_version=hl.version(), datasetType='SV', @@ -89,13 +83,6 @@ def annotate_cohort_gcnv(vcf_path: str, out_mt_path: str, checkpoint_prefix: str num_exon=hl.agg.max(mt.NP), ) - # save those changes - mt = checkpoint_hail(mt, 'initial_annotation_round.mt', checkpoint_prefix) - - # get the Gene-Symbol mapping dict - gene_id_mapping_file = download_gencode_gene_id_mapping(get_config().get('gencode_release', '46')) - gene_id_mapping = parse_gtf_from_local(gene_id_mapping_file) - # find all the column names which contain Gene symbols conseq_predicted_gene_cols = [ gene_col @@ -116,15 +103,29 @@ def annotate_cohort_gcnv(vcf_path: str, out_mt_path: str, checkpoint_prefix: str ), ) + mt = mt.checkpoint('pre-gene_annotation.mt', overwrite=True) + + # this next section is currently failing - the dictionary of genes is too large + # to be used in an annotation expression. At least... I think it is + # for i, chunks in enumerate(chunks(gene_id_mapping, 100)): + + # get the Gene-Symbol mapping dict + gene_id_mappings = parse_gtf_from_local(gencode, chunk_size=15000) + # overwrite symbols with ENSG IDs in these columns # not sure why this is required, I think SV annotation came out # with ENSGs from the jump, but this is all symbols - for col_name in conseq_predicted_gene_cols: - mt = mt.annotate_rows( - info=mt.info.annotate( - **{col_name: hl.map(lambda gene: gene_id_mapping.get(gene, gene), mt.info[col_name])}, - ), - ) + for i, gene_id_mapping in enumerate(gene_id_mappings): + logging.info(f'Processing gene ID mapping chunk {i}') + for col_name in conseq_predicted_gene_cols: + mt = mt.annotate_rows( + info=mt.info.annotate( + **{col_name: hl.map(lambda gene: gene_id_mapping.get(gene, gene), mt.info[col_name])}, + ), + ) + + # # checkpoint this chunk + mt = mt.checkpoint(join(checkpoint, f'fragment_{i}.mt')) mt = mt.annotate_rows( # this expected mt.variant_name to be present, and it's not @@ -173,21 +174,21 @@ def annotate_cohort_gcnv(vcf_path: str, out_mt_path: str, checkpoint_prefix: str ) # write this output - mt.write(out_mt_path, overwrite=True) + mt.write(mt_out, overwrite=True) -def annotate_dataset_gcnv(mt_path: str, out_mt_path: str): +def annotate_dataset_gcnv(mt_in: str, mt_out: str, *args, **kwargs): """ process data specific to samples in this dataset do this after sub-setting to specific samples Args: - mt_path (str): path to the annotated MatrixTable - out_mt_path (str): and where do you want it to end up? + mt_in (str): path to the annotated MatrixTable + mt_out (str): and where do you want it to end up? """ logging.info('Annotating genotypes') - mt = read_hail(mt_path) + mt = hl.read_matrix_table(mt_in) # adding in the GT here, that may cause problems later? mt = mt.annotate_rows( @@ -274,5 +275,41 @@ def _filter_num_alt(i, g): ) logging.info('Genotype fields annotated') mt.describe() - mt.write(out_mt_path, overwrite=True) - logging.info(f'Written gCNV MT to {out_mt_path}') + mt.write(mt_out, overwrite=True) + logging.info(f'Written gCNV MT to {mt_out}') + + +def cli_main(): + """ + command line entrypoint + """ + # enable Info-level logging + logging.basicConfig(level=logging.INFO) + + init_batch() + + # set up an argument parser to allow two separate entrypoints + parser = ArgumentParser() + # these arguments are used for both entrypoints + parser.add_argument('--mt_out', help='Path to the MatrixTable, input or output', required=True) + parser.add_argument('--checkpoint', help='Dir to write checkpoints to', required=True) + subparsers = parser.add_subparsers() + + # a parser for the AnnotateCohort method + cohort_parser = subparsers.add_parser('cohort') + cohort_parser.add_argument('--vcf', help='Path to input VCF file') + cohort_parser.add_argument('--gencode', help='Path to input gencode GTF file') + cohort_parser.set_defaults(func=annotate_cohort_gcnv) + + # a parser for the AnnotateDataset method + cohort_parser = subparsers.add_parser('dataset') + cohort_parser.add_argument('--mt_in', help='Path to input MT') + cohort_parser.set_defaults(func=annotate_dataset_gcnv) + + args = parser.parse_args() + + args.func(**vars(args)) + + +if __name__ == '__main__': + cli_main() diff --git a/cpg_workflows/stages/gcnv.py b/cpg_workflows/stages/gcnv.py index 1978591f6..d05a99262 100644 --- a/cpg_workflows/stages/gcnv.py +++ b/cpg_workflows/stages/gcnv.py @@ -9,9 +9,8 @@ from cpg_utils import Path, to_path from cpg_utils.config import AR_GUID_NAME, config_retrieve, image_path, reference_path, try_get_ar_guid -from cpg_utils.hail_batch import get_batch, query_command +from cpg_utils.hail_batch import get_batch from cpg_workflows.jobs import gcnv -from cpg_workflows.query_modules import seqr_loader_cnv from cpg_workflows.stages.gatk_sv.gatk_sv_common import get_images, get_references, queue_annotate_sv_jobs from cpg_workflows.stages.seqr_loader import es_password from cpg_workflows.targets import Cohort, Dataset, MultiCohort, SequencingGroup @@ -612,20 +611,20 @@ def queue_jobs(self, multicohort: MultiCohort, inputs: StageInput) -> StageOutpu Fire up the job to ingest the cohort VCF as a MT, and rearrange the annotations """ - vcf_path = inputs.as_path(target=multicohort, stage=AnnotateCNVVcfWithStrvctvre, key='strvctvre_vcf') + vcf_path = str(inputs.as_path(target=multicohort, stage=AnnotateCNVVcfWithStrvctvre, key='strvctvre_vcf')) outputs = self.expected_outputs(multicohort) j = get_batch().new_job('annotate gCNV cohort', self.get_job_attrs(multicohort)) - j.image(image_path('cpg_workflows')) + j.image(config_retrieve(['worfklow', 'driver_image'])) + gencode_gz = config_retrieve(['worfklow', 'gencode_gtf_file']) + gencode_gtf_local = get_batch().read_input(str(gencode_gz)) j.command( - query_command( - seqr_loader_cnv, - seqr_loader_cnv.annotate_cohort_gcnv.__name__, - str(vcf_path), - str(outputs['mt']), - str(self.tmp_prefix / 'checkpoints'), - setup_gcp=True, - ), + 'seqr_loader_cnv ' + f'--mt_out {str(outputs["mt"])} ' + f'--checkpoint {str(self.tmp_prefix / "checkpoints")} ' + f'cohort ' # use the annotate_COHORT functionality + f'--vcf {vcf_path} ' + f'--gencode {gencode_gtf_local} ', ) return self.make_outputs(multicohort, data=outputs, jobs=j) @@ -657,7 +656,7 @@ def queue_jobs(self, dataset: Dataset, inputs: StageInput) -> StageOutput | None jobs = gcnv.annotate_dataset_jobs_cnv( mt_path=mt_in, sgids=dataset.get_sequencing_group_ids(), - out_mt_path=outputs['mt'], + out_mt_path=str(outputs['mt']), tmp_prefix=self.tmp_prefix / dataset.name / 'checkpoints', job_attrs=self.get_job_attrs(dataset), ) diff --git a/setup.py b/setup.py index fa1a3d614..746f0b8e6 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name='cpg-workflows', # This tag is automatically updated by bumpversion - version='1.29.4', + version='1.29.5', description='CPG workflows for Hail Batch', long_description=open('README.md').read(), long_description_content_type='text/markdown', @@ -68,6 +68,7 @@ 'mt_to_es = cpg_workflows.scripts.mt_to_es_without_dataproc:main', # Generate new intervals from a MatrixTable 'new_intervals_from_mt = cpg_workflows.scripts.generate_new_intervals:cli_main', + 'seqr_loader_cnv = cpg_workflows.scripts.seqr_loader_cnv:cli_main', ], }, )