-
Notifications
You must be signed in to change notification settings - Fork 0
/
ears.py
111 lines (96 loc) · 3.62 KB
/
ears.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# ears.py
#!/usr/bin/env python3
# additional sources: vosk helper code
import argparse
import os
import queue
import sounddevice as sd
import vosk
import sys
import json
import simpleaudio as sa
import numpy as np
from vosk import Model, KaldiRecognizer, SpkModel
q = queue.Queue()
custom_devices = ["Aidan's Airpods"]
def cosine_dist(x, y):
nx = np.array(x)
ny = np.array(y)
return 1 - np.dot(nx, ny) / np.linalg.norm(nx) / np.linalg.norm(ny)
def int_or_str(text):
"""Helper function for argument parsing."""
try:
return int(text)
except ValueError:
return text
def callback(indata, frames, time, status):
"""This is called (from a separate thread) for each audio block."""
if status:
#print(status, file=sys.stderr)
pass
q.put(bytes(indata))
def listen():
try:
device = 0
modelfp = "model2"
if not os.path.exists(modelfp):
print("Model File Path Not Founds")
micarraypres = -1
airpodpres = -1
for x in range(0, len(sd.query_devices())):
print(sd.query_devices(x))
if(sd.query_devices(x)['name'] == 'USB PnP Audio Device' and sd.query_devices(x)['max_input_channels'] > 0):
micarraypres = x
if(sd.query_devices(x)['name'] == custom_devices[0] and sd.query_devices(x)['max_input_channels'] > 0):
airpodpres = x
if(airpodpres > 0):
device_info = sd.query_devices(airpodpres, 'input')
device = airpodpres
if(micarraypres > 0):
device_info = sd.query_devices(micarraypres, 'input')
device = micarraypres
else:
device_info = sd.query_devices(None, 'input')
device = None
print(device_info)
# soundfile expects an int, sounddevice provides a float:
samplerate = int(device_info['default_samplerate'])
model = vosk.Model(modelfp)
spk_model = SpkModel("model-spk")
with sd.RawInputStream(samplerate=samplerate, blocksize = 8000, device=device, dtype='int16', channels=1, callback=callback):
print("< ------ Ear Initialization Complete ------ >")
yield(True)
#print('#' * 80)
#print('Press Ctrl+C to stop the recording')
#print('#' * 80)
rec = KaldiRecognizer(model, samplerate,spk_model)
#rec = vosk.KaldiRecognizer(model, samplerate)
while True:
data = q.get()
yieldtriga = False
yieldtrigb = False
#print(data)
#numpydata = np.frombuffer(data, dtype=np.int16)
#numpydata = independentca.source_segment(numpydata)
#data = numpydata.tobytes()
if rec.AcceptWaveform(data):
spkfp = None
spoken_text = None
#print(rec.Result())
res = json.loads(rec.Result())
if("spk" in res):
spkfp = res["spk"]
yieldtriga = True
if("text" in res):
spoken_text = res["text"]
yieldtrigb = True
if(yieldtriga == True and yieldtrigb == True):
yield([spkfp, spoken_text])
#print(str(spkfp) + "\n" + str(spoken_text))
#print(rec.PartialResult())
pass
except KeyboardInterrupt:
print('\n')
except Exception as e:
print("< ------ Exception in Audio ------ >")
print(type(e).__name__ + ': ' + str(e))