Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local random object and Skipping malformed denovo variants #61

Merged
merged 4 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions make_prg/update/denovo_variants.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class DenovoError(Exception):
pass


class NonACGTError(Exception):
pass


class TooLongDeletion(Exception):
pass

Expand Down Expand Up @@ -82,7 +86,7 @@ def _param_checking(
def _check_sequence_is_composed_of_ACGT_only(seq: str):
sequence_is_composed_of_ACGT_only = all([base in "ACGT" for base in seq])
if not sequence_is_composed_of_ACGT_only:
raise DenovoError(f"Found a non-ACGT seq ({seq}) in a denovo variant")
raise NonACGTError(f"Found a non-ACGT seq ({seq}) in a denovo variant")

def __eq__(self, other):
if isinstance(other, self.__class__):
Expand Down Expand Up @@ -496,7 +500,7 @@ def _read_variants(
filehandler, long_deletion_threshold
)
variants.append(denovo_variant)
except TooLongDeletion as error:
except (TooLongDeletion, NonACGTError) as error:
logger.warning(f"Ignoring variant: {error}")
return variants

Expand Down
13 changes: 8 additions & 5 deletions make_prg/utils/seq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ def generate_random_seed(sequences: List[str]) -> bytes:
return hashlib.sha256("".join(sequences).encode()).digest()


def get_consensus_residue(position: int, sequences: List[str]) -> str:
def get_consensus_residue(
position: int, sequences: List[str], local_random: random.Random
) -> str:
# Count the residues at this position, ignoring gaps and Ns
pos_counts = Counter(
seq[position]
Expand All @@ -257,14 +259,14 @@ def get_consensus_residue(position: int, sequences: List[str]) -> str:

# If there are no residues other than gaps and Ns at this position, use a random base
if len(pos_counts) == 0:
return random.choice("ACGT")
return local_random.choice("ACGT")

# Find the residue(s) with the highest count
max_count = pos_counts.most_common(1)[0][1]
max_residues = [res for res, count in pos_counts.items() if count == max_count]

# Randomly select a residue from the residues with the highest count
return random.choice(max_residues)
return local_random.choice(max_residues)


def get_majority_consensus_from_MSA(alignment: MSA) -> str:
Expand All @@ -274,14 +276,15 @@ def get_majority_consensus_from_MSA(alignment: MSA) -> str:
all_seqs = get_alignment_seqs(alignment)
all_seqs = list(convert_to_upper(all_seqs))
random_seed_for_this_alignment = generate_random_seed(all_seqs)
random.seed(random_seed_for_this_alignment)
local_random = random.Random()
local_random.seed(random_seed_for_this_alignment)

# Initialize the consensus sequence as an empty string
consensus = ""

# Loop over the positions in the alignment
for i in range(alignment.get_alignment_length()):
# Add the residue to the consensus sequence
consensus += get_consensus_residue(i, all_seqs)
consensus += get_consensus_residue(i, all_seqs, local_random)

return consensus
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
>a_column_full_of_Ns
5 AATCGTGGTT 6 AATCGCGGTT 5
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
H VN:Z:1.0 bn:Z:--linear --singlearr
S 0 * RC:i:0
L 0 + 1 + 0M
S 1 AATCGTGGTT RC:i:0
L 0 + 2 + 0M
S 2 AATCGCGGTT RC:i:0
S 3 * RC:i:0
L 1 + 3 + 0M
L 2 + 3 + 0M
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
12 changes: 10 additions & 2 deletions tests/integration_tests/test_from_msa.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,21 @@ def test___contains_RYKMSW(self):
)
)

def test___fails___a_column_full_of_Ns(self):
options = self.prepare_options("fails")
def test___a_column_full_of_Ns(self):
options = self.prepare_options("a_column_full_of_Ns")
from_msa.run(options)
self.assertTrue(
are_dir_trees_equal(
data_dir / "truth_output/a_column_full_of_Ns",
data_dir / "output/a_column_full_of_Ns",
)
)

def test___fails___unexpected_char_in_MSA(self):
options = self.prepare_options("fails_2")
from_msa.run(options)
output_dir_is_empty = len(os.listdir(data_dir / "output/fails_2")) == 0
self.assertTrue(output_dir_is_empty)

def test___nested_snp_backgrounds(self):
options = self.prepare_options("nested_snps_seq_backgrounds")
Expand Down
15 changes: 10 additions & 5 deletions tests/update/test_DenovoVariant.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
from unittest import TestCase
from unittest.mock import Mock

from make_prg.update.denovo_variants import DenovoError, DenovoVariant, TooLongDeletion
from make_prg.update.denovo_variants import (
DenovoError,
DenovoVariant,
NonACGTError,
TooLongDeletion,
)
from make_prg.update.MLPath import MLPathNode


class DenovoVariantTest(TestCase):
def setUp(self) -> None:
self.sample_denovo_variant = DenovoVariant(5, "AACC", "GT")

def test___ref_not_composed_of_ACGT_only___DenovoError_raised(self):
with self.assertRaises(DenovoError):
def test___ref_not_composed_of_ACGT_only___NonACGTError_raised(self):
with self.assertRaises(NonACGTError):
DenovoVariant(0, ref="ACGTB", alt="")

def test___alt_not_composed_of_ACGT_only___DenovoError_raised(self):
with self.assertRaises(DenovoError):
def test___alt_not_composed_of_ACGT_only___NonACGTError_raised(self):
with self.assertRaises(NonACGTError):
DenovoVariant(0, ref="", alt="ACGTB")

def test___ref_and_alt_are_identical___DenovoError_raised(self):
Expand Down
Loading