Skip to content

Commit

Permalink
[MRG+1] - Refactor seasonality module (#571)
Browse files Browse the repository at this point in the history
* Run `black` formatting

* Refactor `CHTest()` class to add new `_calc_ch_crit_val()` method

* Fix formatting issues

* Fix final formatting issue

* Enhance error handling
  • Loading branch information
chrimaho committed Feb 23, 2024
1 parent cdf5724 commit c33a017
Showing 1 changed file with 87 additions and 61 deletions.
148 changes: 87 additions & 61 deletions pmdarima/arima/seasonality.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@

from ._arima import C_canova_hansen_sd_test

__all__ = [
'CHTest',
'decompose',
'OCSBTest'
]
__all__ = ["CHTest", "decompose", "OCSBTest"]


def decompose(x, type_, m, filter_=None):
Expand Down Expand Up @@ -85,7 +81,7 @@ def decompose(x, type_, m, filter_=None):

multiplicative = "multiplicative"
additive = "additive"
is_m_odd = (m % 2 == 1)
is_m_odd = m % 2 == 1

# Helper function to stay consistent and concise based on 'type_'
def _decomposer_helper(a, b):
Expand All @@ -98,8 +94,8 @@ def _decomposer_helper(a, b):
# to ask the user for the frequency.
try:
assert isinstance(m, int) and m > 1
except (ValueError, AssertionError):
raise ValueError("'f' should be a positive integer")
except (ValueError, AssertionError) as err:
raise ValueError("'f' should be a positive integer") from err

if filter_ is None:
filter_ = np.ones((m,)) / m
Expand All @@ -116,7 +112,7 @@ def _decomposer_helper(a, b):

# Take half of m for the convolution / sma process.
half_m = m // 2
trend = np.convolve(x, filter_, mode='valid')
trend = np.convolve(x, filter_, mode="valid")

if not is_m_odd:
trend = trend[:-1] # we remove the final index if m is even.
Expand Down Expand Up @@ -153,7 +149,7 @@ def _decomposer_helper(a, b):
random = _decomposer_helper(_decomposer_helper(x, trend), seasonal)

# Create a namedtuple so the output mirrors the output of the R function.
decomposed = namedtuple('decomposed', 'x trend seasonal random')
decomposed = namedtuple("decomposed", "x trend seasonal random")
decomposed_tuple = decomposed(x, trend, seasonal, random)

return decomposed_tuple
Expand All @@ -164,10 +160,11 @@ class _SeasonalStationarityTest(_BaseStationarityTest, metaclass=ABCMeta):
Canova-Hansen test and the Osborn-Chui-Smith-Birchenhall tests. These tests
are used to determine the seasonal differencing term for a time-series.
"""

def __init__(self, m):
self.m = m
if m < 2:
raise ValueError('m must be > 1')
raise ValueError("m must be > 1")

@abstractmethod
def estimate_seasonal_differencing_term(self, x):
Expand Down Expand Up @@ -212,10 +209,20 @@ class CHTest(_SeasonalStationarityTest):
.. [2] R source code for CH test:
https://github.com/robjhyndman/forecast/blob/master/R/arima.R#L148
"""
crit_vals = c(0.4617146, 0.7479655, 1.0007818,
1.2375350, 1.4625240, 1.6920200,
1.9043096, 2.1169602, 2.3268562,
2.5406922, 2.7391007)

crit_vals = c(
0.4617146,
0.7479655,
1.0007818,
1.2375350,
1.4625240,
1.6920200,
1.9043096,
2.1169602,
2.3268562,
2.5406922,
2.7391007,
)

def __init__(self, m):
super(CHTest, self).__init__(m=m)
Expand All @@ -234,8 +241,7 @@ def _sd_test(wts, s):

# fit model, get residuals
lmch = make_pipeline(
StandardScaler(with_mean=False),
LinearRegression()
StandardScaler(with_mean=False), LinearRegression()
).fit(R1, wts)
# lmch = sm.OLS(wts, R1).fit(method='qr')
residuals = wts - lmch.predict(R1)
Expand Down Expand Up @@ -284,8 +290,9 @@ def _sd_test(wts, s):
# https://github.com/robjhyndman/forecast/blob/master/R/arima.R#L321
Fhat = Fhataux.cumsum(axis=0)
solved = solve(AtOmfhatA, np.identity(AtOmfhatA.shape[0]))
return (1.0 / n ** 2) * solved.dot(A.T).dot(
Fhat.T).dot(Fhat).dot(A).diagonal().sum()
return (1.0 / n**2) * solved.dot(A.T).dot(Fhat.T).dot(Fhat).dot(
A
).diagonal().sum()

@staticmethod
def _seas_dummy(x, m):
Expand All @@ -310,8 +317,10 @@ def _seas_dummy(x, m):
n = x.shape[0]

# assume m > 1 since this function called internally...
assert m > 1, 'This function is called internally and ' \
'should not encounter this issue'
assert m > 1, (
"This function is called internally and "
"should not encounter this issue"
)

tt = np.arange(n) + 1
fmat = np.ones((n, 2 * m)) * np.nan
Expand All @@ -327,7 +336,19 @@ def _seas_dummy(x, m):
# fmat[,2*(i-1)+1] <- cos(2*pi*i*tt/m)
fmat[:, 2 * (i - 1)] = np.cos(2 * pi * i * tt / m)

return fmat[:, :m - 1]
return fmat[:, : m - 1]

@staticmethod
def _calc_ch_crit_val(m):
if m <= 12:
return CHTest.crit_vals[m - 2]
if m == 24:
return 5.098624
if m == 52:
return 10.341416
if m == 365:
return 65.44445
return 0.269 * (m**0.928)

def estimate_seasonal_differencing_term(self, x):
"""Estimate the seasonal differencing term.
Expand Down Expand Up @@ -365,17 +386,8 @@ def estimate_seasonal_differencing_term(self, x):
return 0

chstat = self._sd_test(x, m)

if m <= 12:
return int(chstat > self.crit_vals[m - 2]) # R does m - 1...
if m == 24:
return int(chstat > 5.098624)
if m == 52:
return int(chstat > 10.341416)
if m == 365:
return int(chstat > 65.44445)

return int(chstat > 0.269 * (m ** 0.928))
crit_val = self._calc_ch_crit_val(m)
return int(chstat > crit_val)


class OCSBTest(_SeasonalStationarityTest):
Expand Down Expand Up @@ -414,13 +426,13 @@ class OCSBTest(_SeasonalStationarityTest):
.. [2] R's forecast::OCSB test source code: https://bit.ly/2QYQHno
"""

_ic_method_map = {
"aic": lambda fit: fit.aic,
"bic": lambda fit: fit.bic,

# TODO: confirm False for add_constant, since the model fit contains
# . a constant term
"aicc": lambda fit: _aicc(fit, fit.nobs, False)
"aicc": lambda fit: _aicc(fit, fit.nobs, False),
}

def __init__(self, m, lag_method="aic", max_lag=3):
Expand All @@ -436,9 +448,13 @@ def _calc_ocsb_crit_val(m):
# https://github.com/robjhyndman/forecast/blob/
# 8c6b63b1274b064c84d7514838b26dd0acb98aee/R/unitRoot.R#L409
log_m = np.log(m)
return -0.2937411 * \
np.exp(-0.2850853 * (log_m - 0.7656451) + (-0.05983644) *
((log_m - 0.7656451) ** 2)) - 1.652202
return (
-0.2937411 * np.exp(
-0.2850853 * (log_m - 0.7656451) + (-0.05983644) * (
(log_m - 0.7656451) ** 2
)
) - 1.652202
)

@staticmethod
def _do_lag(y, lag, omit_na=True):
Expand All @@ -451,7 +467,7 @@ def _do_lag(y, lag, omit_na=True):
# If there are tons of lags, this may not be super efficient...
out = np.ones((n + (lag - 1), lag)) * np.nan
for i in range(lag):
out[i:i + n, i] = y
out[i: i + n, i] = y

if omit_na:
out = out[~np.isnan(out).any(axis=1)]
Expand Down Expand Up @@ -489,12 +505,14 @@ def _fit_ocsb(x, m, lag, max_lag):

# A constant term is added in the R code's lm formula. We do that in
# the linear model's constructor
mf = ylag[:y.shape[0]]
ar_fit = sm.OLS(y, add_constant(mf)).fit(method='qr')
mf = ylag[: y.shape[0]]
ar_fit = sm.OLS(y, add_constant(mf)).fit(method="qr")

# Create Z4
z4_y = y_first_order_diff[lag:] # new endog
z4_lag = OCSBTest._gen_lags(y_first_order_diff, lag)[:z4_y.shape[0], :]
z4_lag = OCSBTest._gen_lags(y_first_order_diff, lag)[
: z4_y.shape[0], :
]
z4_preds = ar_fit.predict(add_constant(z4_lag)) # preds
z4 = z4_y - z4_preds # test residuals

Expand All @@ -503,18 +521,20 @@ def _fit_ocsb(x, m, lag, max_lag):
z5_y = diff(x)
z5_lag = OCSBTest._gen_lags(z5_y, lag)
z5_y = z5_y[lag:]
z5_lag = z5_lag[:z5_y.shape[0], :]
z5_lag = z5_lag[: z5_y.shape[0], :]
z5_preds = ar_fit.predict(add_constant(z5_lag))
z5 = z5_y - z5_preds

# Finally, fit a linear regression on mf with z4 & z5 features added
data = np.hstack((
mf,
z4[:mf.shape[0]].reshape(-1, 1),
z5[:mf.shape[0]].reshape(-1, 1)
))
data = np.hstack(
(
mf,
z4[: mf.shape[0]].reshape(-1, 1),
z5[: mf.shape[0]].reshape(-1, 1),
)
)

return sm.OLS(y, data).fit(method='qr')
return sm.OLS(y, data).fit(method="qr")

def _compute_test_statistic(self, x):
m = self.m
Expand All @@ -523,12 +543,14 @@ def _compute_test_statistic(self, x):

# We might try multiple lags in this case
crit_regression = None
if maxlag > 0 and method != 'fixed':
if maxlag > 0 and method != "fixed":
try:
icfunc = self._ic_method_map[method]
except KeyError:
raise ValueError("'%s' is an invalid method. Must be one "
"of ('aic', 'aicc', 'bic', 'fixed')")
except KeyError as err:
raise ValueError(
"'%s' is an invalid method. Must be one "
"of ('aic', 'aicc', 'bic', 'fixed')"
) from err

fits = []
icvals = []
Expand All @@ -543,10 +565,12 @@ def _compute_test_statistic(self, x):

# If they're all NaN, raise
if np.isnan(icvals).all():
raise ValueError("All lag values up to 'maxlag' produced "
"singular matrices. Consider using a longer "
"series, a different lag term or a different "
"test.")
raise ValueError(
"All lag values up to 'maxlag' produced "
"singular matrices. Consider using a longer "
"series, a different lag term or a different "
"test."
)

# Compute the information criterion vals
best_index = int(np.nanargmin(icvals))
Expand All @@ -558,14 +582,16 @@ def _compute_test_statistic(self, x):
# Compute the actual linear model used for determining the test stat
try:
regression = self._fit_ocsb(x, m, maxlag, maxlag)
except np.linalg.LinAlgError: # Singular matrix
except np.linalg.LinAlgError as err: # Singular matrix
if crit_regression is not None:
regression = crit_regression
# Otherwise we have no solution to fall back on
else:
raise ValueError("Could not find a solution. Try a longer "
"series, different lag term, or a different "
"test.")
raise ValueError(
"Could not find a solution. Try a longer "
"series, different lag term, or a different "
"test."
) from err

# Get the coefficients for the z4 and z5 matrices
tvals = regression.tvalues[-2:] # len 2
Expand Down

0 comments on commit c33a017

Please sign in to comment.