diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 7fa16f3..2a01289 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -1,9 +1,23 @@ +# .readthedocs.yaml +# Read the Docs configuration file +# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details + +# Required version: 2 + +# Set the OS, Python version and other tools you might need build: os: "ubuntu-22.04" tools: - python: "3.8" - + python: "3.9" + +# Build documentation in the "docs/" directory with Sphinx +sphinx: + configuration: docs/conf.py + +# Declare the Python requirements required +# to build your documentation +# See https://docs.readthedocs.io/en/stable/guides/reproducible-builds.html python: install: - - requirements: requirements.txt + - requirements: docs/requirements.txt diff --git a/README.md b/README.md index d7dc1bd..330d033 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ -[![Documentation Status](https://readthedocs.org/projects/pystages/badge/?version=latest)](https://pystages.readthedocs.io/en/latest/?badge=latest) - # PyStages +[![Documentation Status](https://readthedocs.org/projects/pystages/badge/?version=latest)](https://pystages.readthedocs.io/en/latest/?badge=latest) + PyStages is a Python 3 library for controlling motorized stages which have a motion controller. It has been designed for microscopy test benches automation. @@ -14,16 +14,28 @@ are implemented): - Tic Stepper Motor controller (USB only) - CNC Router with GRBL/GCode instructions (CNC 3018-PRO) -The library also provides helper classes for basic vector manipulation and -autofocus calculation +The library also provides helper classes for basic vector manipulation and +autofocus computation. ## Documentation Documentation is available on [Read the Docs](https://pystages.readthedocs.io). +## PyStages GUI + +A user interface has been implemented to control the stages. + +You can run it with the following command: + +```bash +python -m pystages.gui +``` + ## Requirements This library requires the following packages: + - [pyserial](https://pypi.org/project/pyserial/) - [numpy](https://pypi.org/project/numpy/) -- [pyusb](https://pypi.org/project/pyusb/) \ No newline at end of file +- [pyusb](https://pypi.org/project/pyusb/) +- [PyQt6](https://pypi.org/project/PyQt6/) diff --git a/docs/api.rst b/docs/api.rst index 408b4df..9c90831 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -17,10 +17,10 @@ Classes .. automodule:: pystages :members: -.. autoclass:: Autofocus - :members: +.. .. autoclass:: Autofocus +.. :members: -.. autoclass:: Vector - :special-members: __init__ - :members: +.. .. autoclass:: Vector +.. :special-members: __init__ +.. :members: diff --git a/docs/conf.py b/docs/conf.py index 89ed0d5..5fc4dcf 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -10,19 +10,22 @@ # add these directories to sys.path here. If the directory is relative to the # documentation root, use os.path.abspath to make it absolute, like shown here. # -# import os -# import sys -# sys.path.insert(0, os.path.abspath('.')) + +import sys +import os + +# Add path for autodoc +sys.path.insert(0, os.path.abspath("..")) # -- Project information ----------------------------------------------------- project = "pystages" -copyright = "2022, Olivier Hériveaux, Manuel San Pedro, Michaël Mouchous" +copyright = "2024, Olivier Hériveaux, Manuel San Pedro, Michaël Mouchous" author = "Olivier Hériveaux, Manuel San Pedro, Michaël Mouchous" # The full version, including alpha/beta/rc tags -release = "1.1" +release = "1.2" # -- General configuration --------------------------------------------------- @@ -30,7 +33,7 @@ # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. -extensions = ["sphinx.ext.autodoc"] +extensions = ["sphinx.ext.autodoc", "myst_parser"] # Add any paths that contain templates here, relative to this directory. templates_path = ["_templates"] @@ -54,3 +57,8 @@ html_static_path = [] html_logo = "logo.png" + +source_suffix = { + ".rst": "restructuredtext", + ".md": "markdown", +} diff --git a/docs/gui.md b/docs/gui.md new file mode 100644 index 0000000..10103fd --- /dev/null +++ b/docs/gui.md @@ -0,0 +1,41 @@ +# PyStages GUI + +A user interface has been implemented to control the stages. + +You can run it with the following command: + +```bash +python -m pystages.gui +``` + +This tool presents basic controls. After connection to the stage, it polls the position and prints out +at the bottom of the window. + +## Stage selection and connection + +Select the type of stage that you want to control and chose either a serial port to connect to, or select `Auto detect` +to make pystage to select the correct one according to the device description. + +Then click to `Connect`. If the connection success, the control buttons are activated. + +## Relative move + +You can click on buttons to move for a positive of negative relative distance. +The direction correspond to the axis number of the stage (`X` for first axe, `Y` for the second one, +`Z` for the third one). The moving distance is selected throug the `Step` dropdown menu. + +## Absolute move + +You can enter absolute coordinates in the 3 entry boxes and trigger the move by clicking the `Go To Position` button. + +## Z offset + +The `Z offset` checkbox and dropdown menu permit for each horizontal move (X or Y) +to make first a vertical-up displacement (Z+) followed by the +actual move, and then a final vertical-down displacement (Z-). + +## Homing + +The `Home` button permits to trigger the calibration process of the stage. +Be careful that your setup is clear before using it. + diff --git a/docs/index.rst b/docs/index.rst index b223c76..859a8fa 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,8 +1,3 @@ -.. pystages documentation master file, created by - sphinx-quickstart on Tue Jul 7 15:03:18 2020. - You can adapt this file completely to your liking, but it should at least - contain the root `toctree` directive. - Welcome to pystages's documentation! ==================================== @@ -24,13 +19,14 @@ The library also provides helper classes for basic vector manipulation (:class:`pystages.Autofocus`). .. toctree:: - :maxdepth: 2 - :caption: Contents: - - Vectors - Autofocus - Python API - Troubleshooting + :maxdepth: 2 + :caption: Contents: + + Vectors + Autofocus + Python API + GUI + Troubleshooting Indices and tables diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 0000000..ab4aa53 --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1,6 @@ +pyserial +pyusb +numpy +sphinx +sphinx-rtd-theme +myst-parser diff --git a/pyproject.toml b/pyproject.toml index 11943d0..777eba8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,26 +4,22 @@ build-backend = "hatchling.build" [project] name = "pystages" -version = "1.1.0" +version = "1.2" authors = [ - { name="Olivier Hérivaux", email="olivier.heriveaux@ledger.fr" }, - { name="Michaël Mouchous", email="michael.mouchous@ledger.fr" }, + { name = "Olivier Hérivaux", email = "olivier.heriveaux@ledger.fr" }, + { name = "Michaël Mouchous", email = "michael.mouchous@ledger.fr" }, ] description = "Motorized stage control library for scientific applications" readme = "README.md" -requires-python = ">=3.8" +requires-python = ">=3.9" classifiers = [ - "Programming Language :: Python :: 3", - "License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)", - "Operating System :: OS Independent", -] -dependencies = [ - "pyserial", - "pyusb", - "numpy" + "Programming Language :: Python :: 3", + "License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)", + "Operating System :: OS Independent", ] +dependencies = ["pyserial", "pyusb", "numpy", "pyqt6"] [project.urls] "Homepage" = "https://github.com/Ledger-Donjon/pystages" "Bug Tracker" = "https://github.com/Ledger-Donjon/pystages/issues" -"Documentation" = "https://pystages.readthedocs.io/en/latest" \ No newline at end of file +"Documentation" = "https://pystages.readthedocs.io/en/latest" diff --git a/pystages/__init__.py b/pystages/__init__.py index 713b0b9..1a80f2c 100644 --- a/pystages/__init__.py +++ b/pystages/__init__.py @@ -24,3 +24,16 @@ from .vector import Vector from .tic import Tic, TicDirection from .cncrouter import CNCRouter, CNCStatus + +__all__ = [ + "Stage", + "Corvus", + "M3FS", + "SMC100", + "Autofocus", + "Vector", + "Tic", + "TicDirection", + "CNCRouter", + "CNCStatus", +] diff --git a/pystages/cncrouter.py b/pystages/cncrouter.py index 4f0451f..cf33399 100644 --- a/pystages/cncrouter.py +++ b/pystages/cncrouter.py @@ -7,20 +7,20 @@ # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License -# along with this program. If not, see . +# along with this program. If not, see . # # Values and descriptions of commands and error codes has been taken from GRBL Github repository # https://github.com/grbl/grbl # and the instruction manual of the CNC 3018 PRO # https://drive.google.com/file/d/1yQH9gtO8lWbE-K0dff8g9zq_1xOB57x7 # -# Copyright 2018-2022 Ledger SAS, written by Michaël Mouchous +# Copyright 2018-2023 Ledger SAS, written by Michaël Mouchous -import serial +import serial.serialutil import time from typing import Optional, Tuple, List, Union from .exceptions import ConnectionFailure @@ -30,7 +30,11 @@ from .stage import Stage -class CNCStatus(Enum): +class CNCStatus(str, Enum): + """ + Possible statuses that the CNC can report. + """ + IDLE = "Idle" RUN = "Run" HOLD = "Hold" @@ -40,23 +44,34 @@ class CNCStatus(Enum): CHECK = "Check" +class CNCError(Exception): + """Exception raised when a specific error is detected by the CNC""" + + def __init__(self, message: str, cncstatus: CNCStatus): + super().__init__(message) + self.cncstatus = cncstatus + + class CNCRouter(Stage): """ Class to command CNC routers. """ - def __init__(self, dev: str, reset_wait_time=2.0): + def __init__(self, dev: Optional[str] = None, reset_wait_time=2.0): """ Open serial device to connect to the CNC routers. Raise a ConnectionFailure exception if the serial device could not be open. - :param dev: Serial device. For instance `'/dev/ttyUSB0'`. + :param dev: Serial device. For instance `'/dev/ttyUSB0'`. If not provided, try + to discover a suitable device according to device vendor and product IDs :param reset_wait_time: Depending on the state of the stage, it can take some time for - GRBL to reset. This parameter makes the wait time to be tuned, by giving a time in seconds. + GRBL to reset. This parameter makes the wait time to be tuned, by giving a time in seconds. """ + super().__init__(num_axis=3) self.reset_wait_time = reset_wait_time try: + dev = dev or self.find_device(pid=0x7523, vid=0x1A86) self.serial = serial.Serial(dev, 115200, timeout=1) except serial.serialutil.SerialException as e: raise ConnectionFailure() from e @@ -68,7 +83,7 @@ def reset_grbl(self, wait_time: Optional[float] = None) -> bool: :return: True if the GRBL sent the correct prompt at the end of the reset :param wait_time: Depending on the state of the stage, it can take some time for GRBL to - reset. This parameter makes the wait time to be tuned, by giving a time in seconds. + reset. This parameter makes the wait time to be tuned, by giving a time in seconds. """ if wait_time is None: wait_time = self.reset_wait_time @@ -163,11 +178,17 @@ def get_current_status(self) -> Optional[Tuple[CNCStatus, dict]]: router :return: A tuple containing the status and a dictionary of all other parameters in the - output of the command. + output of the command. """ + self.send("?", eol="") status = self.receive() + # Retry, sometimes it does not respond + if status == "": + self.send("?", eol="") + status = self.receive() + # Sometimes, the CNC returns 'ok' and the actual response is following. while status == "ok": status = self.receive() @@ -176,8 +197,15 @@ def get_current_status(self) -> Optional[Tuple[CNCStatus, dict]]: if status is None: return None - # The output + # The possible outputs # '' + # 'ALARM:1' + + if status.startswith("ALARM:1"): + # The ALARM message is followed by something like + # '[MSG:Reset to continue]' + next = self.receive() + raise CNCError(next, CNCStatus.ALARM) # Discard any unwanted format if not (status.startswith("<") and status.endswith(">")): @@ -219,12 +247,12 @@ def receive_lines(self, until: str = "ok") -> List[str]: :param until: The expected response indicating the end of received lines. :return: The list of all received lines. Note that the expected line is not included in the - list. + list. """ lines = [] - while (l := self.serial.readline().strip().decode()) != until: - if len(l): - lines.append(l) + while (line := self.serial.readline().strip().decode()) != until: + if len(line): + lines.append(line) return lines def receive(self) -> str: @@ -234,10 +262,17 @@ def receive(self) -> str: :return: Received response string, CR-LF removed. """ + tries = 10 # Read at least 2 bytes for CR-LF. response = self.serial.read(2) while response[-2:] != b"\r\n": - response += self.serial.read(1) + part = self.serial.read(1) + response += part + # Give a chance to get out of this + if part == b"": + tries -= 1 + if tries == 0: + return "" # Remove CR-LF and return as string return response[:-2].decode() @@ -274,7 +309,9 @@ def position(self) -> Vector: @position.setter def position(self, value: Vector): # To check dimension and range of the given value - super(__class__, self.__class__).position.fset(self, value) + pos_setter = Stage.position.fset + assert pos_setter is not None + pos_setter(self, value) command = f"G0 X{value.x}" if len(value) > 1: @@ -289,7 +326,7 @@ def is_moving(self) -> bool: Queries the current status of the CNC in order to determine if the CNC is moving :return: True if the CNC reports that a cycle is running (Run) or - if it is in a middle of a homing cycle. + if it is in a middle of a homing cycle (Home). """ while (status := self.get_current_status()) is None: pass @@ -302,3 +339,14 @@ def set_origin(self) -> str: :return: The response of the CNC ('ok' if command has been submitted correctly). """ return self.send_receive("G92 X0 Y0 Z0") + + def home(self, wait=False): + """ + Sends a `$H` command. The stage responds a message `[MSG:Sleeping]` after `ok`. + Take caution for collisions before calling this method ! + + :param wait: Optionally waits for move operation to be done. + """ + self.send("$H") + if wait: + self.wait_move_finished() diff --git a/pystages/corvus.py b/pystages/corvus.py index 1ebfb09..e3fb3b7 100644 --- a/pystages/corvus.py +++ b/pystages/corvus.py @@ -17,11 +17,12 @@ # Copyright 2018-2022 Ledger SAS, written by Olivier Hériveaux -import serial +import serial.serialutil import time from .exceptions import ConnectionFailure from .vector import Vector from .stage import Stage +from typing import Optional class Corvus(Stage): @@ -29,15 +30,17 @@ class Corvus(Stage): Class to command Corvus Eco XYZ stage controller. """ - def __init__(self, dev): + def __init__(self, dev: Optional[str] = None, serial_number: Optional[str] = None): """ Open serial device to connect to the Corvus controller. Raise a ConnectionFailure exception if the serial device could not be open. - :param dev: Serial device. For instance '/dev/ttyUSB0'. + :param dev: Serial device path. For instance '/dev/ttyUSB0'. + :param serial_number: Device Serial Number. For instance 'A600AAAA'. """ super().__init__(num_axis=3) try: + dev = dev or self.find_device(serial_number=serial_number) self.serial = serial.Serial(dev, 57600) except serial.serialutil.SerialException as e: raise ConnectionFailure() from e @@ -123,6 +126,18 @@ def calibrate(self): while int(self.send_receive("{0} getcaldone".format(i + 1))) != 3: time.sleep(0.1) + def home(self, wait=False): + """ + Execute limit-switch move. + Take caution for collisions before calling this method ! + + :param wait: Optionally waits for move operation to be done. + """ + # Call for calibration + self.send("cal") + if wait: + self.wait_move_finished() + def move_relative(self, x, y, z): """ Move stage relative to current position. @@ -166,7 +181,10 @@ def position(self): @position.setter def position(self, value: Vector): # To check dimension and range of the given value - super(__class__, self.__class__).position.fset(self, value) + pos_setter = Stage.position.fset + assert pos_setter is not None + pos_setter(self, value) + self.send("3 setdim") self.send("{0} {1} {2} move".format(value.x, value.y, value.z)) diff --git a/pystages/gui/__init__.py b/pystages/gui/__init__.py new file mode 100644 index 0000000..287320a --- /dev/null +++ b/pystages/gui/__init__.py @@ -0,0 +1,4 @@ +from . import gui + + +__all__ = ["gui"] diff --git a/pystages/gui/__main__.py b/pystages/gui/__main__.py new file mode 100644 index 0000000..6dbad8a --- /dev/null +++ b/pystages/gui/__main__.py @@ -0,0 +1,36 @@ +from PyQt6.QtWidgets import QApplication, QStyleFactory +import sys +from PyQt6.QtGui import QIcon, QPalette, QColor +from .util import resource_path +from .gui import StageWindow + +app = QApplication(sys.argv) + +app.setApplicationName("PyStages") +app.setWindowIcon(QIcon(resource_path(":/icons/logo.png"))) +app.setStyle(QStyleFactory.create("Fusion")) +palette = QPalette() +palette.setColor(QPalette.ColorRole.Window, QColor(25, 25, 25)) +palette.setColor(QPalette.ColorRole.WindowText, QColor(240, 240, 240)) +palette.setColor(QPalette.ColorRole.Base, QColor(40, 40, 40)) +palette.setColor(QPalette.ColorRole.AlternateBase, QColor(255, 0, 0)) +palette.setColor(QPalette.ColorRole.ToolTipBase, QColor(25, 25, 25)) +palette.setColor(QPalette.ColorRole.ToolTipText, QColor(255, 255, 255)) +palette.setColor(QPalette.ColorRole.Text, QColor(200, 200, 200)) +palette.setColor(QPalette.ColorRole.Button, QColor(40, 40, 40)) +palette.setColor( + QPalette.ColorGroup.Disabled, QPalette.ColorRole.Button, QColor(30, 30, 30) +) +palette.setColor(QPalette.ColorRole.ButtonText, QColor(200, 200, 200)) +palette.setColor( + QPalette.ColorGroup.Disabled, QPalette.ColorRole.ButtonText, QColor(100, 100, 100) +) +palette.setColor(QPalette.ColorRole.BrightText, QColor(255, 0, 0)) +palette.setColor(QPalette.ColorRole.Link, QColor(255, 0, 0)) +palette.setColor(QPalette.ColorRole.Highlight, QColor(40, 120, 233)) +palette.setColor(QPalette.ColorRole.HighlightedText, QColor(255, 255, 255)) +app.setPalette(palette) + +win = StageWindow() +win.show() +sys.exit(app.exec()) diff --git a/pystages/gui/gui.py b/pystages/gui/gui.py new file mode 100644 index 0000000..b426f80 --- /dev/null +++ b/pystages/gui/gui.py @@ -0,0 +1,241 @@ +#!/bin/python3 +from typing import Optional +from PyQt6.QtWidgets import ( + QWidget, + QHBoxLayout, + QVBoxLayout, + QGridLayout, + QLabel, + QComboBox, + QPushButton, + QCheckBox, + QLineEdit, +) +from PyQt6.QtCore import QObject, QTimer, QLocale, QCoreApplication +from PyQt6.QtGui import QDoubleValidator +from ..cncrouter import CNCRouter +from ..corvus import Corvus +from ..smc100 import SMC100 +from ..stage import Stage +from ..m3fs import M3FS +from enum import Enum +from serial.tools.list_ports import comports +from serial.tools.list_ports_common import ListPortInfo + + +class StageType(str, Enum): + CNC = "CNC" + Corvus = "Corvus" + SMC = "SMC100" + M3FS = "M3FS" + + +class StageWindow(QWidget): + def set_controls_enabled(self, enabled: bool): + for c in self.controls: + c.setEnabled(enabled) + + def connect(self, on_off): + if on_off: + selected = self.stage_selection.currentText() + port = self.port_selection.currentData() + dev = port.device if isinstance(port, ListPortInfo) else None + + # Instanciate stage according to current stage selection + if selected == StageType.CNC: + self.stage = CNCRouter(dev) + elif selected == StageType.Corvus: + self.stage = Corvus(dev) + elif selected == StageType.SMC: + self.stage = SMC100(dev, [1, 2]) + elif selected == StageType.M3FS: + self.stage = M3FS(dev, baudrate=115200) + self.position_timer.start(100) + + else: + del self.stage + self.stage = None + + self.position_timer.stop() + + self.stage_selection.setDisabled(on_off) + self.port_selection.setDisabled(on_off) + + self.set_controls_enabled(on_off) + + if self.stage is not None: + self.stage.wait_routine = lambda: QCoreApplication.processEvents() + + def __init__(self): + super().__init__() + + # Current stage + self.stage: Optional[Stage] = None + + # This flag is used to limit the communication + # with the stage by not making updates of the position + self.in_motion = False + + # Moving controls + self.controls = [] + + vbox = QVBoxLayout() + self.setLayout(vbox) + + box = QHBoxLayout() + vbox.addLayout(box) + w = QLabel("Stage Selection") + box.addWidget(w) + self.stage_selection = w = QComboBox() + w.addItems([StageType.CNC, StageType.Corvus, StageType.SMC, StageType.M3FS]) + box.addWidget(w) + self.port_selection = w = QComboBox() + w.addItem("Auto detection", None) + for port in comports(): + d = port.device + if port.vid and port.pid: + d += f" -- {port.vid:04x}:{port.pid:04x}" + if port.product: + d += f" -- {port.product}" + if port.serial_number: + d += f" -- {port.serial_number}" + w.addItem(d, userData=port) + box.addWidget(w) + self.connect_button = w = QPushButton("Connect") + w.setCheckable(True) + w.clicked.connect(self.connect) + box.addWidget(w) + + box = QHBoxLayout() + vbox.addLayout(box) + box.addWidget(QLabel("Step")) + self.step_selection = w = QComboBox() + for step in [50, 10, 5, 1, 0.5, 0.1]: + w.addItem(f"{step} mm", step) + w.setCurrentIndex(3) + box.addWidget(w) + self.controls.append(w) + + self.moving_grid = grid = QGridLayout() + w = QPushButton("Y+") + w.clicked.connect(self.bouton_moved) + self.controls.append(w) + grid.addWidget(w, 0, 1) + w = QPushButton("X-") + w.clicked.connect(self.bouton_moved) + self.controls.append(w) + grid.addWidget(w, 1, 0) + w = QPushButton("Y-") + w.clicked.connect(self.bouton_moved) + self.controls.append(w) + grid.addWidget(w, 2, 1) + w = QPushButton("X+") + w.clicked.connect(self.bouton_moved) + self.controls.append(w) + grid.addWidget(w, 1, 2) + w = QPushButton("Z+") + w.clicked.connect(self.bouton_moved) + self.controls.append(w) + grid.addWidget(w, 0, 3) + w = QPushButton("Z-") + w.clicked.connect(self.bouton_moved) + self.controls.append(w) + grid.addWidget(w, 2, 3) + w = QPushButton("Home") + w.clicked.connect(self.home) + self.controls.append(w) + grid.addWidget(w, 1, 1) + vbox.addLayout(grid) + + box = QHBoxLayout() + vbox.addLayout(box) + self.z_offset = w = QCheckBox("Z offset (mm)") + w.setChecked(True) + box.addWidget(w) + self.controls.append(w) + self.z_offset_sel = w = QComboBox() + for value in [10, 5, 1, 0.5, 0.1]: + w.addItem(f"{value} mm", value) + w.setCurrentIndex(2) + box.addWidget(w) + self.controls.append(w) + + v = QDoubleValidator() + v.setLocale(QLocale.c()) + box = QHBoxLayout() + vbox.addLayout(box) + self.go_x = w = QLineEdit() + w.setValidator(v) + box.addWidget(w) + self.go_y = w = QLineEdit() + w.setValidator(v) + box.addWidget(w) + self.go_z = w = QLineEdit() + w.setValidator(v) + box.addWidget(w) + w = QPushButton("Go to position") + w.clicked.connect(self.go_to_position) + self.controls.append(w) + box.addWidget(w) + + # Disable all controls + self.set_controls_enabled(False) + + self.position_label = w = QLabel("Pos") + vbox.addWidget(w) + self.position_timer = QTimer() + self.position_timer.timeout.connect(self.update_position) + + def update_position(self): + if self.stage is None or self.in_motion: + return + position = self.stage.position + self.position_label.setText(",".join([f"{i:.02f}" for i in position.data])) + + def bouton_moved(self): + if self.stage is None: + return + self.in_motion = True + button = QObject().sender() + assert isinstance(button, QPushButton) + axe, direction = button.text() + axe = {"X": 0, "Y": 1, "Z": 2}[axe] + direction = {"+": 1.0, "-": -1.0}[direction] + step = direction * self.step_selection.currentData() + pos = self.stage.position + z_offset = ( + self.z_offset_sel.currentData() + if axe != 2 and self.z_offset.isChecked() + else None + ) + + if z_offset is not None: + pos[2] += z_offset + self.stage.move_to(pos) + + pos[axe] += step + self.stage.move_to(pos) + + if z_offset is not None: + pos[2] -= z_offset + self.stage.move_to(pos) + self.in_motion = False + + def home(self): + if self.stage is None: + return + self.stage.home(wait=True) + + def go_to_position(self): + if self.stage is None: + return + x, y, z = ( + QLocale.c().toDouble(self.go_x.text())[0], + QLocale.c().toDouble(self.go_y.text())[0], + QLocale.c().toDouble(self.go_z.text())[0], + ) + pos = self.stage.position + pos.x = x + pos.y = y + pos.z = z + self.stage.move_to(pos) diff --git a/pystages/gui/util.py b/pystages/gui/util.py new file mode 100644 index 0000000..aef8c8d --- /dev/null +++ b/pystages/gui/util.py @@ -0,0 +1,12 @@ +import os + +__dirname = os.path.dirname(os.path.realpath(__file__)) + + +def resource_path(path: str) -> str: + """ + Transforms a .":/path/to/file" path to the relative path from the main script + """ + if not path.startswith(":/"): + return path + return os.path.join(__dirname, path[2:]) diff --git a/pystages/m3fs.py b/pystages/m3fs.py index a2d3bb9..981424c 100644 --- a/pystages/m3fs.py +++ b/pystages/m3fs.py @@ -17,11 +17,24 @@ # Copyright 2018-2020 Ledger SAS, written by Olivier Hériveaux -import serial +from typing import Optional +import serial.serialutil from binascii import hexlify from .exceptions import ConnectionFailure, ProtocolError, VersionNotSupported from .stage import Stage from .vector import Vector +from enum import Enum + + +class M3FSCommand(Enum): + """ + Command integer ID for New Scale Technologies M3-FS. + https://www.newscaletech.com/downloads/please-register/ + """ + + READ_FIRM_VERSION = 1 + MOVE_TO_TARGET = 8 + VIEW_CLOSED_LOOP_STATUS_POS = 10 class M3FS(Stage): @@ -33,13 +46,15 @@ class M3FS(Stage): correct VID/DID). """ - def __init__(self, dev, baudrate=250000): + def __init__(self, dev: Optional[str] = None, baudrate=250000): """ Connect to the device. If the serial device cannot be opened, a ConnectionFailure exception is thrown. If the device version is not supported, a VersionNotSupported error is thrown. - :param dev: Serial device. For instance 'COM0'. + :param dev: Serial device. For instance `'/dev/ttyUSB0'`. + If not provided, a suitable device is searched according to + according to vendor and product IDs :param baudrate: Serial baudrate. """ super().__init__() @@ -55,14 +70,14 @@ def __init__(self, dev, baudrate=250000): # in the future... self.serial.timeout = 1 try: - res = self.command(1) + res = self.command(M3FSCommand.READ_FIRM_VERSION) except ProtocolError as e: raise ConnectionFailure() from e self.serial.timeout = None if res != "1 VER 4.7.3 M3-FS": raise VersionNotSupported(res) - def __send(self, command, data=None): + def __send(self, command: M3FSCommand, data: Optional[str] = None): """ Send a command to the controller. @@ -71,9 +86,9 @@ def __send(self, command, data=None): """ if data is not None: assert ("<" not in data) and (">" not in data) and ("\r" not in data) - if command not in range(100): + if command.value not in range(100): raise ValueError("Invalid command ID.") - full_command = "<{0:02d}".format(command) + full_command = "<{0:02d}".format(command.value) if data is not None: full_command += " " + data full_command += ">\r" @@ -112,7 +127,7 @@ def __receive(self): raise ProtocolError() return result.decode() - def command(self, command, data=None): + def command(self, command: M3FSCommand, data=None): """ Send a command to the controller and get the response. @@ -124,13 +139,17 @@ def command(self, command, data=None): res = self.__receive() # Check that the command id in the response is the same as the command # id in the request. - if (len(res) < 2) or (int(res[0:2]) != command): - raise ProtocolError() + if (len(res) < 2) or (int(res[0:2]) != command.value): + raise ProtocolError( + f"Unexpected response after sending command {command}:", res + ) if len(res) == 2: return None else: if res[2] != " ": - raise ProtocolError() + raise ProtocolError( + f"Unexpected response after sending command {command}:", res + ) return res[3:] def __get_closed_loop_status(self): @@ -139,7 +158,17 @@ def __get_closed_loop_status(self): :return: Tuple of 3 int. """ - res = list(bytes.fromhex(x) for x in self.command(10).split(" ")) + command = M3FSCommand.VIEW_CLOSED_LOOP_STATUS_POS + res = self.command(command) + if res is None: + raise ProtocolError( + f"Unexpected response after sending command {command}: Got response without data." + ) + res = list(bytes.fromhex(x) for x in res.split(" ")) + if len(res) != 3: + raise ProtocolError( + f"Unexpected response after sending command {command}: Expecting 3 values, got {res}." + ) motor_status = int.from_bytes(res[0], "big", signed=False) position = int.from_bytes(res[1], "big", signed=True) error = int.from_bytes(res[2], "big", signed=True) @@ -153,16 +182,18 @@ def position(self) -> Vector: :getter: Query and return current stage position. :setter: Move stage. Wait until position is reached. """ - motor_status, position, error = self.__get_closed_loop_status() + _, position, _ = self.__get_closed_loop_status() return Vector(position * self.resolution_um) @position.setter def position(self, value: Vector): # To check dimension and range of the given value - super(__class__, self.__class__).position.fset(self, value) + pos_setter = Stage.position.fset + assert pos_setter is not None + pos_setter(self, value) val = round(value.x / self.resolution_um).to_bytes(4, "big", signed=True) - self.command(8, hexlify(val).decode()) + self.command(M3FSCommand.MOVE_TO_TARGET, hexlify(val).decode()) # Now wait until motor is not moving anymore while self.is_moving: pass diff --git a/pystages/smc100.py b/pystages/smc100.py index 1deeb56..bd8f1db 100644 --- a/pystages/smc100.py +++ b/pystages/smc100.py @@ -18,12 +18,12 @@ # written by Olivier Hériveaux, Manuel San Pedro and Michaël Mouchous -import serial +import serial.serialutil import time from .vector import Vector from .exceptions import ProtocolError, ConnectionFailure -from enum import Enum, IntFlag -from typing import Optional, List +from enum import Enum, Flag +from typing import Optional, List, Union from .stage import Stage @@ -42,9 +42,11 @@ class Link: controller in the daisy chain is always zero. """ - def __init__(self, dev: str): + def __init__(self, dev: Optional[str] = None): """ - :param dev: Serial device. For instance '/dev/ttyUSB0' or 'COM0'. + :param dev: Serial device. For instance `'/dev/ttyUSB0'`. + If not provided, a suitable device is searched according to + according to vendor and product IDs """ try: self.serial = serial.Serial(port=dev, baudrate=57600, xonxoff=True) @@ -113,12 +115,12 @@ def response(self, address: Optional[int], command: str) -> str: """ query_string = f'{"" if address is None else address}{command}' res = self.receive() - if res[: len(query_string)] != query_string: + if res is None or res[: len(query_string)] != query_string: raise ProtocolError(query_string, res) return res[len(query_string) :] -class State(Enum): +class State(int, Enum): """ Possible controller states. The values in this enumeration corresponds to the values returned by the @@ -148,7 +150,7 @@ class State(Enum): JOGGING_FROM_DISABLE = 0x47 -class Error(IntFlag): +class Error(int, Flag): """ Information returned when querying positioner error. """ @@ -171,8 +173,8 @@ class ErrorAndState: Information returned when querying positioner error and controller state. """ - state = None - error = None + state = State.NOT_REFERENCED_FROM_RESET + error = Error.NO_ERROR @property def is_referenced(self) -> bool: @@ -183,41 +185,39 @@ def is_referenced(self) -> bool: raise RuntimeError("state not available") else: return not ( - (self.state.value >= State.NOT_REFERENCED_FROM_RESET.value) - and (self.state.value <= State.NOT_REFERENCED_FROM_JOGGING.value) + (self.state >= State.NOT_REFERENCED_FROM_RESET) + and (self.state <= State.NOT_REFERENCED_FROM_JOGGING) ) @property def is_ready(self) -> bool: """:return: True if state is one of READY_x states.""" - return (self.state.value >= State.READY_FROM_HOMING.value) and ( - self.state.value <= State.READY_FROM_JOGGING.value + return (self.state >= State.READY_FROM_HOMING) and ( + self.state <= State.READY_FROM_JOGGING ) @property def is_moving(self) -> bool: """:return: True if state is MOVING.""" - return self.state.value == State.MOVING.value + return self.state == State.MOVING @property def is_homing(self) -> bool: """:return: True if state is one of HOMING_x states.""" - return (self.state.value >= State.HOMING_RS232.value) and ( - self.state.value <= State.HOMING_SMCRC.value - ) + return (self.state >= State.HOMING_RS232) and (self.state <= State.HOMING_SMCRC) @property - def is_jogging(self): + def is_jogging(self) -> bool: """:return: True if state is one of JOGGING_x states.""" - return (self.state.value >= State.JOGGING_FROM_READY.value) and ( - self.state.value <= State.JOGGING_FROM_DISABLE.value + return (self.state >= State.JOGGING_FROM_READY) and ( + self.state <= State.JOGGING_FROM_DISABLE ) @property - def is_disabled(self): + def is_disabled(self) -> bool: """:return: True if state is one of DISABLE_x states.""" - return (self.state.value >= State.DISABLE_FROM_READY.value) and ( - self.state.value <= State.DISABLE_FROM_JOGGING.value + return (self.state >= State.DISABLE_FROM_READY) and ( + self.state <= State.DISABLE_FROM_JOGGING ) @@ -226,11 +226,13 @@ class SMC100(Stage): Class to command Newport SMC100 controllers. """ - def __init__(self, dev, addresses: List[int]): + def __init__(self, dev: Optional[Union[str, Link, "SMC100"]], addresses: List[int]): """ - :param dev: Serial device string (for instance '/dev/ttyUSB0' or + :param dev: Serial device string (for instance `'/dev/ttyUSB0'` or 'COM0'), an instance of Link, or an instance of SMC100 sharing the same serial device. + If not provided, a suitable device is searched according to + according to vendor and product IDs :param addresses: An iterable of int controller addresses. """ super().__init__(num_axis=len(addresses)) @@ -261,23 +263,37 @@ def position(self): # It is faster to send all the request and then get all the responses. # This reduces a lot the latency. for i, addr in enumerate(self.addresses): - r = self.link.query(addr, "TP", lazy_res=False) - val = float(r) + res = self.link.query(addr, "TP", lazy_res=False) + if res is None: + raise ProtocolError("TP") + val = float(res) result[i] = val return result @position.setter def position(self, value: Vector): # To check dimension and range of the given value - super(__class__, self.__class__).position.fset(self, value) + pos_setter = Stage.position.fset + assert pos_setter is not None + pos_setter(self, value) # Enable the motors self.is_disabled = False commands = [] - for position, addr in zip(value, self.addresses): + for position, addr in zip(value.data, self.addresses): commands.append(f"{addr}PA{position:.5f}") self.link.send(None, "\r\n".join(commands)) + def home(self, wait=False): + """ + Perform home search. + + :param wait: Optionally waits for move operation to be done. + """ + self.home_search() + if wait: + self.wait_move_finished() + def home_search(self): """ Perform home search. @@ -308,7 +324,7 @@ def get_error_and_state(self, addr: int): :return: Current error and state, in a ErrorAndState instance. """ res = self.link.query(addr, "TS") - if len(res) != 6: + if res is None or len(res) != 6: raise ProtocolError("TS", res) result = ErrorAndState() result.error = Error(int(res[:4], 16)) @@ -330,7 +346,10 @@ def controller_address(self, addr: int): """ Get controller's RS-485 address. int in [2, 31]. """ - return int(self.link.query(addr, "SA")) + res = self.link.query(addr, "SA") + if res is None: + raise ProtocolError("SA") + return int(res) def set_controller_address(self, addr: int, value: int): """ @@ -392,6 +411,7 @@ def set_position(self, addr: int, value: float, blocking=True): def is_moving(self) -> bool: """ Indicates if the stage is currently moving due to MOVE, HOME or JOG operation. + :return: Moving state of the stage """ for addr in self.addresses: @@ -423,7 +443,7 @@ def enter_leave_disable_state(self, addr: Optional[int], enter: bool = True): DISABLE state makes the motor not energized and opens the control loop. :param addr: address of the axis to operate. - If None is passed, it applies to all controllers + If None is passed, it applies to all controllers :param enter: True to enter, False to leave DISABLE state """ # MM0 changes the controller’s state from READY to DISABLE (enter) diff --git a/pystages/stage.py b/pystages/stage.py index f79d142..8e16695 100644 --- a/pystages/stage.py +++ b/pystages/stage.py @@ -17,8 +17,9 @@ # Copyright 2018-2022 Ledger SAS, written by Michaël Mouchous from .vector import Vector -from typing import Optional +from typing import Optional, Callable from abc import ABC, abstractmethod +from serial.tools.list_ports import comports class Stage(ABC): @@ -29,18 +30,49 @@ class Stage(ABC): def __init__(self, num_axis=1): """ - :param num_axis: The number of axis of the stage, can be updated or set after initialisation - of the object. + :param num_axis: The number of axis of the stage, can be updated or set after initialization + of the object. """ self.num_axis = num_axis # The wait routine is a function that is called when the wait_move_finished is looping. - # It can be used to add some temporisation and/or UI updates. - self.wait_routine = None + # It can be used to add some temporization and/or UI updates. + self.wait_routine: Optional[Callable] = None # Minimum and maximum software limits self._minimums: Optional[Vector] = None self._maximums: Optional[Vector] = None + def find_device( + self, + pid: Optional[int] = None, + vid: Optional[int] = None, + serial_number: Optional[str] = None, + ) -> str: + """ + Find automatically one device according to given information in parameters + + :param pid: Product ID of the device. If None, not a limiting criteria. + :param vid: Vendor ID of the device. If None, not a limiting criterial. + :param serial_number: serial number of the device. If not None, the device + must have a serial number and matches the given value. + :return: The device path to the communication port. + """ + possible_ports = [] + for port in comports(): + if serial_number is not None: + if port.serial_number == serial_number: + possible_ports.append(port) + continue + if (port.pid, port.vid) == (pid, vid): + possible_ports.append(port) + if len(possible_ports) > 1: + raise RuntimeError("Multiple devices found! I don't know which one to use.") + elif len(possible_ports) == 1: + dev = possible_ports[0].device + else: + raise RuntimeError("No device found") + return dev + @property @abstractmethod def position(self) -> Vector: @@ -151,3 +183,10 @@ def maximums(self, value: Optional[Vector]): if value is not None: self.check_dimension(value) self._maximums = value + + def home(self, wait=False): + """Triggers a homing command. + + :param wait: Optionally waits for move operation to be done. + """ + self.move_to(Vector(dim=self.num_axis), wait=wait) diff --git a/pystages/tic.py b/pystages/tic.py index 578a99c..28ba9e0 100644 --- a/pystages/tic.py +++ b/pystages/tic.py @@ -23,6 +23,7 @@ from time import sleep from .stage import Stage from .vector import Vector +from .exceptions import ConnectionFailure class TicVariable(Enum): @@ -73,7 +74,7 @@ class TicVariable(Enum): LAST_HP_DRIVER_ERRORS = (0xFF, 1, False) -class TicCommand(Enum): +class TicCommand(int, Enum): """ Command codes for Polulu Tic Stepper Motor Controller. https://www.pololu.com/docs/0J71/8 @@ -107,7 +108,7 @@ class TicCommand(Enum): START_BOOTLOADER = 0xFF -class TicDirection(Enum): +class TicDirection(int, Enum): """Possible directions for homing""" REVERSE = 0 @@ -124,7 +125,11 @@ class Tic(Stage): def __init__(self): super().__init__() - self.dev = usb.core.find(idVendor=0x1FFB, idProduct=0x00B5) + dev = usb.core.find(idVendor=0x1FFB, idProduct=0x00B5) + if isinstance(dev, usb.core.Device): + self.dev = dev + else: + raise ConnectionFailure("Tic stepper motor not found.") self.dev.set_configuration() self.energize() self.poll_interval = 0.1 @@ -135,7 +140,7 @@ def quick(self, command: TicCommand): :param command: Command. """ - self.dev.ctrl_transfer(0x40, command.value, 0, 0, 0) + self.dev.ctrl_transfer(0x40, command, 0, 0, 0) def write_7(self, command: TicCommand, data: int): """ @@ -144,7 +149,7 @@ def write_7(self, command: TicCommand, data: int): :param command: Command. :param data: Value to be written. """ - self.dev.ctrl_transfer(0x40, command.value, data, 0, 0) + self.dev.ctrl_transfer(0x40, command, data, 0, 0) def write_32(self, command: TicCommand, data: int): """ @@ -153,7 +158,7 @@ def write_32(self, command: TicCommand, data: int): :param command: Command code. :param data: Value to be written. """ - self.dev.ctrl_transfer(0x40, command.value, data & 0xFFFF, data >> 16, 0) + self.dev.ctrl_transfer(0x40, command, data & 0xFFFF, data >> 16, 0) def block_read(self, command: TicCommand, offset, length) -> bytes: """ @@ -163,7 +168,7 @@ def block_read(self, command: TicCommand, offset, length) -> bytes: :param offset: Data offset. :param length: Data length. """ - return bytes(self.dev.ctrl_transfer(0xC0, command.value, 0, offset, length)) + return bytes(self.dev.ctrl_transfer(0xC0, command, 0, offset, length)) def set_setting(self, command: TicCommand, data, offset): """ @@ -173,7 +178,7 @@ def set_setting(self, command: TicCommand, data, offset): :param data: Value to be written. :param offset: Write offset. """ - self.dev.ctrl_transfer(0x40, command.value, data, offset, 0) + self.dev.ctrl_transfer(0x40, command, data, offset, 0) def energize(self): self.quick(TicCommand.ENERGIZE) @@ -187,6 +192,14 @@ def reset(self): def exit_safe_start(self): self.quick(TicCommand.EXIT_SAFE_START) + def home(self, wait=False): + """Triggers a Home command. + + :param wait: Optionally waits for move operation to be done.""" + self.go_home(TicDirection.REVERSE, False) + if wait: + self.wait_move_finished() + def go_home(self, direction: TicDirection, wait: bool = True): """ Run the homing procedure. @@ -194,7 +207,7 @@ def go_home(self, direction: TicDirection, wait: bool = True): :param wait: If True, wait for homing procedure end. """ self.exit_safe_start() - self.write_7(TicCommand.GO_HOME, direction.value) + self.write_7(TicCommand.GO_HOME, direction) if wait: while self.get_variable(TicVariable.MISC_FLAGS) & (1 << 4): self.exit_safe_start() @@ -227,7 +240,10 @@ def position(self) -> Vector: @position.setter def position(self, value: Vector): # To check dimension and range of the given value - super(__class__, self.__class__).position.fset(self, value) + pos_setter = Stage.position.fset + assert pos_setter is not None + pos_setter(self, value) + self.target_position = value.x while self.position.x != value.x: sleep(self.poll_interval) diff --git a/pystages/vector.py b/pystages/vector.py index f97083f..546cfa5 100644 --- a/pystages/vector.py +++ b/pystages/vector.py @@ -163,7 +163,7 @@ def __mul__(self, other: Union["Vector", int, float]): result = Vector(dim=dim) if len(other) != dim: - raise ValueError(f"Incorrect vector size") + raise ValueError("Incorrect vector size") for i in range(dim): result[i] = self[i] * other[i] return result @@ -173,7 +173,7 @@ def __truediv__(self, other): return self * (1.0 / other) else: raise TypeError( - f"Incorrect type for second operand. int or float is expected." + "Incorrect type for second operand. int or float is expected." ) diff --git a/requirements.txt b/requirements.txt index add796d..4a450cd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ pyserial pyusb numpy +pyqt6 \ No newline at end of file