1from statsmodels.compat.pandas import Substitution, is_int_index 2 3import datetime as dt 4from typing import Any, Dict, Optional, Union 5 6import numpy as np 7import pandas as pd 8 9from statsmodels.base.data import PandasData 10from statsmodels.iolib.summary import SimpleTable, Summary 11from statsmodels.tools.docstring import Docstring, Parameter, indent 12from statsmodels.tsa.base.prediction import PredictionResults 13from statsmodels.tsa.base.tsa_model import get_index_loc, get_prediction_index 14from statsmodels.tsa.seasonal import STL, DecomposeResult 15from statsmodels.tsa.statespace.kalman_filter import _check_dynamic 16 17DateLike = Union[int, str, dt.datetime, pd.Timestamp, np.datetime64] 18 19ds = Docstring(STL.__doc__) 20ds.insert_parameters( 21 "endog", 22 Parameter( 23 "model", 24 "Model", 25 [ 26 "The model used to forecast endog after the seasonality has been " 27 "removed using STL" 28 ], 29 ), 30) 31ds.insert_parameters( 32 "model", 33 Parameter( 34 "model_kwargs", 35 "Dict[str, Any]", 36 [ 37 "Any additional arguments needed to initialized the model using " 38 "the residuals produced by subtracting the seasonality." 39 ], 40 ), 41) 42_stl_forecast_params = ds.extract_parameters( 43 [ 44 "endog", 45 "model", 46 "model_kwargs", 47 "period", 48 "seasonal", 49 "trend", 50 "low_pass", 51 "seasonal_deg", 52 "trend_deg", 53 "low_pass_deg", 54 "robust", 55 "seasonal_jump", 56 "trend_jump", 57 "low_pass_jump", 58 ] 59) 60 61ds = Docstring(STL.fit.__doc__) 62_fit_params = ds.extract_parameters(["inner_iter", "outer_iter"]) 63 64 65@Substitution(stl_forecast_params=indent(_stl_forecast_params, " ")) 66class STLForecast: 67 r""" 68 Model-based forecasting using STL to remove seasonality 69 70 Forecasts are produced by first subtracting the seasonality 71 estimated using STL, then forecasting the deseasonalized 72 data using a time-series model, for example, ARIMA. 73 74 Parameters 75 ---------- 76 %(stl_forecast_params)s 77 78 See Also 79 -------- 80 statsmodels.tsa.arima.model.ARIMA 81 ARIMA modeling. 82 statsmodels.tsa.ar_model.AutoReg 83 Autoregressive modeling supporting complex deterministics. 84 statsmodels.tsa.exponential_smoothing.ets.ETSModel 85 Additive and multiplicative exponential smoothing with trend. 86 statsmodels.tsa.statespace.exponential_smoothing.ExponentialSmoothing 87 Additive exponential smoothing with trend. 88 89 Notes 90 ----- 91 If :math:`\hat{S}_t` is the seasonal component, then the deseasonalize 92 series is constructed as 93 94 .. math:: 95 96 Y_t - \hat{S}_t 97 98 The trend component is not removed, and so the time series model should 99 be capable of adequately fitting and forecasting the trend if present. The 100 out-of-sample forecasts of the seasonal component are produced as 101 102 .. math:: 103 104 \hat{S}_{T + h} = \hat{S}_{T - k} 105 106 where :math:`k = m - h + m \lfloor (h-1)/m \rfloor` tracks the period 107 offset in the full cycle of 1, 2, ..., m where m is the period length. 108 109 This class is mostly a convenience wrapper around ``STL`` and a 110 user-specified model. The model is assumed to follow the standard 111 statsmodels pattern: 112 113 * ``fit`` is used to estimate parameters and returns a results instance, 114 ``results``. 115 * ``results`` must exposes a method ``forecast(steps, **kwargs)`` that 116 produces out-of-sample forecasts. 117 * ``results`` may also exposes a method ``get_prediction`` that produces 118 both in- and out-of-sample predictions. 119 120 Examples 121 -------- 122 >>> import numpy as np 123 >>> import pandas as pd 124 >>> from statsmodels.tsa.api import STLForecast 125 >>> from statsmodels.tsa.arima.model import ARIMA 126 >>> from statsmodels.datasets import macrodata 127 >>> ds = macrodata.load_pandas() 128 >>> data = np.log(ds.data.m1) 129 >>> base_date = f"{int(ds.data.year[0])}-{3*int(ds.data.quarter[0])+1}-1" 130 >>> data.index = pd.date_range(base_date, periods=data.shape[0], freq="QS") 131 132 Generate forecasts from an ARIMA 133 134 >>> stlf = STLForecast(data, ARIMA, model_kwargs={"order": (2, 1, 0)}) 135 >>> res = stlf.fit() 136 >>> forecasts = res.forecast(12) 137 138 Generate forecasts from an Exponential Smoothing model with trend 139 >>> from statsmodels.tsa.statespace import exponential_smoothing 140 >>> ES = exponential_smoothing.ExponentialSmoothing 141 >>> config = {"trend": True} 142 >>> stlf = STLForecast(data, ES, model_kwargs=config) 143 >>> res = stlf.fit() 144 >>> forecasts = res.forecast(12) 145 """ 146 147 def __init__( 148 self, 149 endog, 150 model, 151 *, 152 model_kwargs=None, 153 period=None, 154 seasonal=7, 155 trend=None, 156 low_pass=None, 157 seasonal_deg=1, 158 trend_deg=1, 159 low_pass_deg=1, 160 robust=False, 161 seasonal_jump=1, 162 trend_jump=1, 163 low_pass_jump=1, 164 ): 165 self._endog = endog 166 self._stl_kwargs = dict( 167 period=period, 168 seasonal=seasonal, 169 trend=trend, 170 low_pass=low_pass, 171 seasonal_deg=seasonal_deg, 172 trend_deg=trend_deg, 173 low_pass_deg=low_pass_deg, 174 robust=robust, 175 seasonal_jump=seasonal_jump, 176 trend_jump=trend_jump, 177 low_pass_jump=low_pass_jump, 178 ) 179 self._model = model 180 self._model_kwargs = {} if model_kwargs is None else model_kwargs 181 if not hasattr(model, "fit"): 182 raise AttributeError("model must expose a ``fit`` method.") 183 184 @Substitution(fit_params=indent(_fit_params, " " * 8)) 185 def fit(self, *, inner_iter=None, outer_iter=None, fit_kwargs=None): 186 """ 187 Estimate STL and forecasting model parameters. 188 189 Parameters 190 ----------\n%(fit_params)s 191 fit_kwargs : Dict[str, Any] 192 Any additional keyword arguments to pass to ``model``'s ``fit`` 193 method when estimating the model on the decomposed residuals. 194 195 Returns 196 ------- 197 STLForecastResults 198 Results with forecasting methods. 199 """ 200 fit_kwargs = {} if fit_kwargs is None else fit_kwargs 201 stl = STL(self._endog, **self._stl_kwargs) 202 stl_fit: DecomposeResult = stl.fit( 203 inner_iter=inner_iter, outer_iter=outer_iter 204 ) 205 model_endog = stl_fit.trend + stl_fit.resid 206 mod = self._model(model_endog, **self._model_kwargs) 207 res = mod.fit(**fit_kwargs) 208 if not hasattr(res, "forecast"): 209 raise AttributeError( 210 "The model's result must expose a ``forecast`` method." 211 ) 212 return STLForecastResults(stl, stl_fit, mod, res, self._endog) 213 214 215class STLForecastResults: 216 """ 217 Results for forecasting using STL to remove seasonality 218 219 Parameters 220 ---------- 221 stl : STL 222 The STL instance used to decompose the data. 223 result : DecomposeResult 224 The result of applying STL to the data. 225 model : Model 226 The time series model used to model the non-seasonal dynamics. 227 model_result : Results 228 Model results instance supporting, at a minimum, ``forecast``. 229 """ 230 231 def __init__( 232 self, stl: STL, result: DecomposeResult, model, model_result, endog 233 ) -> None: 234 self._stl = stl 235 self._result = result 236 self._model = model 237 self._model_result = model_result 238 self._endog = np.asarray(endog) 239 self._nobs = self._endog.shape[0] 240 self._index = getattr(endog, "index", pd.RangeIndex(self._nobs)) 241 if not ( 242 isinstance(self._index, (pd.DatetimeIndex, pd.PeriodIndex)) 243 or is_int_index(self._index) 244 ): 245 try: 246 self._index = pd.to_datetime(self._index) 247 except ValueError: 248 self._index = pd.RangeIndex(self._nobs) 249 250 @property 251 def period(self) -> int: 252 """The period of the seasonal component""" 253 return self._stl.period 254 255 @property 256 def stl(self) -> STL: 257 """The STL instance used to decompose the time series""" 258 return self._stl 259 260 @property 261 def result(self) -> DecomposeResult: 262 """The result of applying STL to the data""" 263 return self._result 264 265 @property 266 def model(self) -> Any: 267 """The model fit to the additively deseasonalized data""" 268 return self._model 269 270 @property 271 def model_result(self) -> Any: 272 """The result class from the estimated model""" 273 return self._model_result 274 275 def summary(self) -> Summary: 276 """ 277 Summary of both the STL decomposition and the model fit. 278 279 Returns 280 ------- 281 Summary 282 The summary of the model fit and the STL decomposition. 283 284 Notes 285 ----- 286 Requires that the model's result class supports ``summary`` and 287 returns a ``Summary`` object. 288 """ 289 if not hasattr(self._model_result, "summary"): 290 raise AttributeError( 291 "The model result does not have a summary attribute." 292 ) 293 summary: Summary = self._model_result.summary() 294 if not isinstance(summary, Summary): 295 raise TypeError( 296 "The model result's summary is not a Summary object." 297 ) 298 summary.tables[0].title = ( 299 "STL Decomposition and " + summary.tables[0].title 300 ) 301 config = self._stl.config 302 left_keys = ("period", "seasonal", "robust") 303 left_data = [] 304 left_stubs = [] 305 right_data = [] 306 right_stubs = [] 307 for key in config: 308 new = key.capitalize() 309 new = new.replace("_", " ") 310 if new in ("Trend", "Low Pass"): 311 new += " Length" 312 is_left = any(key.startswith(val) for val in left_keys) 313 new += ":" 314 stub = f"{new:<23s}" 315 val = f"{str(config[key]):>13s}" 316 if is_left: 317 left_stubs.append(stub) 318 left_data.append([val]) 319 else: 320 right_stubs.append(" " * 6 + stub) 321 right_data.append([val]) 322 tab = SimpleTable( 323 left_data, stubs=tuple(left_stubs), title="STL Configuration" 324 ) 325 tab.extend_right(SimpleTable(right_data, stubs=right_stubs)) 326 summary.tables.append(tab) 327 return summary 328 329 def _get_seasonal_prediction( 330 self, 331 start: Optional[DateLike], 332 end: Optional[DateLike], 333 dynamic: Union[bool, DateLike], 334 ) -> np.ndarray: 335 """ 336 Get STLs seasonal in- and out-of-sample predictions 337 338 Parameters 339 ---------- 340 start : int, str, or datetime, optional 341 Zero-indexed observation number at which to start forecasting, 342 i.e., the first forecast is start. Can also be a date string to 343 parse or a datetime type. Default is the the zeroth observation. 344 end : int, str, or datetime, optional 345 Zero-indexed observation number at which to end forecasting, i.e., 346 the last forecast is end. Can also be a date string to 347 parse or a datetime type. However, if the dates index does not 348 have a fixed frequency, end must be an integer index if you 349 want out of sample prediction. Default is the last observation in 350 the sample. 351 dynamic : bool, int, str, or datetime, optional 352 Integer offset relative to `start` at which to begin dynamic 353 prediction. Can also be an absolute date string to parse or a 354 datetime type (these are not interpreted as offsets). 355 Prior to this observation, true endogenous values will be used for 356 prediction; starting with this observation and continuing through 357 the end of prediction, forecasted endogenous values will be used 358 instead. 359 360 Returns 361 ------- 362 ndarray 363 Array containing the seasibak predictions. 364 """ 365 data = PandasData(pd.Series(self._endog), index=self._index) 366 if start is None: 367 start = 0 368 (start, end, out_of_sample, prediction_index) = get_prediction_index( 369 start, end, self._nobs, self._index, data=data 370 ) 371 372 if isinstance(dynamic, (str, dt.datetime, pd.Timestamp)): 373 dynamic, _, _ = get_index_loc(dynamic, self._index) 374 dynamic = dynamic - start 375 elif dynamic is True: 376 dynamic = 0 377 elif dynamic is False: 378 # If `dynamic=False`, then no dynamic predictions 379 dynamic = None 380 nobs = self._nobs 381 dynamic, _ = _check_dynamic(dynamic, start, end, nobs) 382 in_sample_end = end + 1 if dynamic is None else dynamic 383 seasonal = np.asarray(self._result.seasonal) 384 predictions = seasonal[start:in_sample_end] 385 oos = np.empty((0,)) 386 if dynamic is not None: 387 num = out_of_sample + end + 1 - dynamic 388 oos = self._seasonal_forecast(num, None, offset=dynamic) 389 elif out_of_sample: 390 oos = self._seasonal_forecast(out_of_sample, None) 391 oos_start = max(start - nobs, 0) 392 oos = oos[oos_start:] 393 predictions = np.r_[predictions, oos] 394 return predictions 395 396 def _seasonal_forecast( 397 self, steps: int, index: Optional[pd.Index], offset=None 398 ) -> Union[pd.Series, np.ndarray]: 399 """ 400 Get the seasonal component of the forecast 401 402 Parameters 403 ---------- 404 steps : int 405 The number of steps required. 406 index : pd.Index 407 A pandas index to use. If None, returns an ndarray. 408 offset : int 409 The index of the first out-of-sample observation. If None, uses 410 nobs. 411 412 Returns 413 ------- 414 seasonal : {ndarray, Series} 415 The seasonal component. 416 """ 417 418 period = self.period 419 seasonal = np.asarray(self._result.seasonal) 420 offset = self._nobs if offset is None else offset 421 seasonal = seasonal[offset - period : offset] 422 seasonal = np.tile(seasonal, steps // period + ((steps % period) != 0)) 423 seasonal = seasonal[:steps] 424 if index is not None: 425 seasonal = pd.Series(seasonal, index=index) 426 return seasonal 427 428 def forecast( 429 self, steps: int = 1, **kwargs: Dict[str, Any] 430 ) -> Union[np.ndarray, pd.Series]: 431 """ 432 Out-of-sample forecasts 433 434 Parameters 435 ---------- 436 steps : int, str, or datetime, optional 437 If an integer, the number of steps to forecast from the end of the 438 sample. Can also be a date string to parse or a datetime type. 439 However, if the dates index does not have a fixed frequency, steps 440 must be an integer. Default 441 **kwargs 442 Additional arguments may required for forecasting beyond the end 443 of the sample. These arguments are passed into the time series 444 model results' ``forecast`` method. 445 446 Returns 447 ------- 448 forecast : {ndarray, Series} 449 Out of sample forecasts 450 """ 451 forecast = self._model_result.forecast(steps=steps, **kwargs) 452 index = forecast.index if isinstance(forecast, pd.Series) else None 453 return forecast + self._seasonal_forecast(steps, index) 454 455 def get_prediction( 456 self, 457 start: Optional[DateLike] = None, 458 end: Optional[DateLike] = None, 459 dynamic: Union[bool, DateLike] = False, 460 **kwargs: Dict[str, Any], 461 ): 462 """ 463 In-sample prediction and out-of-sample forecasting 464 465 Parameters 466 ---------- 467 start : int, str, or datetime, optional 468 Zero-indexed observation number at which to start forecasting, 469 i.e., the first forecast is start. Can also be a date string to 470 parse or a datetime type. Default is the the zeroth observation. 471 end : int, str, or datetime, optional 472 Zero-indexed observation number at which to end forecasting, i.e., 473 the last forecast is end. Can also be a date string to 474 parse or a datetime type. However, if the dates index does not 475 have a fixed frequency, end must be an integer index if you 476 want out of sample prediction. Default is the last observation in 477 the sample. 478 dynamic : bool, int, str, or datetime, optional 479 Integer offset relative to `start` at which to begin dynamic 480 prediction. Can also be an absolute date string to parse or a 481 datetime type (these are not interpreted as offsets). 482 Prior to this observation, true endogenous values will be used for 483 prediction; starting with this observation and continuing through 484 the end of prediction, forecasted endogenous values will be used 485 instead. 486 **kwargs 487 Additional arguments may required for forecasting beyond the end 488 of the sample. These arguments are passed into the time series 489 model results' ``get_prediction`` method. 490 491 Returns 492 ------- 493 PredictionResults 494 PredictionResults instance containing in-sample predictions, 495 out-of-sample forecasts, and prediction intervals. 496 """ 497 pred = self._model_result.get_prediction( 498 start=start, end=end, dynamic=dynamic, **kwargs 499 ) 500 seasonal_prediction = self._get_seasonal_prediction( 501 start, end, dynamic 502 ) 503 mean = pred.predicted_mean + seasonal_prediction 504 try: 505 var_pred_mean = pred.var_pred_mean 506 except (AttributeError, NotImplementedError): 507 # Allow models that do not return var_pred_mean 508 import warnings 509 510 warnings.warn( 511 f"The variance of the predicted mean is not available using " 512 f"the {self.model.__class__.__name__} model class.", 513 UserWarning, 514 ) 515 var_pred_mean = np.nan + mean.copy() 516 return PredictionResults( 517 mean, var_pred_mean, dist="norm", row_labels=pred.row_labels 518 ) 519