Skip to content

Commit

Permalink
Merge branch 'master' into Ticket8359_create_p4p_wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom-Willemsen authored Dec 9, 2024
2 parents ec0bc1e + 8fbc9f1 commit fbc81f3
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 81 deletions.
117 changes: 76 additions & 41 deletions common_tests/eurotherm.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import abc
import os
import time
import typing
import unittest
from typing import ContextManager

from parameterized import parameterized

from utils.calibration_utils import reset_calibration_file, use_calibration_file
from utils.channel_access import ChannelAccess
from utils.emulator_launcher import LewisLauncher
from utils.ioc_launcher import EPICS_TOP, IOCRegister
from utils.testing import get_running_lewis_and_ioc, parameterized_list, skip_if_recsim

Expand All @@ -19,69 +23,85 @@
# PIDs cannot be floating-point
PID_TEST_VALUES = [-50, 50, 3000]

SENSORS = ["01", "02", "03", "04", "05", "06"]

class EurothermBaseTests(metaclass=abc.ABCMeta):
PV_SENSORS = ["A01", "A02", "A03", "A04", "A05", "A06"]


# This class is only valid for classes which also derive from unittest.TestCase,
# and we can't derive from unittest.TestCase at runtime, because
# unittest would try to execute them as tests
class EurothermBaseTests(
unittest.TestCase if typing.TYPE_CHECKING else object, metaclass=abc.ABCMeta
):
"""
Tests for the Eurotherm temperature controller.
"""

@abc.abstractmethod
def get_device(self):
def get_device(self) -> str:
pass

@abc.abstractmethod
def get_emulator_device(self):
def get_emulator_device(self) -> str:
pass

@abc.abstractmethod
def _get_temperature_setter_wrapper(self):
def _get_temperature_setter_wrapper(self) -> ContextManager:
pass

@abc.abstractmethod
def get_scaling(self):
def get_scaling(self) -> str:
pass

def get_prefix(self):
def get_prefix(self) -> str:
return "{}:A01".format(self.get_device())

def setUp(self):
self._setup_lewis_and_channel_access()
self._reset_device_state()
self.ca_no_prefix = ChannelAccess()
self._lewis: LewisLauncher

def _setup_lewis_and_channel_access(self):
self._lewis, self._ioc = get_running_lewis_and_ioc("eurotherm", "EUROTHRM_01")
self._lewis, self._ioc = get_running_lewis_and_ioc("eurotherm", "EUROTHRM_01") # type:ignore
self.ca = ChannelAccess(device_prefix="EUROTHRM_01", default_wait_time=0)
self.ca.assert_that_pv_exists("A01:RBV", timeout=30)
self.ca.assert_that_pv_exists("A01:CAL:SEL", timeout=10)

def _reset_device_state(self, sensor="A01"):
self._lewis.backdoor_set_on_device("connected", True)
def _reset_device_state(self, sensor=PV_SENSORS[0]):
i = PV_SENSORS.index(sensor)
self._lewis.backdoor_run_function_on_device("set_connected", [SENSORS[i], True])
reset_calibration_file(self.ca, prefix=f"{sensor}:")

intial_temp = 0.0

self._set_setpoint_and_current_temperature(intial_temp)

self._lewis.backdoor_set_on_device("ramping_on", False)
self._lewis.backdoor_set_on_device("ramp_rate", 1.0)
self._lewis.backdoor_run_function_on_device("set_ramping_on", [SENSORS[0], True])
self._lewis.backdoor_run_function_on_device("set_ramp_rate", [SENSORS[0], 1.0])
self.ca.set_pv_value(f"{sensor}:RAMPON:SP", 0, sleep_after_set=0)

self._set_setpoint_and_current_temperature(intial_temp)
self.ca.assert_that_pv_is(f"{sensor}:TEMP", intial_temp)
# Ensure the temperature isn't being changed by a ramp any more
self.ca.assert_that_pv_value_is_unchanged(f"{sensor}:TEMP", 5)
self.ca.assert_that_pv_value_is_unchanged(f"{sensor}:TEMP", wait=3)

def _set_setpoint_and_current_temperature(self, temperature, sensor="A01"):
def _set_setpoint_and_current_temperature(self, temperature, sensor=PV_SENSORS[0]):
if IOCRegister.uses_rec_sim:
self.ca.set_pv_value(f"{sensor}:SIM:TEMP:SP", temperature)
self.ca.assert_that_pv_is(f"{sensor}:SIM:TEMP", temperature)
self.ca.assert_that_pv_is(f"{sensor}:SIM:TEMP:SP", temperature)
self.ca.assert_that_pv_is(f"{sensor}:SIM:TEMP:SP:RBV", temperature)
else:
self._lewis.backdoor_set_on_device("current_temperature", temperature)
i = PV_SENSORS.index(sensor)
self._lewis.backdoor_run_function_on_device(
"set_current_temperature", [SENSORS[i], temperature]
)
self.ca.assert_that_pv_is_number(f"{sensor}:TEMP", temperature, 0.1, timeout=30)
self._lewis.backdoor_set_on_device("ramp_setpoint_temperature", temperature)
self._lewis.backdoor_run_function_on_device(
"set_ramp_setpoint_temperature", [SENSORS[i], temperature]
)
self.ca.assert_that_pv_is_number(f"{sensor}:TEMP:SP:RBV", temperature, 0.1, timeout=30)

