diff --git a/pmdarima/arima/seasonality.py b/pmdarima/arima/seasonality.py index c747f350..7c720d99 100644 --- a/pmdarima/arima/seasonality.py +++ b/pmdarima/arima/seasonality.py @@ -26,11 +26,7 @@ from ._arima import C_canova_hansen_sd_test -__all__ = [ - 'CHTest', - 'decompose', - 'OCSBTest' -] +__all__ = ["CHTest", "decompose", "OCSBTest"] def decompose(x, type_, m, filter_=None): @@ -85,7 +81,7 @@ def decompose(x, type_, m, filter_=None): multiplicative = "multiplicative" additive = "additive" - is_m_odd = (m % 2 == 1) + is_m_odd = m % 2 == 1 # Helper function to stay consistent and concise based on 'type_' def _decomposer_helper(a, b): @@ -98,8 +94,8 @@ def _decomposer_helper(a, b): # to ask the user for the frequency. try: assert isinstance(m, int) and m > 1 - except (ValueError, AssertionError): - raise ValueError("'f' should be a positive integer") + except (ValueError, AssertionError) as err: + raise ValueError("'f' should be a positive integer") from err if filter_ is None: filter_ = np.ones((m,)) / m @@ -116,7 +112,7 @@ def _decomposer_helper(a, b): # Take half of m for the convolution / sma process. half_m = m // 2 - trend = np.convolve(x, filter_, mode='valid') + trend = np.convolve(x, filter_, mode="valid") if not is_m_odd: trend = trend[:-1] # we remove the final index if m is even. @@ -153,7 +149,7 @@ def _decomposer_helper(a, b): random = _decomposer_helper(_decomposer_helper(x, trend), seasonal) # Create a namedtuple so the output mirrors the output of the R function. - decomposed = namedtuple('decomposed', 'x trend seasonal random') + decomposed = namedtuple("decomposed", "x trend seasonal random") decomposed_tuple = decomposed(x, trend, seasonal, random) return decomposed_tuple @@ -164,10 +160,11 @@ class _SeasonalStationarityTest(_BaseStationarityTest, metaclass=ABCMeta): Canova-Hansen test and the Osborn-Chui-Smith-Birchenhall tests. These tests are used to determine the seasonal differencing term for a time-series. """ + def __init__(self, m): self.m = m if m < 2: - raise ValueError('m must be > 1') + raise ValueError("m must be > 1") @abstractmethod def estimate_seasonal_differencing_term(self, x): @@ -212,10 +209,20 @@ class CHTest(_SeasonalStationarityTest): .. [2] R source code for CH test: https://github.com/robjhyndman/forecast/blob/master/R/arima.R#L148 """ - crit_vals = c(0.4617146, 0.7479655, 1.0007818, - 1.2375350, 1.4625240, 1.6920200, - 1.9043096, 2.1169602, 2.3268562, - 2.5406922, 2.7391007) + + crit_vals = c( + 0.4617146, + 0.7479655, + 1.0007818, + 1.2375350, + 1.4625240, + 1.6920200, + 1.9043096, + 2.1169602, + 2.3268562, + 2.5406922, + 2.7391007, + ) def __init__(self, m): super(CHTest, self).__init__(m=m) @@ -234,8 +241,7 @@ def _sd_test(wts, s): # fit model, get residuals lmch = make_pipeline( - StandardScaler(with_mean=False), - LinearRegression() + StandardScaler(with_mean=False), LinearRegression() ).fit(R1, wts) # lmch = sm.OLS(wts, R1).fit(method='qr') residuals = wts - lmch.predict(R1) @@ -284,8 +290,9 @@ def _sd_test(wts, s): # https://github.com/robjhyndman/forecast/blob/master/R/arima.R#L321 Fhat = Fhataux.cumsum(axis=0) solved = solve(AtOmfhatA, np.identity(AtOmfhatA.shape[0])) - return (1.0 / n ** 2) * solved.dot(A.T).dot( - Fhat.T).dot(Fhat).dot(A).diagonal().sum() + return (1.0 / n**2) * solved.dot(A.T).dot(Fhat.T).dot(Fhat).dot( + A + ).diagonal().sum() @staticmethod def _seas_dummy(x, m): @@ -310,8 +317,10 @@ def _seas_dummy(x, m): n = x.shape[0] # assume m > 1 since this function called internally... - assert m > 1, 'This function is called internally and ' \ - 'should not encounter this issue' + assert m > 1, ( + "This function is called internally and " + "should not encounter this issue" + ) tt = np.arange(n) + 1 fmat = np.ones((n, 2 * m)) * np.nan @@ -327,7 +336,19 @@ def _seas_dummy(x, m): # fmat[,2*(i-1)+1] <- cos(2*pi*i*tt/m) fmat[:, 2 * (i - 1)] = np.cos(2 * pi * i * tt / m) - return fmat[:, :m - 1] + return fmat[:, : m - 1] + + @staticmethod + def _calc_ch_crit_val(m): + if m <= 12: + return CHTest.crit_vals[m - 2] + if m == 24: + return 5.098624 + if m == 52: + return 10.341416 + if m == 365: + return 65.44445 + return 0.269 * (m**0.928) def estimate_seasonal_differencing_term(self, x): """Estimate the seasonal differencing term. @@ -365,17 +386,8 @@ def estimate_seasonal_differencing_term(self, x): return 0 chstat = self._sd_test(x, m) - - if m <= 12: - return int(chstat > self.crit_vals[m - 2]) # R does m - 1... - if m == 24: - return int(chstat > 5.098624) - if m == 52: - return int(chstat > 10.341416) - if m == 365: - return int(chstat > 65.44445) - - return int(chstat > 0.269 * (m ** 0.928)) + crit_val = self._calc_ch_crit_val(m) + return int(chstat > crit_val) class OCSBTest(_SeasonalStationarityTest): @@ -414,13 +426,13 @@ class OCSBTest(_SeasonalStationarityTest): .. [2] R's forecast::OCSB test source code: https://bit.ly/2QYQHno """ + _ic_method_map = { "aic": lambda fit: fit.aic, "bic": lambda fit: fit.bic, - # TODO: confirm False for add_constant, since the model fit contains # . a constant term - "aicc": lambda fit: _aicc(fit, fit.nobs, False) + "aicc": lambda fit: _aicc(fit, fit.nobs, False), } def __init__(self, m, lag_method="aic", max_lag=3): @@ -436,9 +448,13 @@ def _calc_ocsb_crit_val(m): # https://github.com/robjhyndman/forecast/blob/ # 8c6b63b1274b064c84d7514838b26dd0acb98aee/R/unitRoot.R#L409 log_m = np.log(m) - return -0.2937411 * \ - np.exp(-0.2850853 * (log_m - 0.7656451) + (-0.05983644) * - ((log_m - 0.7656451) ** 2)) - 1.652202 + return ( + -0.2937411 * np.exp( + -0.2850853 * (log_m - 0.7656451) + (-0.05983644) * ( + (log_m - 0.7656451) ** 2 + ) + ) - 1.652202 + ) @staticmethod def _do_lag(y, lag, omit_na=True): @@ -451,7 +467,7 @@ def _do_lag(y, lag, omit_na=True): # If there are tons of lags, this may not be super efficient... out = np.ones((n + (lag - 1), lag)) * np.nan for i in range(lag): - out[i:i + n, i] = y + out[i: i + n, i] = y if omit_na: out = out[~np.isnan(out).any(axis=1)] @@ -489,12 +505,14 @@ def _fit_ocsb(x, m, lag, max_lag): # A constant term is added in the R code's lm formula. We do that in # the linear model's constructor - mf = ylag[:y.shape[0]] - ar_fit = sm.OLS(y, add_constant(mf)).fit(method='qr') + mf = ylag[: y.shape[0]] + ar_fit = sm.OLS(y, add_constant(mf)).fit(method="qr") # Create Z4 z4_y = y_first_order_diff[lag:] # new endog - z4_lag = OCSBTest._gen_lags(y_first_order_diff, lag)[:z4_y.shape[0], :] + z4_lag = OCSBTest._gen_lags(y_first_order_diff, lag)[ + : z4_y.shape[0], : + ] z4_preds = ar_fit.predict(add_constant(z4_lag)) # preds z4 = z4_y - z4_preds # test residuals @@ -503,18 +521,20 @@ def _fit_ocsb(x, m, lag, max_lag): z5_y = diff(x) z5_lag = OCSBTest._gen_lags(z5_y, lag) z5_y = z5_y[lag:] - z5_lag = z5_lag[:z5_y.shape[0], :] + z5_lag = z5_lag[: z5_y.shape[0], :] z5_preds = ar_fit.predict(add_constant(z5_lag)) z5 = z5_y - z5_preds # Finally, fit a linear regression on mf with z4 & z5 features added - data = np.hstack(( - mf, - z4[:mf.shape[0]].reshape(-1, 1), - z5[:mf.shape[0]].reshape(-1, 1) - )) + data = np.hstack( + ( + mf, + z4[: mf.shape[0]].reshape(-1, 1), + z5[: mf.shape[0]].reshape(-1, 1), + ) + ) - return sm.OLS(y, data).fit(method='qr') + return sm.OLS(y, data).fit(method="qr") def _compute_test_statistic(self, x): m = self.m @@ -523,12 +543,14 @@ def _compute_test_statistic(self, x): # We might try multiple lags in this case crit_regression = None - if maxlag > 0 and method != 'fixed': + if maxlag > 0 and method != "fixed": try: icfunc = self._ic_method_map[method] - except KeyError: - raise ValueError("'%s' is an invalid method. Must be one " - "of ('aic', 'aicc', 'bic', 'fixed')") + except KeyError as err: + raise ValueError( + "'%s' is an invalid method. Must be one " + "of ('aic', 'aicc', 'bic', 'fixed')" + ) from err fits = [] icvals = [] @@ -543,10 +565,12 @@ def _compute_test_statistic(self, x): # If they're all NaN, raise if np.isnan(icvals).all(): - raise ValueError("All lag values up to 'maxlag' produced " - "singular matrices. Consider using a longer " - "series, a different lag term or a different " - "test.") + raise ValueError( + "All lag values up to 'maxlag' produced " + "singular matrices. Consider using a longer " + "series, a different lag term or a different " + "test." + ) # Compute the information criterion vals best_index = int(np.nanargmin(icvals)) @@ -558,14 +582,16 @@ def _compute_test_statistic(self, x): # Compute the actual linear model used for determining the test stat try: regression = self._fit_ocsb(x, m, maxlag, maxlag) - except np.linalg.LinAlgError: # Singular matrix + except np.linalg.LinAlgError as err: # Singular matrix if crit_regression is not None: regression = crit_regression # Otherwise we have no solution to fall back on else: - raise ValueError("Could not find a solution. Try a longer " - "series, different lag term, or a different " - "test.") + raise ValueError( + "Could not find a solution. Try a longer " + "series, different lag term, or a different " + "test." + ) from err # Get the coefficients for the z4 and z5 matrices tvals = regression.tvalues[-2:] # len 2