Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adding cubi-tk snappy itransfer_sv_calling #213

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cubi_tk/snappy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
Transfer results and logs from ``output`` directory of ``ngs_mapping``.
``itransfer-variant-calling``
Transfer results and logs from ``output`` directory of ``variant_calling``.
``itransfer-sv-calling``
Transfer results and logs from ``output`` directory of ``sv_calling`` or ``sv_calling_targeted``.
``itransfer-step``
Transfer results and logs from ``output`` directory of any snappy pipeline step.
``pull-sheet``
Expand Down Expand Up @@ -45,6 +47,7 @@
)
from .itransfer_raw_data import setup_argparse as setup_argparse_itransfer_raw_data
from .itransfer_step import setup_argparse as setup_argparse_itransfer_step
from .itransfer_sv_calling import setup_argparse as setup_argparse_itransfer_sv_calling
from .itransfer_variant_calling import (
setup_argparse as setup_argparse_itransfer_variant_calling,
)
Expand Down Expand Up @@ -90,6 +93,13 @@ def setup_argparse(parser: argparse.ArgumentParser) -> None:
)
)

setup_argparse_itransfer_sv_calling(
subparsers.add_parser(
"itransfer-sv-calling",
help="Transfer sv_calling or sv_calling_targeted results into iRODS landing zone",
)
)

