diff --git a/hardware-testing/Makefile b/hardware-testing/Makefile index 87edd408aa7..45b50b5a579 100755 --- a/hardware-testing/Makefile +++ b/hardware-testing/Makefile @@ -166,6 +166,10 @@ test-liquid-sense: .PHONY: test-integration test-integration: test-production-qc test-examples test-scripts test-gravimetric +.PHONY: test-stacker +test-stacker: + $(python) -m hardware_testing.modules.flex_stacker_evt_qc --simulate + .PHONY: lint lint: $(python) -m mypy hardware_testing tests diff --git a/hardware-testing/hardware_testing/modules/flex_stacker_evt_qc/__init__.py b/hardware-testing/hardware_testing/modules/flex_stacker_evt_qc/__init__.py new file mode 100644 index 00000000000..b79a4e5b836 --- /dev/null +++ b/hardware-testing/hardware_testing/modules/flex_stacker_evt_qc/__init__.py @@ -0,0 +1 @@ +"""FLEX Stacker QC scripts for EVT.""" diff --git a/hardware-testing/hardware_testing/modules/flex_stacker_evt_qc/__main__.py b/hardware-testing/hardware_testing/modules/flex_stacker_evt_qc/__main__.py new file mode 100644 index 00000000000..2c4890023d4 --- /dev/null +++ b/hardware-testing/hardware_testing/modules/flex_stacker_evt_qc/__main__.py @@ -0,0 +1,74 @@ +"""FLEX Stacker EVT QC.""" +from os import environ + +# NOTE: this is required to get WIFI test to work +if "OT_SYSTEM_VERSION" not in environ: + environ["OT_SYSTEM_VERSION"] = "0.0.0" + +import argparse +import asyncio +from pathlib import Path +from typing import Tuple + +from hardware_testing.data import ui +from hardware_testing.data.csv_report import CSVReport + +from .config import TestSection, TestConfig, build_report, TESTS +from .driver import FlexStacker + + +def build_stacker_report(is_simulating: bool) -> Tuple[CSVReport, FlexStacker]: + """Report setup for FLEX Stacker qc script.""" + test_name = Path(__file__).parent.name.replace("_", "-") + ui.print_title(test_name.upper()) + + stacker = FlexStacker.build_simulator() if is_simulating else FlexStacker.build() + + report = build_report(test_name) + report.set_operator( + "simulating" if is_simulating else input("enter OPERATOR name: ") + ) + info = stacker.get_device_info() + if not is_simulating: + barcode = input("SCAN device barcode: ").strip() + else: + barcode = "STACKER-SIMULATOR-SN" + report.set_tag(info.sn) + report.set_device_id(info.sn, barcode) + return report, stacker + + +async def _main(cfg: TestConfig) -> None: + # BUILD REPORT + report, stacker = build_stacker_report(cfg.simulate) + + # RUN TESTS + for section, test_run in cfg.tests.items(): + ui.print_title(section.value) + test_run(stacker, report, section.value) + + # SAVE REPORT + ui.print_title("DONE") + report.save_to_disk() + report.print_results() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--simulate", action="store_true") + # add each test-section as a skippable argument (eg: --skip-connectivity) + for s in TestSection: + parser.add_argument(f"--skip-{s.value.lower()}", action="store_true") + parser.add_argument(f"--only-{s.value.lower()}", action="store_true") + args = parser.parse_args() + _t_sections = {s: f for s, f in TESTS if getattr(args, f"only_{s.value.lower()}")} + if _t_sections: + assert ( + len(list(_t_sections.keys())) < 2 + ), 'use "--only" for just one test, not multiple tests' + else: + _t_sections = { + s: f for s, f in TESTS if not getattr(args, f"skip_{s.value.lower()}") + } + _config = TestConfig(simulate=args.simulate, tests=_t_sections) + asyncio.run(_main(_config)) diff --git a/hardware-testing/hardware_testing/modules/flex_stacker_evt_qc/config.py b/hardware-testing/hardware_testing/modules/flex_stacker_evt_qc/config.py new file mode 100644 index 00000000000..e8bc37da959 --- /dev/null +++ b/hardware-testing/hardware_testing/modules/flex_stacker_evt_qc/config.py @@ -0,0 +1,45 @@ +"""Config.""" +from dataclasses import dataclass +import enum +from typing import Dict, Callable + +from hardware_testing.data.csv_report import CSVReport, CSVSection + +from . import ( + test_connectivity, +) + + +class TestSection(enum.Enum): + """Test Section.""" + + CONNECTIVITY = "CONNECTIVITY" + + +@dataclass +class TestConfig: + """Test Config.""" + + simulate: bool + tests: Dict[TestSection, Callable] + + +TESTS = [ + ( + TestSection.CONNECTIVITY, + test_connectivity.run, + ), +] + + +def build_report(test_name: str) -> CSVReport: + """Build report.""" + return CSVReport( + test_name=test_name, + sections=[ + CSVSection( + title=TestSection.CONNECTIVITY.value, + lines=test_connectivity.build_csv_lines(), + ) + ], + ) diff --git a/hardware-testing/hardware_testing/modules/flex_stacker_evt_qc/driver.py b/hardware-testing/hardware_testing/modules/flex_stacker_evt_qc/driver.py new file mode 100644 index 00000000000..04d833fa8a5 --- /dev/null +++ b/hardware-testing/hardware_testing/modules/flex_stacker_evt_qc/driver.py @@ -0,0 +1,93 @@ +"""FLEX Stacker Driver.""" +from dataclasses import dataclass +import serial # type: ignore[import] +from serial.tools.list_ports import comports # type: ignore[import] +import re +from enum import Enum + +STACKER_VID = 0x483 +STACKER_PID = 0xEF24 +STACKER_FREQ = 115200 + + +class HardwareRevision(Enum): + """Hardware Revision.""" + + NFF = "nff" + EVT = "a1" + + +@dataclass +class StackerInfo: + """Stacker Info.""" + + fw: str + hw: HardwareRevision + sn: str + + +class FlexStacker: + """FLEX Stacker Driver.""" + + @classmethod + def build(cls, port: str = "") -> "FlexStacker": + """Build FLEX Stacker driver.""" + if not port: + for i in comports(): + if i.vid == STACKER_VID and i.pid == STACKER_PID: + port = i.device + break + assert port, "could not find connected FLEX Stacker" + return cls(port) + + @classmethod + def build_simulator(cls, port: str = "") -> "FlexStacker": + """Build FLEX Stacker simulator.""" + return cls(port, simulating=True) + + def __init__(self, port: str, simulating: bool = False) -> None: + """Constructor.""" + self._simulating = simulating + if not self._simulating: + self._serial = serial.Serial(port, baudrate=STACKER_FREQ) + + def _send_and_recv(self, msg: str, guard_ret: str = "") -> str: + """Internal utility to send a command and receive the response.""" + assert self._simulating + self._serial.write(msg.encode()) + ret = self._serial.readline() + if guard_ret: + if not ret.startswith(guard_ret.encode()): + raise RuntimeError(f"Incorrect Response: {ret}") + if ret.startswith("ERR".encode()): + raise RuntimeError(ret) + return ret.decode() + + def get_device_info(self) -> StackerInfo: + """Get Device Info.""" + if self._simulating: + return StackerInfo( + "STACKER-FW", HardwareRevision.EVT, "STACKER-SIMULATOR-SN" + ) + + _DEV_INFO_RE = re.compile( + "^M115 FW:(?P.+) HW:Opentrons-flex-stacker-(?P.+) SerialNo:(?P.+) OK\n" + ) + res = self._send_and_recv("M115\n", "M115 FW:") + m = _DEV_INFO_RE.match(res) + if not m: + raise RuntimeError(f"Incorrect Response: {res}") + return StackerInfo( + m.group("fw"), HardwareRevision(m.group("hw")), m.group("sn") + ) + + def set_serial_number(self, sn: str) -> None: + """Set Serial Number.""" + if self._simulating: + return + self._send_and_recv(f"M996 {sn}\n", "M996 OK") + + def __del__(self) -> None: + """Close serial port.""" + if not self._simulating: + self._serial.close() diff --git a/hardware-testing/hardware_testing/modules/flex_stacker_evt_qc/test_connectivity.py b/hardware-testing/hardware_testing/modules/flex_stacker_evt_qc/test_connectivity.py new file mode 100644 index 00000000000..86a0bda991d --- /dev/null +++ b/hardware-testing/hardware_testing/modules/flex_stacker_evt_qc/test_connectivity.py @@ -0,0 +1,78 @@ +"""Test Connectivity.""" +from typing import List, Union + +from hardware_testing.data import ui +from hardware_testing.data.csv_report import ( + CSVReport, + CSVLine, + CSVLineRepeating, + CSVResult, +) + +from .driver import FlexStacker, HardwareRevision + + +def build_csv_lines() -> List[Union[CSVLine, CSVLineRepeating]]: + """Build CSV Lines.""" + return [ + CSVLine("usb-get-device-info", [str, str, str, CSVResult]), + CSVLine("eeprom-set-serial-number", [str, str, CSVResult]), + CSVLine("led-blinking", [bool, CSVResult]), + ] + + +def test_gcode(driver: FlexStacker, report: CSVReport) -> None: + """Send and receive response for GCODE M115.""" + success = True + info = driver.get_device_info() + print("Hardware Revision: ", info.hw, "\n") + if info is None or info.hw != HardwareRevision.EVT: + print("Hardware Revision must be EVT") + success = False + report( + "CONNECTIVITY", + "usb-get-device-info", + [info.fw, info.hw, info.sn, CSVResult.from_bool(success)], + ) + + +def test_eeprom(driver: FlexStacker, report: CSVReport) -> None: + """Set serial number and make sure device info is updated accordingly.""" + success = True + if not driver._simulating: + serial = input("enter Serial Number: ") + else: + serial = "STACKER-SIMULATOR-SN" + driver.set_serial_number(serial) + info = driver.get_device_info() + if info.sn != serial: + print("Serial number is not set properly") + success = False + report( + "CONNECTIVITY", + "eeprom-set-serial-number", + [serial, info.sn, CSVResult.from_bool(success)], + ) + + +def test_leds(driver: FlexStacker, report: CSVReport) -> None: + """Prompt tester to verify the status led is blinking.""" + if not driver._simulating: + is_blinking = ui.get_user_answer("Is the status LED blinking?") + else: + is_blinking = True + report( + "CONNECTIVITY", "led-blinking", [is_blinking, CSVResult.from_bool(is_blinking)] + ) + + +def run(driver: FlexStacker, report: CSVReport, section: str) -> None: + """Run.""" + ui.print_header("USB Communication") + test_gcode(driver, report) + + ui.print_header("EEPROM Communication") + test_eeprom(driver, report) + + ui.print_header("LED Blinking") + test_leds(driver, report)