Skip to content

Commit

Permalink
tacked second round of reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
jnunyez committed Sep 27, 2023
1 parent e811611 commit 7e3299e
Showing 1 changed file with 42 additions and 32 deletions.
74 changes: 42 additions & 32 deletions src/vse_sync_pp/analyzers/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,34 @@

from ..requirements import REQUIREMENTS

def calculate_limit(accuracy, limit_percentage, tau):
"""Calculate upper limit based on tau
`accuracy` is the list of functions to calculate upper limits
`limit_percentage` is the unaccuracy percentage
`tau` is the observation window interval
Return the upper limit value based on `tau`
"""
for (low, high), f in accuracy.items():
if ((low is None or tau > low) and (tau <= high)):
return f(tau) * (limit_percentage / 100)

def out_of_range(taus, samples, accuracy, limit):
"""Check if the input samples are out of range.
`taus` list of observation windows intervals
`samples` are input samples
`accuracy` contains the list of upper bound limit functions
`limit` is the percentage to apply the upper limit
Return `True` if any value in `samples` is out of range
"""
for tau, sample in zip(taus, samples):
mask = calculate_limit(accuracy, limit, tau)
if mask <= sample:
return True
return False

class Config():
"""Analyzer configuration"""
Expand Down Expand Up @@ -286,34 +314,16 @@ def __init__(self, config):
self._transient = config.parameter('transient-period/s')
# minimum test duration for a valid test
self._duration = config.parameter('min-test-duration/s')
# limit initial tau observation windows from 1 to 9k taus
taus_below_tenkey = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, 50, 60, 70, 80, 90,
# limit initial tau observation windows from 1 to 10k taus
taus_below_10k = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, 50, 60, 70, 80, 90,
100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 2000, 3000,
4000, 5000, 6000, 7000, 8000, 10000])
4000, 5000, 6000, 7000, 8000, 9000, 10000])
# observation window upper bound limited to 100k samples in 5k increments
taus_above_tenkey = np.arange(10001, 100000, 5000)
taus_above_10k = np.arange(10001, 100000, 5000)
# `taus_list` contains range limit of obervation window intervals for which to compute the statistic
self._taus_list = np.concatenate((taus_below_tenkey, taus_above_tenkey))
self._taus_list = np.concatenate((taus_below_10k, taus_above_10k))
self._rate = None
self._lpf_signal = None
# atributtes to be initialized in derived class
self._accuracy = None

def calculate_limit(self, tau):
# return limit based on `tau`
if self._accuracy is None:
raise AttributeError('atributte not initialized')
for (low, high), f in self._accuracy.items():
if ((low is None or tau > low) and (tau <= high)):
return f(tau) * (self._limit / 100)

def out_of_range(self, taus, samples):
# return true if any value in `tdevs` is out of range
for tau, sample in zip(taus, samples):
mask = self.calculate_limit(tau)
if mask <= sample:
return True
return False

def prepare(self, rows):
idx = 0
Expand Down Expand Up @@ -353,7 +363,7 @@ def calculate_rate(self, data):
cumdelta = cumdelta + data.iloc[i].timestamp - data.iloc[i - 1].timestamp
return round((1 / (cumdelta / 100)))

def test_common(self, data):
def _test_common(self, data):
if len(data) == 0:
return (False, "no data")
if frozenset(data.state.unique()).difference(self.locked):
Expand All @@ -368,7 +378,7 @@ def test_common(self, data):
self._lpf_signal = self.calculate_filter(data)
return None

def explain_common(self, data):
def _explain_common(self, data):
if len(data) == 0:
return {}
if self._rate is None:
Expand Down Expand Up @@ -398,17 +408,17 @@ def __init__(self, config):
self._ns = None

def test(self, data):
result = super().test_common(data)
result = super()._test_common(data)
if result is None:
if self._samples is None:
self._taus, self._samples, self._errors, self._ns = allantools.tdev(self._lpf_signal, rate=self._rate, data_type="phase", taus=self._taus_list) # noqa
if self.out_of_range(self._taus, self._samples):
if out_of_range(self._taus, self._samples, self._accuracy, self._limit):
return (False, "unacceptable time deviation")
return (True, None)
return result

def explain(self, data):
analysis = super().explain_common(data)
analysis = super()._explain_common(data)
if analysis is None:
if self._samples is None:
self._taus, self._samples, self._errors, self._ns = allantools.tdev(self._lpf_signal, rate=self._rate, data_type="phase", taus=self._taus_list) # noqa
Expand All @@ -433,24 +443,24 @@ def __init__(self, config):
# limit of inaccuracy at observation point
self._limit = config.parameter('maximum-time-interval-error-limit/%')
# `taus` contains the list of observation windows intervals for which the metric will be actually computed
# `taus` is a subset of `taus_list`
# not that `taus` will be a subset of `taus_list`
self._taus = None
self._samples = None
self._errors = None
self._ns = None

def test(self, data):
result = super().test_common(data)
result = super()._test_common(data)
if result is None:
if self._samples is None:
self._taus, self._samples, self._errors, self._ns = allantools.mtie(self._lpf_signal, rate=self._rate, data_type="phase", taus=self._taus_list) # noqa
if self.out_of_range(self._taus, self._samples):
if out_of_range(self._taus, self._samples, self._accuracy, self._limit):
return (False, "unacceptable mtie")
return (True, None)
return result

def explain(self, data):
analysis = super().explain_common(data)
analysis = super()._explain_common(data)
if analysis is None:
if self._samples is None:
self._taus, self._samples, self._errors, self._ns = allantools.mtie(self._lpf_signal, rate=self._rate, data_type="phase", taus=self._taus_list) # noqa
Expand Down

0 comments on commit 7e3299e

Please sign in to comment.