def test_WHEN_read_rbv_temperature_THEN_rbv_value_is_same_as_backdoor(self):
Expand Down Expand Up @@ -114,21 +134,25 @@ def test_WHEN_set_ramp_rate_in_K_per_min_THEN_current_temperature_reaches_set_po
"A01:TEMP:SP:RBV", setpoint_temperature, tolerance=0.1, timeout=60
)
end = time.time()
self.assertAlmostEquals(
self.assertAlmostEqual(
end - start,
60.0 * (setpoint_temperature - start_temperature) / ramp_rate,
delta=0.1 * (end - start),
) # Lower tolerance will be too tight given scan rate

def test_WHEN_sensor_disconnected_THEN_ramp_setting_is_disabled(self):
self._lewis.backdoor_set_on_device("current_temperature", SENSOR_DISCONNECTED_VALUE)
self._lewis.backdoor_run_function_on_device(
"set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE]
)

self.ca.assert_that_pv_is_number("A01:RAMPON:SP.DISP", 1)

def test_GIVEN_sensor_disconnected_WHEN_sensor_reconnected_THEN_ramp_setting_is_enabled(self):
self._lewis.backdoor_set_on_device("current_temperature", SENSOR_DISCONNECTED_VALUE)
self._lewis.backdoor_run_function_on_device(
"set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE]
)

self._lewis.backdoor_set_on_device("current_temperature", 0)
self._lewis.backdoor_run_function_on_device("set_current_temperature", [SENSORS[0], 0])

self.ca.assert_that_pv_is_number("A01:RAMPON:SP.DISP", 0)

Expand All @@ -137,7 +161,9 @@ def test_GIVEN_ramp_was_off_WHEN_sensor_disconnected_THEN_ramp_is_off_and_cached
):
self.ca.set_pv_value("A01:RAMPON:SP", 0)

self._lewis.backdoor_set_on_device("current_temperature", SENSOR_DISCONNECTED_VALUE)
self._lewis.backdoor_run_function_on_device(
"set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE]
)

self.ca.assert_that_pv_is("A01:RAMPON", "OFF")
self.ca.assert_that_pv_is("A01:RAMPON:CACHE", "OFF")
Expand All @@ -147,17 +173,21 @@ def test_GIVEN_ramp_was_on_WHEN_sensor_disconnected_THEN_ramp_is_off_and_cached_
):
self.ca.set_pv_value("A01:RAMPON:SP", 1)

self._lewis.backdoor_set_on_device("current_temperature", SENSOR_DISCONNECTED_VALUE)
self._lewis.backdoor_run_function_on_device(
"set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE]
)

self.ca.assert_that_pv_is("A01:RAMPON", "OFF")
self.ca.assert_that_pv_is("A01:RAMPON:CACHE", "ON")

def test_GIVEN_ramp_was_on_WHEN_sensor_disconnected_and_reconnected_THEN_ramp_is_on(self):
self.ca.set_pv_value("A01:RAMPON:SP", 1)

self._lewis.backdoor_set_on_device("current_temperature", SENSOR_DISCONNECTED_VALUE)
self._lewis.backdoor_run_function_on_device(
"set_current_temperature", [SENSORS[0], SENSOR_DISCONNECTED_VALUE]
)
self.ca.assert_that_pv_is("A01:RAMPON", "OFF")
self._lewis.backdoor_set_on_device("current_temperature", 0)
self._lewis.backdoor_run_function_on_device("set_current_temperature", [SENSORS[0], 0])

self.ca.assert_that_pv_is("A01:RAMPON", "ON")

Expand Down Expand Up @@ -265,7 +295,7 @@ def test_WHEN_config_file_and_temperature_unit_changed_THEN_then_ramp_rate_unit_
]
)
def test_GIVEN_None_txt_calibration_file_WHEN_temperature_is_set_THEN(
self, _, temperature, expected_value_of_under_range_calc_pv
self, _, temperature, exp_val
):
# Arrange

