diff --git a/cubi_tk/snappy/__init__.py b/cubi_tk/snappy/__init__.py index b7c28ebb..32a8d2d8 100644 --- a/cubi_tk/snappy/__init__.py +++ b/cubi_tk/snappy/__init__.py @@ -13,6 +13,8 @@ Transfer results and logs from ``output`` directory of ``ngs_mapping``. ``itransfer-variant-calling`` Transfer results and logs from ``output`` directory of ``variant_calling``. +``itransfer-sv-calling`` + Transfer results and logs from ``output`` directory of ``sv_calling`` or ``sv_calling_targeted``. ``itransfer-step`` Transfer results and logs from ``output`` directory of any snappy pipeline step. ``pull-sheet`` @@ -45,6 +47,7 @@ ) from .itransfer_raw_data import setup_argparse as setup_argparse_itransfer_raw_data from .itransfer_step import setup_argparse as setup_argparse_itransfer_step +from .itransfer_sv_calling import setup_argparse as setup_argparse_itransfer_sv_calling from .itransfer_variant_calling import ( setup_argparse as setup_argparse_itransfer_variant_calling, ) @@ -90,6 +93,13 @@ def setup_argparse(parser: argparse.ArgumentParser) -> None: ) ) + setup_argparse_itransfer_sv_calling( + subparsers.add_parser( + "itransfer-sv-calling", + help="Transfer sv_calling or sv_calling_targeted results into iRODS landing zone", + ) + ) + setup_argparse_itransfer_step( subparsers.add_parser( "itransfer-step", help="Transfer snappy step results into iRODS landing zone" diff --git a/cubi_tk/snappy/itransfer_sv_calling.py b/cubi_tk/snappy/itransfer_sv_calling.py new file mode 100644 index 00000000..e7cb76da --- /dev/null +++ b/cubi_tk/snappy/itransfer_sv_calling.py @@ -0,0 +1,103 @@ +"""``cubi-tk snappy itransfer-variant-calling``: transfer variant_calling results into iRODS landing zone.""" + +import argparse +import os +import typing + +from logzero import logger +import yaml + +from . import common +from .itransfer_common import IndexLibrariesOnlyMixin, SnappyItransferCommandBase + +#: Template string for variant_calling results files. +TPL_INPUT_DIR = "%(step_name)s/output/%(mapper)s.%(caller)s.%(library_name)s" + + +class SnappyStepNotFoundException(Exception): + def __str__(self): + return "snappy-pipeline config does not define the expected steps this function needs." + + +class SnappyItransferSvCallingCommand(IndexLibrariesOnlyMixin, SnappyItransferCommandBase): + """Implementation of snappy itransfer command for variant calling results.""" + + fix_md5_files = True + command_name = "itransfer-sv-calling" + step_names = ("sv_calling", "sv_calling_targeted") + start_batch_in_family = True + + def __init__(self, args): + super().__init__(args) + + path = common.find_snappy_root_dir(self.args.base_path or os.getcwd()) + with open(path / ".snappy_pipeline/config.yaml", "rt") as f: + config = yaml.safe_load(f) + self.step_name = None + for step_name in self.__class__.step_names: + if not self.step_name and step_name in config["step_config"]: + self.step_name = step_name + elif self.step_name and step_name in config["step_config"]: + raise SnappyStepNotFoundException( + f"Found multiple sv-calling step names in config.yaml. Only one of {', '.join(self.__class__.step_names)} is allowed." + ) + if not self.step_name: + raise SnappyStepNotFoundException( + f"Could not find any sv-calling step name in 'config.yaml'. Was looking for one of: {', '.join(self.__class__.step_names)}" + ) + + self.defined_callers = config["step_config"][self.step_name]["tools"] + + @classmethod + def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: + super().setup_argparse(parser) + parser.add_argument( + "--mapper", + help="Name of the mapper to transfer for, defaults to bwa_mem2.", + default="bwa_mem2", + ) + parser.add_argument( + "--caller", + help="Name of the variant caller to transfer for. Defaults to all callers defined in config", + default="all-defined", + ) + + @classmethod + def run( + cls, args, _parser: argparse.ArgumentParser, _subparser: argparse.ArgumentParser + ) -> typing.Optional[int]: + """Entry point into the command.""" + return cls(args).execute_multi() + + def execute_multi(self) -> typing.Optional[int]: + """Execute the transfer.""" + ret = 0 + if self.args.caller == "all-defined": + logger.info("Starting cubi-tk snappy sv-calling for multiple callers") + for caller in self.defined_callers: + self.args.caller = caller + ret = self.execute() or ret + else: + ret = self.execute() + + return int(ret) + + def build_base_dir_glob_pattern(self, library_name: str) -> typing.Tuple[str, str]: + return ( + os.path.join( + self.args.base_path, + TPL_INPUT_DIR + % { + "step_name": self.step_name, + "mapper": self.args.mapper, + "caller": self.args.caller, + "library_name": library_name, + }, + ), + "**", + ) + + +def setup_argparse(parser: argparse.ArgumentParser) -> None: + """Setup argument parser for ``cubi-tk snappy itransfer-variant-calling``.""" + return SnappyItransferSvCallingCommand.setup_argparse(parser) diff --git a/tests/test_snappy_itransfer_sv_calling.py b/tests/test_snappy_itransfer_sv_calling.py new file mode 100644 index 00000000..813e6b21 --- /dev/null +++ b/tests/test_snappy_itransfer_sv_calling.py @@ -0,0 +1,264 @@ +"""Tests for ``cubi_tk.snappy.itransfer_variant_calling``. + +We only run some smoke tests here. +""" + +import os +import textwrap +from unittest import mock +from unittest.mock import ANY + +from pyfakefs import fake_filesystem +import pytest + +from cubi_tk.__main__ import main, setup_argparse +from cubi_tk.snappy.itransfer_sv_calling import ( + SnappyItransferSvCallingCommand, + SnappyStepNotFoundException, +) + +from .conftest import my_exists, my_get_sodar_info + + +def fake_config(n_tools=1): + """Return configuration text""" + head = textwrap.dedent( + r""" + static_data_config: {} + + step_config: + """ + ).lstrip() + + tool1 = textwrap.dedent( + r""" + sv_calling_targeted: + tools: + - gcnv + - manta + dummy_line + """ + ).rstrip("dummy_line\n") + + tool2 = textwrap.dedent( + r""" + sv_calling: + tools: + - gcnv + - manta + dummy_line + """ + ).rstrip("dummy_line\n") + + tail = textwrap.dedent( + r""" + data_sets: + first_batch: + sodar_uuid: 466ab946-ce6a-4c78-9981-19b79e7bbe86 + file: sheet.tsv + search_patterns: + - {'left': '*/*/*_R1.fastq.gz', 'right': '*/*/*_R2.fastq.gz'} + search_paths: ['/path'] + type: germline_variants + naming_scheme: only_secondary_id + """ + ) + + if n_tools == 0: + return head.rstrip() + " {}\n" + tail + if n_tools == 1: + return head + tool1 + tail + if n_tools == 2: + return head + tool1 + tool2 + tail + + +def test_run_snappy_itransfer_sv_calling_help(capsys): + parser, _subparsers = setup_argparse() + with pytest.raises(SystemExit) as e: + parser.parse_args(["snappy", "itransfer-sv-calling", "--help"]) + + assert e.value.code == 0 + + res = capsys.readouterr() + assert res.out + assert not res.err + + +def test_run_snappy_itransfer_sv_calling_nothing(capsys): + parser, _subparsers = setup_argparse() + + with pytest.raises(SystemExit) as e: + parser.parse_args(["snappy", "itransfer-sv-calling"]) + + assert e.value.code == 2 + + res = capsys.readouterr() + assert not res.out + assert res.err + + +def test_run_snappy_itransfer_sv_calling_no_sv_step(fs): + fake_base_path = "/base/path" + sodar_uuid = "466ab946-ce6a-4c78-9981-19b79e7bbe86" + argv = [ + "--verbose", + "snappy", + "itransfer-sv-calling", + "--base-path", + fake_base_path, + "--sodar-api-token", + "XXXX", + sodar_uuid, + ] + + no_sv_config = fake_config(0) + print(no_sv_config) + fs.create_file( + os.path.join(fake_base_path, ".snappy_pipeline/config.yaml"), + contents=no_sv_config, + create_missing_dirs=True, + ) + + parser, _subparsers = setup_argparse() + args = parser.parse_args(argv) + with pytest.raises(SnappyStepNotFoundException): + SnappyItransferSvCallingCommand(args) + + +def test_run_snappy_itransfer_sv_calling_two_sv_steps(fs): + fake_base_path = "/base/path" + sodar_uuid = "466ab946-ce6a-4c78-9981-19b79e7bbe86" + argv = [ + "--verbose", + "snappy", + "itransfer-sv-calling", + "--base-path", + fake_base_path, + "--sodar-api-token", + "XXXX", + sodar_uuid, + ] + + no_sv_config = fake_config(2) + print(no_sv_config) + fs.create_file( + os.path.join(fake_base_path, ".snappy_pipeline/config.yaml"), + contents=no_sv_config, + create_missing_dirs=True, + ) + + parser, _subparsers = setup_argparse() + args = parser.parse_args(argv) + with pytest.raises(SnappyStepNotFoundException): + SnappyItransferSvCallingCommand(args) + + +def test_run_snappy_itransfer_sv_calling_smoke_test(mocker, germline_trio_sheet_tsv): + fake_base_path = "/base/path" + dest_path = "/irods/dest" + sodar_uuid = "466ab946-ce6a-4c78-9981-19b79e7bbe86" + argv = [ + "--verbose", + "snappy", + "itransfer-sv-calling", + "--base-path", + fake_base_path, + "--sodar-api-token", + "XXXX", + # tsv_path, + sodar_uuid, + ] + + # Setup fake file system but only patch selected modules. We cannot use the Patcher approach here as this would + # break both biomedsheets and multiprocessing. + fs = fake_filesystem.FakeFilesystem() + + fake_file_paths = [] + for member in ("index",): + for ext in ("", ".md5"): + fake_file_paths.append( + "%s/sv_calling_targeted/output/bwa_mem2.gcnv.%s-N1-DNA1-WES1/out/bwa_mem2.gcnv.%s-N1-DNA1-WES1.vcf.gz%s" + % (fake_base_path, member, member, ext) + ) + fs.create_file(fake_file_paths[-1]) + fake_file_paths.append( + "%s/sv_calling_targeted/output/bwa_mem2.manta.%s-N1-DNA1-WES1/out/bwa_mem2.manta.%s-N1-DNA1-WES1.vcf.gz%s" + % (fake_base_path, member, member, ext) + ) + fs.create_file(fake_file_paths[-1]) + fake_file_paths.append( + "%s/sv_calling_targeted/output/bwa_mem2.gcnv.%s-N1-DNA1-WES1/log/bwa_mem2.gcnv.%s-N1-DNA1-WES1.log%s" + % (fake_base_path, member, member, ext) + ) + fs.create_file(fake_file_paths[-1]) + # Create sample sheet in fake file system + sample_sheet_path = fake_base_path + "/.snappy_pipeline/sheet.tsv" + fs.create_file(sample_sheet_path, contents=germline_trio_sheet_tsv, create_missing_dirs=True) + # Create config in fake file system + config_path = fake_base_path + "/.snappy_pipeline/config.yaml" + fs.create_file(config_path, contents=fake_config(), create_missing_dirs=True) + + # Print path to all created files + print(fake_config()) + print("\n".join(fake_file_paths + [sample_sheet_path, config_path])) + + # Remove index's log MD5 file again so it is recreated. + fs.remove(fake_file_paths[3]) + + # Set Mocker + mocker.patch("pathlib.Path.exists", my_exists) + mocker.patch( + "cubi_tk.snappy.itransfer_common.SnappyItransferCommandBase.get_sodar_info", + my_get_sodar_info, + ) + + fake_os = fake_filesystem.FakeOsModule(fs) + mocker.patch("glob.os", fake_os) + mocker.patch("cubi_tk.snappy.itransfer_common.os", fake_os) + mocker.patch("cubi_tk.snappy.itransfer_variant_calling.os", fake_os) + + mock_check_output = mock.mock_open() + mocker.patch("cubi_tk.snappy.itransfer_common.check_output", mock_check_output) + + fake_open = fake_filesystem.FakeFileOpen(fs) + mocker.patch("cubi_tk.snappy.itransfer_sv_calling.open", fake_open) + mocker.patch("cubi_tk.snappy.itransfer_common.open", fake_open) + mocker.patch("cubi_tk.snappy.common.open", fake_open) + + mock_check_call = mock.mock_open() + mocker.patch("cubi_tk.snappy.itransfer_common.check_call", mock_check_call) + + # Actually exercise code and perform test. + parser, _subparsers = setup_argparse() + args = parser.parse_args(argv) + res = main(argv) + + assert not res + + # We do not care about call order but simply test call count and then assert that all files are there which would + # be equivalent of comparing sets of files. + + assert fs.exists(fake_file_paths[3]) + + assert mock_check_call.call_count == 1 + mock_check_call.assert_called_once_with( + ["md5sum", "bwa_mem2.gcnv.index-N1-DNA1-WES1.vcf.gz"], + cwd=os.path.dirname(fake_file_paths[3]), + stdout=ANY, + ) + + assert mock_check_output.call_count == len(fake_file_paths) * 3 + for path in fake_file_paths: + mapper_index, rel_path = os.path.relpath( + path, os.path.join(fake_base_path, "sv_calling_targeted/output") + ).split("/", 1) + _mapper, index = mapper_index.rsplit(".", 1) + remote_path = os.path.join( + dest_path, index, "sv_calling_targeted", args.remote_dir_date, rel_path + ) + expected_mkdir_argv = ["imkdir", "-p", os.path.dirname(remote_path)] + expected_irsync_argv = ["irsync", "-a", "-K", path, "i:%s" % remote_path] + expected_ils_argv = ["ils", os.path.dirname(remote_path)] + mock_check_output.assert_any_call(expected_mkdir_argv) + mock_check_output.assert_any_call(expected_irsync_argv) + mock_check_output.assert_any_call(expected_ils_argv, stderr=-2)