-
Notifications
You must be signed in to change notification settings - Fork 1
/
kmer.py
174 lines (143 loc) · 5.18 KB
/
kmer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""Enumerate kmers
This script generates all successive kmers, with a controllable offset,
from a given set of sequences.
If we specify k=6, and j=3, the following kmers will be emitted
------
------
------
------
-----
AAAAAAAAAAAAAAAAA
Note that 3' terminal substrings will be emitted if the are > j.
The parallel mode of operation for this script breaks up an input sequence
into "number_of_task" chunks of approximately equal size. Kmers within each
chunk are generated. Each chunk region is extended by j to overlap chunks.
Input sequence can be single or multiline FASTA, and read from stdin.
Sequences obtained are sent to stdout.
Unit tests can be executed by calling the program without arguments.
Author: Daniel McDonald (d3mcdonald@eng.ucsd.edu)
License: BSD-3
"""
import sys
import datetime
import logging
import os
import io
# modified from
# https://stackoverflow.com/a/2135920/19741
def split(a, n):
"""get n roughly even start/stop boundaries for size a"""
assert a > n
# k -> quotient
# m -> remainder
k, m = divmod(a, n)
# (start, stop)
# for start, move to the opening position of the ith
# quotient bin. we right shift by one, until we exhaust our remainder
# the effect being that the first m bins contain an additional element
# for stop, repeat but assuming the next ith position
return [((i * k) + min(i, m),
((i + 1) * k) + min(i + 1, m))
for i in range(n)]
def check_write(id_, rec, k, j, task, ntask, buf, max_n=100):
if id_ is None:
return
rec = ''.join(rec)
idfmt = id_ + '_%d\n'
# j -> jump
# k -> length
# >>> a
# 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
# >>> k = 15
# >>> j = 8
# >>> for i in range(0, len(a) - j + 1, j):
# ... print(i, a[i:i+k])
# ...
# 0 abcdefghijklmno
# 8 ijklmnopqrstuvw
# 16 qrstuvwxyzABCDE
# 24 yzABCDEFGHIJKLM
# 32 GHIJKLMNOPQRSTU
# 40 OPQRSTUVWXYZ
for idx, (rec_start, rec_end) in enumerate(split(len(rec), ntask)):
if idx % ntask == task:
task_substring = rec[rec_start:rec_end + j] # overlap to cover edges
for start in range(0, len(task_substring) - j + 1, j):
s = task_substring[start:start + k]
if s.count('N') >= max_n:
continue
if len(s) <= j:
continue
buf.write(idfmt % (start + rec_start))
buf.write(s)
buf.write('\n')
def test_check_write():
tests = [(('>foo', 'AATTGGCC', 2, 1, 0, 3),
['>foo_0', 'AA', '>foo_1', 'AT', '>foo_2', 'TT']),
(('>bar', 'ATNNNTTGC', 2, 1, 0, 3),
['>bar_0', 'AT', '>bar_1', 'TN']),
(('>bar', 'ATNNNTTGC', 2, 1, 0, 1),
['>bar_0', 'AT', '>bar_1', 'TN', '>bar_4', 'NT',
'>bar_5', 'TT', '>bar_6', 'TG', '>bar_7', 'GC'])]
for args, exp in tests:
buf = io.StringIO()
check_write(*args, buf, max_n=2)
buf.seek(0)
obs = [l.strip() for l in buf.readlines()]
assert obs == exp
def test_split():
tests = [((10, 5), [(0, 2), (2, 4), (4, 6), (6, 8), (8, 10)]),
((11, 5), [(0, 3), (3, 5), (5, 7), (7, 9), (9, 11)]),
((11, 2), [(0, 6), (6, 11)])]
for (a, k), exp in tests:
obs = split(a, k)
assert obs == exp
if __name__ == '__main__':
if len(sys.argv) == 1:
print("testing...")
test_check_write()
test_split()
sys.exit(0)
elif len(sys.argv) != 5:
print("usage: python kmer.py <k> <j> <task> <number_of_tasks>")
print("")
print("\tk: kmer size to emit")
print("\tj: size of jump to perform")
print("\ttask: the task index (e.g., SLURM_ARRAY_TASK_ID)")
print("\tnumber_of_tasks: the total number of tasks (e.g., SLURM_ARRAY_TASK_COUNT)")
sys.exit(1)
n = datetime.datetime.now()
nfmt = n.strftime("%Y.%m.%d")
sjob = os.environ.get('SLURM_ARRAY_JOB_ID', 'no-slurm-id')
stask = os.environ.get('SLURM_ARRAY_TASK_ID', 'no-slurm-array')
if os.environ.get('SLURM_ARRAY_TASK_ID') is None:
disable_logging = True
else:
disable_logging = False
logging.basicConfig(filename=f'kmer-{nfmt}-{sjob}_{stask}.log',
encoding='utf-8',
level=logging.DEBUG,
format='%(asctime)s:::%(message)s',
datefmt="%Y.%m.%d %H:%M:%S")
logger = logging.getLogger()
if disable_logging:
logger.disabled = True
k = int(sys.argv[1])
j = int(sys.argv[2])
task = int(sys.argv[3])
ntask = int(sys.argv[4])
id_ = None
rec = []
buf = sys.stdout
for line in sys.stdin:
if id_ is None and line.startswith('>'):
id_ = line.strip().split(' ')[0]
elif line.startswith('>'):
logger.info(f"Emitting on: {id_}")
check_write(id_, rec, k, j, task, ntask, buf)
id_ = line.strip().split(' ')[0]
rec = []
else:
rec.append(line.strip())
logger.info(f"Emitting on: {id_}")
check_write(id_, rec, k, j, task, ntask, buf)