Skip to content

Commit

Permalink
feat(hardware-testing): flex stacker EVT diagnostic script for connec…
Browse files Browse the repository at this point in the history
…tivity (#16835)

Setting qc test suite for flex-stacker & adding connectivity script.
  • Loading branch information
ahiuchingau authored Nov 18, 2024
1 parent 1e6df83 commit f433137
Show file tree
Hide file tree
Showing 6 changed files with 295 additions and 0 deletions.
4 changes: 4 additions & 0 deletions hardware-testing/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ test-liquid-sense:
.PHONY: test-integration
test-integration: test-production-qc test-examples test-scripts test-gravimetric

.PHONY: test-stacker
test-stacker:
$(python) -m hardware_testing.modules.flex_stacker_evt_qc --simulate

.PHONY: lint
lint:
$(python) -m mypy hardware_testing tests
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""FLEX Stacker QC scripts for EVT."""
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""FLEX Stacker EVT QC."""
from os import environ

# NOTE: this is required to get WIFI test to work
if "OT_SYSTEM_VERSION" not in environ:
environ["OT_SYSTEM_VERSION"] = "0.0.0"

import argparse
import asyncio
from pathlib import Path
from typing import Tuple

from hardware_testing.data import ui
from hardware_testing.data.csv_report import CSVReport

from .config import TestSection, TestConfig, build_report, TESTS
from .driver import FlexStacker


def build_stacker_report(is_simulating: bool) -> Tuple[CSVReport, FlexStacker]:
"""Report setup for FLEX Stacker qc script."""
test_name = Path(__file__).parent.name.replace("_", "-")
ui.print_title(test_name.upper())

stacker = FlexStacker.build_simulator() if is_simulating else FlexStacker.build()

report = build_report(test_name)
report.set_operator(
"simulating" if is_simulating else input("enter OPERATOR name: ")
)
info = stacker.get_device_info()
if not is_simulating:
barcode = input("SCAN device barcode: ").strip()
else:
barcode = "STACKER-SIMULATOR-SN"
report.set_tag(info.sn)
report.set_device_id(info.sn, barcode)
return report, stacker


async def _main(cfg: TestConfig) -> None:
# BUILD REPORT
report, stacker = build_stacker_report(cfg.simulate)

# RUN TESTS
for section, test_run in cfg.tests.items():
ui.print_title(section.value)
test_run(stacker, report, section.value)

# SAVE REPORT
ui.print_title("DONE")
report.save_to_disk()
report.print_results()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--simulate", action="store_true")
# add each test-section as a skippable argument (eg: --skip-connectivity)
for s in TestSection:
parser.add_argument(f"--skip-{s.value.lower()}", action="store_true")
parser.add_argument(f"--only-{s.value.lower()}", action="store_true")
args = parser.parse_args()
_t_sections = {s: f for s, f in TESTS if getattr(args, f"only_{s.value.lower()}")}
if _t_sections:
assert (
len(list(_t_sections.keys())) < 2
), 'use "--only" for just one test, not multiple tests'
else:
_t_sections = {
s: f for s, f in TESTS if not getattr(args, f"skip_{s.value.lower()}")
}
_config = TestConfig(simulate=args.simulate, tests=_t_sections)
asyncio.run(_main(_config))
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Config."""
from dataclasses import dataclass
import enum
from typing import Dict, Callable

from hardware_testing.data.csv_report import CSVReport, CSVSection

from . import (
test_connectivity,
)


class TestSection(enum.Enum):
"""Test Section."""

CONNECTIVITY = "CONNECTIVITY"


@dataclass
class TestConfig:
"""Test Config."""

simulate: bool
tests: Dict[TestSection, Callable]


TESTS = [
(
TestSection.CONNECTIVITY,
test_connectivity.run,
),
]


def build_report(test_name: str) -> CSVReport:
"""Build report."""
return CSVReport(
test_name=test_name,
sections=[
CSVSection(
title=TestSection.CONNECTIVITY.value,
lines=test_connectivity.build_csv_lines(),
)
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""FLEX Stacker Driver."""
from dataclasses import dataclass
import serial # type: ignore[import]
from serial.tools.list_ports import comports # type: ignore[import]
import re
from enum import Enum

STACKER_VID = 0x483
STACKER_PID = 0xEF24
STACKER_FREQ = 115200


class HardwareRevision(Enum):
"""Hardware Revision."""

NFF = "nff"
EVT = "a1"


@dataclass
class StackerInfo:
"""Stacker Info."""

fw: str
hw: HardwareRevision
sn: str


class FlexStacker:
"""FLEX Stacker Driver."""

@classmethod
def build(cls, port: str = "") -> "FlexStacker":
"""Build FLEX Stacker driver."""
if not port:
for i in comports():
if i.vid == STACKER_VID and i.pid == STACKER_PID:
port = i.device
break
assert port, "could not find connected FLEX Stacker"
return cls(port)

@classmethod
def build_simulator(cls, port: str = "") -> "FlexStacker":
"""Build FLEX Stacker simulator."""
return cls(port, simulating=True)

def __init__(self, port: str, simulating: bool = False) -> None:
"""Constructor."""
self._simulating = simulating
if not self._simulating:
self._serial = serial.Serial(port, baudrate=STACKER_FREQ)

def _send_and_recv(self, msg: str, guard_ret: str = "") -> str:
"""Internal utility to send a command and receive the response."""
assert self._simulating
self._serial.write(msg.encode())
ret = self._serial.readline()
if guard_ret:
if not ret.startswith(guard_ret.encode()):
raise RuntimeError(f"Incorrect Response: {ret}")
if ret.startswith("ERR".encode()):
raise RuntimeError(ret)
return ret.decode()

def get_device_info(self) -> StackerInfo:
"""Get Device Info."""
if self._simulating:
return StackerInfo(
"STACKER-FW", HardwareRevision.EVT, "STACKER-SIMULATOR-SN"
)

_DEV_INFO_RE = re.compile(
"^M115 FW:(?P<fw>.+) HW:Opentrons-flex-stacker-(?P<hw>.+) SerialNo:(?P<sn>.+) OK\n"
)
res = self._send_and_recv("M115\n", "M115 FW:")
m = _DEV_INFO_RE.match(res)
if not m:
raise RuntimeError(f"Incorrect Response: {res}")
return StackerInfo(
m.group("fw"), HardwareRevision(m.group("hw")), m.group("sn")
)

def set_serial_number(self, sn: str) -> None:
"""Set Serial Number."""
if self._simulating:
return
self._send_and_recv(f"M996 {sn}\n", "M996 OK")

def __del__(self) -> None:
"""Close serial port."""
if not self._simulating:
self._serial.close()
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""Test Connectivity."""
from typing import List, Union

from hardware_testing.data import ui
from hardware_testing.data.csv_report import (
CSVReport,
CSVLine,
CSVLineRepeating,
CSVResult,
)

from .driver import FlexStacker, HardwareRevision


def build_csv_lines() -> List[Union[CSVLine, CSVLineRepeating]]:
"""Build CSV Lines."""
return [
CSVLine("usb-get-device-info", [str, str, str, CSVResult]),
CSVLine("eeprom-set-serial-number", [str, str, CSVResult]),
CSVLine("led-blinking", [bool, CSVResult]),
]


def test_gcode(driver: FlexStacker, report: CSVReport) -> None:
"""Send and receive response for GCODE M115."""
success = True
info = driver.get_device_info()
print("Hardware Revision: ", info.hw, "\n")
if info is None or info.hw != HardwareRevision.EVT:
print("Hardware Revision must be EVT")
success = False
report(
"CONNECTIVITY",
"usb-get-device-info",
[info.fw, info.hw, info.sn, CSVResult.from_bool(success)],
)


def test_eeprom(driver: FlexStacker, report: CSVReport) -> None:
"""Set serial number and make sure device info is updated accordingly."""
success = True
if not driver._simulating:
serial = input("enter Serial Number: ")
else:
serial = "STACKER-SIMULATOR-SN"
driver.set_serial_number(serial)
info = driver.get_device_info()
if info.sn != serial:
print("Serial number is not set properly")
success = False
report(
"CONNECTIVITY",
"eeprom-set-serial-number",
[serial, info.sn, CSVResult.from_bool(success)],
)


def test_leds(driver: FlexStacker, report: CSVReport) -> None:
"""Prompt tester to verify the status led is blinking."""
if not driver._simulating:
is_blinking = ui.get_user_answer("Is the status LED blinking?")
else:
is_blinking = True
report(
"CONNECTIVITY", "led-blinking", [is_blinking, CSVResult.from_bool(is_blinking)]
)


def run(driver: FlexStacker, report: CSVReport, section: str) -> None:
"""Run."""
ui.print_header("USB Communication")
test_gcode(driver, report)

ui.print_header("EEPROM Communication")
test_eeprom(driver, report)

ui.print_header("LED Blinking")
test_leds(driver, report)

0 comments on commit f433137

Please sign in to comment.