Expand All @@ -282,10 +312,10 @@ def test_GIVEN_None_txt_calibration_file_WHEN_temperature_is_set_THEN(
# Assert

self.ca.assert_that_pv_is("A01:TEMP:RANGE:UNDER.A", temperature)
self.ca.assert_that_pv_is("A01:TEMP:RANGE:UNDER", expected_value_of_under_range_calc_pv)
self.ca.assert_that_pv_is("A01:TEMP:RANGE:UNDER", expected_value=exp_val)

@parameterized.expand(
[
def test_WHEN_disconnected_THEN_in_alarm(self):
records = [
"A01:TEMP",
"A01:TEMP:SP:RBV",
"A01:P",
Expand All @@ -295,43 +325,48 @@ def test_GIVEN_None_txt_calibration_file_WHEN_temperature_is_set_THEN(
"A01:MAX_OUTPUT",
"A01:LOWLIM",
]
)
def test_WHEN_disconnected_THEN_in_alarm(self, record):
self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.NONE)
with self._lewis.backdoor_simulate_disconnected_device():
self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.INVALID, timeout=60)
for record in records:
self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.NONE)
with self._lewis.backdoor_simulate_disconnected_addr():
for record in records:
self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.INVALID, timeout=60)
# Assert alarms clear on reconnection
with self._get_temperature_setter_wrapper():
self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.NONE, timeout=30)
for record in records:
self.ca.assert_that_pv_alarm_is(record, ChannelAccess.Alarms.NONE, timeout=30)

def test_WHEN_eurotherm_missing_THEN_updates_of_PVs_stop(self):
with self._lewis.backdoor_simulate_disconnected_addr():
self.ca.assert_that_pv_value_is_unchanged("A01:RBV", 20)

@parameterized.expand(parameterized_list(PID_TEST_VALUES))
@skip_if_recsim("Backdoor not available in recsim")
def test_WHEN_p_set_via_backdoor_THEN_p_updates(self, _, val):
self._lewis.backdoor_set_on_device("p", val)
self._lewis.backdoor_run_function_on_device("set_p", [SENSORS[0], val])
self.ca.assert_that_pv_is_number("A01:P", val, tolerance=0.05, timeout=15)

@parameterized.expand(parameterized_list(PID_TEST_VALUES))
@skip_if_recsim("Backdoor not available in recsim")
def test_WHEN_i_set_via_backdoor_THEN_i_updates(self, _, val):
self._lewis.backdoor_set_on_device("i", val)
self._lewis.backdoor_run_function_on_device("set_i", [SENSORS[0], val])
self.ca.assert_that_pv_is_number("A01:I", val, tolerance=0.05, timeout=15)

@parameterized.expand(parameterized_list(PID_TEST_VALUES))
@skip_if_recsim("Backdoor not available in recsim")
def test_WHEN_d_set_via_backdoor_THEN_d_updates(self, _, val):
self._lewis.backdoor_set_on_device("d", val)
self._lewis.backdoor_run_function_on_device("set_d", [SENSORS[0], val])
self.ca.assert_that_pv_is_number("A01:D", val, tolerance=0.05, timeout=15)

@parameterized.expand(parameterized_list(TEST_VALUES))
@skip_if_recsim("Backdoor not available in recsim")
def test_WHEN_output_set_via_backdoor_THEN_output_updates(self, _, val):
self._lewis.backdoor_set_on_device("output", val)
self._lewis.backdoor_run_function_on_device("set_output", [SENSORS[0], val])
self.ca.assert_that_pv_is_number("A01:OUTPUT", val, tolerance=0.05, timeout=15)

@parameterized.expand(parameterized_list(TEST_VALUES))
@skip_if_recsim("Backdoor not available in recsim")
def test_WHEN_output_set_via_backdoor_THEN_output_updates(self, _, val):
self._lewis.backdoor_set_on_device("max_output", val)
def test_WHEN_max_output_set_via_backdoor_THEN_output_updates(self, _, val):
self._lewis.backdoor_run_function_on_device("set_max_output", [SENSORS[0], val])
self.ca.assert_that_pv_is_number("A01:MAX_OUTPUT", val, tolerance=0.05, timeout=15)

@parameterized.expand(parameterized_list([0, 100, 3276]))
Expand All @@ -343,11 +378,11 @@ def test_WHEN_output_rate_set_THEN_output_rate_updates(self, _, val):
@parameterized.expand(parameterized_list(TEST_VALUES))
@skip_if_recsim("Backdoor not available in recsim")
def test_WHEN_high_limit_set_via_backdoor_THEN_high_lim_updates(self, _, val):
self._lewis.backdoor_set_on_device("high_lim", val)
self._lewis.backdoor_run_function_on_device("set_high_lim", [SENSORS[0], val])
self.ca.assert_that_pv_is_number("A01:HILIM", val, tolerance=0.05, timeout=15)

@parameterized.expand(parameterized_list(TEST_VALUES))
@skip_if_recsim("Backdoor not available in recsim")
def test_WHEN_low_limit_set_via_backdoor_THEN_low_lim_updates(self, _, val):
self._lewis.backdoor_set_on_device("low_lim", val)
self._lewis.backdoor_run_function_on_device("set_low_lim", [SENSORS[0], val])
self.ca.assert_that_pv_is_number("A01:LOWLIM", val, tolerance=0.05, timeout=15)
Loading

0 comments on commit fbc81f3

Please sign in to comment.