Skip to content

Commit

Permalink
feat(abr_teting): Enabled module testing functions
Browse files Browse the repository at this point in the history
  • Loading branch information
AnthonyNASC20 committed Nov 18, 2024
1 parent efc734a commit db4fbef
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 0 deletions.
87 changes: 87 additions & 0 deletions abr-testing/abr_testing/protocols/test_protocols/test_modules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import asyncio
import time
import traceback
import os
from opentrons.hardware_control.scripts import module_control


async def hs_test_1(module, path_to_file):
duration = int(input("How long to run this test for? (in seconds): "))
rpm = input("Target RPM (200-3000): ")
start = time.time()
while time.time() - start < duration:
if not await (hs_test_home(module, path_to_file)):
return
time.sleep(5)
if not await (hs_test_set_shake(module, rpm, path_to_file)):
return
time.sleep(10)
if not await (hs_test_set_shake(module, '0', path_to_file)):
return
time.sleep(10)

async def input_codes(module, path_to_file):
await module_control._main(module, path_to_file)

hs_tests = {"Test 1": (hs_test_1, "Repeatedly home heater shaker then set shake speed"),
"Input GCodes": (input_codes, "Input g codes"),
}
td_tests = {}

tc_tests = {}

global modules
modules = {
"heatershaker": hs_tests,
"tempdeck": td_tests,
"thermocycler": tc_tests,
}


async def main(module):
# Select test to run
# Set directory for tests
BASE_DIRECTORY = "\\userfs\\data\\testing_data\\"
if not os.path.exists(BASE_DIRECTORY):
os.makedirs(BASE_DIRECTORY)
tests = modules[module]
for i, test in enumerate(tests.keys()):
function, description = tests[test]
print(f"{i}) {test} : {description}")
selected_test = int(input("Please select a test: "))
try:
function, description = tests[list(tests.keys())[selected_test]]
test_dir = BASE_DIRECTORY + f'{module}\\test{tests[selected_test]}'
print(f"{i}, {description}")
output_file = os.path.join(test_dir, 'results.txt')
print(f'PATH: {output_file} ')
await (function(module, output_file))
except:
print("Failed to run test")
traceback.print_exc()


async def hs_test_home(module, path_to_file):
hs_gcodes = module_control.hs_gcode_shortcuts
home_gcode = hs_gcodes["home"]
await (module_control._main(module, [home_gcode, 'done'], path_to_file))


async def hs_test_set_shake(module, rpm, path_to_file):
hs_gcodes = module_control.hs_gcode_shortcuts
set_shake_gcode = hs_gcodes["srpm"].format(rpm=rpm)
await (module_control._main(module, [set_shake_gcode, 'done'], path_to_file))

async def hs_deactivate(module, path_to_file):
hs_gcodes = module_control.hs_gcode_shortcuts
deactivate_gcode = hs_gcodes["deactivate"]
await (module_control._main(module, [deactivate_gcode, 'done'], path_to_file))


if __name__ == "__main__":
print("Modules:")
for i, module in enumerate(modules):
print(f"{i}) {module}")
module_int = int(input("Please select a module: "))
module = list(modules.keys())[module_int]
asyncio.run(main(module))
131 changes: 131 additions & 0 deletions api/src/opentrons/hardware_control/scripts/module_control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
from serial import Serial # type: ignore[import-untyped]
import asyncio
import subprocess
import time
from typing import Any

# Generic
_READ_ALL = "readall"
_READ_LINE = "read"
_DONE = "done"

# TC commands
_MOVE_SEAL = "ms"
_MOVE_LID = "ml"
tc_gcode_shortcuts = {
"status": "M119",
_MOVE_SEAL: "M241.D", # move seal motor
_MOVE_LID: "M240.D", # move lid stepper motor
"ol": "M126", # open lid
"cl": "M127", # close lid
"sw": "M901.D", # status of all switches
"lt": "M141.D", # get lid temperature
"pt": "M105.D", # get plate temperature
}

# HS Commands
hs_gcode_shortcuts = {
"srpm": "M3 S{rpm}", # Set RPM
"grpm": "M123", # Get RPM
"home": "G28", # Home
"deactivate": "M106", # Deactivate
}

gcode_shortcuts = tc_gcode_shortcuts | hs_gcode_shortcuts

async def message_read(dev: Serial) -> Any:
response = dev.readline().decode()
while not response:
await asyncio.sleep(1)
response = dev.readline().decode()
return response


async def message_return(dev: Serial) -> Any:
try:
response = await asyncio.wait_for(message_read(dev), timeout=30)
return response
except asyncio.exceptions.TimeoutError:
print("response timed out.")
return ""


async def handle_module_gcode_shortcut(dev: Serial, command: str, in_commands:bool, output: str = "") -> None:
# handle debugging commands that require followup
if in_commands:
if command == _MOVE_SEAL:
distance = input("enter distance in steps => ")
dev.write(
f"{gcode_shortcuts[command]} {distance}\n".encode()
) # (+) -> retract, (-) -> engage
# print(await message_return(dev))
elif command == _MOVE_LID:
distance = input(
"enter angular distance in degrees => "
) # (+) -> open, (-) -> close
dev.write(f"{gcode_shortcuts[command]} {distance}\n".encode())
# print(await message_return(dev))
# everything else
else:
dev.write(f"{gcode_shortcuts[command]}\n".encode())
else:
dev.write(f"{command}\n".encode())
try:
mr = await message_return(dev)
print(mr)
except TypeError:
print('Invalid input')
return

if output:
try:
with open(output, 'w') as result_file:
if 'OK' in mr:
status = command + ': SUCCESS'
else:
status = command + ': FAILURE'
result_file.write(status)
result_file.write(f' {mr}')
result_file.close()
except:
print(f'cannot open file: {output}')


async def comms_loop(dev: Serial, commands: list, output:str ="") -> bool:
_exit = False
try:
command = commands.pop(0)
except IndexError:
command = input("\n>>> ")
if command == _READ_ALL:
print(dev.readlines())
elif command == _READ_LINE:
print(dev.readline())
elif command == _DONE:
_exit = True
elif command in gcode_shortcuts:
await handle_module_gcode_shortcut(dev, command, True, output)
else:
await handle_module_gcode_shortcut(dev, command, False, output)
return _exit


async def _main(module, commands:list = [], output:str = "") -> bool:
module_name = (
subprocess.check_output(["find", "/dev/", "-name", f"*{module}*"])
.decode()
.strip()
)
if not module_name:
print(f"{module} not found. Exiting.")
return(False)
dev = Serial(f"{module_name}", 9600, timeout=2)
_exit = False
while not _exit:
_exit = await comms_loop(dev, commands, output)
dev.close()
return(True)


if __name__ == "__main__":
asyncio.run(_main())

0 comments on commit db4fbef

Please sign in to comment.