-
Notifications
You must be signed in to change notification settings - Fork 0
/
run.py
102 lines (80 loc) · 3.47 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
"""
* @author Myrthe Tilleman
* @email mtillerman@ossur.com
* @create date 2023-01-23 08:59:19
* @desc Run this script as code.py on the Seeed board with CircuitPython.
"""
import asyncio
import gc
from activate_vibration_motors import ActivateVibrationMotor
from preprocessing import PreprocessEMG
from read_uart import ReadUart
async def check_serial_input(read_uart, process_EMG, motors):
""" Task 1: Poll for emg signals and process them when it is available.
Args:
read_uart (Class): ReadUart instance.
process_EMG (Class): PreprocessEMG instance.
motors (Class): ActivateVibrationMotor instance.
"""
while True:
data = read_uart.get_serial_data()
if len(data) > 11: # update level when data is available
emg_value = read_uart.extract_emg_data(data)
normal = process_EMG.normalise_data_MVC(emg_value)
motors.vib_emg = process_EMG.threshold_reached(normal)
if motors.vib_emg:
level = process_EMG.define_dominant_muscle(normal)
motors.vibrator_level = motors.level_list[level + 4]
motors.pin_on_index = motors.vibrator_level["PIN_INDEX"]
else:
motors.vib_count = 0
motors.prev_count = 0
motors.prev_level = None
motors.pin_on_index = []
await asyncio.sleep(0)
async def activate_motors(motors):
""" Task 2: Activate and deactivate the vibration motors.
Args:
motors (Class): ActivateVibrationMotor instance.
"""
while True:
pin_off_index = set(motors.pin_index) - set(motors.pin_on_index)
pins_off = [motors.pins[pin] for pin in pin_off_index]
# turn off all others
motors.set_motor_value(pins_off, False)
if motors.vib_emg:
motors.adjust_off_time()
await motors.check_time_to_change()
else:
await asyncio.sleep(0)
async def online_feedback_loop(
user, feedback_folder, emg_folder,
threshold_file='perceptual_threshold.csv', left_leg=True):
""" Online processing of incoming EMG signals and activates the vibration
motors accordingly. Creates two asyncio tasks and runs these alternately.
Args:
user (str): user name or number, folder where all user files are saved.
feedback_folder (str): path to files for thresholds file.
emg_folder (str): path to files for emg calibration.
threshold_file (str, optional): file with perceptual thresholds for
vibration for each level. Defaults to 'perceptual_threshold.csv'.
left_leg (bool, optional): Whether the motors are placed on the left or
right leg. Defaults to True.
"""
read_uart = ReadUart()
motors = ActivateVibrationMotor(user, feedback_folder, left_leg)
motors.set_thresholds(threshold_file)
process_EMG = PreprocessEMG(user, emg_folder, extend=1, flex=0)
emg_collection_task = asyncio.create_task(
check_serial_input(read_uart, process_EMG, motors))
vibration_task = asyncio.create_task(activate_motors(motors))
gc.collect()
await asyncio.gather(emg_collection_task, vibration_task)
if __name__ == '__main__':
user = 'me'
feedback_calibration = '2023_03_28'
emg_calibration = '2023_02_24'
threshold_file = 'perceptual_thresholds - Copy.csv'
left_leg = True
asyncio.run(online_feedback_loop(
user, feedback_calibration, emg_calibration, threshold_file, left_leg))