-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsigan_iface.py
135 lines (114 loc) · 4.06 KB
/
sigan_iface.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import logging
import time
from abc import ABC, abstractmethod
from typing import Optional
from its_preselector.web_relay import WebRelay
from scos_actions.hardware.utils import power_cycle_sigan
logger = logging.getLogger(__name__)
class SignalAnalyzerInterface(ABC):
def __init__(
self,
switches: Optional[dict[str, WebRelay]] = None,
):
self._model = "Unknown"
self._api_version = "Unknown"
self._firmware_version = "Unknown"
self.switches = switches
@property
@abstractmethod
def is_available(self) -> bool:
"""Returns True if sigan is initialized and ready for measurements."""
pass
@property
@abstractmethod
def plugin_version(self) -> str:
"""Returns the version of the SCOS plugin defining this interface."""
pass
@property
@abstractmethod
def plugin_name(self) -> str:
"""Returns the name of the SCOS plugin defining this interface."""
pass
@property
def firmware_version(self) -> str:
"""Returns the version of the signal analyzer firmware."""
return self._firmware_version
@property
def api_version(self) -> str:
"""Returns the version of the underlying signal analyzer API."""
return self._api_version
@abstractmethod
def acquire_time_domain_samples(
self,
num_samples: int,
num_samples_skip: int = 0,
) -> dict:
"""
Acquire time domain IQ samples, scaled to Volts at
the signal analyzer input.
:param num_samples: Number of samples to acquire
:param num_samples_skip: Number of samples to skip
:return: dictionary containing data, sample_rate, frequency, capture_time, etc
"""
pass
@abstractmethod
def connect(self) -> None:
"""
Establish a connection to the signal analyzer.
"""
pass
def healthy(self, num_samples: int = 56000, retries: int = 3) -> bool:
"""Perform health check by collecting IQ samples."""
logger.debug("Performing health check.")
if not self.is_available:
return False
while True:
try:
measurement_result = self.acquire_time_domain_samples(num_samples)
data = measurement_result["data"]
break
except BaseException as e:
retries -= 1
if retries == 0:
logger.exception(
"Unable to acquire samples from device during health check."
)
return False
else:
logger.debug(
"Unable to acquire samples during health check. Retrying..."
)
if not len(data) == num_samples:
logger.error("Data length doesn't match request.")
return False
return True
def power_cycle_and_connect(self, sleep_time: float = 2.0) -> None:
"""
Attempt to cycle signal analyzer power then reconnect.
:param sleep_time: Time (s) to wait for power to cycle, defaults to 2.0
"""
logger.info("Attempting to power cycle the signal analyzer and reconnect.")
try:
power_cycle_sigan(self.switches)
except Exception as hce:
logger.warning(f"Unable to power cycle sigan: {hce}")
return
try:
# Wait for power cycle to complete
logger.debug(f"Waiting {sleep_time} seconds before reconnecting...")
time.sleep(sleep_time)
logger.info("Power cycled signal analyzer. Reconnecting...")
self.connect()
except Exception:
logger.exception(
"Unable to reconnect to signal analyzer after power cycling"
)
return
def get_status(self) -> dict:
return {"model": self._model, "healthy": self.healthy()}
@property
def model(self) -> str:
return self._model
@model.setter
def model(self, value: str):
self._model = value