Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] committed Oct 18, 2023
1 parent 03871cf commit 07920f3
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 63 deletions.
2 changes: 1 addition & 1 deletion src/scirpy/ir_dist/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def IrNeighbors(*args, **kwargs):
BLOSUM62 matrix. This option is incompatible with nucleotide sequences.
See :class:`~scirpy.ir_dist.metrics.AlignmentDistanceCalculator`.
* `fastalignment` -- Distance based on pairwise sequence alignments using the
BLOSUM62 matrix. Faster implementation of `alignment` with some loss.
BLOSUM62 matrix. Faster implementation of `alignment` with some loss.
This option is incompatible with nucleotide sequences.
See :class:`~scirpy.ir_dist.metrics.FastAlignmentDistanceCalculator`.
* any instance of :class:`~scirpy.ir_dist.metrics.DistanceCalculator`.
Expand Down
130 changes: 68 additions & 62 deletions src/scirpy/ir_dist/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

from scirpy.util import _doc_params, tqdm


_doc_params_parallel_distance_calculator = """\
n_jobs
Number of jobs to use for the pairwise distance calculation.
Expand Down Expand Up @@ -480,18 +479,19 @@ def _self_alignment_scores(self, seqs: Sequence) -> dict:
count=len(seqs),
)


@_doc_params(params=_doc_params_parallel_distance_calculator)
class FastAlignmentDistanceCalculator(ParallelDistanceCalculator):
"""\
Calculates distance between sequences based on pairwise sequence alignment.
This is a variation of the AlignmentDistanceCalculator which pre-filters sequence pairs based on
a) differences in sequence length
b) the number of different characters, based on estimate of the mismatch penalty
Depending on the setup, alignment may be performed significantly faster, but be advised that some sequence pairs may be filtered out incorrectly.
Default values for BLOSUM and PAM matrices are provided, but finding an adequate estimated_penalty for your given setup is encouraged.
The distance between two sequences is defined as :math:`S_{{1,2}}^{{max}} - S_{{1,2}}`,
where :math:`S_{{1,2}}` is the alignment score of sequences 1 and 2 and
:math:`S_{{1,2}}^{{max}}` is the max. achievable alignment score of sequences 1 and 2.
Expand Down Expand Up @@ -547,53 +547,59 @@ def __init__(
subst_mat: str = "blosum62",
gap_open: int = 11,
gap_extend: int = 11,
estimated_penalty: float = None
):
estimated_penalty: float = None,
):
if cutoff is None:
cutoff = 10
super().__init__(cutoff, n_jobs=n_jobs, block_size=block_size)
self.subst_mat = subst_mat
self.gap_open = gap_open
self.gap_extend = gap_extend

penalty_dict = {
"blosum30":4.0,
"blosum35":4.0,
"blosum40":4.0,
"blosum45":4.0,
"blosum50":4.0,
"blosum55":4.0,
"blosum60":4.0,
"blosum62":4.0,
"blosum65":4.0,
"blosum70":4.0,
"blosum75":4.0,
"blosum80":4.0,
"blosum85":4.0,
"blosum90":4.0,
"pam10":8.0,
"pam20":8.0,
"pam30":8.0,
"pam40":8.0,
"pam50":8.0,
"pam60":4.0,
"pam70":4.0,
"pam80":4.0,
"pam90":4.0,
"pam100":4.0,
"pam110":2.0,
"pam120":2.0,
"pam130":2.0,
"pam140":2.0,
"pam150":2.0,
"pam160":2.0,
"pam170":2.0,
"pam180":2.0,
"pam190":2.0,
"pam200":2.0}

self.estimated_penalty = estimated_penalty if estimated_penalty is not None else penalty_dict[subst_mat] if subst_mat in penalty_dict.keys() else 0.0

penalty_dict = {
"blosum30": 4.0,
"blosum35": 4.0,
"blosum40": 4.0,
"blosum45": 4.0,
"blosum50": 4.0,
"blosum55": 4.0,
"blosum60": 4.0,
"blosum62": 4.0,
"blosum65": 4.0,
"blosum70": 4.0,
"blosum75": 4.0,
"blosum80": 4.0,
"blosum85": 4.0,
"blosum90": 4.0,
"pam10": 8.0,
"pam20": 8.0,
"pam30": 8.0,
"pam40": 8.0,
"pam50": 8.0,
"pam60": 4.0,
"pam70": 4.0,
"pam80": 4.0,
"pam90": 4.0,
"pam100": 4.0,
"pam110": 2.0,
"pam120": 2.0,
"pam130": 2.0,
"pam140": 2.0,
"pam150": 2.0,
"pam160": 2.0,
"pam170": 2.0,
"pam180": 2.0,
"pam190": 2.0,
"pam200": 2.0,
}

self.estimated_penalty = (
estimated_penalty
if estimated_penalty is not None
else penalty_dict[subst_mat]
if subst_mat in penalty_dict.keys()
else 0.0
)

def _compute_block(self, seqs1, seqs2, origin):
import parasail
Expand All @@ -611,40 +617,40 @@ def _compute_block(self, seqs1, seqs2, origin):
else:
self_alignment_scores2 = self._self_alignment_scores(seqs2)

max_len_diff = ((self.cutoff - self.gap_open) / self.gap_extend) + 1
max_len_diff = ((self.cutoff - self.gap_open) / self.gap_extend) + 1

result = []
for row, s1 in enumerate(seqs1):
col_start = row if square_matrix else 0
profile = parasail.profile_create_16(s1, subst_mat)
len1 = len(s1)

for col, s2 in enumerate(seqs2[col_start:], start=col_start):
len_diff = abs(len1-len(s2))
len_diff = abs(len1 - len(s2))
# No need to calculate diagonal values
if s1 == s2:
result.append((1, origin_row + row, origin_col + col))
# Dismiss sequences based on length
elif len_diff <= max_len_diff:
elif len_diff <= max_len_diff:
# Dismiss sequences that are too different
if self._num_different_characters(s1, s2, len_diff) * self.estimated_penalty + len_diff * self.gap_extend <= self.cutoff:
r = parasail.nw_scan_profile_16(
profile, s2, self.gap_open, self.gap_extend
)
max_score = np.min(
[self_alignment_scores1[row], self_alignment_scores2[col]]
)
if (
self._num_different_characters(s1, s2, len_diff) * self.estimated_penalty
+ len_diff * self.gap_extend
<= self.cutoff
):
r = parasail.nw_scan_profile_16(profile, s2, self.gap_open, self.gap_extend)
max_score = np.min([self_alignment_scores1[row], self_alignment_scores2[col]])
d = max_score - r.score

if d <= self.cutoff:
result.append((d + 1, origin_row + row, origin_col + col))


return result

def _self_alignment_scores(self, seqs: Sequence) -> dict:
"""Calculate self-alignments. We need them as reference values
to turn scores into dists"""
to turn scores into dists
"""
import parasail

return np.fromiter(
Expand All @@ -661,11 +667,11 @@ def _self_alignment_scores(self, seqs: Sequence) -> dict:
dtype=int,
count=len(seqs),
)

def _num_different_characters(self, s1, s2, len_diff):
longer, shorter = (s1, s2) if len(s1)>=len(s2) else (s2, s1)
longer, shorter = (s1, s2) if len(s1) >= len(s2) else (s2, s1)

for c in shorter:
if c in longer:
longer = longer.replace(c, "", 1)
return len(longer)-len_diff
longer = longer.replace(c, "", 1)
return len(longer) - len_diff

0 comments on commit 07920f3

Please sign in to comment.