From 88dfaebaf5c6e0000fb6fbe27d8104eb498c0a06 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Thu, 17 Oct 2024 14:33:50 +0100 Subject: [PATCH] Ruff/pyright --- utils/build_architectures.py | 2 +- utils/calibration_utils.py | 14 +- utils/device_launcher.py | 15 +- utils/emulator_exceptions.py | 4 +- utils/emulator_launcher.py | 448 ++++++++++++++++++++++++----------- utils/formatters.py | 4 +- utils/free_ports.py | 6 +- utils/ioc_launcher.py | 223 ++++++++++------- utils/log_file.py | 10 +- utils/testing.py | 159 ++++++++----- 10 files changed, 574 insertions(+), 311 deletions(-) diff --git a/utils/build_architectures.py b/utils/build_architectures.py index f54d3fc1a..4bbf346f2 100644 --- a/utils/build_architectures.py +++ b/utils/build_architectures.py @@ -14,7 +14,7 @@ class BuildArchitectures(Enum): _32BIT = 2 @staticmethod - def archname(arch): + def archname(arch: str) -> str: """ Returns: nice name of architecture """ diff --git a/utils/calibration_utils.py b/utils/calibration_utils.py index 73d154b88..ad8035cc7 100644 --- a/utils/calibration_utils.py +++ b/utils/calibration_utils.py @@ -1,10 +1,14 @@ import time +import typing from contextlib import contextmanager CAL_SEL_PV = "CAL:SEL" +if typing.TYPE_CHECKING: + from utils.channel_access import ChannelAccess -def set_calibration_file(channel_access, filename, prefix=""): + +def set_calibration_file(channel_access: "ChannelAccess", filename: str, prefix: str = "") -> None: """ Sets a calibration file. Retries if it didn't set properly first time. Args: @@ -29,12 +33,16 @@ def set_calibration_file(channel_access, filename, prefix=""): ) -def reset_calibration_file(channel_access, default_file="None.txt", prefix=""): +def reset_calibration_file( + channel_access: "ChannelAccess", default_file: str = "None.txt", prefix: str = "" +) -> None: set_calibration_file(channel_access, default_file, prefix) @contextmanager -def use_calibration_file(channel_access, filename, default_file="None.txt", prefix=""): +def use_calibration_file( + channel_access: "ChannelAccess", filename: str, default_file: str = "None.txt", prefix: str = "" +) -> typing.Generator[None, None, None]: set_calibration_file(channel_access, filename, prefix) try: yield diff --git a/utils/device_launcher.py b/utils/device_launcher.py index 5c015080e..3cb5296f3 100644 --- a/utils/device_launcher.py +++ b/utils/device_launcher.py @@ -1,13 +1,9 @@ -from contextlib import contextmanager - -try: - from contextlib import ExitStack # PY3 -except ImportError: - from contextlib2 import ExitStack # PY2 +from contextlib import ExitStack, contextmanager +from typing import ContextManager, Generator @contextmanager -def device_launcher(ioc, lewis): +def device_launcher(ioc: ContextManager, lewis: ContextManager) -> Generator[None, None, None]: """ Context manager that launches an ioc and emulator pair :param ioc: the ioc launcher @@ -22,10 +18,11 @@ def device_launcher(ioc, lewis): @contextmanager -def device_collection_launcher(devices): +def device_collection_launcher(devices: list[ContextManager]) -> Generator[None, None, None]: """ Context manager that launches a list of devices - :param devices: list of context managers representing the devices to launch (see device_launcher above) + :param devices: list of context managers representing the devices to launch + (see device_launcher above) """ with ExitStack() as stack: for device in devices: diff --git a/utils/emulator_exceptions.py b/utils/emulator_exceptions.py index 34c7c082c..6f181b666 100644 --- a/utils/emulator_exceptions.py +++ b/utils/emulator_exceptions.py @@ -1,9 +1,9 @@ -class UnableToConnectToEmulatorException(IOError): +class UnableToConnectToEmulatorException(IOError): # noqa: N818 (historic name) """ The system is unable to connect to the emulator for some reason. """ - def __init__(self, emulator_name, err): + def __init__(self, emulator_name: str, err: str | BaseException) -> None: super(UnableToConnectToEmulatorException, self).__init__( "Unable to connect to Emnulator {0}: {1}".format(emulator_name, err) ) diff --git a/utils/emulator_launcher.py b/utils/emulator_launcher.py index 73814be37..b5750df72 100644 --- a/utils/emulator_launcher.py +++ b/utils/emulator_launcher.py @@ -11,7 +11,8 @@ from datetime import datetime from functools import partial from time import sleep, time -from typing import Any, Dict, List +from types import TracebackType +from typing import Any, Callable, Dict, Generator, List, Self, Type, TypeAlias, TypeVar import psutil @@ -26,6 +27,11 @@ DEFAULT_PY_PATH = os.path.join("C:\\", "Instrument", "Apps", "Python3") +EmulatorValue: TypeAlias = int | float | str | bool + +T = TypeVar("T") + + class EmulatorRegister(object): """ A way of registering running emulators. @@ -35,7 +41,7 @@ class EmulatorRegister(object): RunningEmulators = {} @classmethod - def get_running(cls, name): + def get_running(cls, name: str) -> "EmulatorLauncher | MultiLewisLauncher | None": """ Get a running emulator by name, return None if not running. @@ -45,7 +51,7 @@ def get_running(cls, name): return cls.RunningEmulators.get(name) @classmethod - def add_emulator(cls, name, emulator): + def add_emulator(cls, name: str, emulator: "EmulatorLauncher | MultiLewisLauncher") -> None: """ Add a emulator to the running list. @@ -55,7 +61,7 @@ def add_emulator(cls, name, emulator): cls.RunningEmulators[name] = emulator @classmethod - def remove_emulator(cls, name): + def remove_emulator(cls, name: str) -> None: """ Removes an emulator from the running list. @@ -65,7 +71,15 @@ def remove_emulator(cls, name): class EmulatorLauncher(object, metaclass=abc.ABCMeta): - def __init__(self, test_name, device, emulator_path, var_dir, port, options): + def __init__( + self, + test_name: str, + device: str, + emulator_path: str, + var_dir: str, + port: int, + options: dict[str, Any], + ) -> None: """ Args: test_name: The name of the test we are creating a device emulator for @@ -83,36 +97,43 @@ def __init__(self, test_name, device, emulator_path, var_dir, port, options): self._test_name = test_name self._emulator_path = emulator_path - def __enter__(self): + def __enter__(self) -> Self: self._open() EmulatorRegister.add_emulator(self._emulator_id, self) return self - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__( + self, + exc_type: Type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: self._close() EmulatorRegister.remove_emulator(self._emulator_id) - def _get_device(self): + def _get_device(self) -> str: return self._device - def _get_var_dir(self): + def _get_var_dir(self) -> str: return self._var_dir @abc.abstractmethod - def _close(self): + def _close(self) -> None: """ Close the emulator. This should perform any cleanup required to kill the emulator. """ @abc.abstractmethod - def _open(self): + def _open(self) -> None: """ - Open the emulator. This should spawn the emulator process and return once the emulator is ready to - accept commands. + Open the emulator. This should spawn the emulator process and return once the emulator is + ready to accept commands. """ @abc.abstractmethod - def backdoor_get_from_device(self, variable, *args, **kwargs): + def backdoor_get_from_device( + self, variable: str, *args: list[Any], **kwargs: dict[str, Any] + ) -> str: """ Get a value from the device via the back door. @@ -126,7 +147,13 @@ def backdoor_get_from_device(self, variable, *args, **kwargs): """ @abc.abstractmethod - def backdoor_set_on_device(self, variable, value, *args, **kwargs): + def backdoor_set_on_device( + self, + variable: str, + value: EmulatorValue, + *args: list[Any], + **kwargs: dict[str, Any], + ) -> None: """ Set a value from the device via the back door. @@ -141,7 +168,9 @@ def backdoor_set_on_device(self, variable, value, *args, **kwargs): """ @abc.abstractmethod - def backdoor_emulator_disconnect_device(self, *args, **kwargs): + def backdoor_emulator_disconnect_device( + self, *args: list[Any], **kwargs: dict[str, Any] + ) -> None: """ Disconnect the device via the back door. @@ -154,7 +183,7 @@ def backdoor_emulator_disconnect_device(self, *args, **kwargs): """ @abc.abstractmethod - def backdoor_emulator_connect_device(self, *args, **kwargs): + def backdoor_emulator_connect_device(self, *args: list[Any], **kwargs: dict[str, Any]) -> None: """ Connect the device via the back door. @@ -167,7 +196,9 @@ def backdoor_emulator_connect_device(self, *args, **kwargs): """ @abc.abstractmethod - def backdoor_run_function_on_device(self, *args, **kwargs): + def backdoor_run_function_on_device( + self, function_name: str, arguments: list[Any] | None + ) -> list[bytes]: """ Runs a function on an emulator via the backdoor. @@ -179,7 +210,17 @@ def backdoor_run_function_on_device(self, *args, **kwargs): Nothing. """ - def backdoor_set_and_assert_set(self, variable, value, *args, **kwargs): + def backdoor_command(self, command: list[str]) -> list[bytes]: + """ + Runs a string command on an emulator via the backdoor. + + Precisely how this string is interpreted is dependent on the emulator launcher in use. + """ + raise NotImplementedError("This emulator launcher does not override backdoor_command.") + + def backdoor_set_and_assert_set( + self, variable: str, value: EmulatorValue, *args: list[Any], **kwargs: dict[str, Any] + ) -> None: """ Sets a value on the emulator via the backdoor and gets it back to assert it's been set @@ -196,15 +237,21 @@ def backdoor_set_and_assert_set(self, variable, value, *args, **kwargs): self.assert_that_emulator_value_is(variable, str(value)) def assert_that_emulator_value_is( - self, emulator_property, expected_value, timeout=None, message=None, cast=lambda val: val - ): + self, + emulator_property: str, + expected_value: T, + timeout: float | None = None, + message: str | None = None, + cast: Callable[[str], T] = lambda val: val, + ) -> None: """ - Assert that the emulator property has the expected value or that it becomes the expected value within the - timeout. + Assert that the emulator property has the expected value or that it becomes the expected + value within the timeout. Args: emulator_property (string): emulator property to check - expected_value: expected value. Emulator backdoor always returns a string, so the value should be a string. + expected_value: expected value. Emulator backdoor always returns a string, so the value + should be a string. timeout (float): if it hasn't changed within this time raise assertion error message (string): Extra message to print cast (callable): function which casts the returned value to an appropriate type before @@ -214,22 +261,28 @@ def assert_that_emulator_value_is( """ if message is None: - message = "Expected emulator to have value {}.".format(format_value(expected_value)) + message = f"Expected emulator to have value {format_value(expected_value)}." return self.assert_that_emulator_value_causes_func_to_return_true( emulator_property, lambda val: cast(val) == expected_value, timeout=timeout, msg=message ) def assert_that_emulator_value_is_not( - self, emulator_property, value, timeout=None, message=None, cast=lambda val: val - ): + self, + emulator_property: str, + value: T, + timeout: float | None = None, + message: str | None = None, + cast: Callable[[str], T] = lambda val: val, + ) -> None: """ - Assert that the emulator property does not have the passed value and that it does not become the passed value - within the timeout. + Assert that the emulator property does not have the passed value and that it does not become + the passed value within the timeout. Args: emulator_property (string): emulator property to check - value: value to check against. Emulator backdoor always returns a string, so the value should be a string. + value: value to check against. Emulator backdoor always returns a string, so the value + should be a string. timeout (float): if it hasn't changed within this time raise assertion error message (string): Extra message to print cast (callable): function which casts the returned value to an appropriate type before @@ -240,37 +293,46 @@ def assert_that_emulator_value_is_not( """ if message is None: - message = "Expected PV to *not* have value {}.".format(format_value(value)) + message = f"Expected PV to *not* have value {format_value(value)}." return self.assert_that_emulator_value_causes_func_to_return_false( emulator_property, lambda val: cast(val) == value, timeout=timeout, msg=message ) def assert_that_emulator_value_causes_func_to_return_true( - self, emulator_property, func, timeout=None, msg=None - ): + self, + emulator_property: str, + func: Callable[[str], bool], + timeout: float | None = None, + msg: str | None = None, + ) -> None: """ Check that an emulator property satisfies a given function within some timeout. Args: emulator_property (string): emulator property to check - func: a function that takes one argument, the emulator property value, and returns True if the value is - valid. + func: a function that takes one argument, the emulator property value, and returns True + if the value is valid. timeout: time to wait for the emulator to satisfy the function msg: custom message to print on failure Raises: AssertionError: If the function does not evaluate to true within the given timeout """ - def wrapper(msg): + def wrapper(msg: str) -> str | None: value = self.backdoor_get_from_device(emulator_property) try: return_value = func(value) except Exception as e: + try: + exception_message = e.message # type: ignore + except AttributeError: + exception_message = "" + return ( "Exception was thrown while evaluating function '{}' on emulator property {}. " "Exception was: {} {}".format( - func.__name__, format_value(value), e.__class__.__name__, e.message + func.__name__, format_value(value), e.__class__.__name__, exception_message ) ) if return_value: @@ -283,9 +345,9 @@ def wrapper(msg): ) if msg is None: - msg = "Expected function '{}' to evaluate to True when reading emulator property '{}'.".format( - func.__name__, emulator_property - ) + msg = ( + "Expected function '{}' to evaluate to True " "when reading emulator property '{}'." + ).format(func.__name__, emulator_property) err = self._wait_for_emulator_lambda(partial(wrapper, msg), timeout) @@ -293,30 +355,38 @@ def wrapper(msg): raise AssertionError(err) def assert_that_emulator_value_causes_func_to_return_false( - self, emulator_property, func, timeout=None, msg=None - ): + self, + emulator_property: str, + func: Callable[[str], bool], + timeout: float | None = None, + msg: str | None = None, + ) -> None: """ Check that an emulator property does not satisfy a given function within some timeout. Args: emulator_property (string): emulator property to check - func: a function that takes one argument, the emulator property value, and returns True if the value is - valid (i.e. *not* the value we're checking). + func: a function that takes one argument, the emulator property value, and returns True + if the value is valid (i.e. *not* the value we're checking). timeout: time to wait for the PV to satisfy the function msg: custom message to print on failure Raises: AssertionError: If the function does not evaluate to false within the given timeout """ - def wrapper(msg): + def wrapper(msg: str) -> str | None: value = self.backdoor_get_from_device(emulator_property) try: return_value = func(value) except Exception as e: + try: + exception_message = e.message # type: ignore + except AttributeError: + exception_message = "" return ( "Exception was thrown while evaluating function '{}' on emulator property {}. " "Exception was: {} {}".format( - func.__name__, format_value(value), e.__class__.__name__, e.message + func.__name__, format_value(value), e.__class__.__name__, exception_message ) ) if return_value: @@ -329,18 +399,22 @@ def wrapper(msg): return None if msg is None: - msg = "Expected function '{}' to evaluate to False when reading emulator property '{}'.".format( - func.__name__, emulator_property - ) + msg = ( + "Expected function '{}' to evaluate to False " + "when reading emulator property '{}'." + ).format(func.__name__, emulator_property) err = self._wait_for_emulator_lambda(partial(wrapper, msg), timeout) if err is not None: raise AssertionError(err) - def _wait_for_emulator_lambda(self, wait_for_lambda, timeout): + def _wait_for_emulator_lambda( + self, wait_for_lambda: Callable[[], T], timeout: float | None + ) -> T: """ - Wait for a lambda containing a emulator property to become None; return value or timeout and return actual value. + Wait for a lambda containing a emulator property to become None; return value or timeout and + return actual value. Args: wait_for_lambda: lambda we expect to be None @@ -348,10 +422,11 @@ def _wait_for_emulator_lambda(self, wait_for_lambda, timeout): Returns: final value of lambda """ - start_time = time() - current_time = start_time + start_time: float = time() + current_time: float = start_time if timeout is None: + assert isinstance(self, LewisLauncher), "No default timeout for non-lewis launchers" timeout = self._default_timeout while current_time - start_time < timeout: @@ -369,15 +444,15 @@ def _wait_for_emulator_lambda(self, wait_for_lambda, timeout): return wait_for_lambda() def assert_that_emulator_value_is_greater_than( - self, emulator_property, min_value, timeout=None - ): + self, emulator_property: str, min_value: float, timeout: float | None = None + ) -> None: """ Assert that an emulator property has a value greater than the expected value. Args: emulator_property (string): Name of the numerical emulator property. - min_value (float): Minimum value (inclusive).Emulator backdoor always returns a string, so the value - should be a string. + min_value (float): Minimum value (inclusive).Emulator backdoor always returns a string, + so the value should be a string. timeout: if it hasn't changed within this time raise assertion error Raises: AssertionError: if value does not become requested value @@ -393,7 +468,9 @@ def assert_that_emulator_value_is_greater_than( ) @contextlib.contextmanager - def backdoor_simulate_disconnected_device(self, emulator_property="connected"): + def backdoor_simulate_disconnected_device( + self, emulator_property: str = "connected" + ) -> Generator[None, None, None]: """ Simulate device disconnection """ @@ -409,26 +486,34 @@ class NullEmulatorLauncher(EmulatorLauncher): A null emulator launcher that does nothing. """ - def _open(self): + def _open(self) -> None: pass - def _close(self): + def _close(self) -> None: pass - def backdoor_get_from_device(self, variable, *args, **kwargs): - return None + def backdoor_get_from_device( + self, variable: str, *args: list[Any], **kwargs: dict[str, Any] + ) -> str: + return "" - def backdoor_set_on_device(self, variable, value, *args, **kwargs): + def backdoor_set_on_device( + self, variable: str, value: EmulatorValue, *args: list[Any], **kwargs: dict[str, Any] + ) -> None: pass - def backdoor_emulator_disconnect_device(self, *args, **kwargs): + def backdoor_emulator_disconnect_device( + self, *args: list[Any], **kwargs: dict[str, Any] + ) -> None: pass - def backdoor_emulator_connect_device(self, *args, **kwargs): + def backdoor_emulator_connect_device(self, *args: list[Any], **kwargs: dict[str, Any]) -> None: pass - def backdoor_run_function_on_device(self, *args, **kwargs): - pass + def backdoor_run_function_on_device( + self, function_name: str, arguments: list[Any] | None + ) -> list[bytes]: + return [] @dataclass @@ -462,7 +547,15 @@ class LewisLauncher(EmulatorLauncher): _DEFAULT_LEWIS_PATH = os.path.join(DEFAULT_PY_PATH, "scripts") - def __init__(self, test_name, device, emulator_path, var_dir, port, options): + def __init__( + self, + test_name: str, + device: str, + emulator_path: str, + var_dir: str, + port: int, + options: dict[str, Any], + ) -> None: """ Constructor that also launches Lewis. @@ -477,20 +570,22 @@ def __init__(self, test_name, device, emulator_path, var_dir, port, options): test_name, device, emulator_path, var_dir, port, options ) - self._lewis_path = options.get("lewis_path", LewisLauncher._DEFAULT_LEWIS_PATH) - self._python_path = options.get("python_path", os.path.join(DEFAULT_PY_PATH, "python.exe")) - self._lewis_protocol = options.get("lewis_protocol", "stream") - self._lewis_additional_path = options.get("lewis_additional_path", emulator_path) - self._lewis_package = options.get("lewis_package", "lewis_emulators") - self._default_timeout = options.get("default_timeout", 5) - self._speed = options.get("speed", 100) + self._lewis_path: str = options.get("lewis_path", LewisLauncher._DEFAULT_LEWIS_PATH) + self._python_path: str = options.get( + "python_path", os.path.join(DEFAULT_PY_PATH, "python.exe") + ) + self._lewis_protocol: str = options.get("lewis_protocol", "stream") + self._lewis_additional_path: str = options.get("lewis_additional_path", emulator_path) + self._lewis_package: str = options.get("lewis_package", "lewis_emulators") + self._default_timeout: float = options.get("default_timeout", 5) + self._speed: float = options.get("speed", 100) self._process = None self._logFile = None self._connected = None @classmethod - def from_emulator(cls, test_name, emulator: Emulator): + def from_emulator(cls, test_name: str, emulator: Emulator) -> Self: """ Constructor that also launches Lewis. @@ -498,9 +593,11 @@ def from_emulator(cls, test_name, emulator: Emulator): test_name: name of test we are creating device emulator for emulator: Information to launch the emulator with """ - return cls(test_name, emulator.device, emulator.var_dir, emulator.port, emulator.options) + return cls( + test_name, emulator.device, "", emulator.var_dir, emulator.port, emulator.options + ) - def _close(self): + def _close(self) -> None: """ Closes the Lewis session by killing the process. """ @@ -511,7 +608,7 @@ def _close(self): self._logFile.close() print("Lewis log written to {0}".format(self._log_filename())) - def _open(self): + def _open(self) -> None: """ Start the lewis emulator. @@ -557,29 +654,31 @@ def _open(self): ) self._connected = True - def _log_filename(self): + def _log_filename(self) -> str: return log_filename( self._test_name, "lewis", self._emulator_id, TestModes.DEVSIM, self._var_dir ) - def check(self): + def check(self) -> bool: """ Check that the lewis emulator is running. :return: True if it is running; False otherwise """ - if self._process.poll() is None: + proc = self._process + assert proc is not None + if proc.poll() is None: return True print("Lewis has terminated! It said:") - stdoutdata, stderrdata = self._process.communicate() - sys.stderr.write(stderrdata) - sys.stdout.write(stdoutdata) + stdoutdata, stderrdata = proc.communicate() + sys.stderr.write(stderrdata.decode("utf-8")) + sys.stdout.write(stdoutdata.decode("utf-8")) return False - def _convert_to_string_for_backdoor(self, value): + def _convert_to_string_for_backdoor(self, value: EmulatorValue) -> str: """ - Convert the value given to a string for the backdoor. If the type is a string suround with quotes otherwise - pass it raw, e.g. for a number. + Convert the value given to a string for the backdoor. If the type is a string suround with + quotes otherwise pass it raw, e.g. for a number. Args: value: value to convert @@ -588,25 +687,31 @@ def _convert_to_string_for_backdoor(self, value): """ return "'{}'".format(value) if isinstance(value, str) else str(value) - def backdoor_set_on_device(self, variable_name, value, *_, **__): + def backdoor_set_on_device( + self, variable: str, value: EmulatorValue, *args: list[Any], **kwargs: dict[str, Any] + ) -> None: """ Set a value in the device using the lewis backdoor. - :param variable_name: name of variable to set + :param variable: name of variable to set :param value: new value it should have :return: """ self.backdoor_command( - ["device", str(variable_name), self._convert_to_string_for_backdoor(value)] + ["device", str(variable), self._convert_to_string_for_backdoor(value)] ) - def backdoor_run_function_on_device(self, function_name, arguments=None): + def backdoor_run_function_on_device( + self, + function_name: str, + arguments: list[Any] | None = None, + ) -> list[bytes]: """ Run a function in lewis using the back door on a device. :param function_name: name of the function to call - :param arguments: an iterable of the arguments for the function; None means no arguments. Arguments will - automatically be turned into json + :param arguments: an iterable of the arguments for the function; None means no arguments. + Arguments will automatically be turned into json :return: """ command = ["device", function_name] @@ -617,11 +722,11 @@ def backdoor_run_function_on_device(self, function_name, arguments=None): return self.backdoor_command(command) - def backdoor_command(self, lewis_command): + def backdoor_command(self, command: list[str]) -> list[bytes]: """ Send a command to the backdoor of lewis. - :param lewis_command: array of command line arguments to send + :param command: array of command line arguments to send :return: lines from the command output """ lewis_command_line = [ @@ -629,12 +734,14 @@ def backdoor_command(self, lewis_command): "-r", "127.0.0.1:{control_port}".format(control_port=self._control_port), ] - lewis_command_line.extend(lewis_command) + lewis_command_line.extend(command) time_stamp = datetime.fromtimestamp(time()).strftime("%Y-%m-%d %H:%M:%S") - self._logFile.write( + log_file = self._logFile + assert log_file is not None + log_file.write( "{0}: lewis backdoor command: {1}\n".format(time_stamp, " ".join(lewis_command_line)) ) - self._logFile.flush() + log_file.flush() try: p = subprocess.Popen( lewis_command_line, stderr=subprocess.STDOUT, stdout=subprocess.PIPE @@ -647,26 +754,28 @@ def backdoor_command(self, lewis_command): else: p.terminate() print(f"Lewis backdoor command {lewis_command_line} did not finish!") - self._logFile.write(f"Lewis backdoor command {lewis_command_line} did not finish!") - self._logFile.flush() + log_file.write(f"Lewis backdoor command {lewis_command_line} did not finish!") + log_file.flush() - output = [line for line in p.stdout] + out = p.stdout + assert out is not None + output = [line for line in out] for line in output: if b"failed to create process" in line.lower(): - raise IOError( - f"Failed to spawn lewis-control.exe for backdoor set {lewis_command}." - ) + raise IOError(f"Failed to spawn lewis-control.exe for backdoor set {command}.") return [line.strip() for line in output] except subprocess.CalledProcessError as ex: - for loc in [sys.stderr, self._logFile]: + for loc in [sys.stderr, log_file]: loc.write(f"Error using backdoor: {ex.output}\n") loc.write(f"Error code {ex.returncode}\n") - self._logFile.flush() + log_file.flush() raise ex - def backdoor_emulator_disconnect_device(self): + def backdoor_emulator_disconnect_device( + self, *args: list[Any], **kwargs: dict[str, Any] + ) -> None: """ Disconnect the emulated device. @@ -676,7 +785,7 @@ def backdoor_emulator_disconnect_device(self): self.backdoor_command(["simulation", "disconnect_device"]) self._connected = False - def backdoor_emulator_connect_device(self): + def backdoor_emulator_connect_device(self, *args: list[Any], **kwargs: dict[str, Any]) -> None: """ Connect the emulated device. @@ -686,16 +795,16 @@ def backdoor_emulator_connect_device(self): self.backdoor_command(["simulation", "connect_device"]) self._connected = True - def backdoor_get_from_device(self, variable_name, *_, **__): + def backdoor_get_from_device( + self, variable: str, *args: list[Any], **kwargs: dict[str, Any] + ) -> str: """ Return the string of a value on a device from lewis. - :param variable_name: name of the variable + :param variable: name of the variable :return: the variables value """ # backdoor_command returns a list of bytes and join takes str so convert them here - return "".join( - i.decode("utf-8") for i in self.backdoor_command(["device", str(variable_name)]) - ) + return "".join(i.decode("utf-8") for i in self.backdoor_command(["device", str(variable)])) class MultiLewisLauncher(object): @@ -703,47 +812,62 @@ class MultiLewisLauncher(object): Launch multiple lewis emulators. """ - def __init__(self, test_name: str, emulators: List[Emulator]): + def __init__(self, test_name: str, emulators: List[Emulator]) -> None: self.test_name: str = test_name self.emulator_launchers: Dict[int, LewisLauncher] = { emulator.launcher_address: LewisLauncher.from_emulator(test_name, emulator) for emulator in emulators } - def __enter__(self): + def __enter__(self) -> Self: self._open() EmulatorRegister.add_emulator(self.test_name, self) return self - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__( + self, + exc_type: Type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: self._close() EmulatorRegister.remove_emulator(self.test_name) - def _close(self): + def _close(self) -> None: """ Stop the lewis emulators. """ for launcher in self.emulator_launchers.values(): launcher._close() - def _open(self): + def _open(self) -> None: """ Start the lewis emulators. """ for launcher in self.emulator_launchers.values(): launcher._open() - def backdoor_get_from_device(self, launcher_address, variable, *_, **__): + def backdoor_get_from_device( + self, launcher_address: int, variable: str, *args: list[Any], **kwargs: dict[str, Any] + ) -> str: """ Get the variable value from the emulator addressed with the given launcher address. - :param launcher_address: The address of the emulator to identify the device we want to get the value from. + :param launcher_address: The address of the emulator to identify the device we want to get + the value from. :param variable: The variable to obtain the value of from the device. :return: The variable's value. """ return self.emulator_launchers[launcher_address].backdoor_get_from_device(variable) - def backdoor_set_on_device(self, launcher_address, variable, value, *_, **__): + def backdoor_set_on_device( + self, + launcher_address: int, + variable: str, + value: EmulatorValue, + *args: list[Any], + **kwargs: dict[str, Any], + ) -> None: """ Set the variable to the given value on the emulator address with the given launcher address. @@ -753,7 +877,7 @@ def backdoor_set_on_device(self, launcher_address, variable, value, *_, **__): """ self.emulator_launchers[launcher_address].backdoor_set_on_device(variable, value) - def backdoor_emulator_disconnect_device(self, launcher_address): + def backdoor_emulator_disconnect_device(self, launcher_address: int) -> None: """ Disconnect the emulator addressed by the given launcher address. @@ -761,7 +885,7 @@ def backdoor_emulator_disconnect_device(self, launcher_address): """ self.emulator_launchers[launcher_address].backdoor_emulator_disconnect_device() - def backdoor_emulator_connect_device(self, launcher_address): + def backdoor_emulator_connect_device(self, launcher_address: int) -> None: """ Connect the emulator addressed by the given launcher address. @@ -769,7 +893,9 @@ def backdoor_emulator_connect_device(self, launcher_address): """ self.emulator_launchers[launcher_address].backdoor_emulator_connect_device() - def backdoor_run_function_on_device(self, launcher_address, function_name, arguments=None): + def backdoor_run_function_on_device( + self, launcher_address: int, function_name: str, arguments: list[Any] | None = None + ) -> list[bytes]: """ Run a function with the given arguments on the emulator addressed by the launcher address. @@ -783,7 +909,15 @@ def backdoor_run_function_on_device(self, launcher_address, function_name, argum class CommandLineEmulatorLauncher(EmulatorLauncher): - def __init__(self, test_name, device, emulator_path, var_dir, port, options): + def __init__( + self, + test_name: str, + device: str, + emulator_path: str, + var_dir: str, + port: int, + options: dict[str, Any], + ) -> None: super(CommandLineEmulatorLauncher, self).__init__( test_name, device, emulator_path, var_dir, port, options ) @@ -791,8 +925,8 @@ def __init__(self, test_name, device, emulator_path, var_dir, port, options): self.command_line = options["emulator_command_line"] except KeyError: raise KeyError( - "To use a command line emulator launcher, the 'emulator_command_line' option must be " - "provided as part of the options dictionary" + "To use a command line emulator launcher, the 'emulator_command_line' option " + "must be provided as part of the options dictionary" ) try: @@ -808,7 +942,7 @@ def __init__(self, test_name, device, emulator_path, var_dir, port, options): self._process = None self._log_file = None - def _open(self): + def _open(self) -> None: self._log_file = open( log_filename( self._test_name, "cmdemulator", self._device, TestModes.RECSIM, self._var_dir @@ -817,7 +951,7 @@ def _open(self): ) self._call_command_line(self.command_line.format(port=self._port)) - def _call_command_line(self, command_line): + def _call_command_line(self, command_line: str | list[str]) -> None: if self._cwd_emulator_path: cwd = self._emulator_path else: @@ -833,14 +967,16 @@ def _call_command_line(self, command_line): if self.wait: self._process.wait() - def _close(self): + def _close(self) -> None: print("Closing commandline emulator.") # We need to catch psutil.NoSuchProcess as it is possible: # * the main process may exit after the children have terminated # and before terminate() can be called by us on it # * terminating one child may lead to another exiting before # we call terminate() ourselves on it - children = self._process.children(recursive=True) + proc = self._process + assert proc is not None + children = proc.children(recursive=True) for child in children: if child is not None: try: @@ -859,30 +995,46 @@ def _close(self): if self._log_file is not None: self._log_file.close() - def backdoor_get_from_device(self, variable, *args, **kwargs): + def backdoor_get_from_device( + self, variable: str, *args: list[Any], **kwargs: dict[str, Any] + ) -> str: raise ValueError("Cannot use backdoor for an arbitrary command line launcher") - def backdoor_set_on_device(self, variable, value, *args, **kwargs): + def backdoor_set_on_device( + self, variable: str, value: EmulatorValue, *args: list[Any], **kwargs: dict[str, Any] + ) -> None: raise ValueError("Cannot use backdoor for an arbitrary command line launcher") - def backdoor_emulator_disconnect_device(self, *args, **kwargs): + def backdoor_emulator_disconnect_device( + self, *args: list[Any], **kwargs: dict[str, Any] + ) -> None: raise ValueError("Cannot use backdoor for an arbitrary command line launcher") - def backdoor_emulator_connect_device(self, *args, **kwargs): + def backdoor_emulator_connect_device(self, *args: list[Any], **kwargs: dict[str, Any]) -> None: raise ValueError("Cannot use backdoor for an arbitrary command line launcher") - def backdoor_run_function_on_device(self, *args, **kwargs): + def backdoor_run_function_on_device( + self, function_name: str, arguments: list[Any] | None + ) -> list[bytes]: raise ValueError("Cannot use backdoor for an arbitrary command line launcher") class BeckhoffEmulatorLauncher(CommandLineEmulatorLauncher): - def __init__(self, test_name, device, emulator_path, var_dir, port, options): + def __init__( + self, + test_name: str, + device: str, + emulator_path: str, + var_dir: str, + port: int, + options: dict[str, Any], + ) -> None: try: self.beckhoff_root = options["beckhoff_root"] except KeyError: raise KeyError( - "To use a beckhoff emulator launcher, the 'beckhoff_root' and 'tpy_file_path' options must" - " be provided as part of the options dictionary" + "To use a beckhoff emulator launcher, the 'beckhoff_root' and 'tpy_file_path' " + "options must be provided as part of the options dictionary" ) run_bat_file = os.path.join(self.beckhoff_root, "run.bat") @@ -900,7 +1052,15 @@ def __init__(self, test_name, device, emulator_path, var_dir, port, options): class DAQMxEmulatorLauncher(CommandLineEmulatorLauncher): - def __init__(self, test_name, device, emulator_path, var_dir, port, options): + def __init__( + self, + test_name: str, + device: str, + emulator_path: str, + var_dir: str, + port: int, + options: dict[str, Any], + ) -> None: labview_scripts_dir = os.path.join(DEVICE_EMULATOR_PATH, "other_emulators", "DAQmx") self.start_command = os.path.join(labview_scripts_dir, "start_sim.bat") self.stop_command = os.path.join(labview_scripts_dir, "stop_sim.bat") @@ -910,12 +1070,12 @@ def __init__(self, test_name, device, emulator_path, var_dir, port, options): test_name, device, emulator_path, var_dir, port, options ) - def _close(self): + def _close(self) -> None: self.disconnect_device() super(DAQMxEmulatorLauncher, self)._close() - def disconnect_device(self): + def disconnect_device(self) -> None: self._call_command_line(self.stop_command) - def reconnect_device(self): + def reconnect_device(self) -> None: self._call_command_line(self.start_command) diff --git a/utils/formatters.py b/utils/formatters.py index 144df469e..9f6f19c50 100644 --- a/utils/formatters.py +++ b/utils/formatters.py @@ -2,8 +2,10 @@ Formatters for data. """ +from typing import Any -def format_value(value): + +def format_value(value: Any) -> str: # noqa: ANN401 """ Formats a value for display. Includes type information to ease debugging. diff --git a/utils/free_ports.py b/utils/free_ports.py index bd3f322d4..c912cf13f 100644 --- a/utils/free_ports.py +++ b/utils/free_ports.py @@ -1,7 +1,7 @@ import socket -def get_free_ports(n): +def get_free_ports(n: int) -> tuple[int, ...]: """ Returns n free port numbers on the current machine. @@ -21,7 +21,7 @@ def get_free_ports(n): return tuple(ports) -def get_free_ports_from_list(n, port_low, port_high): +def get_free_ports_from_list(n: int, port_low: int, port_high: int) -> tuple[int, ...]: """ Return n free ports by testing specified range. @@ -41,7 +41,7 @@ def get_free_ports_from_list(n, port_low, port_high): ports.append(s.getsockname()[1]) socks.append(s) break - except: + except socket.error: pass for s in socks: s.close() diff --git a/utils/ioc_launcher.py b/utils/ioc_launcher.py index 69ee96edc..6f3521e06 100644 --- a/utils/ioc_launcher.py +++ b/utils/ioc_launcher.py @@ -2,6 +2,7 @@ Code that launches an IOC/application under test """ +import abc import os import subprocess import telnetlib @@ -11,6 +12,8 @@ from datetime import date from signal import SIGTERM from time import sleep +from types import TracebackType +from typing import Any, Callable, Generator, Self, Type import psutil @@ -35,7 +38,7 @@ } -def get_default_ioc_dir(iocname, iocnum=1): +def get_default_ioc_dir(iocname: str, iocnum: int = 1) -> str: """ Gets the default path to run the IOC given the name. Args: @@ -49,21 +52,23 @@ def get_default_ioc_dir(iocname, iocnum=1): ) -class check_existence_pv(object): +class CheckExistencePv(object): """ - Checks to see if a IOC has been started correctly by asserting that a pv does not exist on entry and does on exit + Checks to see if a IOC has been started correctly by asserting that a pv does not exist on entry + and does on exit + Args: ca: channel access device: the device test_pv: the name of the test pv, defaults to the DISABLE PV """ - def __init__(self, ca, device, test_pv="DISABLE"): + def __init__(self, ca: ChannelAccess, device: str, test_pv: str = "DISABLE") -> None: self.ca = ca self.device = device self.test_pv = test_pv - def __enter__(self): + def __enter__(self) -> None: if self.test_pv is None: print("No existence PV specified.") return @@ -76,7 +81,12 @@ def __enter__(self): "IOC '{}' appears to already be running: {}".format(self.device, ex) ) - def __exit__(self, type, value, traceback): + def __exit__( + self, + exc_type: Type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: if self.test_pv is None: return @@ -95,13 +105,13 @@ class IOCRegister(object): """ # Static dictionary of running iocs - RunningIOCs = {} + RunningIOCs: dict[str, "BaseLauncher"] = {} uses_rec_sim = False test_mode = TestModes.DEVSIM @classmethod - def get_running(cls, ioc_name): + def get_running(cls, ioc_name: str) -> "BaseLauncher | None": """ Get a running ioc by name, return None if not running. @@ -111,7 +121,7 @@ def get_running(cls, ioc_name): return cls.RunningIOCs.get(ioc_name) @classmethod - def add_ioc(cls, name, ioc): + def add_ioc(cls, name: str, ioc: "BaseLauncher") -> None: """ Add an ioc to the running list. @@ -127,7 +137,9 @@ class BaseLauncher(object, metaclass=ABCMeta): Launcher base, this is the base class for a launcher of application under test. """ - def __init__(self, test_name, ioc_config, test_mode, var_dir): + def __init__( + self, test_name: str, ioc_config: dict[str, Any], test_mode: TestModes, var_dir: str + ) -> None: """ Constructor which picks some generic things out of the config. Args: @@ -136,8 +148,10 @@ def __init__(self, test_name, ioc_config, test_mode, var_dir): name: String, Device name directory: String, the directory where st.cmd for the IOC is found custom_prefix: String, the prefix for the IOC PVs, default of IOC name - started_text: String, the text printed when the IOC has started, default of DEFAULT_IOC_START_TEXT - pv_for_existence: String, the PV to check for whether the IOC is running, default of DISABLE + started_text: String, the text printed when the IOC has started, default of + DEFAULT_IOC_START_TEXT + pv_for_existence: String, the PV to check for whether the IOC is running, default + of DISABLE macros: Dict, the macros that should be passed to this IOC var_dir: The directory into which the launcher will save log files. """ @@ -155,7 +169,7 @@ def __init__(self, test_name, ioc_config, test_mode, var_dir): self._var_dir = var_dir self._test_name = test_name self.ca = None - self.command_line = [] + self.command_line: list[str] = [] self.log_file_manager = None self._process = None self._test_mode = test_mode @@ -171,7 +185,7 @@ def __init__(self, test_name, ioc_config, test_mode, var_dir): self._test_name, "ioc", self._device, self._test_mode, self._var_dir ) - def open(self): + def open(self) -> None: """ Starts the application under test. """ @@ -184,7 +198,7 @@ def open(self): ca = self._get_channel_access() - with check_existence_pv(ca, self._device, self._pv_for_existence): + with CheckExistencePv(ca, self._device, self._pv_for_existence): print(f"Starting IOC ({self._device}), IOC log file is {self.log_file_name}") settings = self.get_environment_vars() @@ -196,7 +210,8 @@ def open(self): "Started IOC with '{0}'".format(" ".join(self.command_line)) ) - # To be able to see the IOC output for debugging, remove the redirection of stdin, stdout and stderr. + # To be able to see the IOC output for debugging, remove the redirection of stdin, + # stdout and stderr. # This does mean that the IOC will need to be closed manually after the tests. # Make sure to revert before checking code in self._process = subprocess.Popen( @@ -210,40 +225,47 @@ def open(self): ) # Write a return so that an epics terminal will appear after boot - self._process.stdin.write("\n".encode("utf-8")) - self._process.stdin.flush() + stdin = self._process.stdin + assert stdin is not None + stdin.write("\n".encode("utf-8")) + stdin.flush() self.log_file_manager.wait_for_console( MAX_TIME_TO_WAIT_FOR_IOC_TO_START, self._ioc_started_text ) for key, value in self._init_values.items(): print("Initialising PV {} to {}".format(key, value)) - self.ca.set_pv_value(key, value) + ca.set_pv_value(key, value) IOCRegister.add_ioc(self._device, self) sleep(self._delay_after_startup) - def _command_line(self): + @abc.abstractmethod + def _command_line(self) -> list[str]: """ The command line used to start an IOC that a subclass is expected to provide. """ - pass - def close(self): + def close(self) -> None: """ Exits the application under test """ pass - def __enter__(self): + def __enter__(self) -> Self: self.open() return self - def __exit__(self, *args, **kwargs): + def __exit__( + self, + exc_type: Type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: self.close() - def _get_channel_access(self): + def _get_channel_access(self) -> ChannelAccess: """ :return (ChannelAccess): the channel access component """ @@ -252,9 +274,10 @@ def _get_channel_access(self): return self.ca - def create_macros_file(self): + def create_macros_file(self) -> None: """ - Creates a temporary file that sets the EPICS macros, this file is called when the IOC first starts + Creates a temporary file that sets the EPICS macros, this file is called when the IOC first + starts """ full_dir = os.path.join(self._var_dir, "tmp") if not os.path.exists(full_dir): @@ -268,9 +291,10 @@ def create_macros_file(self): ) ) - def get_environment_vars(self): + def get_environment_vars(self) -> dict[str, str]: """ - Get the current environment variables and add in the extra ones needed for starting the IOC in DEVSIM/RECSIM. + Get the current environment variables and add in the extra ones needed for starting the IOC + in DEVSIM/RECSIM. :return: (Dict): The names and values of the environment variables. """ settings = os.environ.copy() @@ -298,7 +322,7 @@ def get_environment_vars(self): return settings - def set_simulated_value(self, pv_name, value): + def set_simulated_value(self, pv_name: str, value: float | int | str | bool) -> None: """ If this IOC is in rec sim set the PV value. @@ -317,7 +341,9 @@ class ProcServLauncher(BaseLauncher): ICPTOOLS = os.path.join(EPICS_TOP, "tools", "master") - def __init__(self, test_name, ioc, test_mode, var_dir): + def __init__( + self, test_name: str, ioc: dict[str, Any], test_mode: TestModes, var_dir: str + ) -> None: """ Constructor which calls ProcServ to boot an IOC @@ -336,11 +362,17 @@ def __init__(self, test_name, ioc, test_mode, var_dir): self.procserv_port = get_free_ports(1)[0] - self._telnet = None + self._telnet: telnetlib.Telnet | None = None self.autorestart = True self.original_macros = ioc.get("macros", {}) - def get_environment_vars(self): + def _get_telnet(self) -> telnetlib.Telnet: + tn = self._telnet + if tn is None: + raise ValueError("Attempted to use telnet before it was set up") + return tn + + def get_environment_vars(self) -> dict[str, str]: settings = super(ProcServLauncher, self).get_environment_vars() settings["CYGWIN"] = "nodosfilewarning" @@ -355,7 +387,7 @@ def get_environment_vars(self): return settings @staticmethod - def to_cygwin_address(win_filepath): + def to_cygwin_address(win_filepath: str) -> str: """ Converts a windows-style filepath to a / delimited path with cygdrive root Args: @@ -370,7 +402,7 @@ def to_cygwin_address(win_filepath): return cyg_address - def _command_line(self): + def _command_line(self) -> list[str]: comspec = os.getenv("ComSpec") cygwin_dir = self.to_cygwin_address(self._directory) return [ @@ -393,7 +425,7 @@ def _command_line(self): "st.cmd", ] - def open(self): + def open(self) -> None: """ Overrides the open function to create a procserv telnet connection once IOC opened. @@ -412,72 +444,76 @@ def open(self): self._telnet = telnetlib.Telnet("localhost", self.procserv_port, timeout=timeout) # Wait for procServ to become responsive by checking for the IOC started text - init_output = self._telnet.read_until( - self._ioc_started_text.encode("ascii"), timeout - ).decode("ascii") + init_output = ( + self._get_telnet() + .read_until(self._ioc_started_text.encode("ascii"), timeout) + .decode("ascii") + ) if "Welcome to procServ" not in init_output: raise OSError("Cannot connect to procServ over telnet") def send_telnet_command_and_retry_if_not_detected_condition_for_success( - self, command, condition_for_success, retry_limit - ): + self, command: str, condition_for_success: Callable[[], bool], retry_limit: int + ) -> None: """ Send a command over telnet and detect if the condition for success has been met. Retry until the limit is reached and if the condition is not met raise an AssertionError. Args: command (str): The command to send over telnet - condition_for_success (func): A function that returns True if condition met, and False if not + condition_for_success (func): A function that returns True if condition met, and False + if not retry_limit (int): The number of times you Raises: - AssertionError: If the text has not been detected in the log after the given number of retries + AssertionError: If the text has not been detected in the log after the given number of + retries """ for i in range(retry_limit): self.send_telnet_command(command) if condition_for_success(): break else: - self._telnet.close() - self._telnet.open("localhost", self.procserv_port, timeout=20) + self._get_telnet().close() + self._get_telnet().open("localhost", self.procserv_port, timeout=20) else: # If condition for success not detected, raise an assertion error raise AssertionError( "Sending telnet command {} failed {} times".format(command, retry_limit) ) - def send_telnet_command(self, command: str): + def send_telnet_command(self, command: str) -> None: """ Send a command to the ioc via telnet. Command is sent and newline is appended Args: command: command to set """ - self._telnet.write("{cmd}\n".format(cmd=command).encode("ascii")) + self._get_telnet().write("{cmd}\n".format(cmd=command).encode("ascii")) - def force_manual_save(self): + def force_manual_save(self) -> None: """ Force a manual save by sending requests to save the settings and positions files """ self.send_telnet_command("manual_save({}_info_settings.req)".format(self._device)) self.send_telnet_command("manual_save({}_info_positions.req)".format(self._device)) - def start_ioc(self, wait=False): + def start_ioc(self, wait: bool = False) -> None: """ Start/restart IOC over telnet. (^X) Args: - wait (bool): If this is true send the command and wait for the ioc started text to appear in the log, - if the text doesn't appear retry (retries at most 3 times). If false just send the command and - don't wait or retry. + wait (bool): If this is true send the command and wait for the ioc started text to + appear in the log, if the text doesn't appear retry (retries at most 3 times). If + false just send the command and don't wait or retry. """ start_command = "\x18" if wait: - def condition_for_success(): + def condition_for_success() -> bool: try: - self.log_file_manager.wait_for_console( - MAX_TIME_TO_WAIT_FOR_IOC_TO_START, self._ioc_started_text - ) + lfm = self.log_file_manager + assert lfm is not None + lfm.wait_for_console(MAX_TIME_TO_WAIT_FOR_IOC_TO_START, self._ioc_started_text) except AssertionError: return False else: @@ -489,7 +525,7 @@ def condition_for_success(): else: self.send_telnet_command(start_command) - def quit_ioc(self): + def quit_ioc(self) -> None: """ Sends the quit IOC command to procserv. (^Q) @@ -497,16 +533,16 @@ def quit_ioc(self): quit_command = "\x11" self.send_telnet_command(quit_command) - def toggle_autorestart(self): + def toggle_autorestart(self) -> None: """ Toggles whether the IOC is auto-restarts or not. """ - self._telnet.read_very_eager() + self._get_telnet().read_very_eager() autorestart_command = "-" self.send_telnet_command(autorestart_command) - response = self._telnet.read_very_eager().decode("ascii") + response = self._get_telnet().read_very_eager().decode("ascii") if "OFF" in response: self.autorestart = False @@ -515,14 +551,15 @@ def toggle_autorestart(self): else: raise OSError("No response from procserv") - def close(self): + def close(self) -> None: """ - Shuts telnet connection and kills IOC. Identifies the spawned procServ processes and kills them + Shuts telnet connection and kills IOC. + Identifies the spawned procServ processes and kills them """ print("\nTerminating IOC ({})".format(self._device)) if self._telnet is not None: - self._telnet.close() + self._get_telnet().close() at_least_one_killed = False while True: @@ -550,7 +587,7 @@ def close(self): ) ) - def _find_processes(self): + def _find_processes(self) -> list[int]: pid_list = [] for process in psutil.process_iter(attrs=["pid", "name"]): if process.info["name"] == "procServ.exe" and self.process_arguments_match_this_ioc( @@ -560,15 +597,17 @@ def _find_processes(self): pid_list.append(process.pid) return pid_list - def process_arguments_match_this_ioc(self, process_arguments): + def process_arguments_match_this_ioc(self, process_arguments: list[str]) -> bool: """ - Compares the arguments this IOC was started with to the arguments of a process. Returns True if the arguments match + Compares the arguments this IOC was started with to the arguments of a process. + Returns True if the arguments match Args: process_arguments: The command line arguments of the process to be considered Returns: - arguments_match: Boolean: True if the process command line arguments match the IOC boot arguments, else False + arguments_match: Boolean: True if the process command line arguments match the IOC boot + arguments, else False """ # PSUtil strips quote marks (") from the command line used to spawn a process, @@ -580,7 +619,9 @@ def process_arguments_match_this_ioc(self, process_arguments): return arguments_match @contextmanager - def start_with_macros(self, macros, pv_to_wait_for): + def start_with_macros( + self, macros: dict[str, str], pv_to_wait_for: str + ) -> Generator[None, None, None]: """ A context manager to start the ioc with the given macros and then at the end start the ioc again with the original macros. @@ -589,15 +630,17 @@ def start_with_macros(self, macros, pv_to_wait_for): macros (dict): A dictionary of macros to restart the ioc with. pv_to_wait_for (str): A pv to wait for 60 seconds to appear after starting the ioc. """ + ca = self.ca + assert ca is not None try: self._start_with_macros(macros) - self.ca.assert_that_pv_exists(pv_to_wait_for, timeout=60) + ca.assert_that_pv_exists(pv_to_wait_for, timeout=60) yield finally: self._start_with_original_macros() - self.ca.assert_that_pv_exists(pv_to_wait_for, timeout=60) + ca.assert_that_pv_exists(pv_to_wait_for, timeout=60) - def _start_with_macros(self, macros, wait=True): + def _start_with_macros(self, macros: dict[str, str], wait: bool = True) -> None: """ Restart the ioc with the given macros @@ -609,7 +652,7 @@ def _start_with_macros(self, macros, wait=True): time.sleep(1) self.start_ioc(wait) - def _start_with_original_macros(self, wait=True): + def _start_with_original_macros(self, wait: bool = True) -> None: """ Restart the ioc with the macros originally set. """ @@ -624,7 +667,9 @@ class IocLauncher(BaseLauncher): Launches an IOC for testing. """ - def __init__(self, test_name, ioc, test_mode, var_dir): + def __init__( + self, test_name: str, ioc: dict[str, Any], test_mode: TestModes, var_dir: str + ) -> None: """ Constructor that also launches the IOC. @@ -641,7 +686,7 @@ def __init__(self, test_name, ioc, test_mode, var_dir): """ super(IocLauncher, self).__init__(test_name, ioc, test_mode, var_dir) - def _command_line(self): + def _command_line(self) -> list[str]: run_ioc_path = os.path.join(self._directory, "runIOC.bat") st_cmd_path = os.path.join(self._directory, "st.cmd") @@ -652,7 +697,7 @@ def _command_line(self): return [run_ioc_path, st_cmd_path] - def close(self): + def close(self) -> None: """ Closes the IOC. """ @@ -660,8 +705,10 @@ def close(self): if self._process is not None: # use write not communicate so that we don't wait for exit before continuing - self._process.stdin.write("exit\n".encode("utf-8")) - self._process.stdin.flush() + stdin = self._process.stdin + assert stdin is not None + stdin.write("exit\n".encode("utf-8")) + stdin.flush() max_wait_for_ioc_to_die = 60 wait_per_loop = 0.1 @@ -688,13 +735,14 @@ def close(self): print("After killing process forcibly and waiting, IOC died correctly.") except AssertionError: print( - "After killing process forcibly and waiting, IOC was still up. Will continue anyway, but " - "the next set of tests to use this IOC are likely to fail" + "After killing process forcibly and waiting, IOC was still up. Will " + "continue anyway, but the next set of tests to use this IOC are likely to " + "fail" ) self._print_log_file_location() - def _print_log_file_location(self): + def _print_log_file_location(self) -> None: if self.log_file_manager is not None: self.log_file_manager.close() print("IOC log written to {0}".format(self.log_file_name)) @@ -705,11 +753,13 @@ class PythonIOCLauncher(IocLauncher): Launch a python ioc like REFL server. """ - def __init__(self, test_name, ioc, test_mode, var_dir): + def __init__( + self, test_name: str, ioc: dict[str, Any], test_mode: TestModes, var_dir: str + ) -> None: super(PythonIOCLauncher, self).__init__(test_name, ioc, test_mode, var_dir) self._python_script_commandline = ioc.get("python_script_commandline", None) - def _command_line(self): + def _command_line(self) -> list[str]: run_ioc_path = self._python_script_commandline[0] if not os.path.isfile(run_ioc_path): print("Command first argument path not found: '{0}'".format(run_ioc_path)) @@ -717,14 +767,7 @@ def _command_line(self): command_line.extend(self._python_script_commandline) return command_line - def _set_environment_vars(self): - settings = super(PythonIOCLauncher, self)._set_environment_vars() - settings["PYTHONUNBUFFERED"] = "TRUE" - settings.update(EPICS_CASE_ENVIRONMENT_VARS) - - return settings - - def close(self): + def close(self) -> None: """ Closes the IOC. """ diff --git a/utils/log_file.py b/utils/log_file.py index f917e9c1c..e86641d81 100644 --- a/utils/log_file.py +++ b/utils/log_file.py @@ -7,7 +7,7 @@ LOG_FILES_DIRECTORY = os.path.join("logs", "IOCTestFramework") -def log_filename(test_name, what, device, test_mode, var_dir): +def log_filename(test_name: str, what: str, device: str, test_mode: TestModes, var_dir: str) -> str: """ Log file name with path. Ensure path exists. @@ -41,11 +41,11 @@ class LogFileManager(object): Class to manage the access of log files """ - def __init__(self, filename): + def __init__(self, filename: str) -> None: self.log_file_w = open(filename, "w", 1) self.log_file_r = open(filename, "r") - def read_log(self): + def read_log(self) -> list[str]: """ Takes any new lines that have been written to the log and returns them @@ -63,7 +63,7 @@ def read_log(self): return new_messages - def wait_for_console(self, timeout, ioc_started_text): + def wait_for_console(self, timeout: int, ioc_started_text: str) -> None: """ Waits until the ioc has started. @@ -89,7 +89,7 @@ def wait_for_console(self, timeout, ioc_started_text): ) ) - def close(self): + def close(self) -> None: """ Returns: close the log file """ diff --git a/utils/testing.py b/utils/testing.py index 82e5fa53b..971d5c427 100644 --- a/utils/testing.py +++ b/utils/testing.py @@ -1,26 +1,42 @@ import functools import unittest from time import sleep +from types import TracebackType +from typing import TYPE_CHECKING, Callable, Concatenate, ParamSpec, Self, Type, TypeVar, overload from utils.emulator_launcher import EmulatorRegister from utils.ioc_launcher import IOCRegister from utils.test_modes import TestModes +P = ParamSpec("P") +T = TypeVar("T") + +if TYPE_CHECKING: + from utils.channel_access import ChannelAccess + from utils.emulator_launcher import EmulatorLauncher + from utils.ioc_launcher import IocLauncher + from utils.log_file import LogFileManager + class ManagerMode(object): """A context manager for switching manager mode on.""" MANAGER_MODE_PV = "CS:MANAGER" - def __init__(self, channel_access): + def __init__(self, channel_access: "ChannelAccess") -> None: self.channel_access = channel_access self.channel_access.assert_that_pv_exists(self.MANAGER_MODE_PV) - def __enter__(self): + def __enter__(self) -> None: self.channel_access.set_pv_value(self.MANAGER_MODE_PV, 1) self.channel_access.assert_that_pv_is(self.MANAGER_MODE_PV, "Yes", timeout=5) - def __exit__(self, *args): + def __exit__( + self, + exc_type: Type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: self.channel_access.set_pv_value(self.MANAGER_MODE_PV, 0) self.channel_access.assert_that_pv_is(self.MANAGER_MODE_PV, "No", timeout=5) @@ -28,25 +44,28 @@ def __exit__(self, *args): class _AssertLogContext(object): """A context manager used to implement assert_log_messages.""" - messages = list() + messages = [] first_message = 0 def __init__( self, - log_manager, - number_of_messages=None, - in_time=5, - must_contain=None, - ignore_log_client_failures=True, - ignore_autosave=True, - ): + log_manager: "LogFileManager", + number_of_messages: int | None = None, + in_time: float = 5, + must_contain: str | None = None, + ignore_log_client_failures: bool = True, + ignore_autosave: bool = True, + ) -> None: """ Args: log_manager: A reference to the IOC log object - number_of_messages: A number of log messages to expect (None to not check number of messages) + number_of_messages: A number of log messages to expect (None to not check number of + messages) in_time: The amount of time to wait for messages to be generated - must_contain: A string which must appear in the generated log messages (None to not check contents) - ignore_log_client_failures (bool): Whether to ignore messages about not being able to connect to logserver + must_contain: A string which must appear in the generated log messages (None to not + check contents) + ignore_log_client_failures (bool): Whether to ignore messages about not being able to + connect to logserver ignore_autosave (bool): Whether to ignore messages coming from autosave """ self.in_time = in_time @@ -56,11 +75,16 @@ def __init__( self.ignore_log_client_failures = ignore_log_client_failures self.ignore_autosave = ignore_autosave - def __enter__(self): + def __enter__(self) -> Self: self.log_manager.read_log() # Read any excess log return self - def __exit__(self, *args): + def __exit__( + self, + exc_type: Type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> bool: sleep(self.in_time) self.messages = self.log_manager.read_log() @@ -98,14 +122,25 @@ def __exit__(self, *args): return True -def get_running_lewis_and_ioc(emulator_name=None, ioc_name=None): +@overload +def get_running_lewis_and_ioc(emulator_name: None, ioc_name: str) -> tuple[None, "IocLauncher"]: ... + + +@overload +def get_running_lewis_and_ioc( + emulator_name: str, ioc_name: str +) -> tuple["EmulatorLauncher", "IocLauncher"]: ... + + +def get_running_lewis_and_ioc( + emulator_name: str | None = None, ioc_name: str | None = None +) -> tuple["EmulatorLauncher | None", "IocLauncher"]: """ Assert that the emulator and ioc have been started if needed. :param emulator_name: the name of the lewis emulator; None for don't check the emulator :param ioc_name: the name of the IOC :return: lewis launcher and ioc launcher tuple - :rtype: (LewisLauncher, IocLauncher) """ lewis = EmulatorRegister.get_running(emulator_name) ioc = IOCRegister.get_running(ioc_name) @@ -123,16 +158,16 @@ def get_running_lewis_and_ioc(emulator_name=None, ioc_name=None): def assert_log_messages( - ioc, - number_of_messages=None, - in_time=1, - must_contain=None, - ignore_log_client_failures=True, - ignore_autosave=True, -): + ioc: "IocLauncher", + number_of_messages: int | None = None, + in_time: float = 1, + must_contain: str | None = None, + ignore_log_client_failures: bool = True, + ignore_autosave: bool = True, +) -> _AssertLogContext: """ - A context object that asserts that the given code produces the given number of ioc log messages in the the given - amount of time. + A context object that asserts that the given code produces the given number of ioc log messages + in the the given amount of time. To assert that no more than 5 messages are produced in 5 seconds:: with assert_log_messages(self._ioc, 5, 5): @@ -146,14 +181,20 @@ def assert_log_messages( Args: ioc (IocLauncher): The IOC that we are checking the logs for. - number_of_messages (int): The maximum number of messages that are expected (None to not check number of messages) + number_of_messages (int): The maximum number of messages that are expected (None to not + check number of messages) in_time (int): The number of seconds to wait for messages - must_contain (str): a string which must be contained in at least one of the messages (None to not check) - ignore_log_client_failures (bool): Whether to ignore messages about not being able to connect to logserver + must_contain (str): a string which must be contained in at least one of the messages (None + to not check) + ignore_log_client_failures (bool): Whether to ignore messages about not being able to + connect to logserver ignore_autosave (bool): Whether to ignore messages coming from autosave """ + manager = ioc.log_file_manager + if manager is None: + raise ValueError("IOC does not have a log file manager, cannot use assert_log_messages") return _AssertLogContext( - ioc.log_file_manager, + manager, number_of_messages, in_time, must_contain, @@ -162,22 +203,24 @@ def assert_log_messages( ) -def skip_if_condition(condition, reason): +def skip_if_condition( + condition: Callable[[], bool], reason: str +) -> Callable[[Callable[P, T]], Callable[P, T]]: """ Decorator to skip tests given a particular condition. - This is similar to unittest's @skipIf decorator, but this one determines it's condition at runtime as opposed to - class load time. This is necessary because otherwise the decorators don't properly pick up changes in - IOCRegister.uses_rec_sim + This is similar to unittest's @skipIf decorator, but this one determines it's condition at + runtime as opposed to class load time. This is necessary because otherwise the decorators don't + properly pick up changes in IOCRegister.uses_rec_sim Args: condition (func): The condition on which to skip the test. Should be callable. reason (str): The reason for skipping the test """ - def decorator(func): + def decorator(func: Callable[P, T]) -> Callable[P, T]: @functools.wraps(func) - def wrapper(*args, **kwargs): + def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: if condition(): raise unittest.SkipTest(reason) return func(*args, **kwargs) @@ -204,26 +247,27 @@ def wrapper(*args, **kwargs): skip_always = functools.partial(skip_if_condition, lambda: True) -def add_method(method): +def add_method(method: Callable[P, T]) -> Callable[[Type[T]], Type[T]]: """ - Class decorator which as the method to the decorated class. + Class decorator which adds the method to the decorated class. - This is inspired by https://stackoverflow.com/questions/9443725/add-method-to-a-class-dynamically-with-decorator - and https://gist.github.com/victorlei/5968685. + This is inspired by + https://stackoverflow.com/questions/9443725/add-method-to-a-class-dynamically-with-decorator + and + https://gist.github.com/victorlei/5968685. Args: method (func): The method to add to the class decorated. Should be callable. """ - @functools.wraps(method) - def wrapper(class_to_decorate): + def wrapper(class_to_decorate: Type[T]) -> Type[T]: setattr(class_to_decorate, method.__name__, method) return class_to_decorate return wrapper -def parameterized_list(cases): +def parameterized_list(cases: list[T]) -> list[tuple[str, T]]: """ Creates a list of cases for parameterized to use to run tests. @@ -243,37 +287,46 @@ def parameterized_list(cases): for case in cases: test_case = (str(case),) try: - return_list.append(test_case + case) + return_list.append(test_case + case) # type: ignore except TypeError: return_list.append(test_case + (case,)) return return_list -def unstable_test(max_retries=2, error_class=AssertionError, wait_between_runs=0): +def unstable_test( + max_retries: int = 2, + error_class: Type[BaseException] = AssertionError, + wait_between_runs: float = 0, +) -> Callable[ + [Callable[Concatenate[unittest.TestCase, P], T]], Callable[Concatenate[unittest.TestCase, P], T] +]: """ - Decorator which will retry a test on failure. This decorator should not be required on most tests and should not - be included as standard when writing a test. + Decorator which will retry a test on failure. This decorator should not be required on most + tests and should not be included as standard when writing a test. Args: - max_retries: the max number of times to run the test before actually throwing an error (defaults to 2 retries) + max_retries: the max number of times to run the test before actually throwing an error + (defaults to 2 retries) error_class: the class of error to retry under (defaults to AssertionError) wait_between_runs: number of seconds to wait between each failed attempt at running the test """ - def decorator(func): + def decorator( + func: Callable[Concatenate[unittest.TestCase, P], T], + ) -> Callable[Concatenate[unittest.TestCase, P], T]: @functools.wraps(func) - def wrapper(self, *args, **kwargs): + def wrapper(self: unittest.TestCase, *args: P.args, **kwargs: P.kwargs) -> T: try: return func(self, *args, **kwargs) # Initial attempt to run the test "normally" - except error_class: + except (error_class,): last_error = None for _ in range(max_retries): sleep(wait_between_runs) try: self.setUp() # Need to rerun setup return func(self, *args, **kwargs) - except error_class as e: + except (error_class,) as e: last_error = e finally: self.tearDown() # Rerun tearDown regardless of success or not