-
Notifications
You must be signed in to change notification settings - Fork 2
/
porcupine.py
136 lines (110 loc) · 4.38 KB
/
porcupine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import os
import struct
import wave
from threading import Thread
import pvporcupine
from pvrecorder import PvRecorder
class Porcupine(Thread):
def __init__(
self,
keywords,
access_key,
sensitivities,
library_path=None,
model_path=None,
keyword_paths=None,
input_device_index=None,
output_path=None):
super(Porcupine, self).__init__()
if keywords is None:
keywords = pvporcupine.KEYWORDS
if keyword_paths is None:
keyword_paths = [pvporcupine.KEYWORD_PATHS[x] for x in keywords]
self._access_key = access_key
self._library_path = library_path
self._model_path = model_path
self._keyword_paths = keyword_paths
self._sensitivities = sensitivities
self._input_device_index = input_device_index
self._output_path = output_path
self.callback = None
self.recorder = None
def restart(self):
self.recorder.start()
def stop(self):
self.recorder.stop()
def run(self):
keywords = list()
for x in self._keyword_paths:
keyword_phrase_part = os.path.basename(x).replace('.ppn', '').split('_')
if len(keyword_phrase_part) > 6:
keywords.append(' '.join(keyword_phrase_part[0:-6]))
else:
keywords.append(keyword_phrase_part[0])
porcupine = None
recorder = None
wav_file = None
try:
porcupine = pvporcupine.create(
access_key=self._access_key,
keyword_paths=self._keyword_paths,
sensitivities=self._sensitivities)
recorder = PvRecorder(device_index=self._input_device_index, frame_length=porcupine.frame_length)
recorder.start()
self.recorder = recorder
if self._output_path is not None:
wav_file = wave.open(self._output_path, "w")
wav_file.setparams((1, 2, 16000, 512, "NONE", "NONE"))
print('Using device: %s' % recorder.selected_device)
print('Listening {')
for keyword, sensitivity in zip(keywords, self._sensitivities):
print(' %s (%.2f)' % (keyword, sensitivity))
print('}')
while True:
pcm = recorder.read()
if wav_file is not None:
wav_file.writeframes(struct.pack("h" * len(pcm), *pcm))
result = porcupine.process(pcm)
if result >= 0:
self.callback(keywords[result])
break
except pvporcupine.PorcupineInvalidArgumentError as e:
args = (
self._access_key,
self._library_path,
self._model_path,
self._keyword_paths,
self._sensitivities,
)
print("One or more arguments provided to Porcupine is invalid: ", args)
print("If all other arguments seem valid, ensure that '%s' is a valid AccessKey" % self._access_key)
raise e
except pvporcupine.PorcupineActivationError as e:
print("AccessKey activation error")
raise e
except pvporcupine.PorcupineActivationLimitError as e:
print("AccessKey '%s' has reached it's temporary device limit" % self._access_key)
raise e
except pvporcupine.PorcupineActivationRefusedError as e:
print("AccessKey '%s' refused" % self._access_key)
raise e
except pvporcupine.PorcupineActivationThrottledError as e:
print("AccessKey '%s' has been throttled" % self._access_key)
raise e
except pvporcupine.PorcupineError as e:
print("Failed to initialize Porcupine")
raise e
except KeyboardInterrupt:
print('Stopping ...')
finally:
if porcupine is not None:
porcupine.delete()
if recorder is not None:
recorder.delete()
if wav_file is not None:
wav_file.close()
@classmethod
def show_audio_devices(cls):
devices = PvRecorder.get_audio_devices()
for i in range(len(devices)):
print('index: %d, device name: %s' % (i, devices[i]))