From 819007af7975869727c0d33e0f54e2e0420d949b Mon Sep 17 00:00:00 2001 From: francdoc <32172349+francdoc@users.noreply.github.com> Date: Sat, 18 Nov 2023 17:28:22 -0300 Subject: [PATCH] Add functions to C library wrapper (#23) --- acspy/__init__.py | 2 +- acspy/acsc.py | 179 ++++++++++++++++++++++++++++++++-------------- acspy/control.py | 43 +++++------ 3 files changed, 149 insertions(+), 75 deletions(-) diff --git a/acspy/__init__.py b/acspy/__init__.py index 4638381..ff4637e 100644 --- a/acspy/__init__.py +++ b/acspy/__init__.py @@ -1,3 +1,3 @@ """A package for working with ACS motion controllers in Python.""" -__version__ = "0.0.5" +__version__ = "0.0.6" diff --git a/acspy/acsc.py b/acspy/acsc.py index a985680..dd6e9af 100644 --- a/acspy/acsc.py +++ b/acspy/acsc.py @@ -1,6 +1,6 @@ """This module is a wrapper for the ACS C library using ctypes.""" -from __future__ import division, print_function +from __future__ import annotations, division, print_function import ctypes import platform @@ -30,7 +30,6 @@ class AcscError(Exception): p = ctypes.pointer s = create_string_buffer(256) - # Define motion flags and constants AMF_WAIT = 0x00000001 AMF_RELATIVE = 0x00000002 @@ -106,7 +105,10 @@ class AcscError(Exception): def openCommDirect(): - """Open simulator. Returns communication handle.""" + """Open simulator. + + Returns communication handle. + """ # caution: acs does treat errors differently for openComm functions! hcomm = acs.acsc_OpenCommDirect() if hcomm == -1: @@ -127,8 +129,9 @@ def openCommDirect(): return hcomm -def openCommEthernetTCP(address="10.0.0.100", port=701): - """Address is a string. Port is an int. +def openCommEthernetTCP(address: str = "10.0.0.100", port: int = 701): + """Address is a string. + Returns communication handle.""" hcomm = acs.acsc_OpenCommEthernetTCP(address.encode(), port) if hcomm == -1: @@ -146,11 +149,11 @@ def openCommEthernetTCP(address="10.0.0.100", port=701): raise AcscError(str(err) + ": " + err_str) else: raise AcscError(err) - return hcomm def getSerialNumber(hcomm, wait=SYNCHRONOUS): + """Retrieves the controller serial number.""" s = create_string_buffer(256) buffer_size = int32(ctypes.sizeof(s)) ser_num_len = int32() @@ -166,31 +169,37 @@ def getSerialNumber(hcomm, wait=SYNCHRONOUS): return serial_number -def setVelocity(hcomm, axis, vel, wait=SYNCHRONOUS): +def setVelocity(hcomm, axis: int, vel: float, wait=SYNCHRONOUS): """Sets axis velocity.""" call_acsc(acs.acsc_SetVelocity, hcomm, axis, double(vel), wait) -def setAcceleration(hcomm, axis, acc, wait=SYNCHRONOUS): +def setAcceleration(hcomm, axis: int, acc: float, wait=SYNCHRONOUS): """Sets axis acceleration.""" call_acsc(acs.acsc_SetAcceleration, hcomm, axis, double(acc), wait) -def setDeceleration(hcomm, axis, dec, wait=SYNCHRONOUS): +def setDeceleration(hcomm, axis: int, dec: float, wait=SYNCHRONOUS): """Sets axis deceleration.""" call_acsc(acs.acsc_SetDeceleration, hcomm, axis, double(dec), wait) -def setKillDeceleration(hcomm, axis, dec, wait=SYNCHRONOUS): +def setKillDeceleration(hcomm, axis: int, dec: float, wait=SYNCHRONOUS): """Sets axis deceleration.""" - call_acsc(acs.acsc_SetKillDecelerations, hcomm, axis, double(dec), wait) + call_acsc(acs.acsc_SetKillDeceleration, hcomm, axis, double(dec), wait) -def setJerk(hcomm, axis, jerk, wait=SYNCHRONOUS): +def setJerk(hcomm, axis: int, jerk: float, wait=SYNCHRONOUS): + """Defines a value of motion jerk.""" call_acsc(acs.acsc_SetJerk, hcomm, axis, double(jerk), wait) -def getMotorEnabled(hcomm, axis, wait=SYNCHRONOUS): +def setRPosition(hcomm, axis: int, rpos: float, wait=SYNCHRONOUS): + """Asigns a current value of reference position.""" + call_acsc(acs.acsc_SetRPosition, hcomm, axis, double(rpos), wait) + + +def getMotorEnabled(hcomm, axis: int, wait=SYNCHRONOUS): """Checks if motor is enabled.""" state = ctypes.c_int() call_acsc(acs.acsc_GetMotorState, hcomm, axis, byref(state), wait) @@ -198,7 +207,7 @@ def getMotorEnabled(hcomm, axis, wait=SYNCHRONOUS): return hex(state)[-1] == "1" -def getMotorState(hcomm, axis, wait=SYNCHRONOUS): +def getMotorState(hcomm, axis: int, wait=SYNCHRONOUS): """Gets the motor state. Returns a dictionary with the following keys: * "enabled" * "in position" @@ -217,17 +226,43 @@ def getMotorState(hcomm, axis, wait=SYNCHRONOUS): return mst -def getAxisState(hcomm, axis, wait=SYNCHRONOUS): - """Gets the axis state. Returns a dictionary with the following keys - * "lead" - * "DC" - * "PEG" - * "PEGREADY" - * "moving" - * "accelerating" - * "segment" - * "vel lock" - * "pos lock" +def getMotorError(hcomm, axis: int, wait=SYNCHRONOUS): + """Get the motor error for disabling.""" + error = ctypes.c_int() + call_acsc(acs.acsc_GetMotorError, hcomm, axis, byref(error), wait) + error = error.value + return error + + +def getErrorString(hcomm, error: int): + """Retrieves the explanation of an error code.""" + err_lng = int32() + s = create_string_buffer(256) + call_acsc( + acs.acsc_GetErrorString, + hcomm, + error, + s, + int32(ctypes.sizeof(s)), + byref(err_lng), + ) + error_string = s.value.decode("ascii") + return error_string + + +def getAxisState(hcomm, axis: int, wait=SYNCHRONOUS): + """Gets the axis state. + + Returns a dictionary with the following keys + * "lead" + * "DC" + * "PEG" + * "PEGREADY" + * "moving" + * "accelerating" + * "segment" + * "vel lock" + * "pos lock" """ state = ctypes.c_int() call_acsc(acs.acsc_GetAxisState, hcomm, axis, byref(state), wait) @@ -246,22 +281,30 @@ def getAxisState(hcomm, axis, wait=SYNCHRONOUS): return ast +def getFPosition(hcomm, axis: int, wait=SYNCHRONOUS): + """Retrieves an instant value of the motor feedback position.""" + fposition = ctypes.c_double() + call_acsc(acs.acsc_GetFPosition, hcomm, axis, byref(fposition), wait) + fposition = fposition.value + return fposition + + def registerEmergencyStop(): """Register the software emergency stop.""" call_acsc(acs.acsc_RegisterEmergencyStop) -def jog(hcomm, flags, axis, vel, wait=SYNCHRONOUS): - """Jog move.""" +def jog(hcomm, flags: int, axis: int, vel: float, wait=SYNCHRONOUS): + """Initiates a single-axis jog motion.""" call_acsc(acs.acsc_Jog, hcomm, flags, axis, double(vel), wait) -def toPoint(hcomm, flags, axis, target, wait=SYNCHRONOUS): +def toPoint(hcomm, flags: int, axis: int, target: float, wait=SYNCHRONOUS): """Point to point move.""" call_acsc(acs.acsc_ToPoint, hcomm, flags, axis, double(target), wait) -def toPointM(hcomm, flags, axes, target, wait=SYNCHRONOUS): +def toPointM(hcomm, flags: int, axes: tuple, target: tuple, wait=SYNCHRONOUS): """Initiates a multi-axis move to the specified target. Axes and target are entered as tuples. Set flags as None for absolute coordinates.""" if len(axes) != len(target): @@ -277,10 +320,17 @@ def toPointM(hcomm, flags, axes, target, wait=SYNCHRONOUS): call_acsc(acs.acsc_ToPointM, hcomm, flags, axes_c, target_c, wait) -def enable(hcomm, axis, wait=SYNCHRONOUS): +def enable(hcomm, axis: int, wait=SYNCHRONOUS): + """The function activates a motor.""" call_acsc(acs.acsc_Enable, hcomm, int32(axis), wait) +def enableMotors(hcomm, axes: list, wait=SYNCHRONOUS): + """The function activates several motors.""" + axes_array = (ctypes.c_int * len(axes))(*axes) + call_acsc(acs.acsc_EnableM, hcomm, axes_array, wait) + + def commutate( hcomm, axis, @@ -300,7 +350,7 @@ def commutate( ) -def waitCommutated(hcomm, axis, timeout=INFINITE): +def waitCommutated(hcomm, axis: int, timeout=INFINITE): """Wait for commutation to finish. Default timeout is 30 seconds. @@ -310,56 +360,68 @@ def waitCommutated(hcomm, axis, timeout=INFINITE): ) -def disable(hcomm, axis, wait=SYNCHRONOUS): +def disable(hcomm, axis: int, wait=SYNCHRONOUS): + """The function shuts off a motor.""" call_acsc(acs.acsc_Disable, hcomm, int32(axis), wait) -def getRPosition(hcomm, axis, wait=SYNCHRONOUS): +def disableAllMotors(hcomm, wait=SYNCHRONOUS): + """The function shuts off all motors.""" + call_acsc(acs.acsc_DisableAll, hcomm, wait) + + +def disableMotors(hcomm, axes: list, wait=SYNCHRONOUS): + """The function shuts off several specified motors.""" + axes_array = (ctypes.c_int * len(axes))(*axes) + call_acsc(acs.acsc_DisableM, hcomm, axes_array, wait) + + +def getRPosition(hcomm, axis: int, wait=SYNCHRONOUS): pos = double() call_acsc(acs.acsc_GetRPosition, hcomm, axis, p(pos), wait) return pos.value -def getFPosition(hcomm, axis, wait=SYNCHRONOUS): +def getFPosition(hcomm, axis: int, wait=SYNCHRONOUS): pos = double() call_acsc(acs.acsc_GetFPosition, hcomm, axis, byref(pos), wait) return pos.value -def getRVelocity(hcomm, axis, wait=SYNCHRONOUS): +def getRVelocity(hcomm, axis: int, wait=SYNCHRONOUS): rvel = double() call_acsc(acs.acsc_GetRVelocity, hcomm, axis, byref(rvel), wait) return rvel.value -def getFVelocity(hcomm, axis, wait=SYNCHRONOUS): +def getFVelocity(hcomm, axis: int, wait=SYNCHRONOUS): vel = double() call_acsc(acs.acsc_GetFVelocity, hcomm, axis, byref(vel), wait) return vel.value -def getVelocity(hcomm, axis, wait=SYNCHRONOUS): +def getVelocity(hcomm, axis: int, wait=SYNCHRONOUS): """Returns current velocity for specified axis.""" vel = double() call_acsc(acs.acsc_GetVelocity, hcomm, axis, byref(vel), wait) return vel.value -def getAcceleration(hcomm, axis, wait=SYNCHRONOUS): +def getAcceleration(hcomm, axis: int, wait=SYNCHRONOUS): """Returns current acceleration for specified axis.""" val = double() call_acsc(acs.acsc_GetAcceleration, hcomm, axis, byref(val), wait) return val.value -def getDeceleration(hcomm, axis, wait=SYNCHRONOUS): +def getDeceleration(hcomm, axis: int, wait=SYNCHRONOUS): """Returns current deceleration for specified axis.""" val = double() call_acsc(acs.acsc_GetDeceleration, hcomm, axis, byref(val), wait) return val.value -def getKillDeceleration(hcomm, axis, wait=SYNCHRONOUS): +def getKillDeceleration(hcomm, axis: int, wait=SYNCHRONOUS): """Returns current deceleration for specified axis.""" val = double() call_acsc(acs.acsc_GetKillDeceleration, hcomm, axis, byref(val), wait) @@ -403,7 +465,7 @@ def getProgramState(hc, nbuf, wait=SYNCHRONOUS): return state.value -def halt(hcomm, axis, wait=SYNCHRONOUS): +def halt(hcomm, axis: int, wait=SYNCHRONOUS): """Halts motion on specified axis.""" call_acsc(acs.acsc_Halt, hcomm, axis, wait) @@ -467,20 +529,20 @@ def writeInteger( ) -def readMflag(hcomm, axis, flag_nm): +def readMflag(hcomm, axis: int, flag_nm): """read a Mflag. For definition refer to ax_mflags at the top""" allFlags = readInteger(hcomm, None, "MFLAGS", axis, axis + 1) return bool(((1 << ax_mflags[flag_nm]) & allFlags)) -def setMflag(hcomm, axis, flag_nm): +def setMflag(hcomm, axis: int, flag_nm): """Set a Mflag. For definition refer to ax_mflags at the top""" allFlags = readInteger(hcomm, None, "MFLAGS", axis, axis + 1) allFlags |= 2 ** (ax_mflags[flag_nm]) writeInteger(hcomm, "MFLAGS", allFlags) -def clearMflag(hcomm, axis, flag_nm): +def clearMflag(hcomm, axis: int, flag_nm): """Clear a Mflag. For definition refer to ax_mflags at the top""" allFlags = readInteger(hcomm, None, "MFLAGS", axis, axis + 1) allFlags &= ~(2 ** (ax_mflags[flag_nm])) @@ -627,17 +689,21 @@ def compileBuffer(hcomm, buffnumber, wait=SYNCHRONOUS): call_acsc(acs.acsc_CompileBuffer, hcomm, buffnumber, wait) -def spline(hcomm, flags, axis, period, wait=SYNCHRONOUS): +def spline(hcomm, flags: int, axis: int, period: float, wait=SYNCHRONOUS): call_acsc(acs.acsc_Spline, hcomm, flags, axis, double(period), wait) -def addPVPoint(hcomm, axis, point, velocity, wait=SYNCHRONOUS): +def addPVPoint( + hcomm, axis: int, point: float, velocity: float, wait=SYNCHRONOUS +): call_acsc( acs.acsc_AddPVPoint, hcomm, axis, double(point), double(velocity), wait ) -def addPVTPoint(hcomm, axis, point, velocity, dt, wait=SYNCHRONOUS): +def addPVTPoint( + hcomm, axis: int, point: float, velocity: float, dt, wait=SYNCHRONOUS +): call_acsc( acs.acsc_AddPVTPoint, hcomm, @@ -649,36 +715,36 @@ def addPVTPoint(hcomm, axis, point, velocity, dt, wait=SYNCHRONOUS): ) -def multiPoint(hcomm, flags, axis, dwell, wait=SYNCHRONOUS): +def multiPoint(hcomm, flags: int, axis: int, dwell: float, wait=SYNCHRONOUS): call_acsc(acs.acsc_MultiPoint, hcomm, flags, axis, double(dwell), wait) -def addPoint(hcomm, axis, point, wait=SYNCHRONOUS): +def addPoint(hcomm, axis: int, point: float, wait=SYNCHRONOUS): call_acsc(acs.acsc_AddPoint, hcomm, axis, double(point), wait) -def extAddPoint(hcomm, axis, point, rate, wait=SYNCHRONOUS): +def extAddPoint(hcomm, axis: int, point: float, rate: float, wait=SYNCHRONOUS): call_acsc( acs.acsc_ExtAddPoint, hcomm, axis, double(point), double(rate), wait ) -def endSequence(hcomm, axis, wait=SYNCHRONOUS): +def endSequence(hcomm, axis: int, wait=SYNCHRONOUS): call_acsc(acs.acsc_EndSequence, hcomm, axis, wait) -def go(hcomm, axis, wait=SYNCHRONOUS): +def go(hcomm, axis: int, wait=SYNCHRONOUS): call_acsc(acs.acsc_Go, hcomm, axis, wait) -def getOutput(hcomm, port, bit, wait=SYNCHRONOUS): +def getOutput(hcomm, port: int, bit: int, wait=SYNCHRONOUS): """Returns the value of a digital output.""" val = int32() call_acsc(acs.acsc_GetOutput, hcomm, port, bit, byref(val), wait) return val.value -def setOutput(hcomm, port, bit, val, wait=SYNCHRONOUS): +def setOutput(hcomm, port: int, bit: int, val: int, wait=SYNCHRONOUS): """Sets the value of a digital output.""" call_acsc(acs.acsc_SetOutput, hcomm, port, bit, val, wait) @@ -705,6 +771,11 @@ def call_acsc(func, *args, **kwargs): return rv +def cancelOperation(hcomm, wait=SYNCHRONOUS): + """Cancels all of the waiting and non-waiting calls.""" + call_acsc(acs.acsc_CancelOperation, hcomm, wait) + + if __name__ == "__main__": """Some testing can go here""" hc = openCommEthernetTCP() diff --git a/acspy/control.py b/acspy/control.py index f1aa70e..b4ca8f2 100644 --- a/acspy/control.py +++ b/acspy/control.py @@ -6,19 +6,20 @@ from __future__ import division, print_function from acspy import acsc + class Controller(object): def __init__(self, contype="simulator", n_axes=8): self.contype = contype self.axes = [] for n in range(n_axes): self.axes.append(Axis(self, n)) - + def connect(self, address="10.0.0.100", port=701): if self.contype == "simulator": self.hc = acsc.openCommDirect() elif self.contype == "ethernet": self.hc = acsc.openCommEthernetTCP(address=address, port=port) - + def enable_all(self, wait=acsc.SYNCHRONOUS): """Enables all axes.""" for a in self.axes: @@ -28,10 +29,10 @@ def disable_all(self, wait=acsc.SYNCHRONOUS): """Disables all axes.""" for a in self.axes: a.disable() - + def disconnect(self): acsc.closeComm(self.hc) - + class Axis(object): def __init__(self, controller, axisno, name=None): @@ -42,13 +43,13 @@ def __init__(self, controller, axisno, name=None): self.axisno = axisno if name: controller.axisdefs[name] = axisno - + def enable(self, wait=acsc.SYNCHRONOUS): acsc.enable(self.controller.hc, self.axisno, wait) def disable(self, wait=acsc.SYNCHRONOUS): acsc.disable(self.controller.hc, self.axisno, wait) - + def ptp(self, target, coordinates="absolute", wait=acsc.SYNCHRONOUS): """Performs a point to point move in either relative or absolute (default) coordinates.""" @@ -57,44 +58,44 @@ def ptp(self, target, coordinates="absolute", wait=acsc.SYNCHRONOUS): else: flags = None acsc.toPoint(self.controller.hc, flags, self.axisno, target, wait) - + def ptpr(self, distance, wait=acsc.SYNCHRONOUS): """Performance a point to point move in relative coordinates.""" self.ptp(distance, coordinates="relative", wait=wait) - + @property def axis_state(self): """Returns axis state dict.""" return acsc.getAxisState(self.controller.hc, self.axisno) - + @property def motor_state(self): """Returns motor state dict.""" return acsc.getMotorState(self.controller.hc, self.axisno) - + @property def moving(self): return self.motor_state["moving"] - + @property def enabled(self): return self.motor_state["enabled"] - + @enabled.setter def enabled(self, choice): if choice == True: self.enable() elif choice == False: self.disable() - + @property def in_position(self): return self.motor_state["in position"] - + @property def accelerating(self): return self.motor_state["accelerating"] - + @property def rpos(self): return acsc.getRPosition(self.controller.hc, self.axisno) @@ -102,7 +103,7 @@ def rpos(self): @property def fpos(self): return acsc.getFPosition(self.controller.hc, self.axisno) - + @property def rvel(self): return acsc.getRVelocity(self.controller.hc, self.axisno) @@ -110,28 +111,30 @@ def rvel(self): @property def fvel(self): return acsc.getFVelocity(self.controller.hc, self.axisno) - + @property def vel(self): return acsc.getVelocity(self.controller.hc, self.axisno) + @vel.setter def vel(self, velocity): """Sets axis velocity.""" acsc.setVelocity(self.controller.hc, self.axisno, velocity) - + @property def acc(self): return acsc.getAcceleration(self.controller.hc, self.axisno) + @acc.setter def acc(self, accel): """Sets axis velocity.""" acsc.setAcceleration(self.controller.hc, self.axisno, accel) - + @property def dec(self): return acsc.getDeceleration(self.controller.hc, self.axisno) + @dec.setter def dec(self, decel): """Sets axis velocity.""" acsc.setDeceleration(self.controller.hc, self.axisno, decel) -