-
Notifications
You must be signed in to change notification settings - Fork 0
/
read_uart.py
119 lines (93 loc) · 3.61 KB
/
read_uart.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""
* @author Myrthe Tilleman
* @email mtillerman@ossur.com
* @create date 2023-02-12 16:52:16
* @desc script to read serial uart EMG data from Panda
"""
import gc
import struct
import board
import busio
from preprocessing import PreprocessEMG
class ReadUart():
def __init__(self, array_length=14, baud_rate=921600, data_num_bytes=2,
start_num_bytes=8):
self.num_variables = 2 # EMG_Flex, EMG_Extend
self.array_length = array_length # num bytes to read at a time
self.baud_rate = baud_rate
self.data_num_bytes = data_num_bytes
self.start_num_bytes = start_num_bytes # bytes before EMG data
self.starting_byte = b'\xaa\n\xb1'
if self.data_num_bytes == 2:
self.data_type = 'h' # 2 byte integer
elif self.data_num_bytes == 4:
self.data_type = 'f' # 4 byte float
self.initialise_uart()
def initialise_uart(self):
""" Initialise UART port and set baud rate.
Read up to array_length bytes at a time, or until timeout (s).
"""
self.uart = busio.UART(board.TX, board.RX, baudrate=self.baud_rate,
timeout=0.00001)
self.raw_data = bytearray(self.array_length)
def get_serial_data(self):
""" Reads array_length bytes. Then checks whether the starting byte
is present and reads additional bytes, so the whole byte array is the
correct length.
Returns:
bytearray: array of bytes starting with the start byte if data is
available before timeout
"""
self.uart.readinto(self.raw_data)
start_index = self.raw_data.find(self.starting_byte) # find start byte
private_data = self.raw_data[start_index:]
read_next = self.uart.read(start_index) # read extra bytes
if read_next is not None:
private_data += bytearray(read_next) # add extra bytes to the end
return private_data
def extract_emg_data(self, private_data):
""" Extracts the EMG data from the byte array and converts the bytes
to integers.
Args:
private_data (bytearray): byte array of array_length
Returns:
list: emg data in ints for flex and extend muscle
"""
emg = private_data[
self.start_num_bytes:
self.start_num_bytes + self.data_num_bytes * self.num_variables]
emg_data = [emg[:self.data_num_bytes], emg[self.data_num_bytes:]]
# convert bytes to int
emg_value = [struct.unpack(self.data_type, data)[0]
for data in emg_data]
return emg_value
if __name__ == '__main__':
gc.collect()
start_mem = gc.mem_free()
print('start memory', start_mem)
read_uart = ReadUart()
user = "me"
date = '2023_02_24'
process_EMG = PreprocessEMG(user, date, extend=1, flex=0)
x = 0
gc.collect()
initialised_mem = gc.mem_free()
print('initialised memory', initialised_mem)
while True:
data = read_uart.get_serial_data()
if len(data) > 11: # update level when data is available
x += 1
emg_value = read_uart.extract_emg_data(data)
print(emg_value)
normal = process_EMG.normalise_data_MVC(emg_value)
level = process_EMG.define_dominant_muscle(normal)
print(normal)
print(level)
gc.collect()
# loop_mem = gc.mem_free()
# print('loop memory', x, loop_mem)
if x == 100: # stop
break
gc.collect()
end_mem = gc.mem_free()
print('end memory', end_mem)