setup_argparse_itransfer_step(
subparsers.add_parser(
"itransfer-step", help="Transfer snappy step results into iRODS landing zone"
Expand Down
103 changes: 103 additions & 0 deletions cubi_tk/snappy/itransfer_sv_calling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""``cubi-tk snappy itransfer-variant-calling``: transfer variant_calling results into iRODS landing zone."""

import argparse
import os
import typing

from logzero import logger
import yaml

from . import common
from .itransfer_common import IndexLibrariesOnlyMixin, SnappyItransferCommandBase

#: Template string for variant_calling results files.
TPL_INPUT_DIR = "%(step_name)s/output/%(mapper)s.%(caller)s.%(library_name)s"


class SnappyStepNotFoundException(Exception):
def __str__(self):
return "snappy-pipeline config does not define the expected steps this function needs."


class SnappyItransferSvCallingCommand(IndexLibrariesOnlyMixin, SnappyItransferCommandBase):
"""Implementation of snappy itransfer command for variant calling results."""

fix_md5_files = True
command_name = "itransfer-sv-calling"
step_names = ("sv_calling", "sv_calling_targeted")
start_batch_in_family = True

def __init__(self, args):
super().__init__(args)

path = common.find_snappy_root_dir(self.args.base_path or os.getcwd())
with open(path / ".snappy_pipeline/config.yaml", "rt") as f:
config = yaml.safe_load(f)
self.step_name = None
for step_name in self.__class__.step_names:
if not self.step_name and step_name in config["step_config"]:
self.step_name = step_name
elif self.step_name and step_name in config["step_config"]:
raise SnappyStepNotFoundException(
f"Found multiple sv-calling step names in config.yaml. Only one of {', '.join(self.__class__.step_names)} is allowed."
)
if not self.step_name:
raise SnappyStepNotFoundException(
f"Could not find any sv-calling step name in 'config.yaml'. Was looking for one of: {', '.join(self.__class__.step_names)}"
)

self.defined_callers = config["step_config"][self.step_name]["tools"]

@classmethod
def setup_argparse(cls, parser: argparse.ArgumentParser) -> None:
super().setup_argparse(parser)
parser.add_argument(
"--mapper",
help="Name of the mapper to transfer for, defaults to bwa_mem2.",
default="bwa_mem2",
)
parser.add_argument(
"--caller",
help="Name of the variant caller to transfer for. Defaults to all callers defined in config",
default="all-defined",
)

@classmethod
def run(
cls, args, _parser: argparse.ArgumentParser, _subparser: argparse.ArgumentParser
) -> typing.Optional[int]:
"""Entry point into the command."""
return cls(args).execute_multi()

def execute_multi(self) -> typing.Optional[int]:
"""Execute the transfer."""
ret = 0
if self.args.caller == "all-defined":
logger.info("Starting cubi-tk snappy sv-calling for multiple callers")
for caller in self.defined_callers:
self.args.caller = caller
ret = self.execute() or ret
else:
ret = self.execute()

return int(ret)

def build_base_dir_glob_pattern(self, library_name: str) -> typing.Tuple[str, str]:
return (
os.path.join(
self.args.base_path,
TPL_INPUT_DIR
% {
"step_name": self.step_name,
"mapper": self.args.mapper,
"caller": self.args.caller,
"library_name": library_name,
},
),
"**",
)


def setup_argparse(parser: argparse.ArgumentParser) -> None:
"""Setup argument parser for ``cubi-tk snappy itransfer-variant-calling``."""
return SnappyItransferSvCallingCommand.setup_argparse(parser)
264 changes: 264 additions & 0 deletions tests/test_snappy_itransfer_sv_calling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
"""Tests for ``cubi_tk.snappy.itransfer_variant_calling``.

We only run some smoke tests here.
"""

import os
import textwrap
from unittest import mock
from unittest.mock import ANY

from pyfakefs import fake_filesystem
import pytest

from cubi_tk.__main__ import main, setup_argparse
from cubi_tk.snappy.itransfer_sv_calling import (
SnappyItransferSvCallingCommand,
SnappyStepNotFoundException,
)

from .conftest import my_exists, my_get_sodar_info


def fake_config(n_tools=1):
"""Return configuration text"""
head = textwrap.dedent(
r"""
static_data_config: {}

step_config:
"""
).lstrip()

tool1 = textwrap.dedent(
r"""
sv_calling_targeted:
tools:
- gcnv
- manta
dummy_line
"""
).rstrip("dummy_line\n")

tool2 = textwrap.dedent(
r"""
sv_calling:
tools:
- gcnv
- manta
dummy_line
"""
).rstrip("dummy_line\n")

tail = textwrap.dedent(
r"""
data_sets:
first_batch:
sodar_uuid: 466ab946-ce6a-4c78-9981-19b79e7bbe86
file: sheet.tsv
search_patterns:
- {'left': '*/*/*_R1.fastq.gz', 'right': '*/*/*_R2.fastq.gz'}
search_paths: ['/path']
type: germline_variants
naming_scheme: only_secondary_id
"""
)

if n_tools == 0:
return head.rstrip() + " {}\n" + tail
if n_tools == 1:
return head + tool1 + tail
if n_tools == 2:
return head + tool1 + tool2 + tail


def test_run_snappy_itransfer_sv_calling_help(capsys):
parser, _subparsers = setup_argparse()
with pytest.raises(SystemExit) as e:
parser.parse_args(["snappy", "itransfer-sv-calling", "--help"])

assert e.value.code == 0

res = capsys.readouterr()
assert res.out
assert not res.err


def test_run_snappy_itransfer_sv_calling_nothing(capsys):
parser, _subparsers = setup_argparse()

with pytest.raises(SystemExit) as e:
parser.parse_args(["snappy", "itransfer-sv-calling"])

assert e.value.code == 2

res = capsys.readouterr()
assert not res.out
assert res.err


def test_run_snappy_itransfer_sv_calling_no_sv_step(fs):
fake_base_path = "/base/path"
sodar_uuid = "466ab946-ce6a-4c78-9981-19b79e7bbe86"
argv = [
"--verbose",
"snappy",
"itransfer-sv-calling",
"--base-path",
fake_base_path,
"--sodar-api-token",
"XXXX",
sodar_uuid,
]

no_sv_config = fake_config(0)
print(no_sv_config)
fs.create_file(
os.path.join(fake_base_path, ".snappy_pipeline/config.yaml"),
contents=no_sv_config,
create_missing_dirs=True,
)

parser, _subparsers = setup_argparse()
args = parser.parse_args(argv)
with pytest.raises(SnappyStepNotFoundException):
SnappyItransferSvCallingCommand(args)


def test_run_snappy_itransfer_sv_calling_two_sv_steps(fs):
fake_base_path = "/base/path"
sodar_uuid = "466ab946-ce6a-4c78-9981-19b79e7bbe86"
argv = [
"--verbose",
"snappy",
"itransfer-sv-calling",
"--base-path",
fake_base_path,
"--sodar-api-token",
"XXXX",
sodar_uuid,
]

no_sv_config = fake_config(2)
print(no_sv_config)
fs.create_file(
os.path.join(fake_base_path, ".snappy_pipeline/config.yaml"),
contents=no_sv_config,
create_missing_dirs=True,
)

parser, _subparsers = setup_argparse()
args = parser.parse_args(argv)
with pytest.raises(SnappyStepNotFoundException):
SnappyItransferSvCallingCommand(args)


def test_run_snappy_itransfer_sv_calling_smoke_test(mocker, germline_trio_sheet_tsv):
fake_base_path = "/base/path"
dest_path = "/irods/dest"
sodar_uuid = "466ab946-ce6a-4c78-9981-19b79e7bbe86"
argv = [
"--verbose",
"snappy",
"itransfer-sv-calling",
"--base-path",
fake_base_path,
"--sodar-api-token",
"XXXX",
# tsv_path,
sodar_uuid,
]

# Setup fake file system but only patch selected modules. We cannot use the Patcher approach here as this would
# break both biomedsheets and multiprocessing.
fs = fake_filesystem.FakeFilesystem()

fake_file_paths = []
for member in ("index",):
for ext in ("", ".md5"):
fake_file_paths.append(
"%s/sv_calling_targeted/output/bwa_mem2.gcnv.%s-N1-DNA1-WES1/out/bwa_mem2.gcnv.%s-N1-DNA1-WES1.vcf.gz%s"
% (fake_base_path, member, member, ext)
)
fs.create_file(fake_file_paths[-1])
fake_file_paths.append(
"%s/sv_calling_targeted/output/bwa_mem2.manta.%s-N1-DNA1-WES1/out/bwa_mem2.manta.%s-N1-DNA1-WES1.vcf.gz%s"
% (fake_base_path, member, member, ext)
)
fs.create_file(fake_file_paths[-1])
fake_file_paths.append(
"%s/sv_calling_targeted/output/bwa_mem2.gcnv.%s-N1-DNA1-WES1/log/bwa_mem2.gcnv.%s-N1-DNA1-WES1.log%s"
% (fake_base_path, member, member, ext)
)
fs.create_file(fake_file_paths[-1])
# Create sample sheet in fake file system
sample_sheet_path = fake_base_path + "/.snappy_pipeline/sheet.tsv"
fs.create_file(sample_sheet_path, contents=germline_trio_sheet_tsv, create_missing_dirs=True)
# Create config in fake file system
config_path = fake_base_path + "/.snappy_pipeline/config.yaml"
fs.create_file(config_path, contents=fake_config(), create_missing_dirs=True)

# Print path to all created files
print(fake_config())
print("\n".join(fake_file_paths + [sample_sheet_path, config_path]))

# Remove index's log MD5 file again so it is recreated.
fs.remove(fake_file_paths[3])

# Set Mocker
mocker.patch("pathlib.Path.exists", my_exists)
mocker.patch(
"cubi_tk.snappy.itransfer_common.SnappyItransferCommandBase.get_sodar_info",
my_get_sodar_info,
)

fake_os = fake_filesystem.FakeOsModule(fs)
mocker.patch("glob.os", fake_os)
mocker.patch("cubi_tk.snappy.itransfer_common.os", fake_os)
mocker.patch("cubi_tk.snappy.itransfer_variant_calling.os", fake_os)

mock_check_output = mock.mock_open()
mocker.patch("cubi_tk.snappy.itransfer_common.check_output", mock_check_output)

fake_open = fake_filesystem.FakeFileOpen(fs)
mocker.patch("cubi_tk.snappy.itransfer_sv_calling.open", fake_open)
mocker.patch("cubi_tk.snappy.itransfer_common.open", fake_open)
mocker.patch("cubi_tk.snappy.common.open", fake_open)

mock_check_call = mock.mock_open()
mocker.patch("cubi_tk.snappy.itransfer_common.check_call", mock_check_call)

# Actually exercise code and perform test.
parser, _subparsers = setup_argparse()
args = parser.parse_args(argv)
res = main(argv)

assert not res

# We do not care about call order but simply test call count and then assert that all files are there which would
# be equivalent of comparing sets of files.

assert fs.exists(fake_file_paths[3])

assert mock_check_call.call_count == 1
mock_check_call.assert_called_once_with(
["md5sum", "bwa_mem2.gcnv.index-N1-DNA1-WES1.vcf.gz"],
cwd=os.path.dirname(fake_file_paths[3]),
stdout=ANY,
)

assert mock_check_output.call_count == len(fake_file_paths) * 3
for path in fake_file_paths:
mapper_index, rel_path = os.path.relpath(
path, os.path.join(fake_base_path, "sv_calling_targeted/output")
).split("/", 1)
_mapper, index = mapper_index.rsplit(".", 1)
remote_path = os.path.join(
dest_path, index, "sv_calling_targeted", args.remote_dir_date, rel_path
)
expected_mkdir_argv = ["imkdir", "-p", os.path.dirname(remote_path)]
expected_irsync_argv = ["irsync", "-a", "-K", path, "i:%s" % remote_path]
expected_ils_argv = ["ils", os.path.dirname(remote_path)]
mock_check_output.assert_any_call(expected_mkdir_argv)
mock_check_output.assert_any_call(expected_irsync_argv)
mock_check_output.assert_any_call(expected_ils_argv, stderr=-2)
Loading