-
Notifications
You must be signed in to change notification settings - Fork 2
/
kws.py
executable file
·103 lines (75 loc) · 2.34 KB
/
kws.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# -*- coding: utf-8 -*-
"""
Keyword spotting using pocketsphinx
"""
import os
import threading
try:
import Queue as queue
except ImportError:
import queue
from voice_engine.element import Element
from pocketsphinx.pocketsphinx import Decoder
class KWS(Element):
def __init__(self):
super(KWS, self).__init__()
self.queue = queue.Queue()
self.on_detected = None
self.done = False
def put(self, data):
self.queue.put(data)
def start(self):
self.done = False
thread = threading.Thread(target=self.run)
thread.daemon = True
thread.start()
def stop(self):
self.done = True
def set_callback(self, callback):
self.on_detected = callback
def run(self):
pocketsphinx_data = os.path.join(os.path.dirname(__file__), 'pocketsphinx-data')
hmm = os.path.join(pocketsphinx_data, 'hmm')
dic = os.path.join(pocketsphinx_data, 'dictionary.txt')
kws_list = os.path.join(pocketsphinx_data, 'keywords.txt')
config = Decoder.default_config()
config.set_string('-hmm', hmm)
config.set_string('-dict', dic)
config.set_string('-kws', kws_list)
# config.set_int('-samprate', SAMPLE_RATE) # uncomment if rate is not 16000. use config.set_float() on ubuntu
config.set_int('-nfft', 512)
config.set_float('-vad_threshold', 2.7)
config.set_string('-logfn', os.devnull)
decoder = Decoder(config)
decoder.start_utt()
while not self.done:
data = self.queue.get()
decoder.process_raw(data, False, False)
hypothesis = decoder.hyp()
if hypothesis:
keyword = hypothesis.hypstr
if callable(self.on_detected):
self.on_detected(keyword)
decoder.end_utt()
decoder.start_utt()
super(KWS, self).put(data)
def main():
import time
from .source import Source
src = Source()
kws = KWS()
src.link(kws)
def on_detected(keyword):
print('found {}'.format(keyword))
kws.set_callback(on_detected)
kws.start()
src.start()
while True:
try:
time.sleep(1)
except KeyboardInterrupt:
break
kws.stop()
src.stop()
if __name__ == '__main__':
main()