Skip to content

Commit

Permalink
combiner setup fixes (#963)
Browse files Browse the repository at this point in the history
  • Loading branch information
MattWellie authored Oct 30, 2024
1 parent 30ac903 commit 9853640
Showing 1 changed file with 12 additions and 14 deletions.
26 changes: 12 additions & 14 deletions cpg_workflows/stages/large_cohort.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import TYPE_CHECKING, Any, Tuple

from cpg_utils import Path
from cpg_utils.config import config_retrieve, get_config, image_path
from cpg_utils.config import config_retrieve, genome_build, get_config, image_path
from cpg_utils.hail_batch import get_batch, query_command
from cpg_workflows.targets import Cohort, SequencingGroup
from cpg_workflows.utils import slugify
Expand Down Expand Up @@ -63,26 +63,24 @@ def queue_jobs(self, cohort: Cohort, inputs: StageInput) -> StageOutput | None:
f"{self.tmp_prefix}/{workflow_config['cohort']}-{workflow_config['sequencing_type']}-{combiner_config['vds_version']}",
)

vds_paths: list[str] | None = None
sg_ids_in_vds: list[str] | None = None
new_sg_gvcfs: list[str] | None = None
# create these as empty lists instead of None, they have the same truthiness
vds_paths: list[str] = []
sg_ids_in_vds: list[str] = []

if combiner_config.get('vds_analysis_ids', None) is not None:
vds_paths = []
sg_ids_in_vds = []
for vds_id in combiner_config['vds_analysis_ids']:
tmp_query_res, tmp_sg_ids_in_vds = self.get_vds_ids_output(vds_id)
vds_paths.append(tmp_query_res)
sg_ids_in_vds = sg_ids_in_vds + tmp_sg_ids_in_vds

# Get SG IDs from the cohort object itself, rather than call Metamist.
# Get VDS IDs first and filter out from this list
sg_ids: list[SequencingGroup] = cohort.get_sequencing_groups(only_active=True)
new_sg_gvcfs = [str(sg.gvcf) for sg in sg_ids if sg_ids_in_vds and sg.id not in sg_ids_in_vds]
if len(new_sg_gvcfs) == 0:
new_sg_gvcfs = None
if not vds_paths or len(vds_paths) == 1:
return self.make_outputs(cohort, self.expected_outputs(cohort))
cohort_sgs: list[SequencingGroup] = cohort.get_sequencing_groups(only_active=True)

new_sg_gvcfs: list[str] = [str(sg.gvcf) for sg in cohort_sgs if sg.id not in sg_ids_in_vds]

if len(new_sg_gvcfs) == 0 and len(vds_paths) <= 1:
return self.make_outputs(cohort, self.expected_outputs(cohort))

j.image(image_path('cpg_workflows'))
j.memory(combiner_config['memory'])
Expand All @@ -94,8 +92,8 @@ def queue_jobs(self, cohort: Cohort, inputs: StageInput) -> StageOutput | None:
output_vds_path=str(output_vds_path),
sequencing_type=workflow_config['sequencing_type'],
tmp_prefix=tmp_prefix,
genome_build=workflow_config['workflow'].get('genome_build', 'GRCh38'),
gvcf_paths=new_sg_gvcfs,
genome_build=genome_build(),
gvcf_paths=new_sg_gvcfs, # this is a list or None, and in new_combiner None is made into []
vds_paths=vds_paths,
)

Expand Down

0 comments on commit 9853640

Please sign in to comment.